很多业务场景需要使用异步去完成,比如:发送短信通知。要完成异步操作一般有两种: 1、消息队列MQ 2、线程池处理。 我们来看看Spring框架中如何去使用线程池来完成异步操作,以及分析背后的原理。一。Spring异步线程池的接口类:TaskExecutor 在Spring4中,Spring中引入了一个新的注解Async,这个注解让我们在使用Spring完成异步操作变得非常方便。 Spring异步线程池的接口类,其实质是java。util。concurrent。Executor Spring已经实现的异常线程池:1。SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。2。SyncTaskExecutor:这个类没有实现异步调用,只是一个同步操作。只适用于不需要多线程的地方3。ConcurrentTaskExecutor:Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类4。SimpleThreadPoolTaskExecutor:是Quartz的SimpleThreadPool的类。线程池同时被quartz和非quartz使用,才需要使用此类5。ThreadPoolTaskExecutor:最常使用,推荐。其实质是对java。util。concurrent。ThreadPoolExecutor的包装, 二、简单使用说明 Spring中用Async注解标记的方法,称为异步方法。在springboot应用中使用Async很简单: 1、调用异步方法类上或者启动类加上注解EnableAsync 2、在需要被异步调用的方法外加上Async 3、所使用的Async注解方法的类对象应该是Spring容器管理的bean对象; 启动类加上注解EnableAsync:importorg。springframework。boot。SpringAimportorg。springframework。boot。autoconfigure。SpringBootAimportorg。springframework。scheduling。annotation。EnableASpringBootApplicationEnableAsyncpublicclassCollectorApplication{publicstaticvoidmain(String〔〕args)throwsException{SpringApplication。run(CollectorApplication。class,args);}} 在需要被异步调用的方法外加上Async,同时类AsyncService加上注解Service或者Component,使其对象成为Spring容器管理的bean对象;importorg。springframework。beans。factory。annotation。Aimportorg。springframework。scheduling。annotation。Aimportorg。springframework。stereotype。Cimportorg。springframework。transaction。annotation。TServiceTransactionalpublicclassAsyncService{AsyncpublicvoidasyncMethod(Strings){System。out。println(receive:s);}publicvoidtest(){System。out。println(test);asyncMethod();同一个类里面调用异步方法}Asyncpublicvoidtest2(){AsyncServiceasyncServicecontext。getBean(AsyncService。class);asyncService。asyncMethod();异步}异布调用返回FutureAsyncpublicFutureStringasyncInvokeReturnFuture(inti){System。out。println(asyncInvokeReturnFuture,parementeri);FutureStry{Thread。sleep(10001);futurenewAsyncResultString(success:i);}catch(InterruptedExceptione){futurenewAsyncResultString(error);}}}异步方法和普通的方法调用相同asyncService。asyncMethod(123);FutureStringfutureasyncService。asyncInvokeReturnFuture(100);System。out。println(future。get()); 如果将一个类声明为异步类Async,那么这个类对外暴露的方法全部成为异步方法。AsyncServicepublicclassAsyncClass{publicAsyncClass(){System。out。println(initAsyncClass);}volatileintindex0;publicvoidfoo(){System。out。println(asyncclassfoo,index:index);}publicvoidfoo(inti){this。System。out。println(asyncclassfoo,index:i);}publicvoidbar(inti){this。System。out。println(asyncclassbar,index:i);}} 这里需要注意的是: 1、同一个类里面调用异步方法不生效:原因默认类内的方法调用不会被aop拦截,即调用方和被调用方是在同一个类中,是无法产生切面的,该对象没有被Spring容器管理。即Async方法不生效。 解决办法:如果要使同一个类中的方法之间调用也被拦截,需要使用spring容器中的实例对象,而不是使用默认的this,因为通过bean实例的调用才会被spring的aop拦截 本例使用方法:AsyncServiceasyncServicecontext。getBean(AsyncService。class);然后使用这个引用调用本地的方法即可达到被拦截的目的 备注:这种方法只能拦截protected,default,public方法,private方法无法拦截。这个是springaop的一个机制。 2、如果不自定义异步方法的线程池默认使用SimpleAsyncTaskExecutor。SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。并发大的时候会产生严重的性能问题。 3、异步方法返回类型只能有两种:void和java。util。concurrent。Future。 1)当返回类型为void的时候,方法调用过程产生的异常不会抛到调用者层面, 可以通过注AsyncUncaughtExceptionHandler来捕获此类异常 2)当返回类型为Future的时候,方法调用过程产生的异常会抛到调用者层面三、定义通用线程池1、定义线程池 在SpringBoot主类中定义一个线程池,publicExecutortaskExecutor()方法用于自定义自己的线程池,线程池前缀taskExecutor。如果不定义,则使用系统默认的线程池。SpringBootApplicationpublicclassApplication{publicstaticvoidmain(String〔〕args){SpringApplication。run(Application。class,args);}EnableAsyncConfigurationclassTaskPoolConfig{BeanpublicExecutortaskExecutor1(){ThreadPoolTaskExecutorpoolnewThreadPoolTaskExecutor();pool。setCorePoolSize(5);线程池活跃的线程数pool。setMaxPoolSize(10);线程池最大活跃的线程数pool。setWaitForTasksToCompleteOnShutdown(true);pool。setThreadNamePrefix(defaultExecutor);}Bean(taskExecutor)publicExecutortaskExecutor2(){ThreadPoolTaskExecutorexecutornewThreadPoolTaskExecutor();executor。setCorePoolSize(10);executor。setMaxPoolSize(20);executor。setQueueCapacity(200);executor。setKeepAliveSeconds(60);executor。setThreadNamePrefix(taskExecutor);executor。setRejectedExecutionHandler(newThreadPoolExecutor。CallerRunsPolicy());executor。setWaitForTasksToCompleteOnShutdown(true);executor。setAwaitTerminationSeconds(60);}}} 上面我们通过ThreadPoolTaskExecutor创建了一个线程池,同时设置了如下参数: 核心线程数10:线程池创建时初始化的线程数 最大线程数20:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程 缓冲队列200:用来缓冲执行任务的队列 允许线程的空闲时间60秒:超过了核心线程数之外的线程,在空闲时间到达之后会被销毁 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池 线程池对拒绝任务的处理策略:此处采用了CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在execute方法的调用线程中运行被拒绝的任务;如果执行程序已被关闭,则会丢弃该任务 设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean 设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住 也可以单独类来配置线程池:importorg。springframework。beans。factory。annotation。Vimportorg。springframework。context。annotation。Bimportorg。springframework。context。annotation。Cimportorg。springframework。scheduling。annotation。EnableAimportorg。springframework。scheduling。concurrent。ThreadPoolTaskEimportjava。util。concurrent。Eimportjava。util。concurrent。ThreadPoolECreatedbyhuangguisuon2020610。ConfigurationEnableAsyncpublicclassMyThreadPoolConfig{privatestaticfinalintCOREPOOLSIZE10;privatestaticfinalintMAXPOOLSIZE20;privatestaticfinalintQUEUECAPACITY200;publicstaticfinalStringBEANEXECUTOR事件和情感接口线程池执行器配置return事件和情感接口线程池执行器beanBean(BEANEXECUTOR)publicExecutorexecutor(){ThreadPoolTaskExecutorexecutornewThreadPoolTaskExecutor();executor。setCorePoolSize(COREPOOLSIZE);executor。setMaxPoolSize(MAXPOOLSIZE);设置队列容量executor。setQueueCapacity(QUEUECAPACITY);设置线程活跃时间(秒)executor。setKeepAliveSeconds(60);executor。setThreadNamePrefix(SEPoolTask);设置拒绝策略executor。setRejectedExecutionHandler(newThreadPoolExecutor。CallerRunsPolicy());executor。initialize();}}2、异步方法使用线程池 只需要在Async注解中指定线程池名即可ComponentpublicclassTask{默认使用线程池AsyncpublicvoiddoTaskOne()throwsException{System。out。println(开始做任务);longstartSystem。currentTimeMillis();Thread。sleep(random。nextInt(10000));longendSystem。currentTimeMillis();System。out。println(完成任务耗时:(endstart)毫秒);}根据BeanName指定特定线程池Async(taskExecutor)publicvoiddoTaskOne()throwsException{System。out。println(开始做任务);longstartSystem。currentTimeMillis();Thread。sleep(random。nextInt(10000));longendSystem。currentTimeMillis();System。out。println(完成任务耗时:(endstart)毫秒);}}3、通过xml配置定义线程池 Bean文件配置:springasync。xml 1。线程的前缀为xmlExecutor 2。启动异步线程池配置!等价于EnableAsync,executor指定线程池task:annotationdrivenexecutorxmlExecutor!id指定线程池产生线程名称的前缀task:executoridxmlExecutorpoolsize525queuecapacity100keepalive120rejectionpolicyCALLERRUNS 启动类导入xml文件:SpringBootApplicationImportResource(classpath:asyncspringasync。xml)publicclassAsyncApplicationWithXML{privatestaticfinalLoggerlogLoggerFactory。getLogger(AsyncApplicationWithXML。class);publicstaticvoidmain(String〔〕args){log。info(StartAsyncApplication。。);SpringApplication。run(AsyncApplicationWithXML。class,args);}} 线程池参数说明 1。‘id’:线程名称的前缀 2。‘poolsize’:线程池的大小。支持范围minmax和固定值(此时线程池core和maxsizes相同) 3。‘queuecapacity’:排队队列长度 4。‘rejectionpolicy’:对方拒绝的任务处理策略 5。‘keepalive’:线程保护时间(单位秒)四、异常处理 上面也提到:在调用方法时,可能出现方法中抛出异常的情况。在异步中主要有有两种异常处理方法:1。对于方法返回值是Futrue的异步方法: a)、一种是在调用future的get时捕获异常; b)、在异常方法中直接捕获异常2。对于返回值是void的异步方法:通过AsyncUncaughtExceptionHandler处理异常ComponentpublicclassAsyncException{带参数的异步调用异步方法可以传入参数对于返回值是void,异常会被AsyncUncaughtExceptionHandler处理掉paramsAsyncpublicvoidasyncInvokeWithException(Strings){log。info(asyncInvokeWithParameter,parementer{},s);thrownewIllegalArgumentException(s);}异常调用返回Future对于返回值是Future,不会被AsyncUncaughtExceptionHandler处理,需要我们在方法中捕获异常并处理或者在调用方在调用Futrue。get时捕获异常进行处理paramireturnAsyncpublicFutureStringasyncInvokeReturnFuture(inti){System。out。println(asyncInvokeReturnFuture,parementer{},i);FutureStry{Thread。sleep(10001);futurenewAsyncResultString(success:i);thrownewIllegalArgumentException(a);}catch(InterruptedExceptione){futurenewAsyncResultString(error);}catch(IllegalArgumentExceptione){futurenewAsyncResultString(errorIllegalArgumentException);}}} 实现AsyncConfigurer接口对异常线程池更加细粒度的控制 a)创建线程自己的线程池 b)对void方法抛出的异常处理的类AsyncUncaughtExceptionHandlerServicepublicclassMyAsyncConfigurerimplementsAsyncConfigurer{OverridepublicExecutorgetAsyncExecutor(){ThreadPoolTaskExecutorthreadPoolnewThreadPoolTaskExecutor();threadPool。setCorePoolSize(1);threadPool。setMaxPoolSize(1);threadPool。setWaitForTasksToCompleteOnShutdown(true);threadPool。setAwaitTerminationSeconds(6015);threadPool。setThreadNamePrefix(MyAsync);threadPool。initialize();returnthreadP}OverridepublicAsyncUncaughtExceptionHandlergetAsyncUncaughtExceptionHandler(){returnnewMyAsyncExceptionHandler();}自定义异常处理类classMyAsyncExceptionHandlerimplementsAsyncUncaughtExceptionHandler{OverridepublicvoidhandleUncaughtException(Throwablethrowable,Methodmethod,Object。。。obj){System。out。println(Exceptionmessagethrowable。getMessage());System。out。println(Methodnamemethod。getName());for(Objectparam:obj){System。out。println(Parametervalueparam);}}}}五、问题 上面也提到:如果不自定义异步方法的线程池默认使用SimpleAsyncTaskExecutor。SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。并发大的时候会产生严重的性能问题。 一般的错误OOM:OutOfMemoryError:unabletocreatenewnativethread,创建线程数量太多,占用内存过大。 解决办法:一般最好使用自定义线程池,做一些特殊策略,比如自定义拒绝策略,如果队列满了,则拒绝处理该任务。 原文链接:https:blog。csdn。nethguisuarticledetails106671893