一丶我们在哪里会使用到FutureTask 基本上工作中和Future接口打交道比较多,比如线程池ThreadPoolExecutorsumbit方法,返回值就是一个Future(实际上基本上就是一个FutureTask)。ThreadPoolExecutorsumbit需要传入一个Callable,我们作为调用方法,在其中的call方法编写业务逻辑,然后ThreadPoolExecutor会将其包装为一个FutureTask提交到线程池中(异步),并立马返回FutureTask,从而让调用方可以取消任务,查看任务是否运行结束,获取异步任务结果FutureTask就如同一个纽带,连接了任务和任务的结果二丶何为FutureTask FutureTask源码注释中写到: FutureTask是可取消的异步任务。此类提供Future的基本实现,包括启动,取消、查询以查看任务是否完成以及获取任务结果的方法。只有在任务完成后才能检索结果,如果尚未完成,get方法将阻塞。任务完成后,无法重新启动或取消任务(除非使用runAndReset调用任务)。 FutureTask可用于包装Callable或Runnable对象。因为FutureTask实现了Runnable,所以FutureTask可以提交给Executor执行。三丶FutureTask继承关系 1。Future Future表示异步任务的运行结果,它定义了取消、查询以查看任务是否完成以及获取任务结果的方法。只有在任务完成后才能检索结果,如果尚未完成,get方法将阻塞。任务完成后,无法重新启动或取消任务(除非使用runAndReset调用任务) 方法 解释 booleancancel(booleanmayInterruptIfRunning) 尝试取消此任务的执行。如果任务已完成、已被取消或由于某些其他原因无法取消,则此尝试将失败。如果成功且在调用取消时此任务尚未启动,则任务不会运行。如果任务已经开始,则mayInterruptIfRunning参数确定是否应该中断执行该任务的线程以尝试停止该任务。此方法返回后,对isDone的后续调用将始终返回true。如果此方法返回true,则对isCancelled的后续调用将始终返回true。 booleanisCancelled() 如果此任务在正常完成之前被取消,则返回true booleanisDone() 如果此任务完成,则返回true。完成可能是由于正常终止、异常或取消在这些情况下,此方法都将返回true。 Vget()throwsInterruptedException,ExecutionException 如果任务没有执行完,那么一直阻塞直到任务完成,直到任务运行成功,或者运行失败,或者运行任务线程被中断 Vget(longtimeout,TimeUnitunit)throwsInterruptedException,ExecutionException,TimeoutException get()的超时等待版本,指定当前线程等待时间,如果超时任然没有执行结束,那么抛出TimeoutException2。RunnableFuture 可以运行的未来,即表示一个可以运行的任务(是一个Runnable),也表示异步任务的结果(是一个Future)四丶带着问题阅读源码 小小FutureTask牛在哪儿昵,为什么是douglea写的,我不行?get方法调用的时候,如果任务没有成功,怎么实现调用线程的阻塞get方法出现异常,或者正常结束的时候,怎么唤醒调用get方法的线程在任务运行之前,如果任务被取消那么将无法运行,已经开始运行的任务只能尝试中断运行任务的线程,无法取消,那么douglea怎么判断运行和取消的先后顺序(注意运行和取消可以是多个线程调用)怎么处理这里的线程不安全问题五丶FutureTask的属性和内部类1。状态 为多个线程对FutureTask运行状态的可见性,FutureTask具备属性privatevolatileintstate,使用volatile修饰,可取以下值(Future定义了对应的常量): 常量 解释 NEW 任务处于新建状态 COMPLETING 任务正在完成,意味着异步计算结束,但是结果没有写回到outcome属性上 NORMAL 任务正常结束,说明程序员定义的业务逻辑,没有抛出异常 EXCEPTIONAL 任务运行失败,说明程序员定义的业务逻辑抛出异常 CANCELLED 任务被取消,说明调用方法调用了cancel方法,在任务运行之前取消了任务 INTERRUPTING 任务正在被中断,是一个瞬态,使用cancel(true),调用方正在中断运行任务的线程 INTERRUPTED 任务已经被中断1。1什么是COMPLETING,正在完成是什么鬼 FutureTask使用outcome属性记录任务运行结果,或者运行失败的抛出的异常,在我们定义的业务逻辑(ThreadPoolExecutorsumbit传入的Callable)成功运行结束,到运行的结果写回到outcome,并不是一个瞬间动作,在运行结果赋值到outcome的时候,会先将FutureTask状态修改为COMPLETING 这样做的目的是,任务处于COMPLETING,这是一个线程调用get获取任务的时候,不会让调用线程阻塞而是让这个线程yield放弃cpu稍等片刻,任务结果马上写回了。1。2什么是INTERRUPTING,正在中断是什么鬼 考虑一个情况,线程A正在执行任务,线程BCD都调用了get,线程E调用了FutureTaskcancel(true),这时候需要中断线程A,并且我们在这个FutureTask的业务逻辑中定义了,如果被中断,那么任务结束,抛出异常等逻辑 这时候FutureTaskrun会抛出一个异常(什么异常取决于FutureTask的业务逻辑抛出什么),那么线程BCD调用get造成的阻塞如何处理,需要去唤醒BCD,在修改状态到中断其实是存在短暂的时间的,在这短暂的时间内线程A会yield,放弃CPU,等待线程E中断方法调用结束,然后线程E会唤醒BCD。2。任务,运行任务的线程,任务执行结果 任务被包装为Callable对象,任务结果使用outcome属性记录,运行任务的线程使用runner属性记录。 需要注意的是这里的outcome,没有使用volatile,后面的注释写到nonvolatile,protectedbystatereadswrites(不使用volatile修饰,可见性由state属性的写入和读取保证),为什么不需要volatile关键字修饰,我们看源码的时候解释3。WaitNode等待任务运行结束的线程 我们上面说到,调用get方法的时候,如果任务没有结束,那么调用线程,将阻塞到任务结束。这意味着任务结束的时候,调用线程将被唤醒,那么哪些线程需要唤醒?FutureTask使用WaitNode类型的属性记录 通过WaitNode属性记录调用线程,并且使用next属性串联起其他等待的线程 看完这些属性,我们来拜读douglea大师的代码六丶源码解读1。构造方法 可以看到如果传入Runnable,那么将被使用RunnableAdapter包装成Callable,典型的适配器模式,通过RunnableAdapter适配Runnable为Callable 需要注意callable没有使用volatile修饰,douglea不担心重排序的问题么 如果执行FutureTask的构造方法的时候,发生重排序,this。callable的赋值重排序到外部获取到构造方法生成的FutureTask的后面,并且立马有另外一个线程调用了FutureTask的任务执行方法,这时候this。callable还来不及赋值,调用执行方法抛出空指针异常。那么为什么不用volatile修饰callable还能保证其可见性昵,能让源码写上ensurevisibilityofcallable这行注释昵? 在《JUC源码学习笔记4原子类,CAS,Volatile内存屏障,缓存伪共享与UnSafe相关方法》的学习笔记中,我们说过volatile变量写具备如下内存屏障 这里的storestore屏障防止了this。callable的赋值重排序到this。stateNEW之后,且后续的store屏障会保证当前线程(构造FutureTask的线程)工作内存会立马写回到主内存,并让其他线程关于此FutureTask的缓存无效,从而保证了callable的线程可见性。2。run 我们从run方法看起,run方法中的许多逻辑,牵扯到其他的方法,可能需要总览全局才能彻底理解publicvoidrun(){如果不是初始,说明有其他线程启动了,或者说其他线程取消了任务那么不需要运行如果是new但是casrunner失败了,说明同时有多个线程执行此cas,当前线程没有抢过,那么不能执行此任务,if(state!NEW!UNSAFE。compareAndSwapObject(this,runnerOffset,null,Thread。currentThread()))try{CallableV再次校验下是否为初始状态如果不是说明在当前线从第一个if到此存在其他线程取消任务任务启动之前可以取消任务的运行if(c!nullstateNEW){V记录当前任务是否成功执行,如果Callable代码写错了,或者说Callable响应中断,执行的途中被中断那么为try{业务逻辑执行resultc。call();成功执行}catch(Throwableex){这里可能是Callable本身代码逻辑错误异常也可能是响应中断抛出异常记录异常setException(ex);}if(ran)设置任务正常执行结果set(result);}}finally{处理中断if(sINTERRUPTING)handlePossibleCancellationInterrupt(s);}}if(state!NEW!UNSAFE。compareAndSwapObject(this,runnerOffset,null,Thread。currentThread()))任务不是new,或者cas设置当前线程为runner失败,那么直接返回false如果任务当前状态不是new,说明有其他线程运行了,或者说其他线程取消了任务如果是new但是casrunner失败了,说明同时有多个线程执行此cas,当前线程没有抢过,那么不能执行此任务,这里使用CAS确保任务不会被其他线程再次执行if(c!nullstateNEW)为什么上面state!NEW都false了还要再次判断这里stateNEW因为state!NEW和UNSAFE。compareAndSwapObject(this,runnerOffset,null,Thread。currentThread()))并非是一个原子性操作,这里是为了确保前线程从第一个if到第二个if,这一段代码执行时间内,没有其他线程取消任务。如果存在其他线程取消了任务,那么stateNEW就不成立任务执行前可以取消任务2。1记录任务执行结果setException和set方法如果运行出现异常protectedvoidsetException(Throwablet){if(UNSAFE。compareAndSwapInt(this,stateOffset,NEW,COMPLETING)){UNSAFE。putOrderedInt(this,stateOffset,EXCEPTIONAL);finalstatefinishCompletion();}}正常运行结束protectedvoidset(Vv){if(UNSAFE。compareAndSwapInt(this,stateOffset,NEW,COMPLETING)){UNSAFE。putOrderedInt(this,stateOffset,NORMAL);finalstatefinishCompletion();}} 这两个方法都差不多,都是上来一个CAS将state从new转变为COMPLETING,然后用outcome记录异常或者记录成功返回值,然后使用UNSAFE。putOrderedInt改变state,如果是出现异常,那么设置状态为EXCEPTIONAL,如果正常结束设置为NORMAL。为什么使用UNSAFE。putOrderedInt为什么outcome没有使用volatile修饰,douglea都不担心可见性的问题么UNSAFE。putOrderedInt这个方法我在《JUC源码学习笔记4原子类,CAS,Volatile内存屏障,缓存伪共享与UnSafe相关方法》中关于AtomicIntegerlazySet中说过,Storeload屏障可以让后续的load指令对其他处理器可见,但是需要将其他处理器的缓存设置成无效让它们重新从主内存读取,putOrderedInt提供一个storestore屏障,然后写数据,storestore是保证putOrderedInt之前的普通写入和putOrderedInt的写入不会重排序,但是不保证下面的volatile读写不被重排序,省去了storeload内存屏障,提高了性能,但是后续的读可能存在可见性的问题。putOrderedInt的storestore屏障保证了outcome回立即刷新回主存也就是说protectedvoidset(Vv){if(UNSAFE。compareAndSwapInt(this,stateOffset,NEW,COMPLETING)){outcome是刷新回主存的,且不会重排序到putOrderedInt后面这也是outcome没有使用volatile修饰的原因之一,有后续调用putOrderedInt方法保证其对其他线程的可见性state字段使用putOrderedInt写入其他线程存在可见性的问题UNSAFE。putOrderedInt(this,stateOffset,NORMAL);finalstatefinishCompletion();}}那么state的可见性问题douglea如何解决?接着看下去2。2finishCompletion完成对等待线程的唤醒privatevoidfinishCompletion(){等待的节点for(WaitN(qwaiters)!){将当前futureTask的waiters属性cas为nullif(UNSAFE。compareAndSwapObject(this,waitersOffset,q,null)){唤醒所有get方法阻塞的线程for(;;){Threadtq。if(t!null){q。唤醒LockSupport。unpark(t);}WaitNodenextq。直到一个为null的节点,意味着遍历结束if(nextnull)q。}结束}}钩子方法留给我们自己扩展done();将任务置为toreducefootprint} 这里拿到waiters然后进行自旋遍历所有等待的节点线程,然后唤醒它们,有意思的点在UNSAFE。compareAndSwapObject(this,waitersOffset,q,null)为何这里要使用CAS更新waiters为null昵? 因为这里存在线程A执行完FutureTask调用finishCompletion的同时线程B调用get进行等待,调用get方法进行排队(排队时也是CAS设置自己为waiters)这两个CAS必定有一个成功,有一个失败如果A失败,说明B在A唤醒之前进行排队,挂起自己,那么A在自旋唤醒的时候会唤醒B如果B失败,那么说明B在A唤醒之后进行排队,那么这时候不需要排队了,因为任务已经完成了,B只需要进行自旋获取返回结果即可 这一点我们在get方法的源码分析的时候,会深有体会 其次在取消任务的时候也会调用finishCompletion唤醒等待的线程,所有finishCompletion的调用存在线程安全问题,需要使用cas保证线程安全2。3处理因为cancel造成的中断handlePossibleCancellationInterrupt 在run方法的finally块中存在运行完设置runner为空重新获取状态如果是INTERRUPTING或者INTERRUPTEDif(sINTERRUPTING)handlePossibleCancellationInterrupthandlePossibleCancellationInterrupt(s);privatevoidhandlePossibleCancellationInterrupt(ints){if(sINTERRUPTING)如果是打断中那么等待直到结束打断while(stateINTERRUPTING)Thread。yield(); cancel方法可以选择传入true表示,如果任务还在运行那么调用运行任务线程的interrupt方法进行中断,如果是调用cancel的线程还没有完成中断那么当前运行的线程会让步,为什么这么做,我们上面说到过,A线程运行任务,B线程cancel任务,B中断线程A其实是需要时间的,B会先修改任务状态为INTERRUPTING,然后中断线程A,然后修改状态为INTERRUPTED并唤醒等待的线程,从INTERRUPTINGINTERRUPTED这段时间,线程A只需要让出cpu等待即可3。get获取任务执行结果get()无限等待任务执行完publicVget()throwsInterruptedException,ExecutionException{任务为NEW和COMPLETING那么调用那么会调用awaitDoneif(sCOMPLETING)sawaitDone(false,0L);此方法如果发现FuturetTask调用异常那么抛出异常returnreport(s);}get(longtimeout,TimeUnitunit)超时等待任务完成publicVget(longtimeout,TimeUnitunit)throwsInterruptedException,ExecutionException,TimeoutException{if(unitnull)thrownewNullPointerException();如果状态小于等于COMPLETING(NEW和COMPLETING)那么会调用awaitDone如果awaitDone结束的时候返回的状态还是NEWorCOMPLETING抛出超时异常if(sCOMPLETING(sawaitDone(true,unit。toNanos(timeout)))COMPLETING)thrownewTimeoutException();此方法如果发现FuturetTask调用异常那么抛出异常returnreport(s);} 二者最终都调用了awaitDone(是否超时等待,等待时长)3。1如果状态小于等于COMPLETING 这意味着状态为new,任务都运行完业务逻辑,或者状态为COMPLETING,业务逻辑运行完了但是outcome正在赋值为执行结果,对outcome赋值后,会修改状态为NORMAL(任务正常完成),或者EXCEPTIONAL(任务执行抛出异常),所有get方法只有状态小于等于COMPLETING,才会调用awaitDone挂起线程进行等待3。2awaitDone等待直到任务完成或者超时 调用此方法的前提是,任务逻辑没有执行完,或者逻辑执行完但是结果还没有赋值给outcome,那么这两种情况douglea如何处理privateintawaitDone(booleantimed,longnanos)throwsInterruptedException{等待结束时间,如果非超时等待,那么为0finallongdeadlinetimed?System。nanoTime()nanos:0L;当前线程如果进入等待任务完成队列,此变量记录等待节点WaitN是否入队(等待任务完成队列)Waiters使用next组成的队列for(;;){如果等待的过程中被中断,那么把自己从等待waiters中删除并且抛出中断异常if(Thread。interrupted()){removeWaiter(q);thrownewInterruptedException();}读取state,volatile保证可见性如果当前大于COMPLETING说明任务执行完成outcome已经赋值了,或者取消了,或者由于取消而被中断直接返回当前状态,不需要再等了if(sCOMPLETING){节点线程置为null,后续执行任务线程唤醒等待线程的时候不会唤醒到此线程if(q!null)q。}如果任务正在完成,进行线程让步后续FutureTask执行的线程写回outcome改变状态为NORMAL或者EXCEPTIONAL是很快的,也许修改状态为NORMAL或者EXCEPTIONAL会导致线程A多yield几下(使用的是UNSAFE。putOrderedInt存在线程可见性问题)elseif(sCOMPLETING)cannottimeoutyetThread。yield();当前线程的节点elseif(qnull)qnewWaitNode();如果没有入队(等待任务完成的队列)那么入队(等待任务完成的队列)elseif(!queued)queuedUNSAFE。compareAndSwapObject(this,waitersOffset,q。nextwaiters,q);如果是超时等待elseif(timed){nanosdeadlineSystem。nanoTime();等待超时把自己从等待任务完成队列中移除if(nanos0L){removeWaiter(q);}等待指定时间LockSupport。parkNanos(this,nanos);}else无限期等待LockSupport。park(this);}} 此方法分支很多,我们慢慢看如果线程执行awaitDone之前就被中断,那么会直接抛出中断异常如果线程执行awaitDone并且成功使用LockSupport。parkNanos(this,nanos)或者LockSupport。park(this)挂起,这时候被中断会从这两个方法中返回,继续自旋,Thread。interrupted()为true,会重置中断标识并且抛出中断异常1和2其实都是FutureTaskget对于中断的响应,get方法如果被中断是会抛出中断异常并且重置中断标识的如果自旋的时候发现任务状态大于COMPLETING说明当前任务执行完成了,或者说任务被取消,或者由于取消已经中断了,那么直接返回即可,从这里返回有三种情况第一次自旋发现任务完成了,超时等待指定时间结束发现任务完成了,任务完成的时候被任务执行线程唤醒,继续自旋发现任务完成了如果自旋的时候发现任务状态等于COMPLETING那么调用yield让出cpu,而不是挂起自己,因为后续FutureTask执行的线程写回outcome改变状态为NORMAL或者EXCEPTIONAL是很快的,也许修改状态为NORMAL或者EXCEPTIONAL会导致线程A多yield几下(使用的是UNSAFE。putOrderedInt存在线程可见性问题)(这里就是我们说的为什么COMPLETING不担心可见性问题)注意如果超时等待指定时间结束,继续自旋,如果进入此分支,那么让出cpu,再次获得时间片后,继续执行,下一次自旋,而不会进入到下面的超时异常分支,也就是说COMPLETING意味着任务执行完了,但是在做一些善后工作(写入任务返回值,唤醒等待线程)不会由于此状态导致超时qnull意味着当前线程没有被包装成WaitNode,当前线程也没有被中断,任务没有完成也不是Completing状态,这时候调用构造方法然后继续自旋staticfinalclassWaitNode{volatileTvolatileWaitNWaitNode(){threadThread。currentThread();}}!queued这是在5的基础上,当前q已经有了节点,但是还没有进入等待任务完成队列,下面通过CAS让当前线程入队elseif(!queued)queuedUNSAFE。compareAndSwapObject(this,waitersOffset,q。nextwaiters,q);这里首先q。nextwaiters让当前节点的next指向waiters,然后CAS设置waiters为当前节点,也就是说最后入队的节点使用waiters记录,使用next串联每一个等待的线程节点,q。nextwaiters不需要考虑线程安全,和AQS中的入队类似,这是改变当前节点的next引用指向,但是修改waiters需要考虑线程安全问题,如果这里CAS失败了(意味着存在其他线程调用get入队等待),那么queued为false继续自旋尝试CAS自己为waiters挂起当前线程超时等待elseif(timed){需要等待的时间nanosdeadlineSystem。nanoTime();已经超时if(nanos0L){把自己从waiters等待任务完成队列中移除removeWaiter(q);}挂起指定时间LockSupport。parkNanos(this,nanos);}等待直到被中断或者唤醒elseLockSupport。park(this);这里超时部分多一个removeWaiter,将自己从等待任务完成队列中移除,这个方法的执行需要考虑线程安全问题,同样使用自旋CAS保证线程安全,这里不做过多分析。4report如果任务执行失败抛出异常,如果成功返回执行结果privateVreport(ints)throwsExecutionException{O如果任务正常结束if(sNORMAL)强转return(V)x;如果任务取消了或者由于取消被中断了,抛出取消异常if(sCANCELLED)thrownewCancellationException();反之抛出ExecutionException包装原始的异常thrownewExecutionException((Throwable)x);} 只有任务正常执行的时候,才会返回结果,如果被取消那么抛出取消异常。5取消任务 取消有一个比较有趣的点,如果取消在任务开始之前,那么说明取消成功,后续任务完成调用set或者setException应该是什么都不做。如果取消在任务执行之后,那么取消的这个动作应该失败,下面我们看下douglea如果处理这个细节。mayInterruptIfRunning表示需要中断任务执行线程publicbooleancancel(booleanmayInterruptIfRunning){任务不是初始,或者CAS修改状态从new到INTERRUPTING或者CANCELLED失败直接返回falseif(!(stateNEWUNSAFE。compareAndSwapInt(this,stateOffset,NEW,mayInterruptIfRunning?INTERRUPTING:CANCELLED)))try{如果需要中断if(mayInterruptIfRunning){try{T执行中断if(t!null)t。interrupt();}finally{finalstate修改状态为INTERRUPTEDUNSAFE。putOrderedInt(this,stateOffset,INTERRUPTED);}}}finally{唤醒所有等待任务执行的线程finishCompletion();}}第一个ifif(!(stateNEWUNSAFE。compareAndSwapInt(this,stateOffset,NEW,mayInterruptIfRunning?INTERRUPTING:CANCELLED)))如果stateNEW不成立,说明任务在执行此判断之前已经结束了(Completing,或者已经到了NORMAL,或者EXCEPTIONAL)说明取消在任务结束之前,那么直接返回false。或者说当前线程A对任务的取消在其他线程B取消任务之后,这时候就A线程取消方法返回false如果UNSAFE。compareAndSwapInt(this,stateOffset,NEW,mayInterruptIfRunning?INTERRUPTING:CANCELLED))不成立意味着stateNEW判读的时候成立,但是执行这句CAS的时候之前有老六线程抢先一步,或者说存在并发当前线程没有抢过,那么也直接返回false,这里保证了cancel的执行是串行的,不存在线程安全问题。注意这里如果需要中断任务执行线程那么CAS修改状态到INTERRUPTING,反之直接修改到CANCELLED尝试中断任务执行线程if(mayInterruptIfRunning){try{Tif(t!null)t。interrupt();}finally{finalstateUNSAFE。putOrderedInt(this,stateOffset,INTERRUPTED);}}调用interrupt具体如何响应中断,得看程序员定义的业务逻辑是啥样的。调用putOrderedInt修改状态为INTERRUPTED,表示已经完成了中断唤醒等待任务结束的线程直接调用finishCompletion,这个方法前面分析过。这里假如线程A取消了任务,那么线程B任务执行完后调用set或者setException会如何昵什么都不做protectedvoidset(Vv){此时state是INTERRUPTING或者INTERRUPTEDif为falseif(UNSAFE。compareAndSwapInt(this,stateOffset,NEW,COMPLETING)){UNSAFE。putOrderedInt(this,stateOffset,NORMAL);finalstatefinishCompletion();}}run方法对中断的处理publicvoidrun(){if(state!NEW!UNSAFE。compareAndSwapObject(this,runnerOffset,null,Thread。currentThread()))try{省略任务的执行}finally{进入这个if必须是INTERRUPTING,或者INTERRUPTEDif(sINTERRUPTING)handlePossibleCancellationInterrupt(s);}}privatevoidhandlePossibleCancellationInterrupt(ints){if(sINTERRUPTING)while(stateINTERRUPTING)Thread。yield();} 这里为INTERRUPTING,可能是取消任务的线程还没来得及执行UNSAFE。putOrderedInt(this,stateOffset,INTERRUPTED),也可能是可见性导致运行任务的线程没有读取到最新的state,handlePossibleCancellationInterrupt会让运行任务的线程等待七丶问题解答get方法调用的时候,如果任务没有成功,怎么实现调用线程的阻塞等待的线程修改Waiternext指向CAS自己为waiters,线程安全的入队,并进行park挂起实现阻塞get方法出现异常,或者正常结束的时候,怎么唤醒调用get方法的线程在finishCompletion中进行唤醒,如果任务没有被取消,运行完,那么运行任务的线程进行唤醒,如果任务被取消那么cancel任务的线程进行唤醒在任务运行之前,如果任务被取消那么将无法运行,已经开始运行的任务只能尝试中断运行任务的线程,无法取消,那么douglea怎么判断运行和取消的先后顺序(注意运行和取消可以是多个线程调用)怎么处理这里的线程不安全问题使用CAS,运行前保证为new,取消的时候也要保证为new,且运行完成修改new使用cas,取消也是使用cas,保证线程安全