登录
    Technology changes quickly but people's minds change slowly.

关于线程池的理解-来自阿里的《码出高效》

技术宅 破玉 1244次浏览 0个评论

ThreadPoolExecutor 的实现

  主要是有四个构造函数,参数最多的是七个:

// 1
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

// 2
 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
// 3
 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

// 4
 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

  1. 构造器各个参数的含义:
  corePoolSize : 核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
  maximumPoolSize :线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
  keepAliveTime :表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;

  unit: 参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:

TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小时
TimeUnit.MINUTES;           //分钟
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //纳秒

  workQueue : 一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:

ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;

ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。
threadFactory : 线程工厂,主要用来创建线程;

handler : 表示当拒绝处理任务时的策略,有以下四种取值:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 

execute 方法

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    //  返回包含线程数,以及线程状态的Integer类型数值 
    int c = ctl.get();
    // 如果工作者线程数小于核心线程数,则创建线程任务并执行
    if (workerCountOf(c) < corePoolSize) {
        // addWorker 很重要,见源码解析
        if (addWorker(command, true))
            return;
        // 如果创建失败,防止外部已经在线程池增加新的任务,重新获取一下
        c = ctl.get();
    }
    // 如果线程池处于运行状态,才执行后半句 将任务加入队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 如果线程池不是运行状态,将刚加入队列的任务移除
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 如果之前的线程已经消费完,就尝试新建一个线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 核心池和队列都已满,尝试创建新线程
    else if (!addWorker(command, false))
        // 创建失败,唤醒拒绝策略 
        reject(command);
}

发生拒绝的理 有两个

1. 线程池状态为非running 状态
2. 等待队列己满。

worker 的源码解析:

/**
 根据当前线程池的状态,检查是否可以添加新的任务线程,如果可以则创建并启动任务
 如果一切正常则返回true,返回false的可能性如下:
 1. 线程池没有处于running  状态
 2. 线程工厂创建新的任务线程失败
 first task  外部启动线程构建的第一个线程,它是线程的母体
 core 新增工作线程时的判断指标,解释如下:
   true: 表示新增工作线程时的,需要判断当前 RUNNING 状态的线程是否少于 corepoolSize
   false: 表示新增工作线程时的,需要判断当前 RUNNING 状态的线程是否少于 maximumPoolSize
**/
private boolean addWorker(Runnable firstTask, boolean core) {
    // 不需要自定义任务的语法标签,响应上下文的 continue retry 快速退出多层嵌套循环
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            // 如果超过最大线程允许的线程数则不能添加新的线程
            // 最大线程数不能超过 2^29 否则影响左边三位的线程池状态值
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 将当前活动线程数加 1 (第3处)
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 线程池状态和工作线程数是可变化的,需要经常提取这个最新的值
            c = ctl.get();  // Re-read ctl
            // 如果已经关闭,再从retry 标签进入
            if (runStateOf(c) != rs)
                continue retry;
            // 如果线程还是处于RUNNing 状态,那就说明仅仅是在 第3处 失败
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    // 开始创建工作线程
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 利用Woker 构造方法创建线程,并封装成工作线程Woker 对象
        w = new Worker(firstTask);
        // 注意这是woker中的属性对象 thread
        final Thread t = w.thread;
        if (t != null) {
            // 在进行ThreadPoolExecutor 的敏感操作时
            // 都需要持有主锁、避免在添加和启动线程时被干扰
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
				//当线程状态为Running或shutdown
                // 且 firstTask 初始线程为空时
                if (rs  largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 终于看到start方法
                // 注意,并非线程池的execute的command参数指向的线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            // 线程启动失败,把刚才添加的工作线程计数再减回去
            addWorkerFailed(w);
    }
    return workerStarted;
}

总结

  总结一下,使用线程池要注意以下几点:

  1. 合理设置各类参数,应根据实际业务场景来设置合理的工作线程数
  2. 线程资源必须通过线程池提供,不允许在应用中自行显示创建线程
  3. 创建线程或者线程池时指定有意义的线程名称,方便出错时回溯

线程池不想允许使用Executors,而是通过ThreadPoolExecutor的方式创建,这样的处理方式能更加明确线程池的运行规则,规避资源耗尽的风险。


华裳绕指柔, 版权所有丨如未注明 , 均为原创|转载请注明关于线程池的理解-来自阿里的《码出高效》
喜欢 (1)
发表我的评论
取消评论
表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址