Kotlin中引入Coroutine(协程)的概念,可以帮助编写异步代码。
在使用和分析协程前,首先要了解一下:
协程是什么?
为什么需要协程?
协程最为人称道的就是可以用看起来同步的方式写出异步的代码,极大提高了代码的可读性。在实际开发中最常见的异步操作莫过于网络请求。通常我们需要通过各种回调的方式去处理网络请求,很容易就陷入到地狱回调中。
WalletHttp.target(VCoinTradeSubmitResult.class).setTag(tag).setFullUrl(Constants.VCOIN_TRADE_SUBMIT_URL).setParams(params).callback(newHttpCallback
请求操作,根据支付返回数据再去查询支付结果,这种情况通过回调就可能演变为“地狱回调”。
//获取账号tokenWalletHttp.target(Account.class).setTag(tag).setFullUrl(Constants.ACCOUNT_URL).setParams(params).callback(newHttpCallback
funpostItem(tag:String,params:Map
如果想要将原先的网络回调请求也改写成这种同步模式呢,只需要对原先请求回调用协程提供的suspendCancellableCoroutine等方法进行封装处理,即可让早期的异步代码也享受上述“同步代码”的丝滑。
协程:
一种非抢占式或者协作式的计算机程序并发调度实现,程序可以主动挂起或者恢复执行,其核心点是函数或一段程序能够被挂起,稍后再在挂起的位置恢复,通过主动让出运行权来实现协作,程序自己处理挂起和恢复来实现程序执行流程的协作调度。
协程本质上是轻量级线程。
协程的特点有:
Kotlin协程实现层次:
基础设施层:标准库的协程API,主要对协程提供了概念和语义上最基本的支持;
业务框架层:协程的上层框架支持,基于标准库实现的封装,也是我们日常开发使用的协程扩展库。
具体在使用协程前,首先要配置对Kotlin协程的依赖。
(1)项目根目录build.gradle
buildscript{...ext.kotlin_coroutines='xxx'...}(2)Module下build.gradle
dependencies{...//协程标准库implementation"org.jetbrains.kotlin:kotlin-stdlib:$kotlin_coroutines"//依赖协程核心库,包含协程公共API部分implementation"org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlin_coroutines"//依赖android支持库,协程Android平台的具体实现方式implementation"org.jetbrains.kotlinx:kotlinx-coroutines-android:$kotlin_coroutines"...}2.1Thread启动在Java中,可以通过Thread开启并发操作:
newThread(newRunnable(){@Overridepublicvoidrun(){//...dowhatyouwant}}).start();在Kotlin中,使用线程更为便捷:
valmyThread=thread{//.......}这个Thread方法有个参数start默认为true,即创造出来的线程默认启动,你可以自定义启动时机:
valmyThread=thread(start=false){//......}myThread.start()2.2协程启动动协程需要三部分:上下文、启动模式、协程体。
启动方式一般有三种,其中最简单的启动协程的方式为:
GlobalScope.launch{//......}GlobalScope.launch()属于协程构建器Coroutinebuilders,Kotlin中还有其他几种Builders,负责创建协程:
runBlocking:T
使用runBlocking顶层函数创建,会创建一个新的协程同时阻塞当前线程,直到协程结束。适用于main函数和单元测试
launch
创建一个新的协程,不会阻塞当前线程,必须在协程作用域中才可以调用。它返回的是一个该协程任务的引用,即Job对象。这是最常用的启动协程的方式。
async
创建一个新的协程,不会阻塞当前线程,必须在协程作用域中才可以调用,并返回Deffer对象。可通过调用Deffer.await()方法等待该子协程执行完成并获取结果。常用于并发执行-同步等待和获取返回值的情况。
runBlocking是一个顶层函数,可以在任意地方独立使用。它能创建一个新的协程同时阻塞当前线程,直到其内部所有逻辑以及子协程所有逻辑全部执行完成。常用于main函数和测试中。
//main函数中应用funmain()=runBlocking{launch{//创建一个新协程,runBlocking会阻塞线程,但内部运行的协程是非阻塞的delay(1000L)println("World!")}println("Hello,")delay(2000L)//延时2s,保证JVM存活}//测试中应用classMyTest{@TestfuntestMySuspendingFunction()=runBlocking{//......}}2.2.2launchlaunch是最常用的用于启动协程的方式,会在不阻塞当前线程的情况下启动一个协程,并返回对该协程任务的引用,即Job对象。
协程需要运行在协程上下文环境中,在非协程环境中的launch有两种:GlobalScope与CoroutineScope。
在应用范围内启动一个新协程,不会阻塞调用线程,协程的生命周期与应用程序一致。
funlaunchTest(){print("start")GlobalScope.launch{delay(1000)//1秒无阻塞延迟print("GlobalScope.launch")}print("end")}/**打印结果startendGlobalScope.launch*/这种启动的协程存在组件被销毁但协程还存在的情况,一般不推荐。其中GlobalScope本身就是一个作用域,launch属于其子作用域。
启动一个新的协程而不阻塞当前线程,并返回对协程的引用作为一个Job。
funlaunchTest2(){print("start")valjob=CoroutineScope(Dispatchers.IO).launch{delay(1000)print("CoroutineScope.launch")}print("end")}协程上下文控制协程生命周期和线程调度,使得协程和该组件生命周期绑定,组件销毁时,协程一并销毁,从而实现安全可靠地协程调用。这是在应用中最推荐的协程使用方式。
关于launch,根据业务需求需要创建一个或多个协程,则可能就需要在一个协程中启动子协程。
funlaunchTest3(){print("start")GlobalScope.launch{delay(1000)print("CoroutineScope.launch")//在协程内创建子协程launch{delay(1500)print("launch子协程")}}print("end")}/****打印结果startendCoroutineScope.launchlaunch子协程*/2.2.3asyncasync类似于launch,都是创建一个不会阻塞当前线程的新的协程。区别在于:async的返回是Deferred对象,可通过Deffer.await()等待协程执行完成并获取结果,而launch不行。常用于并发执行-同步等待和获取返回值的情况。
注意:
在叙述协程启动内容,涉及到了Job、Deferred、启动模式、作用域等概念,这里补充介绍一下上述概念。
Job是协程的句柄,赋予协程可取消,赋予协程以生命周期,赋予协程以结构化并发的能力。
Job是launch构建协程返回的一个协程任务,完成时是没有返回值的。可以把Job看成协程对象本身,封装了协程中需要执行的代码逻辑,协程的操作方法都在Job身上。Job具有生命周期并且可以取消,它也是上下文元素,继承自CoroutineContext。
在日常Android开发过程中,协程配合Lifecycle可以做到自动取消。
Job生命周期
Job的生命周期分为6种状态,分为
通常外界会持有Job接口作为引用被协程调用者所持有。Job接口提供isActive、isCompleted、isCancelled3个变量使外界可以感知Job内部的状态。
valjob=launch(start=CoroutineStart.LAZY){println("Active")}println("New")job.join()println("Completed")/**打印结果**/NewActiveCompleted/***********1.以lazy方式创建出来的协程state为New*2.对应的job调用join函数后,协程进入Active状态,并开始执行协程对应的具体代码*3.当协程执行完毕后,由于没有需要等待的子协程,协程直接进入Completed状态*/关于Job,常用的方法有:
//活跃的,是否仍在执行publicvalisActive:Boolean//启动协程,如果启动了协程,则为true;如果协程已经启动或完成,则为falsepublicfunstart():Boolean//取消Job,可通过传入Exception说明具体原因publicfuncancel(cause:CancellationException=null)//挂起协程直到此Job完成publicsuspendfunjoin()//取消任务并等待任务完成,结合了[cancel]和[join]的调用publicsuspendfunJob.cancelAndJoin()//给Job设置一个完成通知,当Job执行完成的时候会同步执行这个函数publicfuninvokeOnCompletion(handler:CompletionHandler):DisposableHandleJob父子层级
Deferred继承自Job,具有与Job相同的状态机制。
它是async构建协程返回的一个协程任务,可通过调用await()方法等待协程执行完成并获取结果。其中Job没有结果值,Deffer有结果值。
publicinterfaceDeferred
确保所有的协程都会被追踪,Kotlin不允许在没有使用CoroutineScope的情况下启动新的协程。CoroutineScope可被看作是一个具有超能力的ExecutorService的轻量级版本。它能启动新的协程,同时这个协程还具备suspend和resume的优势。
每个协程生成器launch、async等都是CoroutineScope的扩展,并继承了它的coroutineContext,自动传播其所有元素和取消。
启动协程需要作用域,但是作用域又是在协程创建过程中产生的。
publicinterfaceCoroutineScope{/***此域的上下文。Context被作用域封装,用于在作用域上扩展的协程构建器的实现。*/publicvalcoroutineContext:CoroutineContext}官方提供的常用作用域:
顶层函数,可启动协程,但会阻塞当前线程
全局协程作用域。通过GlobalScope创建的协程不会有父协程,可以把它称为根协程。它启动的协程的生命周期只受整个应用程序的生命周期的限制,且不能取消,在运行时会消耗一些内存资源,这可能会导致内存泄露,不适用于业务开发。
创建一个独立的协程作用域,直到所有启动的协程都完成后才结束自身。
它是一个挂起函数,需要运行在协程内或挂起函数内。当这个作用域中的任何一个子协程失败时,这个作用域失败,所有其他的子协程都被取消。
与coroutineScope类似,不同的是子协程的异常不会影响父协程,也不会影响其他子协程。(作用域本身的失败(在block或取消中抛出异常)会导致作用域及其所有子协程失败,但不会取消父协程。)
为UI组件创建主作用域。一个顶层函数,上下文是SupervisorJob()+Dispatchers.Main,说明它是一个在主线程执行的协程作用域,通过cancel对协程进行取消。
LifecycleKtx库提供的具有生命周期感知的协程作用域,与Lifecycle绑定生命周期,生命周期被销毁时,此作用域将被取消。会与当前的UI组件绑定生命周期,界面销毁时该协程作用域将被取消,不会造成协程泄漏,推荐使用。
与lifecycleScope类似,与ViewModel绑定生命周期,当ViewModel被清除时,这个作用域将被取消。推荐使用。
前述进行协程创建启动时涉及到了启动模式CoroutineStart,其是一个枚举类,为协程构建器定义启动选项。在协程构建的start参数中使用。
DEFAULT模式
DEFAULT是饿汉式启动,launch调用后,会立即进入待调度状态,一旦调度器OK就可以开始执行。
suspendfunmain(){log(1)valjob=GlobalScope.launch{log(2)}log(3)Thread.sleep(5000)//防止程序退出}funlog(o:Any){println("[${Thread.currentThread().name}]:$o")}前述示例代码采用默认的启动模式和默认的调度器,,运行结果取决于当前线程与后台线程的调度顺序。
/**可能的运行结果一****/[main]:1[main]:3[main]:2/**可能的运行结果二****/[main]:1[main]:2[main]:3LAZY模式
LAZY是懒汉式启动,launch后并不会有任何调度行为,协程体不会进入执行状态,直到我们需要他的运行结果时进行执行,其launch调用后会返回一个Job实例。
对于这种情况,可以:
suspendfunmain(){log(1)valjob=GlobalScope.launch(start=CoroutineStart.LAZY){log(2)}log(3)job.join()log(4)}funlog(o:Any){println("[${Thread.currentThread().name}]:$o")}对于join,一定要等待协程执行完毕,所以其运行结果一定为:
[main]:1[main]:3[DefaultDispatcher-worker-1]:2[main]:4如果把join()换为start(),则输出结果不一定。
ATOMIC模式
ATOMIC只有涉及cancel的时候才有意义。调用cancel的时机不同,结果也有差异。
suspendfunmain(){log(1)valjob=GlobalScope.launch(start=CoroutineStart.ATOMIC){log(2)}job.cancel()log(3)Thread.sleep(2000)}funlog(o:Any){println("[${Thread.currentThread().name}]:$o")}前述代码示例创建协程后立即cancel,由于是ATOMIC模式,因此协程一定会被调度,则log1、2、3一定都会被打印输出。如果将模式改为DEFAULT模式,则log2有可能打印输出,也可能不会。
其实cancel调用一定会将该job的状态置为cancelling,只不过ATOMIC模式的协程在启动时无视了这一状态。
suspendfunmain(){log(1)valjob=GlobalScope.launch(start=CoroutineStart.ATOMIC){log(2)delay(1000)log(3)}job.cancel()log(4)job.join()Thread.sleep(2000)}funlog(o:Any){println("[${Thread.currentThread().name}]:$o")}/**打印输出结果可能如下****/[main]:1[DefaultDispatcher-worker-1]:2[main]:4前述代码中,2和3中加了一个delay,delay会使得协程体的执行被挂起,1000ms之后再次调度后面的部分。对于ATOMIC模式,它一定会被启动,实际上在遇到第一个挂起点之前,它的执行是不会停止的,而delay是一个suspend函数,这时我们的协程迎来了自己的第一个挂起点,恰好delay是支持cancel的,因此后面的3将不会被打印。
UNDISPATCHED模式
协程在这种模式下会直接开始在当前线程下执行,直到第一个挂起点。
与ATOMIC的不同之处在于UNDISPATCHED不经过任何调度器即开始执行协程体。遇到挂起点之后的执行就取决于挂起点本身的逻辑以及上下文当中的调度器了。
suspendfunmain(){log(1)valjob=GlobalScope.launch(start=CoroutineStart.UNDISPATCHED){log(2)delay(100)log(3)}log(4)job.join()log(5)Thread.sleep(2000)}funlog(o:Any){println("[${Thread.currentThread().name}]:$o")}协程启动后会立即在当前线程执行,因此1、2会连续在同一线程中执行,delay是挂起点,因此3会等100ms后再次调度,这时候4执行,join要求等待协程执行完,因此等3输出后再执行5。
结果如下:
[main]:1[main]:2[main]:4[DefaultDispatcher-worker-1]:3[DefaultDispatcher-worker-1]:53.5withContextwithContext{}不会创建新的协程。在指定协程上运行挂起代码块,放在该块内的任何代码都始终通过IO调度器执行,并挂起该协程直至代码块运行完成。
withContext会使用新指定的上下文的dispatcher,将block的执行转移到指定的线程中。
它会返回结果,可以和当前协程的父协程存在交互关系,主要作用为了来回切换调度器。
在协程启动部分提到,启动协程需要三个部分,其中一个部分就是上下文,其接口类型是CoroutineContext,通常所见的上下文类型是CombinedContext或者EmptyCoroutineContext,一个表示上下文组合,另一个表示空。
协程上下文是Kotlin协程的基本结构单元,主要承载着资源获取,配置管理等工作,是执行环境的通用数据资源的统一管理者。除此之外,也包括携带参数,拦截协程执行等,是实现正确的线程行为、生命周期、异常以及调试的关键。
协程使用以下几种元素集定义协程行为,他们均继承自CoroutineContext:
这里回顾一下launch和async两个函数签名。
publicfunCoroutineScope.launch(context:CoroutineContext=EmptyCoroutineContext,start:CoroutineStart=CoroutineStart.DEFAULT,block:suspendCoroutineScope.()->Unit):Jobpublicfun
所有协程构建函数都是以CoroutineScope的扩展函数的形式被定义的,而CoroutineScope的接口唯一成员就是CoroutineContext类型。
publicinterfaceCoroutineScope{publicvalcoroutineContext:CoroutineContext}简而言之,协程上下文是协程必备组成部分,管理了协程的线程绑定、生命周期、异常处理和调试。
看一下CoroutineContext的接口方法:
publicinterfaceCoroutineContext{//操作符[]重载,可以通过CoroutineContext[Key]这种形式来获取与Key关联的Elementpublicoperatorfun
可知,Element本身也实现了CoroutineContext接口。
这里我们再看一下官方解释:
/**
从官方解释可知,CoroutineContext是一个Element的集合,这种集合被称为indexedset,介于set和map之间的一种结构。set意味着其中的元素有唯一性,map意味着每个元素都对应一个键。
如果将协程上下文内部的一系列上下文称为子上下文,上下文为每个子上下文分配了一个Key,它是一个带有类型信息的接口。
这个接口通常被实现为companionobject。
在一个类似map的结构中,每个键必须是唯一的,因为对相同的键put两次值,新值会代替旧值。通过上述方式,通过键的唯一性保证了上下文中的所有子上下文实例都是唯一的。
我们按照这个格式仿写一下然后反编译。
classMyElement:AbstractCoroutineContextElement(MyElement){companionobjectKey:CoroutineContext.Key
Key是静态变量,全局唯一,为Element提供唯一性保障。
前述内容总结如下:
CoroutineContext.get()获取元素
关于CoroutineContext,我们先看一下其是如何取元素的。
这里看一下Element、CombinedContext、EmptyCoroutineContext的内部实现,其中CombinedContext就是CoroutineContext集合结构的实现,EmptyCoroutineContext就表示一个空的CoroutineContext,它里面是空实现。
@SinceKotlin("1.3")internalclassCombinedContext(//左上下文privatevalleft:CoroutineContext,//右元素privatevalelement:Element):CoroutineContext,Serializable{overridefun
//使用示例println(coroutineContext[CoroutineName])println(Dispatchers.Main[CoroutineName])CoroutineContext.minusKey()删除元素
同理看一下Element、CombinedContext、EmptyCoroutineContext的内部实现。
其结构类似链表,left就是指向下一个结点的指针,get、minusKey操作大体逻辑都是先访问当前element,不满足,再访问left的element,顺序都是从right到left。
CoroutineContext.fold()元素遍历
internalclassCombinedContext(//左上下文privatevalleft:CoroutineContext,//右元素privatevalelement:Element):CoroutineContext,Serializable{//先对left做fold操作,把left做完fold操作的的返回结果和element做operation操作publicoverridefun
CoroutineContext.plus()添加元素
关于CoroutineContext的元素添加方法,直接看其plus()实现,也是唯一没有被重写的方法。
这个覆盖操作就在fold方法的参数operation代码块中完成,通过minusKey方法删除掉重复元素。
plus方法中可以看到里面有个对ContinuationInterceptor的处理,目的是让ContinuationInterceptor在每次相加后都能变成CoroutineContext中的最后一个元素。
ContinuationInterceptor继承自Element,称为协程上下文拦截器,作用是在协程执行前拦截它,从而在协程执行前做出一些其他的操作。通过把ContinuationInterceptor放在最后面,协程在查找上下文的element时,总能最快找到拦截器,避免了递归查找,从而让拦截行为前置执行。
publicdataclassCoroutineName(valname:String):AbstractCoroutineContextElement(CoroutineName){CoroutineName是用户用来指定的协程名称的,用于方便调试和定位问题。
GlobalScope.launch(CoroutineName("GlobalScope")){launch(CoroutineName("CoroutineA")){//指定协程名称valcoroutineName=coroutineContext[CoroutineName]//获取协程名称print(coroutineName)}}/**打印结果CoroutineName(CoroutineA)*/协程内部可以通过coroutineContext这个全局属性直接获取当前协程的上下文。
如果要传递多个上下文元素,CoroutineContext可以使用"+"运算符进行合并。由于CoroutineContext是由一组元素组成的,所以加号右侧的元素会覆盖加号左侧的元素,进而组成新创建的CoroutineContext。
CoroutineScope实际上是一个CoroutineContext的封装,当我们需要启动一个协程时,会在CoroutineScope的实例上调用构建函数,如async和launch。
在构建函数中,一共出现了3个CoroutineContext。
查看协程构建函数async和launch的源码,其第一行都是如下代码:
valnewContext=newCoroutineContext(context)进一步查看:
@ExperimentalCoroutinesApipublicactualfunCoroutineScope.newCoroutineContext(context:CoroutineContext):CoroutineContext{valcombined=coroutineContext+context//CoroutineContext拼接组合valdebug=if(DEBUG)combined+CoroutineId(COROUTINE_ID.incrementAndGet())elsecombinedreturnif(combined!==Dispatchers.Default&&combined[ContinuationInterceptor]==null)debug+Dispatchers.Defaultelsedebug}构建器内部进行了一个CoroutineContext拼接操作,plus左值是CoroutineScope内部的CoroutineContext,右值是作为构建函数参数的CoroutineContext。
抽象类AbstractCoroutineScope实现了CoroutineScope和Job接口。大部分CoroutineScope的实现都继承自AbstractCoroutineScope,意味着他们同时也是一个Job。
publicabstractclassAbstractCoroutine
全限定Context
launch(Dispatchers.Main+Job()+CoroutineName("HelloCoroutine")+CoroutineExceptionHandler{_,_->/*...*/}){/*...*/}全限定Context,即全部显式指定具体值的Elements。不论你用哪一个CoroutineScope构建该协程,它都具有一致的表现,不会受到CoroutineScope任何影响。
CoroutineScopeContext
基于Activity生命周期实现一个CoroutineScope
abstractclassScopedAppActivity:AppCompatActivity(),CoroutineScope{protectedlateinitvarjob:JoboverridevalcoroutineContext:CoroutineContextget()=job+Dispatchers.Main//注意这里使用+拼接CoroutineContextoverridefunonCreate(savedInstanceState:Bundle){super.onCreate(savedInstanceState)job=Job()}overridefunonDestroy(){super.onDestroy()job.cancel()}}Dispatcher:使用Dispatcher.Main,以在UI线程进行绘制
Job:在onCreate时构建,在onDestroy时销毁,所有基于该CoroutineContext创建的协程,都会在Activity销毁时取消,从而避免Activity泄露的问题
临时指定参数
withContext(NonCancellable){/*...*/}读取协程上下文参数
通过顶级挂起只读属性coroutineContext获取协程上下文参数,它位于kotlin-stdlib/kotlin.coroutines/coroutineContext
println("Runningin${coroutineContext[CoroutineName]}")NestedContext内嵌上下文
内嵌上下文切换:在协程A内部构建协程B时,B会自动继承A的Dispatcher。
可以在调用async时加入Dispatcher参数,切换到工作线程
//错误的做法,在主线程中直接调用async,若耗时过长则阻塞UIGlobalScope.launch(Dispatchers.Main){valdeferred=async{/*...*/}/*...*/}//正确的做法,在工作线程执行协程任务GlobalScope.launch(Dispatchers.Main){valdeferred=async(Dispatchers.Default){/*...*/}/*...*/}4.2协程拦截器@SinceKotlin("1.3")publicinterfaceContinuationInterceptor:CoroutineContext.Element{companionobjectKey:CoroutineContext.Key
我们通过Dispatchers来指定协程发生的线程,Dispatchers实现了ContinuationInterceptor接口。
这里我们自定义一个拦截器放到协程上下文,看一下会发生什么。
原因一:CombinedContext的结构决定。
其有两个元素,left是一个前驱集合,element为一个纯粹CoroutineContext,它的get方法每次都是从element开始进行查找对应Key的CoroutineContext对象;没有匹配到才会去left集合中进行递归查找。为了加快查找ContinuationInterceptor类型的实例,才将它加入到拼接链的尾部,对应的就是element。
原因二:ContinuationInterceptor使用很频繁
每次创建协程都会去尝试查找当前协程的CoroutineContext中是否存在ContinuationInterceptor。这里我们用launch来验证一下
publicfunCoroutineScope.launch(context:CoroutineContext=EmptyCoroutineContext,start:CoroutineStart=CoroutineStart.DEFAULT,block:suspendCoroutineScope.()->Unit):Job{valnewContext=newCoroutineContext(context)valcoroutine=if(start.isLazy)LazyStandaloneCoroutine(newContext,block)elseStandaloneCoroutine(newContext,active=true)coroutine.start(start,coroutine,block)returncoroutine}如果使用的launch使用的是默认参数,此时Coroutine就是StandaloneCoroutine,然后调用start方法启动协程。
start(block,receiver,this)}publicoperatorfun
publicfun
publicactualfun
publicfunintercepted():Continuation
下面再看一个ContinuationInterceptor的典型示例
publicabstractclassCoroutineDispatcher:AbstractCoroutineContextElement(ContinuationInterceptor),ContinuationInterceptor{//将可运行块的执行分派到给定上下文中的另一个线程上publicabstractfundispatch(context:CoroutineContext,block:Runnable)//返回一个continuation,它封装了提供的[continuation],拦截了所有的恢复publicfinaloverridefun
调度的本质就是解决挂起点恢复之后的协程逻辑在哪里运行的问题。调度器也属于协程上下文一类,它继承自拦截器。
IO仅在Jvm上有定义,它基于Default调度器背后的线程池,并实现了独立的队列和限制,因此协程调度器从Default切换到IO并不会触发线程切换
关于调度器介绍到这里,还没有详细解释前述协程拦截器中的withContext为什么拦截失败。这里针对这个详细看一下源码实现。
publicactualvalMain:MainCoroutineDispatcherget()=MainDispatcherLoader.dispatcher其返回类型为MainCoroutineDispatcher,继承自CoroutineDispatcher。
publicabstractclassMainCoroutineDispatcher:CoroutineDispatcher()publicabstractclassCoroutineDispatcher:AbstractCoroutineContextElement(ContinuationInterceptor),ContinuationInterceptor{publicopenfunisDispatchNeeded(context:CoroutineContext):Boolean=truepublicabstractfundispatch(context:CoroutineContext,block:Runnable)publicopenfundispatchYield(context:CoroutineContext,block:Runnable):Unit=dispatch(context,block)publicfinaloverridefun
CoroutineDispatch中的isDispatchNeeded就是判断是否需要分发,然后dispatch就是执行分发。
ContinuationInterceptor重要的方法就是interceptContinuation,在CoroutineDispatcher中直接返回了DispatchedContinuation对象,它是一个Continuation类型,看一下其resumeWith实现。
overridefunresumeWith(result:Result
如果我们拿的是Dispatchers.Main,其dispatcher为HandlerContext。
internalclassHandlerContextprivateconstructor(privatevalhandler:Handler,privatevalname:String,privatevalinvokeImmediately:Boolean):HandlerDispatcher(),Delay{overridefunisDispatchNeeded(context:CoroutineContext):Boolean{return!invokeImmediately||Looper.myLooper()!=handler.looper}overridefundispatch(context:CoroutineContext,block:Runnable){if(!handler.post(block)){cancelOnRejection(context,block)}}......其中HandlerContext继承于HandlerDispatcher,而HandlerDispatcher继承于MainCoroutineDispatcher
Dispatcher的基本实现原理大致为:
这里再简单看一下WithContext,我们都知道其不仅可以接受CoroutineDispatcher来帮助我们切换线程,同时在执行完毕之后还会帮助我们将之前切换掉的线程进恢复,保证协程运行的连贯性。那这是怎么实现的呢?
withContext的线程恢复原理是它内部生成了一个DispatchedCoroutine,保存切换线程时的CoroutineContext与切换之前的Continuation,最后在onCompletionInternal进行恢复。我们简单翻一翻其源码实现。
getUserBtn.setOnClickListener{getUser{user->handler.post{userNameView.text=user.name}}}typealiasCallback=(User)->UnitfungetUser(callback:Callback){...}由于getUser函数需要切到其他线程执行,因此回调通常也会在这个非UI的线程中调用,所以为了确保UI正确被刷新,我们需要用handler.post切换到UI线程。
如果要用协程实现呢?
suspendfungetUserCoroutine()=suspendCoroutine
调度器的目的就是切线程,我们只要提供线程,调度器就应该很方便的创建出来。
suspendfunmain(){valmyDispatcher=Executors.newSingleThreadExecutor{r->Thread(r,"MyThread")}.asCoroutineDispatcher()GlobalScope.launch(myDispatcher){log(1)}.join()log(2)}由于这个线程池是我们自己创建的,因此我们需要在合适的时候关闭它。
除了上述的方法,kotlin协程还给出了更简单的api,如下:
GlobalScope.launch(newSingleThreadContext("Dispather")){//......}.join()前述我们是通过线程的方式,同理可以通过线程池转为调度器实现。
Executors.newScheduledThreadPool(10).asCoroutineDispatcher().use{dispatcher->GlobalScope.launch(dispatcher){//......}.join五、协程挂起在前述协程时,经常会出现suspend关键字和挂起的说法,其含义和用法是什么?一起深入看一下。
挂起函数只能在协程或另一个挂起函数中被调用,如果你在非协程中使用到了挂起函数,会报错。
阻塞:
函数A必须在函数B之前完成执行,线程被锁定以便函数A能够完成其执行
挂起:
函数A虽然已经启动,但可以暂停,让函数B执行,然后只在稍后恢复。线程没有被函数A锁定。
“挂起”是指协程从它当前线程脱离,切换到另一个线程运行。当线程运行到suspend函数时,会暂时挂起这个函数及后续代码的执行。简而言之,挂起函数是一个可以启动、暂停和恢复的函数。
协程运行的时候每遇到被suspend修饰的方法时,都可能会挂起当前协程,不是必会挂起,例如如下方法就不会被挂起。
privatesuspendfuna(){println("aa")}这是因为这种方法不会返回COROUTINE_SUSPENDED类型,这在后面详细解释。
Kotlin使用堆栈帧来管理要运行哪个函数以及所有局部变量。
【suspend】:挂起或暂停,用于挂起执行当前协程,并保存所有局部变量
【resume】:恢复,用于让已挂起的协程从挂起处恢复继续执行
挂起(暂停)协程时,会复制并保存当前的堆栈帧以供稍后使用,将信息保存到Continuation对象中。
恢复协程时,会将堆栈帧从其保存位置复制回来,对应的Continuation通过调用resumeWith函数才会恢复协程的执行,然后函数再次开始运行。同时返回Result类型的成功或者异常的结果。
publicinterfaceContinuation
这个函数只能在协程或者挂起函数中执行,说明Continuation很有可能是从协程中传入来的,查看协程构建源码。
publicfunCoroutineScope.launch(context:CoroutineContext=EmptyCoroutineContext,start:CoroutineStart=CoroutineStart.DEFAULT,block:suspendCoroutineScope.()->Unit):Job{valnewContext=newCoroutineContext(context)valcoroutine=if(start.isLazy)LazyStandaloneCoroutine(newContext,block)elseStandaloneCoroutine(newContext,active=true)coroutine.start(start,coroutine,block)returncoroutine}通过launch启动一个协程时,其通过coroutine的start方法启动协程:
publicfun
//CoroutineStart的invoke方法出现了Continuationpublicoperatorfun
publicfun
协程实际挂起是如何实现的?
这里首先通过一个示例来演示一下状态机。
suspendfunmain(){log(1)log(returnSuspended())log(2)delay(1000)log(3)log(returnImmediately())log(4)}suspendfunreturnSuspended()=suspendCoroutineUninterceptedOrReturn
[main]:1[Thread-2]:Returnsuspended.[Thread-2]:2[kotlinx.coroutines.DefaultExecutor]:3[kotlinx.coroutines.DefaultExecutor]:5[kotlinx.coroutines.DefaultExecutor]:Returnimmediately.[kotlinx.coroutines.DefaultExecutor]:4前述代码的实际实现情况大致如下:
publicstaticvoidmain(String...args)throwsThrowable{RunSuspendrunSuspend=newRunSuspend();ContinuationImpltable=newContinuationImpl(runSuspend);table.resumeWith(Unit.INSTANCE);runSuspend.await();}作为completion传入的RunSuspend实例的resumeWith实际上是在ContinuationImpl的resumeWtih的最后才会被调用,因此它的await()一旦进入阻塞态,直到ContinuationImpl的整体状态流转完毕才会停止阻塞,此时进程也就运行完毕正常退出了。
这段代码的运行结果为:
首先创建一个协程并启动,最常见的莫过于CoroutineScope.launch{},其源码实现为:
publicfunCoroutineScope.launch(context:CoroutineContext=EmptyCoroutineContext,start:CoroutineStart=CoroutineStart.DEFAULT,block:suspendCoroutineScope.()->Unit):Job{valnewContext=newCoroutineContext(context)valcoroutine=if(start.isLazy)LazyStandaloneCoroutine(newContext,block)elseStandaloneCoroutine(newContext,active=true)coroutine.start(start,coroutine,block)returncoroutine}我们如果不指定start参数,所以它会使用默认的CoroutineStart.DEFAULT,最终coroutine会得到一个StandaloneCoroutine。其实现自AbstractCoroutine,实现了Continuation。
前述分析suspend本质时已知,其最终会调用到createCoroutineUnintercepted,主要是创建了一个新的可挂起计算,通过调用resume(Unit)启动协程,返回值为Continuation,Continuation提供了resumeWith恢复协程的接口,用以实现协程恢复,Continuation封装了协程的代码运行逻辑和恢复接口。
将协程代码进行反编译,再看一下其字节码和java实现,例如
suspendfuntest(){CoroutineScope(Dispatchers.IO).launch{delay(11)}}查看其字节码实现时,可知其编译生成内部类。
协程的计算逻辑封装在invokeSuspend方法中,而SuspendLambda的继承关系为,
SuspendLambda->ContinuationImpl->BaseContinuationImpl->Continuation
其中BaseContinuationImpl部分关键源码如下:
协程的挂起通过suspend挂起函数实现,协程的恢复通过Continuation.resumeWith实现。
协程的线程调度是通过拦截器实现的,前面提到了协程启动调用到了startCoroutineCancellable,关于协程调度在前述的协程调度器部分已详细介绍了,这里再简单过一下。
@InternalCoroutinesApipublicfun
@SinceKotlin("1.3")publicactualfun
DispatchedContinuation拦截了协程的启动和恢复,分别是resumeCancellableWith和重写的resumeWith(Result)。
internalfun
publicfun
funmain(args:Array
所以协程的挂起在代码层面来说就是跳出协程执行的方法体,或者说跳出协程当前状态机下的对应状态,然后等待下一个状态来临时在进行执行。
关于协程挂起有三点注意事项:
看完invokeSuspend,我们再次回到startCoroutineCancellable函数中,其调用的createCoroutineUnintercepted方法中创建的SuspendLambda实例是BaseContinuationImpl的子类对象,其completion参数为下:
launch:if(isLazy)LazyStandaloneCoroutineelseStandaloneCoroutine
async:if(isLazy)LazyDeferredCoroutineelseDeferredCoroutine
上面这几个类都是AbstractCoroutine的子类。而根据completion的类型会执行不同的逻辑:
BaseContinuationImpl:执行协程逻辑
其它:调用resumeWith方法,处理协程的状态,协程挂起后的恢复即与它有关
前述的示例中async启动的协程,也会调用其invokeSuspend方法执行async协程,假设async返回的结果已经可用时,即非COROUTINE_SUSPENDED值,此时completion是DeferredCoroutine对象,因此就会调用DeferredCoroutine.resumeWith方法,然后返回,父协程的恢复逻辑便是在这里。
publicfinaloverridefunresumeWith(result:Result
privateclassResumeAwaitOnCompletion
launch协程恢复的过程,从async协程的SuspendLambda的子类的completion.resumeWith(outcome)->AbstractCoroutine.resumeWith(result)..->JobSupport.tryFinalizeSimpleState()->JobSupport.completeStateFinalization()->state.list.notifyCompletion(cause)->node.invoke,最后handler节点里面通过调用resume(result)恢复协程。
await()挂起函数恢复协程的原理:
通过前述的一系列分析可知,协程有三层封装:
协程其实就是一段可以挂起和恢复执行的运算逻辑,而协程的挂起通过挂起函数实现,挂起函数用状态机的方式用挂起点将协程的运算逻辑拆分成不同的片段,每次运行协程执行不同的逻辑片段。
所以协程有两个很大的好处:
本文通过为什么使用协程,协程如何创建启动,协程的调度原理和协程的挂起原理几个方面对协程进行了初步剖析,下面一起回顾一下全文重点内容,对全文内容进行一个总结