焦点:为什么要有线程池?java如何创建线程池,区别是什么?几个思考的问题?2。为什么要有线程池? 多线程特点:资源占用多默认一个线程的栈1M(Xss可配)(note:堆外,栈溢出)需要上下文切换cpu切换之前的状态存储(pc,寄存器等) 线程池(资源池)都是为了解决一个主要问题:创建线程(资源)和销毁线程(资源)的成本,比维护线程(资源)池的成本高。 延伸可解决的问题:限制程序对资源无限申请可管理,可监控 对比:协程(内存8k),线程成本更高,线程池就成了常用手段之一3。java如何创建线程池? 创建线程池的方法:newThreadPoolExecutor()Executors。newXXXX() Executors。newXXXX()是对newThreadPoolExecutor()提供的封装3。1ThreadPoolExecutor java中的线程池ThreadPoolExecutorpublicThreadPoolExecutor(intcorePoolSize,核心线程数intmaximumPoolSize,longkeepAliveTime,TimeUnitunit,BlockingQueueRunnableworkQueue,ThreadFactorythreadFactory,RejectedExecutionHandlerhandler){if(corePoolSize0maximumPoolSize0maximumPoolSizecorePoolSizekeepAliveTime0L){if(workQueue!nullthreadFactory!nullhandler!null){this。corePoolSizecorePoolSthis。maximumPoolSizemaximumPoolSthis。workQueueworkQthis。keepAliveTimeunit。toNanos(keepAliveTime);this。threadFactorythreadFthis。}else{thrownewNullPointerException();}}else{thrownewIllegalArgumentException();}}publicvoidexecute(Runnablecommand){if(commandnull){thrownewNullPointerException();}else{intcthis。ctl。get();if(workerCountOf(c)this。corePoolSize){当前worker数量corePoolSize,将添加一个worker线程来工作if(this。addWorker(command,true)){}cthis。ctl。get();}if(isRunning(c)this。workQueue。offer(command)){当前worker数量corePoolSize,将多余的任务放到worker队列中intrecheckthis。ctl。get();if(!isRunning(recheck)this。remove(command)){this。reject(command);}elseif(workerCountOf(recheck)0){this。addWorker((Runnable)null,false);}}elseif(!this。addWorker(command,false)){如果使用worker队列放不下,且也无法增加worker线程的数量(达到maximumPoolSize)了,就执行拒绝this。reject(command);}}}添加worker线程(coretrue:占用corePoolSize线程的名额;corefalse:占用maximumPoolSizecorePoolSize线程的名额)privatebooleanaddWorker(RunnablefirstTask,booleancore){} BlockingQueue有什么?ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。LinkedBlockingQueue:一个基于链表结构的无界阻塞队列(capacity2147483647),此队列按FIFO(先进先出)排序元素,吞吐量通常要高于ArrayBlockingQueue。SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue。PriorityBlockingQueue:一个具有优先级的无界阻塞队列(capacity2147483647),内部使用PriorityQueue。 RejectedExecutionHandler有什么?ThreadPoolExecutor。AbortPolicy如果元素添加到线程池中失败,则直接抛运行时异常RejectedExecutionException(默认)ThreadPoolExecutor。DiscardPolicy如果元素添加线程池失败,则放弃,不抛异常。ThreadPoolExecutor。CallerRunsPolicy如果元素添加线程池失败,则主线程自己来运行任务。ThreadPoolExecutor。DiscardOldestPolicy如果元素添加线程池失败,会将队列中最早的元素删除之后,再尝试添加,一直重复成功为止。3。2Executors。newXXXX() 这些都啥区别?java。util。concurrent。ExecutorsnewFixedThreadPool(int)coresize和maxsize一样大,无界队列publicstaticExecutorServicenewFixedThreadPool(intnThreads){returnnewThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit。MILLISECONDS,newLinkedBlockingQueue());}publicstaticExecutorServicenewFixedThreadPool(intnThreads,ThreadFactorythreadFactory){returnnewThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit。MILLISECONDS,newLinkedBlockingQueue(),threadFactory);}java。util。concurrent。ExecutorsnewCachedThreadPool()无队列,来一个任务,开一个线程,直到oom(内部逻辑,2147483647之后就手动oom异常)publicstaticExecutorServicenewCachedThreadPool(){returnnewThreadPoolExecutor(0,2147483647,60L,TimeUnit。SECONDS,newSynchronousQueue());}publicstaticExecutorServicenewCachedThreadPool(ThreadFactorythreadFactory){returnnewThreadPoolExecutor(0,2147483647,60L,TimeUnit。SECONDS,newSynchronousQueue(),threadFactory);}java。util。concurrent。ExecutorsnewSingleThreadExecutor()core和max都是1,无界队列publicstaticExecutorServicenewSingleThreadExecutor(){returnnewExecutors。FinalizableDelegatedExecutorService(newThreadPoolExecutor(1,1,0L,TimeUnit。MILLISECONDS,newLinkedBlockingQueue()));}publicstaticExecutorServicenewSingleThreadExecutor(ThreadFactorythreadFactory){returnnewExecutors。FinalizableDelegatedExecutorService(newThreadPoolExecutor(1,1,0L,TimeUnit。MILLISECONDS,newLinkedBlockingQueue(),threadFactory));}java。util。concurrent。ExecutorsnewScheduledThreadPool(int)定时执行的线程池publicstaticScheduledExecutorServicenewSingleThreadScheduledExecutor(){returnnewExecutors。DelegatedScheduledExecutorService(newScheduledThreadPoolExecutor(1));}publicstaticScheduledExecutorServicenewSingleThreadScheduledExecutor(ThreadFactorythreadFactory){returnnewExecutors。DelegatedScheduledExecutorService(newScheduledThreadPoolExecutor(1,threadFactory));}publicstaticScheduledExecutorServicenewScheduledThreadPool(intcorePoolSize){returnnewScheduledThreadPoolExecutor(corePoolSize);}publicstaticScheduledExecutorServicenewScheduledThreadPool(intcorePoolSize,ThreadFactorythreadFactory){returnnewScheduledThreadPoolExecutor(corePoolSize,threadFactory);}java。util。concurrent。ExecutorsnewWorkStealingPool(int)支持工作窃取的线程池publicstaticExecutorServicenewWorkStealingPool(intparallelism){returnnewForkJoinPool(parallelism,ForkJoinPool。defaultForkJoinWorkerThreadFactory,(UncaughtExceptionHandler)null,true);}publicstaticExecutorServicenewWorkStealingPool(){returnnewForkJoinPool(Runtime。getRuntime()。availableProcessors(),ForkJoinPool。defaultForkJoinWorkerThreadFactory,(UncaughtExceptionHandler)null,true);}4。几个思考的问题4。1核心线程怎么保活 使用阻塞队列(这就是为什么就算空队列,也一定要传入workQueue这个参数)。getTask是每个worker线程,获取自己要执行的任务。 4。2怎么实现keepAliveTime的 同上blockQueue的poll4。3ScheduledExecutorService怎么实现任务定时执行 构造函数:coresize传入,maxsize2147483647,workQueueScheduledThreadPoolExecutor。DelayedWorkQueue()publicScheduledThreadPoolExecutor(intcorePoolSize){super(corePoolSize,2147483647,10L,TimeUnit。MILLISECONDS,newScheduledThreadPoolExecutor。DelayedWorkQueue());}publicScheduledThreadPoolExecutor(intcorePoolSize,ThreadFactorythreadFactory){super(corePoolSize,2147483647,10L,TimeUnit。MILLISECONDS,newScheduledThreadPoolExecutor。DelayedWorkQueue(),threadFactory);}publicScheduledThreadPoolExecutor(intcorePoolSize,RejectedExecutionHandlerhandler){super(corePoolSize,2147483647,10L,TimeUnit。MILLISECONDS,newScheduledThreadPoolExecutor。DelayedWorkQueue(),handler);}publicScheduledThreadPoolExecutor(intcorePoolSize,ThreadFactorythreadFactory,RejectedExecutionHandlerhandler){super(corePoolSize,2147483647,10L,TimeUnit。MILLISECONDS,newScheduledThreadPoolExecutor。DelayedWorkQueue(),threadFactory,handler);} ScheduledThreadPoolExecutor。DelayedWorkQueue:相当于DelayQueue和PriorityQueue的结合体。(权重是下次执行的时间),无界队列 delayQueue是什么?是一个BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。classMyDelayedTaskimplementsDelayed{privateSprivatelongstartSystem。currentTimeMillis();publicMyDelayedTask(Stringname,longtime){this。this。}需要实现的接口,获得延迟时间,0的时候,就表示可以用了OverridepubliclonggetDelay(TimeUnitunit){returnunit。convert((starttime)System。currentTimeMillis(),TimeUnit。MILLISECONDS);}用于延迟队列内部比较排序当前时间的延迟时间比较对象的延迟时间OverridepublicintcompareTo(Delayedo){MyDelayedTasko1(MyDelayedTask)o;return(int)(this。getDelay(TimeUnit。MILLISECONDS)o。getDelay(TimeUnit。MILLISECONDS));}OverridepublicStringtoString(){returnMyDelayedTask{namename,timetime};}} 常用方法: 注意:execute并不会实现延迟效果(因为execute是线程池通用方法,仍然依赖先core,后queue,再max,最后reject) 问题:schedule,scheduleAtFixedRate,scheduleWithFixedDelay是如何实现延迟的?publicScheduledF?schedule(Runnablecommand,longdelay,TimeUnitunit){if(command!nullunit!null){RunnableScheduledFutureVoidtthis。decorateTask((Runnable)command,newScheduledThreadPoolExecutor。ScheduledFutureTask(command,(Object)null,this。triggerTime(delay,unit),sequencer。getAndIncrement()));根据任务(command)构建一个延迟任务对象this。delayedExecute(t);延迟执行这个任务}else{thrownewNullPointerException();}}privatevoiddelayedExecute(RunnableScheduledF?task){if(this。isShutdown()){this。reject(task);}else{super。getQueue()。add(task);直接将任务加到workQueue里面,所以ScheduledThreadPoolExecutor是先queue,再core,再max,在rejectif(!this。canRunInCurrentRunState(task)this。remove(task)){task。cancel(false);}else{this。ensurePrestart();}}} scheduleAtFixedRate和scheduleWithFixedDelay流程一样,但是延迟任务对象构造出来的不一样4。4ForJoinPool4。4。1什么是forkjoin什么是mapreduce forkjoin和mapreduce都是分治的思想。 举例子:forkjoin:123456((123)(456))(((12)(3))((45)(6)))递归拆分拆分之后块与块之间仍然有联系单机mapreduce:123456sum((12),(56),(34))拆分一次,随机拆分,拆分成目标大小的块即可reduce也是随机的,块与块之间不存在联系分布式 forkjoin模式: 4。4。2什么是workstaling 核心思想:workstealing工作窃取 充分利用线程进行并行计算,减少线程间的竞争(在某些情况下还是会存在竞争,比如双端队列里只有一个任务时)。4。4。3JavaForkJoinPool 在ForkJoinPool中,线程池中每个工作线程(ForkJoinWorkerThread)都对应一个任务队列(WorkQueue),工作线程优先处理来自自身队列的任务(LIFO或FIFO顺序,参数mode决定),然后以FIFO的顺序随机窃取其他队列中的任务。 具体思路如下:每个线程都有自己的一个WorkQueue,该工作队列是一个双端队列。队列支持三个功能push、pop、pollpushpop只能被队列的所有者线程调用,而poll可以被其他线程调用。划分的子任务调用fork时,都会被push到自己的队列中。默认情况下,工作线程从自己的双端队列获出任务并执行。当自己的队列为空时,线程随机从另一个线程的队列末尾调用poll方法窃取任务。 4。4。4java怎么使用ForkJoinPool ForkJoin框架主要包含三个模块:任务对象:ForkJoinTask(包括RecursiveTask、RecursiveAction和CountedCompleter)执行ForkJoin任务的线程:ForkJoinWorkerThread线程池:ForkJoinPool ForkJoinPool只接收ForkJoinTask任务(在实际使用中,也可以接收RunnableCallable任务,但在真正运行时,也会把这些任务封装成ForkJoinTask类型的任务),RecursiveTask是ForkJoinTask的子类,是一个可以递归执行的ForkJoinTask,RecursiveAction是一个无返回值的RecursiveTask,CountedCompleter在任务完成执行后会触发执行一个自定义的钩子函数。importjava。util。concurrent。RecursiveTpublicclassCountTaskextendsRecursiveTaskLong{publicCountTask(intstart,intend,intlimit){this。this。this。}OverrideprotectedLongcompute(){longsum0;if(endstartlimit){for(i){}}else{intmid(startend)2;CountTaskcountTask1newCountTask(start,mid,limit);CountTaskcountTask2newCountTask(mid1,end,limit);countTask1。fork();countTask2。fork();Longresult1countTask1。join();Longresult2countTask2。join();sumresult1result2;}}}publicclassMain{publicstaticvoidmain(String〔〕args){longstartSystem。currentTimeMillis();ForkJoinPoolforkJoinPoolnewForkJoinPool();CountTaskcountTasknewCountTask(1,1000000,5);LonginvokeforkJoinPool。invoke(countTask);longendSystem。currentTimeMillis();System。out。println(invoke,(endstart));}}500000500000,126 提交任务的区别:invoke()会等待任务计算完毕并返回计算结果;execute()是直接向池提交一个任务来异步执行,无返回结果;submit()也是异步执行,但是会返回提交的任务,在适当的时候可通过task。get()获取执行结果。4。4。5ForkJoinPool的核心参数核心线程数:默认线程数Runtime。getRuntime()。availableProcessors()最小线程数1默认Runtime。getRuntime()。availableProcessors(),cpu线程数最大32767keepAlive60000mspublicForkJoinPool(){this(Math。min(32767,Runtime。getRuntime()。availableProcessors()),defaultForkJoinWorkerThreadFactory,(UncaughtExceptionHandler)null,false,0,32767,1,(Predicate)null,60000L,TimeUnit。MILLISECONDS);} 队列大小: