首页 >> 大全

线程池源码初理解--个人笔记

2023-12-21 大全 24 作者:考证青年

线程池

线程池提交的两种方式:(提交或),(提交,提交需要实现)

线程池的状态:

ctl前三位是状态信息,后29位是当前线程池线程数

(111) ()

1.如果当前线程数,少于,则

2.如果当前线程数大于或者等于,就加入阻塞队列

3.加入成功(成功说明当前是),并且再次检查当前状态为的话,就检查工作线程,工作线程为空则执行。

4.加入成功,但是当前状态不为的话,就执行,并且执行拒绝策略

5.加入阻塞队列失败,就执行非核心线程的,失败则执行拒绝策略

public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {//判断当前线程与核心线程数//小于的话worker 则addworkerif (addWorker(command, true))return;c = ctl.get();}//大于或者等于则增加进入阻塞队列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();//再次检查是否为running(可能给其他线程修改),非running就remove,成功的话执行拒绝策略if (! isRunning(recheck) && remove(command))reject(command);//加入成功,并且是running状态,检查工作线程是否为空else if (workerCountOf(recheck) == 0)//空的话则执行addworkeraddWorker(null, false);}//加入阻塞队列失败,则为非核心线程addworkerelse if (!addWorker(command, false))//失败的话执行拒绝策略reject(command);}

()

图9 申请线程执行流程图

1.判断是否可以添加线程

a.处于状态可以添加

b.处于状态,但是为null并且阻塞队列不为空

2.添加线程,利用cas添加

3.真正的添加线程逻辑,看注释

private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.//判断是否可以添加线程//1.running状态可以添加线程//2.处于shutdown状态,但是firstTask为null且阻塞队列不为空if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;//添加线程数量for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//利用cas添加,失败的话要么就是有竞争,要么就是状态给修改了。//竞争的话就再来一遍自旋,状态给修改就去外面再判断一下if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;//用线程工厂创建的threadif (t != null) {//加锁 线程池全局锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();//running或者shutdown&firstTask等于null//workers为线程池 一个全局变量workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}//如果线程池增加成功,则运行线程if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)//处理添加进入线程池失败的逻辑,将一开始自旋里面的cas加一操作减一,并且将worker清理出线程池addWorkerFailed(w);}return workerStarted;
}

调用完后会执行,的run方法

()

图11 执行任务流程

final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {//task为当前worker里面的firstTask或者从阻塞队列里面拿的taskwhile (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interrupt//检查当前线程池状态是否>=stop,是的话则执行中断//或者一开始不大于等于stop,但是后面线程池给外部线程给shutdown或者shutdownNow了,中断标志为true了//执行||后面的逻辑,再次将次线程设置为中断if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();//如果到这里都不符合中断逻辑,则执行task的run方法try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {//执行完run,就解锁并且重新拿新的tasktask = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {//1.completedAbruptly为false,说明全部task执行完毕,执行一种退出逻辑//2.completedAbruptly为默认值true,说明有异常,执行另一种退出逻辑processWorkerExit(w, completedAbruptly);}
}

()

图6 获取任务流程图

返回null的情况:1.允许回收核心线程或者线程数量大于核心线程数量,有机会返回null( timed = ut || wc > ;)

线程池源码__线程池知乎

机会就是当前大于一个或者当前只有一个并且阻塞队列里面为空,返回null

2.判断线程池状态,线程池大于状态返回null

或者线程池为状态,并且阻塞队列为空返回null

其他情况则返回

timed = ut || wc > ; 为true则执行poll(允许超时),为false则执行take(阻塞)

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
}

()

这个方法完成的就是线程退出逻辑,有两种情况

**1.**如果当前线程为异常退出状态,则将数量减一,并且重新一次

**2.**如果当前线程为正常退出状态,则记录当前完成的tasks数量并且从里面退出(需要加锁完成),并且如果此时线程池状态为或者状态,则判断当前线程池线程()数量是否小于最小值min,小于的话则,否则就

min取决于是否允许回收核心线程,允许则为1,不允许则为核心线程数

private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}tryTerminate();int c = ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);}
}

()

final void tryTerminate() {for (;;) {int c = ctl.get();//running,tidying,termination状态,直接return或者 shutdown状态但是阻塞队列不会空,直接returnif (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;//stop或者阻塞队列为空的shutdown执行到这里 判断线程数量是否为0,不为0则中断当前线程if (workerCountOf(c) != 0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}//如果为0,则说明线程池此时只剩执行到这的最后一个线程了,此线程负责将状态设为termination,并通知调用了waitTermanation的线程。final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated();} finally {ctl.set(ctlOf(TERMINATED, 0));termination.signalAll();}return;}} finally {mainLock.unlock();}// else retry on failed CAS}
}

关于我们

最火推荐

小编推荐

联系我们


版权声明:本站内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 88@qq.com 举报,一经查实,本站将立刻删除。备案号:桂ICP备2021009421号
Powered By Z-BlogPHP.
复制成功
微信号:
我知道了