前言 大家好,我是小郭,之前分享了CountDownLatch的使用,我们知道用来控制并发流程的同步工具,主要的作用是为了等待多个线程同时完成任务后,在进行主线程任务。 万万没想到,在生产环境中竟然翻车了,因为没有考虑到一些场景,导致了CountDownLatch出现了问题,接下来来分享一下由于CountDownLatch导致的问题。需求背景 先简单介绍下业务场景,针对用户批量下载的文件进行修改上传 为了提高执行的速度,所以在采用线程池去执行下载修改上传的操作,并在全部执行完之后统一提交保存文件地址到数据库,于是加入了CountDownLatch来进行控制。具体实现 根据服务本身情况,自定义一个线程池publicstaticExecutorServicetestExtcutor(){returnnewThreadPoolExecutor(2,2,0L,TimeUnit。SECONDS,newLinkedBlockingQueue(1));}复制代码 模拟执行publicstaticvoidmain(String〔〕args){下载文件总数ListIntegerresultListnewArrayList(100);IntStream。range(0,100)。forEach(resultList::add);下载文件分段ListListIntegersplitCollUtil。split(resultList,10);ExecutorServiceexecutorServiceBaseThreadPoolExector。testExtcutor();CountDownLatchcountDownLatchnewCountDownLatch(100);for(ListIntegerlist:split){executorService。execute((){list。forEach(i{try{模拟业务操作Thread。sleep(500);System。out。println(任务进入);}catch(InterruptedExceptione){e。printStackTrace();System。out。println(e。getMessage());}finally{System。out。println(countDownLatch。getCount());countDownLatch。countDown();}});});}try{countDownLatch。await();System。out。println(countDownLatch。await());}catch(InterruptedExceptione){e。printStackTrace();}}复制代码 一开始我个人感觉没有什么问题,反正finally都能够做减一的操作,到最后调用await方法,进行主线程任务Exceptioninthreadmainjava。util。concurrent。RejectedExecutionException:Taskjava。util。concurrent。FutureTask300ffa5drejectedfromjava。util。concurrent。ThreadPoolExecutor1f17ae12〔Running,poolsize2,activethreads2,queuedtasks1,completedtasks0〕atjava。util。concurrent。ThreadPoolExecutorAbortPolicy。rejectedExecution(ThreadPoolExecutor。java:2063)atjava。util。concurrent。ThreadPoolExecutor。reject(ThreadPoolExecutor。java:830)atjava。util。concurrent。ThreadPoolExecutor。execute(ThreadPoolExecutor。java:1379)atjava。util。concurrent。AbstractExecutorService。submit(AbstractExecutorService。java:112)atThread。executor。executorTestBlock。main(executorTestBlock。java:28)任务进入countDownLatch。countDown任务进入countDownLatch。countDown任务进入countDownLatch。countDown任务进入countDownLatch。countDown任务进入countDownLatch。countDown任务进入countDownLatch。countDown任务进入countDownLatch。countDown任务进入countDownLatch。countDown任务进入countDownLatch。countDown任务进入countDownLatch。countDown任务进入countDownLatch。countDown任务进入countDownLatch。countDown任务进入countDownLatch。countDown任务进入countDownLatch。countDown任务进入countDownLatch。countDown复制代码 由于任务数量较多,阻塞队列中已经塞满了,所以默认的拒绝策略,当队列满时,处理策略报错异常, 要注意这个异常是线程池,自己抛出的,不是我们循环里面打印出来的, 这也造成了,线上这个线程池被阻塞了,他永远也调用不到await方法, 利用jstack,我们就能够看到有问题pool1thread212prio5osprio31tid0x00007ff6198b7000nid0xa903waitingoncondition〔0x0000700001c64000〕java。lang。Thread。State:WAITING(parking)atsun。misc。Unsafe。park(NativeMethod)parkingtowaitfor0x000000076b2283f8(ajava。util。concurrent。locks。AbstractQueuedSynchronizerConditionObject)atjava。util。concurrent。locks。LockSupport。park(LockSupport。java:175)atjava。util。concurrent。locks。AbstractQueuedSynchronizerConditionObject。await(AbstractQueuedSynchronizer。java:2039)atjava。util。concurrent。LinkedBlockingQueue。take(LinkedBlockingQueue。java:442)atjava。util。concurrent。ThreadPoolExecutor。getTask(ThreadPoolExecutor。java:1074)atjava。util。concurrent。ThreadPoolExecutor。runWorker(ThreadPoolExecutor。java:1134)atjava。util。concurrent。ThreadPoolExecutorWorker。run(ThreadPoolExecutor。java:624)atjava。lang。Thread。run(Thread。java:748)pool1thread111prio5osprio31tid0x00007ff6198b6800nid0x5903waitingoncondition〔0x0000700001b61000〕java。lang。Thread。State:WAITING(parking)atsun。misc。Unsafe。park(NativeMethod)parkingtowaitfor0x000000076b2283f8(ajava。util。concurrent。locks。AbstractQueuedSynchronizerConditionObject)atjava。util。concurrent。locks。LockSupport。park(LockSupport。java:175)atjava。util。concurrent。locks。AbstractQueuedSynchronizerConditionObject。await(AbstractQueuedSynchronizer。java:2039)atjava。util。concurrent。LinkedBlockingQueue。take(LinkedBlockingQueue。java:442)atjava。util。concurrent。ThreadPoolExecutor。getTask(ThreadPoolExecutor。java:1074)atjava。util。concurrent。ThreadPoolExecutor。runWorker(ThreadPoolExecutor。java:1134)atjava。util。concurrent。ThreadPoolExecutorWorker。run(ThreadPoolExecutor。java:624)atjava。lang。Thread。run(Thread。java:748)解决方案调大阻塞队列,但是问题来了,到底多少阻塞队列才是大呢,如果太大了会不由又造成内存溢出等其他的问题在第一个的基础上,我们修改了拒绝策略,当触发拒绝策略的时候,用调用者所在的线程来执行任务publicstaticThreadPoolExecutorqueueExecutor(BlockingQueueworkQueue){returnnewThreadPoolExecutor(size,size,0L,TimeUnit。SECONDS,workQueue,newThreadPoolExecutor。CallerRunsPolicy());}复制代码你可能又会想说,会不会任务数量太多,导致调用者所在的线程执行不过来,任务提交的性能急剧下降那我们就应该自定义拒绝策略,将这下排队的消息记录下来,采用补偿机制的方式去执行同时也要注意上面的那个异常是线程池抛出来的,我们自己也需要将线程池进行trycatch,记录问题数据,并且在finally中执行countDownLatch。countDown来避免,线程池的使用总结 目前根据业务部门的反馈,业务实际中任务数不很特别多的情况,所以暂时先采用了第二种方式去解决这个线上问题 在这里我们也可以看到,如果没有正确的关闭countDownLatch,可能会导致一直等待,这也是我们需要注意的。 工具虽然好,但是依然要注意他带来的问题,没有正确的去处理好,引发的一系列连锁反应。