Handler是什么
在Android中表示一种消息处理机制或者叫消息处理方法,用来循环处理应用程序主线程各种消息,比如UI的更新,按键、触摸消息事件等。
为什么Android要用Handler机制
Android应用程序启动时,系统会创建一个主线程,负责与UI组件(widget、view)进行交互,比如控制UI界面界面显示、更新等;分发事件给UI界面处理,比如按键事件、触摸事件、屏幕绘图事件等,因此,Android主线程也称为UI线程。
由此可知,UI线程只能处理一些简单的、短暂的操作,如果要执行繁重的任务或者耗时很长的操作,比如访问网络、数据库、下载等,这种单线程模型会导致线程运行性能大大降低,甚至阻塞UI线程,如果被阻塞超过5秒,系统会提示应用程序无相应对话框,缩写为ANR,导致退出整个应用程序或者短暂杀死应用程序。
除此之外,单线程模型的UI主线程也是不安全的,会造成不可确定的结果。线程不安全简单理解为:多线程访问资源时,有可能出现多个线程先后更改数据造成数据不一致。比如,A工作线程(也称为子线程)访问某个公共UI资源,B工作线程在某个时候也访问了该公共资源,当B线程正访问时,公共资源的属性已经被A改变了,这样B得到的结果不是所需要的的,造成了数据不一致的混乱情况。
线程安全简单理解为:当一个线程访问功能资源时,对该资源进程了保护,比如加了锁机制,当前线程在没有访问结束释放锁之前,其他线程只能等待直到释放锁才能访问,这样的线程就是安全的。
基于以上原因,Android的单线程模型必须遵守两个规则:
1. 不要阻塞UI线程;
2. 不要在UI线程之外访问UI组件,即不能在子线程访问UI组件,只能在UI线程访问。
因此,Android系统将大部分耗时、繁重任务交给子线程完成,不会在主线程中完成,解决了第一个难题;同时,Android只允许主线程更新UI界面,子线程处理后的结果无法和主线程交互,即无法直接访问主线程,这就要用到Handler机制来解决此问题。基于Handler机制,在子线程先获得Handler对象,该对象将数据发送到主线程消息队列,主线程通过Loop循环获取消息交给Handler处理。
案例用法
先给出2个案例,大概看看Handler如何工作的,再给出源码分析。下面两个案例都实现一个定时器,每隔一秒显示数字0-9
案例1:sendMenssage方法更新UI界面
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 | packagecom.example.testhandler; importandroid.app.Activity; importandroid.os.Bundle; importandroid.os.Handler; importandroid.os.Message; importandroid.view.View; importandroid.view.View.OnClickListener; importandroid.widget.Button; importandroid.widget.TextView; publicclassTestHandlerActivityextendsActivity{ protectedstaticfinalStringTAG="TestHandlerActivity"; privateButton sendButton; privateTextView displayText; Handler mHandler=newHandler(){ @Override publicvoidhandleMessage(Message msg){ // TODO Auto-generated method stub if(msg.what==99){ inti=msg.getData().getInt("displayKey"); displayText.setText(i+""); } super.handleMessage(msg); } }; privateButton exitButton; @Override protectedvoidonCreate(Bundle savedInstanceState){ super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); initViews(); } privatevoidinitViews(){ displayText=(TextView)findViewById(R.id.display_text); sendButton=(Button)findViewById(R.id.send_button); sendButton.setOnClickListener(buttonOnClickListener); exitButton=(Button)findViewById(R.id.exit_button); exitButton.setOnClickListener(buttonOnClickListener); } OnClickListener buttonOnClickListener=newOnClickListener(){ @Override publicvoidonClick(Viewv){ // TODO Auto-generated method stub if(v.getId()==R.id.send_button){ newThread(newWorkRunnable()).start(); }elseif(v.getId()==R.id.exit_button){ finish(); } } }; classWorkRunnableimplementsRunnable{ @Override publicvoidrun(){ // TODO Auto-generated method stub inti=0; while(i<10){ Message msg=mHandler.obtainMessage(99); Bundle bundle=newBundle(); bundle.putInt("displayKey",i); msg.setData(bundle); mHandler.sendMessage(msg); try{ Thread.sleep(1000); }catch(InterruptedExceptione){ // TODO Auto-generated catch block e.printStackTrace(); } i++; } } } @Override protectedvoidonDestroy(){ // TODO Auto-generated method stub if(mHandler!=null){ if(mHandler.hasMessages(99)){ mHandler.removeMessages(99); } mHandler=null; } super.onDestroy(); } } |
WorkRunnable的run方法中进行了耗时操作,要把结果反馈给UI,需要Handler发送消息给UI线程,在Handler的handleMessage对消息进行处理
案例1下载地址:http://yunpan.cn/cLXSTdc8c5gTs 访问密码 e67b
案例2: postDelayed更新UI
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 | packagecom.example.postdelaydemo; importandroid.app.Activity; importandroid.os.Bundle; importandroid.os.Handler; importandroid.util.Log; importandroid.view.View; importandroid.view.View.OnClickListener; importandroid.widget.Button; importandroid.widget.TextView; publicclassPostDelayActivityextendsActivity{ protectedstaticfinalStringTAG="PostDelayActivity"; privateButton sendButton; privateTextView displayText; privateButton exitButton; Handler mHandler=newHandler(); @Override protectedvoidonCreate(Bundle savedInstanceState){ super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); initViews(); } privatevoidinitViews(){ displayText=(TextView)findViewById(R.id.display_text); sendButton=(Button)findViewById(R.id.send_button); sendButton.setOnClickListener(buttonOnClickListener); exitButton=(Button)findViewById(R.id.exit_button); exitButton.setOnClickListener(buttonOnClickListener); } OnClickListener buttonOnClickListener=newOnClickListener(){ @Override publicvoidonClick(Viewv){ // TODO Auto-generated method stub if(v.getId()==R.id.send_button){ mHandler.postDelayed(newPostDelayRunnable(),1000); }elseif(v.getId()==R.id.exit_button){ finish(); } } }; classPostDelayRunnableimplementsRunnable{ inti=0; @Override publicvoidrun(){ // TODO Auto-generated method stub if(i>=10){ mHandler.removeCallbacks(this); }else{ displayText.setText(getResources().getString( R.string.display_label) +i); mHandler.postDelayed(this,1000); } i++; } } @Override protectedvoidonDestroy(){ // TODO Auto-generated method stub if(mHandler!=null){ mHandler=null; } super.onDestroy(); } } |
postDelayed方法把runnable对象作为消息放到队列中等待执行,run方法中就是具体执行过程。
案例2下载地址:http://yunpan.cn/cLDacQBStiRdx 访问密码 58e6
Hanlder机制的架构
msg.target是Handler的一个对象引用,handler对象发送消息暂存到消息队列,Looper取出消息分发给相应的handler处理。
Handler源码浅析及其原理
为了描述清楚,借用本文最普通的案例1,从Handler创建、发送、处理三个方面来分析,分析过程中会涉及到消息队列、Looper等。
1. Handler创建
new Handler()时会调用Handler的构造函数
| publicHandler(){ this(null,false); } publicHandler(Callback callback,booleanasync){ mLooper=Looper.myLooper(); if(mLooper==null){ thrownewRuntimeException("Can't create handler inside thread that has not called Looper.prepare()"); } mQueue=mLooper.mQueue; mCallback=callback; mAsynchronous=async; } |
myLooper方法返回当前线程的Looper对象赋给mLooper,此处当前对象线程就是主线程,不为空;如果是工作线程,就为空。mQueue为mLooper对应的消息队列,Handler构造方法进行了简单的初始化,没有多余的额外工作。
虽然构造方法及其简单,但有两个问题需要弄明白:1. 当前线程的Looper对象mLooper、MessagQueue对象mQueue是什么时候创建的?2. 在创建过程中还做了哪些重要的事情?
要回到这个问题,得从主线程ActivityThread对象着手。在这篇文章:启动Activity的流程(Launcher中点击图标启动) 的过程14、15中,首次启动Activity时通过Process.start创建应用层程序的主线程,创建成功后进入到主线程ActivityThread的main方法中开始执行,main方法有这句:
| Looper.prepareMainLooper(); ActivityThread thread=newActivityThread(); Looper.loop(); |
prepareMainLooper方法创建了Looper对象,在创建创建Looper对象mLooper的同时也创建了消息队列MessageQueue对象mQueue,这就回答了第一个问题;
new ActivityThread()创建了主线程对象,在变量声明开头也通过new H()创建了Handler对象;loop()方法循环取出消息队列中的消息交给Handler处理。这回到了第二个问题。不过,这样回答太过于简单,还需要详细研究源码。
相关源码路径:
| frameworks\base\core\java\android\app\ActivityThread.java frameworks\base\core\java\android\os\Handler.java frameworks\base\core\java\androidos\Looper.java frameworks\base\core\java\android\os\MessageQueue.java frameworks\base\core\jni\android_os_MessageQueue.cpp system\core\libutils\Looper.cpp bionic\libc\include\sys\Epoll.h |
从prepareMainLooper方法开始,Looper.prepareMainLooper() —-> prepare(false) —-> sThreadLocal.set(new Looper(quitAllowed)) —-> Looper对象的构造方法
1.1 Looper类的构造方法
| privateLooper(booleanquitAllowed){ mQueue=newMessageQueue(quitAllowed); mThread=Thread.currentThread(); } |
创建Looper对象时,在其构造方法中也new了一个MessageQueue对象,参数quitAllowed传递过来为false,currentThread返回当前线程对象赋给mThread变量
| MessageQueue(booleanquitAllowed){ mQuitAllowed=quitAllowed; mPtr=nativeInit(); } |
nativeInit是本地方法,其本地实现在C++层如下所示
1.2 nativeInit的本地实现
| staticjlong android_os_MessageQueue_nativeInit(JNIEnv*env,jclass clazz){ NativeMessageQueue*nativeMessageQueue=newNativeMessageQueue(); if(!nativeMessageQueue){ jniThrowRuntimeException(env,"Unable to allocate native queue"); return0; } nativeMessageQueue->incStrong(env); returnreinterpret_cast<jlong>(nativeMessageQueue); } |
此方法名由java层类的包名+类名+方法名组成,这不是标准,是习惯写法,也可以采用其他名称组合,具体是什么名称由JNINativeMethod方法中Java对象与c++对象的映射决定,此处是JNI方面的内容,不作过多解释。
第一句创建了c++层的本地消息队列NativeMessageQueue对象,如果不为空,调用reinterpret_cast把对象指针转换为jlong类型返回给java层,到Java层就转换成了long类型,返回一个long类型的指针给java层并赋给mPtr对象变量,于是,java层就得到了c++层NativeMessageQueue对象的句柄或指针,java层获得该对象是有用的,参考2.4节的nativeWake(mPtr)
NativeMessageQueue类的构造方法:
| NativeMessageQueue::NativeMessageQueue():mInCallback(false),mExceptionObj(NULL){ mLooper=Looper::getForThread(); if(mLooper==NULL){ mLooper=newLooper(false); Looper::setForThread(mLooper); } } |
在NativeMessageQueue的构造方法中也创建了c++层Looper对象,java层和c++层都有相应的Looper、MessageQueue对象。Looper对象的构造函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | Looper::Looper(boolallowNonCallbacks): mAllowNonCallbacks(allowNonCallbacks),mSendingMessage(false), mResponseIndex(0),mNextMessageUptime(LLONG_MAX){ intwakeFds[2]; intresult=pipe(wakeFds); LOG_ALWAYS_FATAL_IF(result!=0,"Could not create wake pipe. errno=%d",errno); mWakeReadPipeFd=wakeFds[0]; mWakeWritePipeFd=wakeFds[1]; result=fcntl(mWakeReadPipeFd,F_SETFL,O_NONBLOCK); LOG_ALWAYS_FATAL_IF(result!=0,"Could not make wake read pipe non-blocking. errno=%d", errno); result=fcntl(mWakeWritePipeFd,F_SETFL,O_NONBLOCK); LOG_ALWAYS_FATAL_IF(result!=0,"Could not make wake write pipe non-blocking. errno=%d", errno); mIdling=false; // Allocate the epoll instance and register the wake pipe. mEpollFd=epoll_create(EPOLL_SIZE_HINT); LOG_ALWAYS_FATAL_IF(mEpollFd<0,"Could not create epoll instance. errno=%d",errno); structepoll_event eventItem; memset(&eventItem,0,sizeof(epoll_event));// zero out unused members of data field union eventItem.events=EPOLLIN; eventItem.data.fd=mWakeReadPipeFd; result=epoll_ctl(mEpollFd,EPOLL_CTL_ADD,mWakeReadPipeFd,&eventItem); LOG_ALWAYS_FATAL_IF(result!=0,"Could not add wake read pipe to epoll instance. errno=%d", errno); } |
pipe系统调用创建了一个管道,wakeFds[0]是管道的读取端,赋值给mWakeReadPipeFd,wakeFds[1]是管道的写入端,赋值给mWakeWritePipeFd;如果管道创建成功,返回为0,否则为-1
fcntl系统调用将已打开文件的状态标志修改为O_NONBLOCK,表示如果open打开文件成功,后续对文件的I/O操作不会阻塞,如果open打开文件失败,也不会陷入阻塞状态,只会返回错误。F_SETFL命令用来设置打开文件的状态,获取文件的状态用F_GETFL命令
创建好管道后,系统采用了Linux中的epoll机制监控就绪列表中的文件描述符,当某文件描述符上发生I/O事件时,epoll机制会通知应用程序处理该事件。采用epoll机制的优点之一是在面对大批量文件描述符时也能表现优越的性能。
epoll机制有三个系统调用构成:epoll_create、epoll_ctl、epoll_wait。
| mEpollFd=epoll_create(EPOLL_SIZE_HINT); |
epoll_create创建了一个epoll对象,参数EPOLL_SIZE_HINT表示该epoll对象监控的文件描述符个数,如果成功创建epoll对象,返回一个文件描述符赋给mEpollFd,这个描述符就表示新创建的epoll对象,该对象在epoll_ctl、epoll_wait都会用到;如果创建失败,返回-1
| result=epoll_ctl(mEpollFd,EPOLL_CTL_ADD,mWakeReadPipeFd,&eventItem); |
epoll_ctl用来注册文件描述符mWakeReadPipeFd上的I/O事件,第一个参数mEpollFd是epoll对象,由epoll_create创建;第二个参数表示动作,可以把文件描述符增加到epoll对象的兴趣列表中,也可以从兴趣列表中删除、修改文件描述符,EPOLL_CTL_ADD的意思就是增加,也就是注册的意思,把文件描述符注册到epoll对象的兴趣列表中;第三个参数就是被注册的文件描述符,此处是注册管道描述符的读取端mWakeReadPipeFd;第四个参数表示待监听的I/O事件及相关信息。
| structepoll_event eventItem; memset(&eventItem,0,sizeof(epoll_event));// zero out unused members of data field union eventItem.events=EPOLLIN; eventItem.data.fd=mWakeReadPipeFd; |
| typedefunionepoll_data{ void*ptr; intfd; uint32_t u32; uint64_t u64; }epoll_data_t; structepoll_event{ uint32_t events; epoll_data_t data; } |
epoll_event结构体中,events表示待监听事件,EPOLLIN表示文件描述符可读数据;data是epoll_data_t联合体,表示回传给调用者进程的信息,fd就是待注册文件描述符。创建完了一系列对象后,开始进入到Looper对象的loop方法:
1.3 Looper.Java的loop方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | publicstaticvoidloop(){ finalLooper me=myLooper(); if(me==null){ thrownewRuntimeException("No Looper; Looper.prepare() wasn't called on this thread."); } finalMessageQueue queue=me.mQueue; // Make sure the identity of this thread is that of the local process, // and keep track of what that identity token actually is. Binder.clearCallingIdentity(); finallongident=Binder.clearCallingIdentity(); for(;;){ Message msg=queue.next();// might block if(msg==null){ // No message indicates that the message queue is quitting. return; } // This must be in a local variable, in case a UI event sets the logger Printer logging=me.mLogging; if(logging!=null){ logging.println(">>>>> Dispatching to "+msg.target+" "+ msg.callback+": "+msg.what); } msg.target.dispatchMessage(msg); if(logging!=null){ logging.println("<<<<< Finished to "+msg.target+" "+msg.callback); } // Make sure that during the course of dispatching the // identity of the thread wasn't corrupted. finallongnewIdent=Binder.clearCallingIdentity(); if(ident!=newIdent){ Log.wtf(TAG,"Thread identity changed from 0x" +Long.toHexString(ident)+" to 0x" +Long.toHexString(newIdent)+" while dispatching to " +msg.target.getClass().getName()+" " +msg.callback+" what="+msg.what); } msg.recycleUnchecked(); } } |
for循环中调用消息队列的next方法获取消息保存到msg变量中,next方法:
1.4 MessageQueue.java的next方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 | Message next(){ // Return here if the message loop has already quit and been disposed. // This can happen if the application tries to restart a looper after quit // which is not supported. finallongptr=mPtr; if(ptr==0){ returnnull; } intpendingIdleHandlerCount=-1;// -1 only during first iteration intnextPollTimeoutMillis=0; for(;;){ if(nextPollTimeoutMillis!=0){ Binder.flushPendingCommands(); } nativePollOnce(ptr,nextPollTimeoutMillis); synchronized(this){ // Try to retrieve the next message. Return if found. finallongnow=SystemClock.uptimeMillis(); Message prevMsg=null; Message msg=mMessages; if(msg!=null&&msg.target==null){ // Stalled by a barrier. Find the next asynchronous message in the queue. do{ prevMsg=msg; msg=msg.next; }while(msg!=null&&!msg.isAsynchronous()); } if(msg!=null){ if(now<msg.when){ // Next message is not ready. Set a timeout to wake up when it is ready. nextPollTimeoutMillis=(int)Math.min(msg.when-now,Integer.MAX_VALUE); }else{ // Got a message. mBlocked=false; if(prevMsg!=null){ prevMsg.next=msg.next; }else{ mMessages=msg.next; } msg.next=null; if(false)Log.v("MessageQueue","Returning message: "+msg); returnmsg; } }else{ // No more messages. nextPollTimeoutMillis=-1; } // Process the quit message now that all pending messages have been handled. if(mQuitting){ dispose(); returnnull; } // If first time idle, then get the number of idlers to run. // Idle handles only run if the queue is empty or if the first message // in the queue (possibly a barrier) is due to be handled in the future. if(pendingIdleHandlerCount<0 &&(mMessages==null||now<mMessages.when)){ pendingIdleHandlerCount=mIdleHandlers.size(); } if(pendingIdleHandlerCount<=0){ // No idle handlers to run. Loop and wait some more. mBlocked=true; continue; } if(mPendingIdleHandlers==null){ mPendingIdleHandlers=newIdleHandler[Math.max(pendingIdleHandlerCount,4)]; } mPendingIdleHandlers=mIdleHandlers.toArray(mPendingIdleHandlers); } // Run the idle handlers. // We only ever reach this code block during the first iteration. for(inti=0;i<pendingIdleHandlerCount;i++){ finalIdleHandler idler=mPendingIdleHandlers[i]; mPendingIdleHandlers[i]=null;// release the reference to the handler booleankeep=false; try{ keep=idler.queueIdle(); }catch(Throwablet){ Log.wtf("MessageQueue","IdleHandler threw exception",t); } if(!keep){ synchronized(this){ mIdleHandlers.remove(idler); } } } // Reset the idle handler count to 0 so we do not run them again. pendingIdleHandlerCount=0; // While calling an idle handler, a new message could have been delivered // so go back and look again for a pending message without waiting. nextPollTimeoutMillis=0; } } |
mPtr代表c++层NativeMessageQueue对象,赋给ptr;在for无限循环中,先执行:
| nativePollOnce(ptr,nextPollTimeoutMillis); |
nativePollOnce本地方法的调用过程是:nativePollOnce —-> android_os_MessageQueue_nativePollOnce —-> nativeMessageQueue->pollOnce(env, timeoutMillis) —-> mLooper->pollOnce(timeoutMillis)
中间过程较为简单不作分析,直接看mLooper对象的pollOnce方法,mLooper是c++层Looper对象,pollOnce方法源码:
1.5 c++层Looper对象的pollOnce方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | intLooper::pollOnce(inttimeoutMillis,int*outFd,int*outEvents,void**outData){ intresult=0; for(;;){ while(mResponseIndex<mResponses.size()){ constResponse&response=mResponses.itemAt(mResponseIndex++); intident=response.request.ident; if(ident>=0){ intfd=response.request.fd; intevents=response.events; void*data=response.request.data; #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - returning signalled identifier %d: " "fd=%d, events=0x%x, data=%p", this,ident,fd,events,data); #endif if(outFd!=NULL)*outFd=fd; if(outEvents!=NULL)*outEvents=events; if(outData!=NULL)*outData=data; returnident; } } if(result!=0){ #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - returning result %d",this,result); #endif if(outFd!=NULL)*outFd=0; if(outEvents!=NULL)*outEvents=0; if(outData!=NULL)*outData=NULL; returnresult; } result=pollInner(timeoutMillis); } } |
timeoutMillis传递过来为0,其他三个参数在Looper.h中初始化为null
继续执行到pollInner方法:
1.6 c++层Looper对象的pollInner方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 | intLooper::pollInner(inttimeoutMillis){ #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d",this,timeoutMillis); #endif // Adjust the timeout based on when the next message is due. if(timeoutMillis!=0&&mNextMessageUptime!=LLONG_MAX){ nsecs_t now=systemTime(SYSTEM_TIME_MONOTONIC); intmessageTimeoutMillis=toMillisecondTimeoutDelay(now,mNextMessageUptime); if(messageTimeoutMillis>=0 &&(timeoutMillis<0||messageTimeoutMillis<timeoutMillis)){ timeoutMillis=messageTimeoutMillis; } #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis=%d", this,mNextMessageUptime-now,timeoutMillis); #endif } // Poll. intresult=POLL_WAKE; mResponses.clear(); mResponseIndex=0; // We are about to idle. mIdling=true; structepoll_event eventItems[EPOLL_MAX_EVENTS]; inteventCount=epoll_wait(mEpollFd,eventItems,EPOLL_MAX_EVENTS,timeoutMillis); // No longer idling. mIdling=false; // Acquire lock. mLock.lock(); // Check for poll error. if(eventCount<0){ if(errno==EINTR){ gotoDone; } ALOGW("Poll failed with an unexpected error, errno=%d",errno); result=POLL_ERROR; gotoDone; } // Check for poll timeout. if(eventCount==0){ #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - timeout",this); #endif result=POLL_TIMEOUT; gotoDone; } // Handle all events. #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - handling events from %d fds",this,eventCount); #endif for(inti=0;i<eventCount;i++){ intfd=eventItems[i].data.fd; uint32_t epollEvents=eventItems[i].events; if(fd==mWakeReadPipeFd){ if(epollEvents&EPOLLIN){ awoken(); }else{ ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.",epollEvents); } }else{ ssize_t requestIndex=mRequests.indexOfKey(fd); if(requestIndex>=0){ intevents=0; if(epollEvents&EPOLLIN)events|=EVENT_INPUT; if(epollEvents&EPOLLOUT)events|=EVENT_OUTPUT; if(epollEvents&EPOLLERR)events|=EVENT_ERROR; if(epollEvents&EPOLLHUP)events|=EVENT_HANGUP; pushResponse(events,mRequests.valueAt(requestIndex)); }else{ ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " "no longer registered.",epollEvents,fd); } } } Done:; // Invoke pending message callbacks. mNextMessageUptime=LLONG_MAX; while(mMessageEnvelopes.size()!=0){ nsecs_t now=systemTime(SYSTEM_TIME_MONOTONIC); constMessageEnvelope&messageEnvelope=mMessageEnvelopes.itemAt(0); if(messageEnvelope.uptime<=now){ // Remove the envelope from the list. // We keep a strong reference to the handler until the call to handleMessage // finishes. Then we drop it so that the handler can be deleted *before* // we reacquire our lock. {// obtain handler sp<MessageHandler>handler=messageEnvelope.handler; Message message=messageEnvelope.message; mMessageEnvelopes.removeAt(0); mSendingMessage=true; mLock.unlock(); #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d", this,handler.get(),message.what); #endif handler->handleMessage(message); }// release handler mLock.lock(); mSendingMessage=false; result=POLL_CALLBACK; }else{ // The last message left at the head of the queue determines the next wakeup time. mNextMessageUptime=messageEnvelope.uptime; break; } } // Release lock. mLock.unlock(); // Invoke all response callbacks. for(size_ti=0;i<mResponses.size();i++){ Response&response=mResponses.editItemAt(i); if(response.request.ident==POLL_CALLBACK){ intfd=response.request.fd; intevents=response.events; void*data=response.request.data; #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p", this,response.request.callback.get(),fd,events,data); #endif intcallbackResult=response.request.callback->handleEvent(fd,events,data); if(callbackResult==0){ removeFd(fd); } // Clear the callback reference in the response structure promptly because we // will not clear the response vector itself until the next poll. response.request.callback.clear(); result=POLL_CALLBACK; } } returnresult; } |
参数timeoutMillis传递过来为0,第一个if语句不成立,跳过
| structepoll_event eventItems[EPOLL_MAX_EVENTS]; inteventCount=epoll_wait(mEpollFd,eventItems,EPOLL_MAX_EVENTS,timeoutMillis); |
声明了epoll_event结构体数组eventItems,最大值为EPOLL_MAX_EVENTS即16,表示监听最大文件描述符个数;epoll_wait是epoll机制的第三个系统调用接口,返回epoll对象mEpollFd中处于就绪状态的文件描述符个数,或者说返回发生I/O事件的个数,返回的个数赋给eventCount;第二个参数就是发生I/O事件的文件描述符的信息集合,其最大值由第三个参数确定,如果有n个(n<=EPOLL_MAX_EVENTS)事件发生,返回值eventCount就等于n;第四个参数为0,表示执行一次非阻塞式的检查,看看兴趣列表中的文件描述符产生了哪个事件;如果为-1,此调用将一直阻塞,直到有事件产生;如果大于0,调用将阻塞timeoutMills秒,直到有事件产生。
epoll_wait将一直监控兴趣列表中的管道描述符读取端,直到管道有数据时即发生了I/O事件时才返回,假设当前管道还没有发送I/O事件,因此暂时分析到此处,待有消息时再继续。
关于epoll机制,参考书籍:《LINUX/UNIX系统编程手册》第63.4节
2 消息发送
2.1 Handler的sendEmptyMessage方法
sendEmptyMessage源码:
| publicfinalbooleansendEmptyMessage(intwhat){ returnsendEmptyMessageDelayed(what,0); } publicfinalbooleansendEmptyMessageDelayed(intwhat,longdelayMillis){ Message msg=Message.obtain(); msg.what=what; returnsendMessageDelayed(msg,delayMillis); } |
sendEmptyMessage是sendEmptyMessageDelayed的特殊形式,当延迟发送时间为0时的特殊情况。
2.2 Message的obtain方法
| publicstaticMessage obtain(){ synchronized(sPoolSync){ if(sPool!=null){ Messagem=sPool; sPool=m.next; m.next=null; m.flags=0;// clear in-use flag sPoolSize--; returnm; } } returnnewMessage(); } |
obtain从消息池中返回一个已存在Message对象,避免重新创建Message对象,浪费内存;如果消息池是空的,就new一个Message对象,然后代码一直进行到enqueueMessage:
2.3 Handler.java的enqueueMessage方法
| privatebooleanenqueueMessage(MessageQueue queue,Message msg,longuptimeMillis){ msg.target=this; if(mAsynchronous){ msg.setAsynchronous(true); } returnqueue.enqueueMessage(msg,uptimeMillis); } |
this表示当前Handler对象,保存到消息的target变量中,这类似于ip路由的目的地址一样,在发送前指明了目的地,待处理消息时就知道由谁来处理,继续:
2.4 MessageQueue.java的enqueueMessage方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | booleanenqueueMessage(Message msg,longwhen){ if(msg.target==null){ thrownewIllegalArgumentException("Message must have a target."); } if(msg.isInUse()){ thrownewIllegalStateException(msg+" This message is already in use."); } synchronized(this){ if(mQuitting){ IllegalStateExceptione=newIllegalStateException( msg.target+" sending message to a Handler on a dead thread"); Log.w("MessageQueue",e.getMessage(),e); msg.recycle(); returnfalse; } msg.markInUse(); msg.when=when; Messagep=mMessages; booleanneedWake; if(p==null||when==0||when<p.when){ // New head, wake up the event queue if blocked. msg.next=p; mMessages=msg; needWake=mBlocked; }else{ // Inserted within the middle of the queue. Usually we don't have to wake // up the event queue unless there is a barrier at the head of the queue // and the message is the earliest asynchronous message in the queue. needWake=mBlocked&&p.target==null&&msg.isAsynchronous(); Message prev; for(;;){ prev=p; p=p.next; if(p==null||when<p.when){ break; } if(needWake&&p.isAsynchronous()){ needWake=false; } } msg.next=p;// invariant: p == prev.next prev.next=msg; } // We can assume mPtr != 0 because mQuitting is false. if(needWake){ nativeWake(mPtr); } } returntrue; } |
msg.when是该消息执行的时间,就是说当时间走到msg.when时,就会放到队列中;p是消息队列mMessages首指针。
if语句的含义:如果当前消息队列为空,或当前消息等待时间为0,或当前消息等待时间比队列中第一个消息的执行时间要小(时间上小于就是说时间要早些),就把当前消息插入到消息队列首位置,并把标致needWake置为mBlocked,mBlocked有可能是true(在MessageQueue的next方法中会提到),表示队列中有消息了,就可以唤醒阻塞中的主线程;
else语句的含义:通过for无线循环在队列中查找一个适当的位置,把新消息插入到此处,具体插到什么位置?根据待插入消息的执行时间和队列中的消息的处理时间大小决定:如果when >= p.when,意味着待插入消息的执行时间大于(换句话说晚于)等于队列中的某个消息,就继续循环,直到待插入消息的执行时间小于(早于)队列中的某个消息时为止,跳出循环语句,这就找到了插入的位置,执行:
| msg.next=p;// invariant: p == prev.next prev.next=msg; |
把待插入消息插入此处;假如循环到最后一个元素都没有找到合适的位置(队列中所有消息的执行时间都小于或等于待插入消息),就把待插入消息直接插到队列尾部,这就是下面if语句的含义。
可以看到,消息队列是按照消息处理时间从小到大顺序排列的。
当前线程继续执行:
| if(needWake){ nativeWake(mPtr); } |
needWake什么时候为true?与mBlocked有关,mBlocked在next()方法中在某个时候会被置为true,当消息队列没有消息时,nextPollTimeoutMillis为-1,执行到:
| if(pendingIdleHandlerCount<=0){ // No idle handlers to run. Loop and wait some more. mBlocked=true; continue; } |
此时mBlocked为true,继续for无限循环,主线程处于等待状态。
nativeWake是本地方法,其调用过程为:
nativeWake(mPtr) —-> return nativeMessageQueue->wake() —-> mLooper->wake()
2.4 Looper.cpp的wake方法
| voidLooper::wake(){ #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ wake",this); #endif ssize_t nWrite; do{ nWrite=write(mWakeWritePipeFd,"W",1); }while(nWrite==-1&&errno==EINTR); if(nWrite!=1){ if(errno!=EAGAIN){ ALOGW("Could not write wake signal, errno=%d",errno); } } } |
write系统调用接口往管道写入端文件描述符mWakeWritePipeFd写入字符w,管道中有数据了,写入端进入阻塞状态,而管道读端被唤醒,也就是主线程被唤醒,此时epollo机制检测到兴趣列表中有I/O事件发生,epoll_wait就返回发生I/O事件的个数,接着1.6节pollInner方法继续分析:
| if(eventCount<0){ if(errno==EINTR){ gotoDone; } ALOGW("Poll failed with an unexpected error, errno=%d",errno); result=POLL_ERROR; gotoDone; } |
首先对返回值进行验证,如果epoll_wait返回的发生I/O事件的描述符个数小于0,如果有错误码EINTR发生,说明epoll_wait在执行期间被一个信号中断了,然后通过信号恢复执行,就会发生这个错误;如果没有错误码EINTR,说明发生了未知错误POLL_ERROR。
| // Check for poll timeout. if(eventCount==0){ #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - timeout",this); #endif result=POLL_TIMEOUT; gotoDone; } |
如果返回值为0,表明在timeoutMills时间内没有I/O事件发生,发生超时错误;如果返回值大于0,说明肯定有I/O事件发生了,于是执行到这个if循环:
| for(inti=0;i<eventCount;i++){ intfd=eventItems[i].data.fd; uint32_t epollEvents=eventItems[i].events; if(fd==mWakeReadPipeFd){ if(epollEvents&EPOLLIN){ awoken(); }else{ ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.",epollEvents); } } ...... |
取出发生I/O事件的管道文件描述符赋给fd,再取出I/O事件赋给epollEvents,如果管道读取端设置了EPOLLIN属性,表明管道中有数据可读了,换句话说,应用层消息队列中有消息可读。进程继续调用awoken函数:
| voidLooper::awoken(){ #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ awoken",this); #endif charbuffer[16]; ssize_t nRead; do{ nRead=read(mWakeReadPipeFd,buffer,sizeof(buffer)); }while((nRead==-1&&errno==EINTR)||nRead==sizeof(buffer)); } |
awoken函数中调用了read系统调用接口,该接口的作用就是从管道读取端描述符mWakeReadPipeFd中读取至多sizeof(buffer)大小的字节放到缓冲区buffer中。
注:管道通信原理:如果管道中没有消息的话,读取端进程(本文中就是主线程)就会处于阻塞状态,直到至少有一个字节从写入端写入到管道中为止;当管道中有数据时,读取端被唤醒,写入端开始阻塞。
管道涵义:管道是进程之间的一个单向数据流,一个进程写入管道的所有数据都由内核定向到另外一个进程,另外一个进程由此就可以从管道中读取数据。即两个进程把一个管道看作一个文件,一个进程往管道中写数据,另外一个进程从管道中读数据,每个管道都是半双工通信方式,同一时刻只能有一个方向传输。
关于管道参考:《深入理解linux内核中文第三版》第19章,《Linux内核源代码情景分析》第6章),《LINUX/UNIX系统编程手册》第44章。
awoken调用完成后,继续执行后面的语句:
| while(mMessageEnvelopes.size()!=0){ |
与
| for(size_ti=0;i<mResponses.size();i++){ |
都不成立,pollInner执行结束,一直返回到1.4节MessageQueue的nativePollOnce方法后面继续执行:
2.5 MessageQueue.java的next方法
| if(msg!=null&&msg.target==null){ // Stalled by a barrier. Find the next asynchronous message in the queue. do{ prevMsg=msg; msg=msg.next; }while(msg!=null&&!msg.isAsynchronous()); } |
这段话的意思是如果当前消息被阻塞了,就从该消息后面找到一个异步消息。这段话的作用与enqueueSyncBarrier、removeSyncBarrier有关,用来解决当消息发生同步阻塞时,依然能够处理后面有异步消息的情况。是否有异步消息取决于很多原因,具体在Message的setAsynchronous方法中设置,本文不考虑异步消息,不作重点分析。
| if(now<msg.when){ // Next message is not ready. Set a timeout to wake up when it is ready. nextPollTimeoutMillis=(int)Math.min(msg.when-now,Integer.MAX_VALUE); } |
如果当前时间还没有到消息的执行时间,就计算距离消息执行的时间间隔,保存到nextPollTimeoutMillis中。
| else{ // Got a message. mBlocked=false; if(prevMsg!=null){ prevMsg.next=msg.next; }else{ mMessages=msg.next; } msg.next=null; if(false)Log.v("MessageQueue","Returning message: "+msg); returnmsg; } |
先把mBlocked置为false,表明主线程正在执行中,不能阻塞,再从队列中取出一个消息返回,一直返回到1.3节Loop.java的loop方法的这句:
| Message msg=queue.next(); if(msg==null){ // No message indicates that the message queue is quitting. return; } |
从队列中取出一个消息赋值给msg,如果为空,表示队列中没有消息,直接返回退出循环。进程继续执行到:
| msg.target.dispatchMessage(msg); |
开始处理消息,这一步放到第3节分析
3. 消息处理
消息什么时候处理?由前文可知,发送到队列中的消息不一定立即处理,因为主线程可能正在处理其他消息,或者管道中还有很多未处理的消息,此时管道写入端正在等待状态。
从案例1得知,处理消息的方法是handleMessage,系统是如何找到handleMessage的?2.5节已经解释清楚。继续2.5节分析,进程执行到:
| msg.target.dispatchMessage(msg); |
msg.target就是发送该消息的Handler对象,在Handler的enqueueMessage中赋值的。
3.1 Handler.java的dispatchMessage方法
| publicvoiddispatchMessage(Message msg){ if(msg.callback!=null){ handleCallback(msg); }else{ if(mCallback!=null){ if(mCallback.handleMessage(msg)){ return; } } handleMessage(msg); } } |
| publicvoidhandleMessage(Message msg){ } |
msg.callback是否为空?在2.2节Message.obtain方法中,首次new时,初始化了Message的callback对象,为空,跳过if语句进入到else,mCallback在第1节创建Handler时也为空,进程执行到了handleMessage,handleMessage由开发者重写,直接调用重写的handleMessage;如果mCallback不为空,就调用mCallback对象的handleMessage处理。
Handler的优缺点
用到Handler,需要对比一下近似方法:
Hanlder机制包含:
a.
Activity.runOnUiThread(Runnable)
View.post(Runnable)
View.postDelayed(Runnable, long)
b. Handler类
c. AsyncTask
这三种方法实际上都是基于Handler类演变而来,只是表现形式不一样,比如AsyncTask是对Handler和Thread的一个封装。三种方式区别:
a中三个方法代码较复杂,难以维护,结构不清晰,容易出错;
而AsyncTask在单个异步操作时较为简单,使用起来简单快捷,但在多个异步操作和UI进行交互时逻辑控制较困难,代码维护不易;
Handler类结构清晰,功能明确,多个异步执行和UI交互也容易控制,缺点是单个异步操作时相对AsyncTask代码较多。
因此,在单个异步操作时选择AsyncTask较好,在多个异步操作或者特殊操作时(比如定时器等)选择Handler较好。但这并不是标准推荐,具体视情形而定。
Handler.post(new Runnable())和sendmessage(msg)区别
(1) 都是把消息放到消息队列等待执行,前者放的是一个runnable对象,后者是一个message对象;
(2) 前者最终还是会转化成sendMessage,只不过最终的处理方式不一样,前者会执行runnable的run方法;后者可以被安排到线程中执行。
从loop调用开始看,loop从消息队列取出消息,然后调用handler的dispatchMessage方法处理:
| publicvoiddispatchMessage(Message msg){ if(msg.callback!=null){ handleCallback(msg); }else{ if(mCallback!=null){ if(mCallback.handleMessage(msg)){ return; } } handleMessage(msg); } } |
对于前者,msg.callback就是runnable对象,肯定不为空,后者则为空,进而进入到handleCallback:
| privatestaticvoidhandleCallback(Message message){ message.callback.run(); } |
调用runnable对象的run方法进行执行,此时还是在主线程中,因为整个过程并没有开辟一个新线程。
(3) 两者本质没有区别,都可以更新UI,区别在于是否易于维护等。
HandlerThread是什么
HandlerThread继承了Thread,是一个包含有looper的线程类。正常情况下,除了主线程,工作线程是没有looper的,但是为了像主线程那样也能循环处理消息,Android也自定义一个包含looper的工作线程——HandlerThread类。
HandlerThread的run方法:
| publicvoidrun(){ mTid=Process.myTid(); Looper.prepare(); synchronized(this){ mLooper=Looper.myLooper(); notifyAll(); } Process.setThreadPriority(mPriority); onLooperPrepared(); Looper.loop(); mTid=-1; } |
prepare方法创建了looper对象,创建looper对象的同时也创建了mesageQueue对象,然后像主线程一样,也包含了loop方法,循环取消息,并且分发给相应的handler对象处理。
问题
线程有没有Looper?
答:要分两种情况来回答:如果是Android应用程序主线程,默认有一个Looper对象;如果是工作线程,默认没有。如果想要有,在工作线程run方法中通过Looper.prepare()创建looper对象和MessageQueue对象,HandlerThread就是一个案例。
建议继续学习:
- 各消息队列软件产品大比拼 (阅读:5152)
- 浅析手机消息推送设计 (阅读:3234)
- Feed消息队列架构分析 (阅读:2614)
- storm入门教程 第四章 消息的可靠处理 (阅读:2057)
- Chaos网络库(三)- 主循环及异步消息的实现 (阅读:1453)
- ECS 中的消息发布订阅机制 (阅读:1035)
QQ技术交流群:445447336,欢迎加入!
扫一扫订阅我的微信号:IT技术博客大学习