JDK一般通过ThreadPoolExecutor方式创建线程池而不用Executors,原因是其构造方法参数限制较多,运行规则十分清晰,可以根据业务需求创建合适的线程池,而Executors返回的线程池对象的弊端,网上搜到比如FixedThreadPool和SingleThreadPool,允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM;CachedThreadPool和ScheduledThreadPool,允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM
整个ThreadPoolExecutor最关键的就是构造方法的几个参数的理解
* @param corePoolSize 核心线程池中最大线程数
* @param maximumPoolSize 线程池中的最大线程数
* @param keepAliveTime 空闲线程的存活时间
* @param unit 存活时间的单位
* @param workQueue 任务队列,保存已经提交但是还没有执行的任务
* @param threadFactory 线程工厂
* @param handler 拒绝策略
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
至于execute的执行流程,网上到处都能搜到,也十分详细,一开始以为queue的size有max约束,导致理解有点误解
我这里按自己的理解也画了一份完整的
简单总结下重点
1、当前执行的线程数小于core上限,直接新建工作线程执行任务 2、当前执行的线程数大于等于core上限,workQueue没满,任务入队,如果队列没满,等待core线程结束上一轮,复用core线程执行队列任务 3、当workQueue满了,当前执行的线程数小于max上限,会在max-core的数量区间中继续创建线程执行任务 4、当前执行线程大于等于max上限,任务被拒绝 5、至于workQueue里任务的执行,复用当前线程池中运行但是没有执行任务的线程
下面以一个还算合适的例子说明下这个流程
任务类
public class Task implements Runnable {
private int no;
private int size;
public Task(int no, int size) {
this.no = no;
this.size = size;
}
@Override
public void run() {
try {
System.out.println("执行中, Task: " + no + ", Thread: " + Thread.currentThread().getName() + ", queue.size: " + size);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
执行,core为2,max为5,队列长度8,重写了拒绝handler方法,直接打印一条记录;任务执行里也直接打印一条记录,然后sleep为了得到相互之间的顺序
理论上结果应该是Task0-1直接创建线程执行,Task2-9直接入队列,Task10-12创建线程执行,Task13-14被拒绝,最后复用线程池中的线程执行Task2-9;这种结果预期的原因是加了sleep,不加sleep可能都不会有拒绝信息,因为Task很快就执行完了
public class Main {
private static ExecutorService pool;
public static void main(String[] args) {
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(8);
pool = new ThreadPoolExecutor(
2,
5,
1000,
TimeUnit.MILLISECONDS,
queue,
Executors.defaultThreadFactory(),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("拒绝中, " + r.toString() + ", queue.size: " + queue.size());
}
});
for(int i = 0; i < 15; i++) {
pool.execute(new Task(i, queue.size()));
}
pool.shutdown();
}
}
执行结果如下
执行中, Task: 0, Thread: pool-1-thread-1, queue.size: 0
执行中, Task: 1, Thread: pool-1-thread-2, queue.size: 0
执行中, Task: 10, Thread: pool-1-thread-3, queue.size: 8
执行中, Task: 11, Thread: pool-1-thread-4, queue.size: 8
执行中, Task: 12, Thread: pool-1-thread-5, queue.size: 8
拒绝中, com.lihuia.code.main.Task@6e8cf4c6, queue.size: 8
拒绝中, com.lihuia.code.main.Task@12edcd21, queue.size: 8
执行中, Task: 2, Thread: pool-1-thread-3, queue.size: 0
执行中, Task: 6, Thread: pool-1-thread-2, queue.size: 4
执行中, Task: 5, Thread: pool-1-thread-5, queue.size: 3
执行中, Task: 4, Thread: pool-1-thread-1, queue.size: 2
执行中, Task: 3, Thread: pool-1-thread-4, queue.size: 1
执行中, Task: 7, Thread: pool-1-thread-3, queue.size: 5
执行中, Task: 9, Thread: pool-1-thread-4, queue.size: 7
执行中, Task: 8, Thread: pool-1-thread-1, queue.size: 6
这个结果其实有很多可以注意的点
1、Task0-1最先起了两个线程,因为queue都是空 2、Task2-9入队列,因为虽然程序里它们是最后执行打印,但是queue.size当参数传进去的,任务开始执行的时候计算,而且从队列长度++可以看出,Task2-9来一个,进队列一个 3、队列塞满了,maxPool为5,因此还可以新建3个工作线程执行任务,因此Task10-12也开始执行,这样maxPool线程池就占满了 4、maxPool满了,因此Task13-14就会被拒绝,打印,可以看到队列一直是满的 5、由于sleep的作用,此时线程池里的线程都已经执行完了上面的Task,最后队列里的Task2-9就直接复用当前运行的线程来完成执行,可以根据pool-1-thread-x这个字段看出来
除了这些内容,还有线程池生命周期管理,阻塞队列,拒绝策略,任务调度管理,worker管理等,网上都能搜到比较详细的内容
Spring里的ThreadPoolTaskExecutor实际上就是ThreadPoolExecutor封装了一层,将它和ThreadPoolExecutor和相关属性封装起来,然后通过暴露这些属性,通过BEAN注入到容器当中,依赖注入管理起来
可以先看下ThreadPoolTaskExecutor的定义
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
private final Object poolSizeMonitor = new Object();
private int corePoolSize = 1;
private int maxPoolSize = 2147483647;
private int keepAliveSeconds = 60;
private int queueCapacity = 2147483647;
private boolean allowCoreThreadTimeOut = false;
@Nullable
private TaskDecorator taskDecorator;
@Nullable
private ThreadPoolExecutor threadPoolExecutor;
private final Map<Runnable, Object> decoratedTaskMap;
很明显,字段和ThreadPoolExecutor一模一样,因此很容易进行Bean注入
比如这里通过Java Config的方式,XML也是类似,配置类如下,和上面例子数据一致好了
@Configuration
public class TaskExecutorConfig {
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(8);
executor.setKeepAliveSeconds(1);
executor.setThreadNamePrefix("task-thread-");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(5);
executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("拒绝中: " + r.toString());
}
});
executor.initialize();
return executor;
}
}
这里多注入了两个额外的属性setWaitForTasksToCompleteOnShutdown和setAwaitTerminationSeconds,这两个属性是关闭线程池的时候触发的,setWaitForTasksToCompleteOnShutdown会等待所有线程执行完,setAwaitTerminationSeconds指定具体等待的时间,可是ThreadPoolExecutor在shutdown的时候,并不需要指定这些,原因是ThreadPoolTaskExecutor在shutdown的时候,修改了逻辑,并不是直接调用ThreadPoolExecutor的shutdown,源码如下
路径如下:org.springframework.scheduling.concurrent.ExecutorConfigurationSupport#shutdown
public void shutdown() {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Shutting down ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
}
if (this.executor != null) {
if (this.waitForTasksToCompleteOnShutdown) {
this.executor.shutdown();
} else {
Iterator var1 = this.executor.shutdownNow().iterator();
while(var1.hasNext()) {
Runnable remainingTask = (Runnable)var1.next();
this.cancelRemainingTask(remainingTask);
}
}
this.awaitTerminationIfNecessary(this.executor);
}
}
这里的业务逻辑是:
如果waitForTasksToCompleteOnShutdown为true,才会调用ThreadPoolExecutor的shutdown()
如果waitForTasksToCompleteOnShutdown为false,调用的是ThreadPoolExecutor的shutdownNow(),结束为完成的任务
而我现在需要最终执行完阻塞队列里的任务,因此需要添加这两个属性
最后再看下executor.initialize()方法,最终调用的是org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor#initializeExecutor
protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
BlockingQueue<Runnable> queue = this.createQueue(this.queueCapacity);
ThreadPoolExecutor executor;
if (this.taskDecorator != null) {
executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) {
public void execute(Runnable command) {
Runnable decorated = ThreadPoolTaskExecutor.this.taskDecorator.decorate(command);
if (decorated != command) {
ThreadPoolTaskExecutor.this.decoratedTaskMap.put(decorated, command);
}
super.execute(decorated);
}
};
} else {
executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler);
}
if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}
this.threadPoolExecutor = executor;
return executor;
}
有了前面ThreadPoolExecutor的了解,这里实际上就是将各个属性来初始化ThreadPoolTaskExecutor里的ThreadPoolExecutor,这样就完成了这个线程池的依赖注入
测试类保持和前面例子一样的调用,执行15个任务,Task类和上面一样,只不过没打印queue的size
@ContextConfiguration(classes = {TaskExecutorConfig.class})
public class TaskTest extends AbstractTestNGSpringContextTests {
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
@Test
public void testTask() {
for (int i = 0; i < 15; i++) {
taskExecutor.execute(new Task(i));
}
taskExecutor.shutdown();
}
}
这里调用的shutdown()方法,并不是JDK里的,而是Spring里的,上面已经说明了,由于加了两个属性,因此这里最终调用的还是JDK里的shutdown()方法
再来看一下执行结果
执行中, Task: 0, Thread: task-thread-1
执行中, Task: 1, Thread: task-thread-2
执行中, Task: 10, Thread: task-thread-3
执行中, Task: 11, Thread: task-thread-4
执行中, Task: 12, Thread: task-thread-5
拒绝中: com.lihuia.code.pool.Task@2d0399f4
拒绝中: com.lihuia.code.pool.Task@14dd7b39
执行中, Task: 2, Thread: task-thread-1
执行中, Task: 3, Thread: task-thread-2
执行中, Task: 4, Thread: task-thread-3
执行中, Task: 5, Thread: task-thread-4
执行中, Task: 6, Thread: task-thread-5
执行中, Task: 7, Thread: task-thread-1
执行中, Task: 8, Thread: task-thread-2
执行中, Task: 9, Thread: task-thread-4
和前面的分析也一致
最后看下Spring里的异步线程池,基于上面ThreadPoolExecutor配置的基础上,只需要开启异步操作,然后注解类或者方法,指定线程池就可以完成异步线程池操作
具体首先还是配置线程池,还是用的spring的ThreadPoolTaskExecutor,同时添加@EnableAsync注解开启异步功能,当然可以放在启动类上
@Slf4j
@EnableAsync
@Configuration
public class TaskExecutorConfig {
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(8);
executor.setKeepAliveSeconds(1);
executor.setThreadNamePrefix("task-thread-");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(5);
executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.info("拒绝中: " + r.toString());
}
});
executor.initialize();
return executor;
}
}
异步执行的方法就随便写一个了,这里是一个无返回的方法
这里要注意的是,@Async注解异步执行的方法, 指定了线程池,也就是配置的Bean name,因此通过依赖查找就找到了配置的线程池,当然这里bean name也可以不指定,因为后面会显式地指定线程池配置类信息
@Slf4j
public class Hello {
@Async("taskExecutor")
public void sayHello(String s) {
log.info("Hello {}", s);
}
}
最后就是测试类,这里通过@ContextConfiguration显式注入需要用到的bean,然后调用异步方法后,sleep一下避免主线程直接结束
@Slf4j
@ContextConfiguration(classes = {TaskExecutorConfig.class, Hello.class})
public class TaskTest extends AbstractTestNGSpringContextTests {
@Autowired
private Hello hello;
private void sleep() {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void testTask() {
hello.sayHello("lihuia.com");
sleep();
}
}
执行的结果是
[task-thread-1] INFO com.lihuia.code.pool.Hello - Hello lihuia.com
从线程的name可以看出,异步线程是来自配置的ThreadPoolTaskExecutor类中,这里有一点,假如将注解里TaskExecutorConfig.class去掉
[main] INFO com.lihuia.code.pool.Hello - Hello lihuia.com
也就是这会就没有用自定义配置的线程池,要注意的是如果没有自定义,spring会搜索TaskExecutor类型的bean或者名称为taskExecutor的Executor类型的bean,如果也不存在,就会用SimpleAsyncTaskExecutor,但这玩意根本就不用线程池,每次都新起一个新线程,因此不能用,容易OOM
通常调用异步方法,都是无返回值的执行,假如要获取返回值,就只有借助Future接口,它可以对具体任务的执行结果进行处理和查询,大致有三种功能
1、判断任务是否完成 2、中断任务 3、获取任务执行结果
比如要获取异步线程池执行任务的结果
@Slf4j
@Component
public class Task{
private void sleep(int t) {
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Async("taskExecutor")
public Future<String> doTask(String s) {
sleep(5);
return new AsyncResult<>("Hello " + s);
}
}
异步方法同样指定线程池添加@Async注解,方法返回一个泛型String,添加一个sleep 5秒
测试类
@Test
public void testFuture() throws InterruptedException, ExecutionException {
Future<String> future = task.doTask("lihuia.com");
log.info("Waiting for result ...");
String result = future.get();
log.info(result);
}
获取异步方法结果之前加一行日志,目的是为了确认获取返回结果这个过程
21:04:40.800 [main] INFO com.lihuia.pool.TaskTest - Waiting for result ... 21:04:45.812 [main] INFO com.lihuia.pool.TaskTest - Hello lihuia.com
从时间戳可以看到,结果的打印一直阻塞了5S,而这5S正好是doTask里sleep的,可见get()方法会一直阻塞,等到任务执行完毕才会返回
同样可以试试get(long timeout, TimeUnit unit)方法,指定时间来取返回结果
@Test
public void testFuture() throws InterruptedException, ExecutionException, TimeoutException {
Future<String> future = task.doTask("lihuia.com");
log.info("Waiting for result ...");
String result = future.get(4, TimeUnit.SECONDS);
log.info(result);
}
比如4S后来返回结果,可是doTask方法4S还并没有执行完,因此会抛出异常
java.util.concurrent.TimeoutException at java.util.concurrent.FutureTask.get(FutureTask.java:205) at com.lihuia.pool.TaskTest.testFuture(TaskTest.java:47) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124) at org.testng.internal.MethodInvocationHelper$1.runTestMethod(MethodInvocationHelper.java:230) at org.springframework.test.context.testng.AbstractTestNGSpringContextTests.run(AbstractTestNGSpringContextTests.java:181) at org.testng.internal.MethodInvocationHelper.invokeHookable(MethodInvocationHelper.java:242) at org.testng.internal.Invoker.invokeMethod(Invoker.java:579) at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:719) at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:989) at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125) at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109) at org.testng.TestRunner.privateRun(TestRunner.java:648) at org.testng.TestRunner.run(TestRunner.java:505) at org.testng.SuiteRunner.runTest(SuiteRunner.java:455) at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450) at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415) at org.testng.SuiteRunner.run(SuiteRunner.java:364) at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52) at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84) at org.testng.TestNG.runSuitesSequentially(TestNG.java:1208) at org.testng.TestNG.runSuitesLocally(TestNG.java:1137) at org.testng.TestNG.runSuites(TestNG.java:1049) at org.testng.TestNG.run(TestNG.java:1017) at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66) at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:109)
这样就会因为执行超时,而释放回到线程池,而不会一直阻塞占用资源
总结一下,核心还是要掌握ThreadPoolExecutor所有参数的含义,包括每个参数的所有选项的区别,ThreadPoolTaskExecutor会进一步封装,然后通过Spring依赖注入更方便用户进行线程池属性配置,异步线程池在线程池基础上管理线程进行任务执行的异步调用