Java线程池ThreadPoolExecutor实现原理
ThreadPoolExecutor的实现原理首先来思考一个问题对于一个线程池他的主要任务是缓存核心线程当一个新任务通过execute()方法提交时我们需要使用线程去执行它这里就会产生一个问题Thread 对象一旦执行完 run() 方法就会销毁。实现复用机制的核心就在于维持一个Thread对象run状态简单来说就是在run方法当中定一个while循环再循环里面不断尝试获取任务执行。在ThreadPoolExecutor当中定义了一个内部类Worker改类继承于AQS和Runnable并重写了run方法在run方法内部调用runWorker方法该方法就是实现维持线程run状态的核心机制同时保证了阻塞队列空闲时对线程的阻塞功能。privatefinalclassWorkerextendsAbstractQueuedSynchronizerimplementsRunnable{}finalvoidrunWorker(Workerw){ThreadwtThread.currentThread();Runnabletaskw.firstTask;w.firstTasknull;w.unlock();// allow interruptsbooleancompletedAbruptlytrue;try{while(task!null||(taskgetTask())!null){w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted()runStateAtLeast(ctl.get(),STOP)))!wt.isInterrupted())wt.interrupt();try{beforeExecute(wt,task);try{task.run();afterExecute(task,null);}catch(Throwableex){afterExecute(task,ex);throwex;}}finally{tasknull;w.completedTasks;w.unlock();}}completedAbruptlyfalse;}finally{processWorkerExit(w,completedAbruptly);}}在runWorker方法当中会传入一个Worker w对象也就是当前执行线程重点在while的判断条件当中Runnabletaskw.firstTask;while(task!null||(taskgetTask())!null)在这里的task指的是在创建核心线程的时候调用execute(task)方法时会传入一个task任务此时如果核心线程未满会创建核心线程同时将这个task任务传入从而避免先加入队列再取出的重复操作。状态与数量管理getTask逻辑与ctl变量getTask()是从阻塞队列取出任务以及在队列空闲时阻塞线程的核心操作逻辑。privatefinalAtomicIntegerctlnewAtomicInteger(ctlOf(RUNNING,0));privateRunnablegetTask(){// 当前线程在上一轮当中执行是否超时booleantimedOutfalse;// Did the last poll() time out?for(;;){intcctl.get();// Check if queue empty only if necessary.// 判断是否要终止线程if(runStateAtLeast(c,SHUTDOWN)(runStateAtLeast(c,STOP)||workQueue.isEmpty())){decrementWorkerCount();returnnull;}// 从合并的 ctl 变量中剥离出当前的线程数量intwcworkerCountOf(c);// Are workers subject to culling?// 确定当前这个线程是否为临时线程/核心线程允许回收booleantimedallowCoreThreadTimeOut||wccorePoolSize;// 对于临时线程且超过存活时间进行清理if((wcmaximumPoolSize||(timedtimedOut))(wc1||workQueue.isEmpty())){if(compareAndDecrementWorkerCount(c))returnnull;continue;}try{// 在从阻塞队列获取Runnable的过程当中有两种模式// timed true 使用poll如果队列为空等待keepAliveTime时间如果没获取到返回null// timed false 使用take如果队列为空阻塞当前线程Runnablertimed?workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS):workQueue.take();if(r!null)returnr;timedOuttrue;}catch(InterruptedExceptionretry){timedOutfalse;}}}在这里有一个非常有意思的设计我们来想一下作为一个线程池来说实际上是需要维护线程池等状态的比如说当前线程池正在运行当中线程池关闭了等状态含义任务处理规则RUNNING正常营业接收新任务并处理阻塞队列中的任务。SHUTDOWN准备打烊不再接收新任务但会继续把阻塞队列里已有的任务执行完。STOP强制停业不接新任务也不再处理队列任务并尝试中断正在执行的任务。TIDYING清理中所有任务已结束线程数为 0正在调用terminated()钩子方法。TERMINATED彻底倒闭terminated()执行完毕线程池彻底终结。除此之外还需要提供一个计数器用来维护当前线程数量也就是说在同一次操作当中我们如果同时修改了上述两者的状态就必须要保证它们的原子性。intcctl.get();对于这种原子性的实现方法最直接的就是直接加重量级锁synchronized 或 ReentrantLock但是这种方式必然会带来额外的性能开销因此线程池引入了一个AtomicInteger类型的变量ctl使用它的前三位来表示线程池状态后29位来标识当前的线程数量这样就能够将两次修改的操作合并为一次通过AtomicInteger的CAS操作来保证原子性。线程获取与回收核心线程与临时线程的处理之后就是对于线程获取任务以及队列为空时核心线程以及临时线程的处理方案了核心体现在下面的代码当中。整体的getTask方法是在一个无限循环当中执行前一次循环的结果可以通过变量传递给下一次循环当中这里重点看最后对于获取任务的逻辑。try{// 在从阻塞队列获取Runnable的过程当中有两种模式// timed true 使用poll如果队列为空等待keepAliveTime时间如果没获取到返回null// timed false 使用take如果队列为空阻塞当前线程Runnablertimed?workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS):workQueue.take();if(r!null)returnr;timedOuttrue;}catch(InterruptedExceptionretry){timedOutfalse;}可以看到线程池的处理逻辑是通过timed一个boolean类型的变量判断当前线程是否为核心线程如果是核心线程则执行take方法该方法是从队列当中取任务如果队列为空则无限期阻塞。第二种是对于临时线程的处理方案会执行poll方法在poll方法当中也会判断队列是否为空如果为空的话等待keepAliveTime时间超时之后返回null。最终在结果收集的阶段如果拿到的任务不为空则直接返回出去如果为空则说明只有一种情况当前线程为临时线程且已超时修改timedOut为true在下一轮循环当中结束该线程。阻塞队列底层机制基于Condition的等待与唤醒对于阻塞队列的实现这里以ArrayBlockingQueue为例对于ArrayBlockingQueue实现的阻塞队列的阻塞逻辑是通过一下三个变量。// 互斥锁finalReentrantLocklock;// 阻塞消费者privatefinalConditionnotEmpty;// 阻塞生产者privatefinalConditionnotFull;阻塞队列本质上是一个队列结构队列有容量大小当队列存满时此时还有生产者尝试添加任务就会通过notFull阻塞生产者等待队列有剩余空间时再唤醒。privateEdequeue(){// ... 取出数据的逻辑 ...count--;// 库存减 1notFull.signal();// 核心动作对着生产者休息室喊“有空位了”returnx;}对于消费者来说当队列为空时消费者尝试获取队列任务此时就会对消费者进行阻塞take的实现逻辑take在尝试获取任务之前会先拿到ReentrantLock锁加锁之后判断当前队列是否为空不为空直接返回为空则通过Condition阻塞当前线程直到队列当中有新任务时主动唤醒线程处理。publicEtake()throwsInterruptedException{finalReentrantLocklockthis.lock;// 加锁lock.lockInterruptibly();try{// 当队列为空时调用Condition阻塞线程while(count0)notEmpty.await();returndequeue();}finally{lock.unlock();}}poll的实现原理publicEpoll(longtimeout,TimeUnitunit)throwsInterruptedException{// 转化为纳秒longnanosunit.toNanos(timeout);finalReentrantLocklockthis.lock;lock.lockInterruptibly();try{// while循环的作用是记录上一轮循环结果如果当前线程在上一轮循环当中被唤醒且队列当中有任务while(count0){if(nanos0L)returnnull;// 使用Condition的定时休眠功能能够让线程在执行休眠时间后被唤醒nanosnotEmpty.awaitNanos(nanos);}returndequeue();}finally{// 获取节点lock.unlock();}}poll在take的基础上当队列为空时会通过nanos notEmpty.awaitNanos(nanos)方法阻塞一定时间临时线程存活时间在这个期间线程如果被唤醒则在下一次循环当中判断队列是否有任务有则取出执行没有则继续阻塞直到超时中断线程。