h

hitechr

V1

2022/07/26阅读:9主题:极客黑

【一知半解】线程池

线程池

什么是线程池?

线程的创建、使用和销毁都需要额外的开销,有时创建和销毁的耗时都要比使用的时间要长,为了提高线程的使用效率,在使用线程前创建一批线程缓存起来,当使用的时候在缓存的线程中拿出一个空闲的线程使用即可。这种基于缓存的池化思想管理的线程,就是线程池。

线程池中总会保存几个活跃的线程,当使用的时候在池中取出处于空闲状态的线程使用,使用完毕后不再销毁,而是放回到池中,供下个任务使用。

线程池的好使

  1. 避免了创建和销毁线程时系统切换的代价
  2. 避免了线程数量无量膨胀导致的过分调度的问题
  3. 方便对线程的统一管理

Executor

继承关系图

ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

参数

  • corePoolSize 核心线程数
  • maximumPoolSize 最大线程数
  • keeyAliveTime 空闲线程存活时间
  • workQueue 等待队列列表
  • threadFactory 线程工厂,默认为Executors.defaultThreadFactory()
  • RejectedExecutionHandler 拒绝策略,默认AbortPolicy忽略

字段

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private final BlockingQueue<Runnable> workQueue;
private final HashSet<Worker> workers = new HashSet<Worker>();
private final ReentrantLock mainLock = new ReentrantLock();
  • ctl 控制线程数量和状态的统一标识
  • workQueue 线程队列
  • workers 工作线程的集合
  • mainLock 锁,用于在操作线程集合时的并发控制

ctl

线程个数 和 线程池的状态统一到一个字段中,这个字段就是ctl,高3位表示池的状态,低29位表示线程的个数。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 线程池的5种状态
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

线程池有两个关闭的方法:shutdown()shutdownNow(),在调用这两个方法时线程池会切换到不同的状态,当队列为空、线程池为空时进入到TIDYING状态,当调用钩子方法terminated()后进行到TERMINATED状态,然后进行关闭。

在调用 shutdown()或者shutdownNow()之后,线程池并不会立即关闭 。

线程池状态

​ 线程池的状态由-1、0、1、2、3依次变大,不能逆向迁移。

任务的提交过程

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();//获取当前ctl的值
     //workerCountof 表示从ctl中取出当前线程的个数
        if (workerCountOf(c) < corePoolSize) {//如果小于corePoolSize
            if (addWorker(command, true))//添加到线程集合中
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {//添加到线程队列中
            int recheck = ctl.get();
            //如果线程池不是运行中,则把command从队列中移除
            if (! isRunning(recheck) && remove(command))
                reject(command);//并执行拒绝策略
            else if (workerCountOf(recheck) == 0)//如果运行的线程已经为0,则添加一个空的任务,主要是为了让Work从任务队列中获取并执行
                addWorker(nullfalse);
        }
        else if (!addWorker(command, false))//创建新的工作线程,标记为false
            reject(command);
    }
  1. 当往线程池中添加一个任务时,首先判断池中线程的个数是否小于corePoolSize,如果小于则直接创建一个线程并执行;如果大于corePoolSize则执行第2步
  2. 判断线程队列是否未满,如果是则加入到线程队列中;如果已满,则执行步骤3
  3. 判断是否小于最大线程数,如果小于则新建线程并执行;如果大于,则执行步骤4
  4. 根据拒绝的策略,执行不同的拒绝策略
//如果第二个参数为true,则使用corePoolSize作为上限,否则使用maxPoolSize作为上限。
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();//线程池的标识
            int rs = runStateOf(c);//run status 线程池的状态
            //如果线程池已经关闭,则直接返回
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);//获取池中线程的个数
                //如果是core=true,则上限为corePoolSize,否则就是maxPoolSize
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //通过cas操作给ctl进行增加一个线程
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();
                //如果状态与之前的状态不一样,有别的线程已经更改的池的状态
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);//创建一个新的工作线程
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    int c = ctl.get();
                    int rs = runStateOf(c);//线程状态
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // 由于线程已经在运行中,无法启动,抛异常
                            throw new IllegalThreadStateException();
                        workers.add(w);//添加到线程集合中
                        int s = workers.size();
                        //维护一个变量保存线程集合中的最大值
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {//添加成功
                    t.start();//启动线程
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

任务的执行过程

由创建的Work去执行任务,或从任务队列中获取并执行。当Work中封装的fisrtWork不为空时,则执行firstWork对应的任务,如果为空,则从任务队列中获取。

任务的开启

if (workerAdded) {
    t.start();
    workerStarted = true;
}

任务的循环执行

public void run() {
    runWorker(this);
}
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();//获取当前的线程
    Runnable task = w.firstTask;//Work封装的待执行的任务
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //如果task不为空,或者从任务队列中能取到值
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();//执行任务
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

为什么执行thread的start方法就能触发任务的获取呢?

Worker实现了Runnable接口,在创建Worker时,又把当前对象封装到Thread里面了

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
    }
}

线程什么时候回收的?

首先看下runWorker方法里调用的processWorkderExit,根据名称可知是处理线程退出的逻辑

 private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) 
            decrementWorkerCount();//工作中的线程减1

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);//从工作集合中移除
        } finally {
            mainLock.unlock();
        }

        tryTerminate();//尝试关闭
        ....
    }

根据源码可以看出来,在runworks中只有task为null时就会退出循环

while (task != null || (task = getTask()) != null) {
    ....
}

在看下getTask的方法

private Runnable getTask() {
        boolean timedOut = false//默认不会超时
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);//获取线程的状态
   //如果线程池已经关闭 或者 队列中已经没有元素了,则返回null
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();//线程数减1
                return null;
            }
            boolean timed;      // Are workers subject to culling?
            for (;;) {
                int wc = workerCountOf(c);//获取线程数
                //如果允许超时或线程个数大于核心线程数
                timed = allowCoreThreadTimeOut || wc > corePoolSize;
                if (wc <= maximumPoolSize && ! (timedOut && timed))
                    break;
                if (compareAndDecrementWorkerCount(c))//线程数减1 
                    return null;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
            }
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
  • 首先判断线程池的状态是否已经关闭,如果已经关闭,则返回null,执行runWorks方法线程会从工作集合中移除

  • 如果允许核心线程数超时 或 线程数大于核心线程数,则从队列中获取任务时调用等待超时的方法workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);否则就调用阻塞获取的方法workQueue.take()

  • 如果当前超时没有获取,则自旋的方式再次去获取

线程池的拒绝策略

  • AbortPolicy 线程池抛异常 ,线程池默认的拒绝策略
  • CallerRunsPolicy 直接在自己的线程里执行,线程池不处理
  • DiscardPolicy 线程池直接丢掉任务
  • DiscardOldestPolicy 删除队列中最早的任务,将当前任务入队列

Executors工具类

newFixedThreadPool

创建固定个数的线程池,核心线程数与最大线程数一样

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

newSingleThreadExecutor

单线程的线程池 ,核心线程数与最大线程数都是1

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(11,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

newCachedThreadPool

每接收一个请求,就创建一个线程来执行 ,核心线程数为0,最大线程数为Integer.Max_value

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

newSingleThreadScheduledExecutor

单线程,具有周期调度功能的线程池

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}

分类:

后端

标签:

Java

作者介绍

h
hitechr
V1