`

java线程池--ThreadPoolExecutor

阅读更多

前言

 

前面几篇文章分别总结了AQS,以及基于AQS实现的几个APICountDownLatchSemaphoreReentrantLock。本篇接着讲解另一个基于AQS的实现:java线程池ThreadPoolExecutor,另外还有一个基于AQS实现的API 读写锁ReentrantReadWriteLock放到下次讲解。

 

java线程池是在日常开发中使用频率较高的一门技术,我们平时的使用一般是使用Executors4类静态方法创建:

1newFixedThreadPool(int nThreads):创建固定线程数的线程池,使用无界队列LinkedBlockingQueue,对应的还有一个带工厂参数的静态方法ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)

 

2newCachedThreadPool():创建缓存线程池,线程数量没有限制,使用SynchronousQueue队列(没有容量,每个 put 必须等待一个 take),对应的还有一个带工厂参数的静态方法newCachedThreadPool(ThreadFactory threadFactory)

 

3newScheduledThreadPool(int corePoolSize):创建可以延迟执行或者以一定频率执行的线程池,可以取代Timer。使用DelayedWorkQueue,线程数量也没有限制(只限制了核心线程数)。对应的还有一个带工厂参数的静态方法newCachedThreadPool(ThreadFactory threadFactory)

 

4newSingleThreadExecutor():创建但线程的线程池,使用无界队列LinkedBlockingQueue对应的还有一个带工厂参数的静态方法newSingleThreadExecutor(ThreadFactory threadFactory)。另外还可以通过newSingleThreadScheduledExecutor()newSingleThreadScheduledExecutor(ThreadFactory threadFactory)创建单线程版本的延迟执行或者以一定频率执行的线程池,第三,四种方式的结合体。

 

不管采用上述哪种方法,这些方法最终会创建ThreadPoolExecutor实例。另外springThreadPoolTaskExecutor本质上也是使用的ThreadPoolExecutor,只是把配置参数抽取出来 方便在xml中配置而已。同样的我们也可以根据ThreadPoolExecutor的构造方法,根据业务需要创建自定义的线程池,这就必须了解ThreadPoolExecutor的基本构成。

 

如果要想熟悉java线程池的基本原理,就必须熟悉ThreadPoolExecutor的实现原理。

 

ThreadPoolExecutor实现原理

 

构造方法

首先来看ThreadPoolExecutor的构造方法,ThreadPoolExecutor4个重载的构造方法,但最终本质上都是调用的同一个,我们只用分析这一个构造方法就行:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
}

 

该构造方法,一共有7个参数,分别一个一个来看:

1corePoolSize:核心线程数,即线程池需要维持的最少线程数。当线程池刚创建时,线程数为0(也可以指定初始化,调用prestartAllCoreThreads方法),向线程池添加任务时,会直接创建线程进行处理,线程个数达到核心线程数。

2maximumPoolSize:线程池运行创建的最大线程数,需要大于等于corePoolSize

3keepAliveTime:当线程数大于corePoolSize,并且有部分线程已经空闲,空闲超过keepAliveTime时长,就会消耗该线程。

4unitkeepAliveTime的时间单位。

5workQueue:任务队列(BlockingQueue类型的阻塞队列),当线程数等于corePoolSize时,再新增任务这时不会新增线程,而是先放到任务队列进行缓冲。当任务队列满时,就会创建新的线程,直达线程数到达允许的最大值maximumPoolSize,如果还有新任务进来,就进入“决绝策略”环节,详见第7个参数handler

6threadFactory:创建线程时使用的工厂实例,要求必须实现ThreadFactory接口,一般用于自定义线程名称、指定优先级等。默认实现是DefaultThreadFactory,可以仿照这个类实现自己的自定义工厂类。

7handler:拒绝策略,当线程数到达maximumPoolSize,并且workQueue任务队列已满的情况下,如果还有新任务加入,就会执行“拒绝策略”。要求必须实现RejectedExecutionHandler接口, ThreadPoolExecutor提供了四个默认实现可以选择:

AbortPolicy:直接抛出RejectedExecutionException异常给提交任务线程;

CallerRunsPolicy:交给提交任务线程执行,即直接执行任务的run方法;

DiscardOldestPolicy:线程池放弃队列中最老的任务,并把这个新任务加入队列;

DiscardPolicy:直接放弃任务,并且不返回异常。

有个默认的构造方法使用的就是AbortPolicy--抛出异常策略。另外 使用Executors创建的线程池都是使用的这个默认策略。

当然,也可以自己实现RejectedExecutionHandler接口,定义自己的拒绝策略

 

到这里构造方法讲解完毕,熟悉了构造方法,相信也就可以实现任意的自定义线程池了。

 

execute提交任务方法

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        //当前线程数小于核心线程数,直接调用addWorker创建核心线程
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //如果线程已经关闭,放弃任务,并执行拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果无法加入队列(队列已满),并且也没办法创建新线程(线程到达最大值),执行决绝策略
        //如果无法加入队列(队列已满),但还可以创建新线程,就创建普通线程(非核心线程)
        else if (!addWorker(command, false))
            reject(command);
}

上面已经对构造方法的参数进行过讲解,结合给出的注释 再来看这个提交任务方法就比较好理解了。这里不再累述,主要需要提出讲的是添加线程方法addWorker(Runnable firstTask, boolean core),第二参数判断是否创建核心线程(非核心线程有可能会被回收),这个方法的主要作用就是实例化一个WorkerThreadPoolExecutor的内部类),并启动Worker中的线程。下面重点讲下Worker类的实现,因为它就是对AQS的实现。

 

WorkerAQS的实现

Worker继承AQS,主要实现了一个排它锁,有人会问为啥不直接用ReentrantLockWorker使用这个锁从队列中获取任务时加锁,每个线程同时只执行一个任务,不需要重入锁(如果是重入锁,就有可能同一个线程同时执行多个任务),这部分内容在runWorker方法中会讲到。首先来看下Worker的实现:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
{
    private static final long serialVersionUID = 6138294804551838833L;
 
    //Worker种的线程
    final Thread thread;
    //线程的初始任务
    Runnable firstTask;
    //该Worker已完成的任务数
    volatile long completedTasks;
   
    //构造方法
    Worker(Runnable firstTask) {
        // 为AQS的state赋值,在执行runWorker之前都是阻塞状态
        setState(-1);
        //初始任务
        this.firstTask = firstTask;
        //调用工厂实例,创建线程
        this.thread = getThreadFactory().newThread(this);
    }
 
    //启动Worker,runWorker中有个死循环 不停的从任务队列中获取任务执行
    public void run() {
        runWorker(this);
    }
   
    //0表示锁可用,1表示已被占用
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
 
    //排它获取锁实现,是非公平实现,主要为了保证性能
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
 
    //释放锁实现
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
 
    //阻塞获取锁
    public void lock()        { acquire(1); }
    //非阻塞尝试获取锁
    public boolean tryLock()  { return tryAcquire(1); }
    //释放锁
    public void unlock()      { release(1); }
    //锁是否被占用
    public boolean isLocked() { return isHeldExclusively(); }
 
    //中断线程方法(注意不是中断获取锁)
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}
 

 

重要的地方都加了注释,结合注释应该可以很好的理解Worker的实现。需要说明的Worker还实现了Runnable接口,在addWorker方法中会启动Worker并执行其run方法,也就会接着执行runWorker方法,该方法会循环从任务队列中获取任务并执行:

 
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 释放锁,记得初始化的时候AQS的state被设置为-1吗?在这里释放
        w.unlock();
        boolean completedAbruptly = true;
        try {
            //不停的获取任务执行
            while (task != null || (task = getTask()) != null) {
                w.lock();//加锁
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                //这一段都是判断是否需要中断线程
                if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                                runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //执行任务前,需要执行的操作,这个可以留给用户自定义
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();//执行任务操作
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //任务执行完成,需要执行的操作,这个可以留给用户自定义
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    //对worker已完成的任务数加1
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //Worker结束时 需要执行的相关清理工作
            processWorkerExit(w, completedAbruptly);
        }
}
 

 

可以看到runWorker方法是一个死循环,只有当Worker线程被外部中断时才会结束。在获取任务执行的过程中需要加锁,这个锁就是Worker中使用AQS实现的锁。或许会有人困惑为什么需要加锁,明明这里只有Worker一个线程在执行(每个Worker实例都有一个私有的锁),这是因为在外部有可能会在其他线程中调用ThreadPoolExecutor的其他方法中断Worker,为了保证任务完整的被执行,必须在获得锁的情况下才能中断Worker,这些方法有两个:interruptIdleWorkers(会shutdown,以及回收空闲Worker线程时等方法调用)、interruptWorkers(会被shutdownNow方法调用)

 

beforeExecuteafterExecute方法

这两个方法是在具体执行某一个任务之前和之后调用,这个两个方法是空方法:

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }

 

并且这个这两个方法都是protected,也就是说专门留给ThreadPoolExecutor的子类实现的。当我们需要在执行每个任务的前后打印一些日志,或者精确的计算每个任务的执行时间,就可以继承ThreadPoolExecutor实现这个两个方法,创建自己的线程池实现。

 

shutdown平滑关闭线程池方法

先来看该方法的实现代码,再说明为什么是平滑关闭线程:

   

 public void shutdown() {
        //主锁,所有Worker共用,注意与Worker中的锁区分开
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();//检查权限
            //设置线程池状态,之后添加的任务直接执行“拒绝策略”
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();//首先中断空闲的线程
            //关闭线程池后,触发的操作,默认是空,预留给子类实现
            onShutdown();
        } finally {
            mainLock.unlock();
        }
        //这个方法的主要作用就是等待Worker把任务队列中的任务执行完成,
        //然后当Worker空闲时,调用interruptIdleWorkers()方法中断线程。
        tryTerminate();
}

 

结合给出的注释应该就很好理解什么是平滑关闭了,首先把线程池状态置为关闭,后续再往队列中添加任务会直接决绝;然后调用interruptIdleWorkers()方法清理所有已经空闲的Worker;最后通过tryTerminate()方法等待Worker把任务队列中的任务执行完成,待Worker状态变为空闲时,在调用interruptIdleWorkers()方法中断Worker线程。其中onShutdown()方法默认是空的,可以在自定义的ThreadPoolExecutor中去实现,比如在线程池关闭时加写日志。另外再重点看下interruptIdleWorkers()方法:

private void interruptIdleWorkers(boolean onlyOne) {
        //主锁,所有Worker公用
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                //尝试获取Worker自己的锁,如果能获取到 说明该Worker已经空闲
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        //中断Worker线程
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

 

方法的实现逻辑很简单,只是需要注意这里用到两个锁,一个是线程池的主锁使用,所用Worker都公用这一个锁;另外一个是Worker自己的锁是通过AQS自己实现的,主要用于判断线程是否空闲,如果空闲就执行中断操作。

 

可以看到shutdown方法的整个实现过程会保证所有已经放入任务队列中的任务最终都被执行,是平滑的关闭线程池方法。

 

shutdownNow暴力关闭方法

相对于shutdown平滑关闭方法,线程池还提供暴力关闭方法shutdownNow:

    

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            //返回任务队列中还没有执行的任务列表
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        //如果任务已经在执行中,会保证任务执行完成后,才中断线程
        tryTerminate();
        return tasks;
    }

 

shutdownNowshutdown方法唯一不同的地方就是通过调用drainQueue()方法,返回给调用方任务队列中还没有执行的任务类别,调用方可以把这些任务保存起来,方便后续处理。同时也会保证已经在执行过程中的任务继续执行完成后,才中断线程。所以shutdownNow虽然暴力,但还是能保证任务状态的一致性(即不存在任务只被执行了一半的状态),同时返回还没有执行的任务列表,为后续恢复执行创造了条件。

 

由于通过interrupt方法中断线程只是打一个标记,线程真正被中断还需要一定的时间。也就是说执行完shutdownNowshutdown方法后线程池不一定马上就中断了,这时可以通过isTerminated()方法来判断线程池是不是已经彻底关闭。

 

至此ThreadPoolExecutor的核心实现讲解完毕。

 

总结

 

我们平时基本都是使用Executors框架来创建线程池,通过本文的总结,相信大家都可以通过ThreadPoolExecutor创建出自己的线程池,必要时还可以继承ThreadPoolExecutor对它进行扩展。

 

 

 

 

0
0
分享到:
评论

相关推荐

    java线程池ThreadPoolExecutor类使用详解.docx

    在《阿里巴巴java开发手册》中指出了线程资源必须通过线程池提供,不允许在应用中自行显示的创建线程,这样一方面是线程的创建更加规范,可以合理控制开辟线程的数量;另一方面线程的细节管理交给线程池处理,优化了...

    12、线程池ThreadPoolExecutor实战及其原理分析(下)

    线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor...

    Java线程池文档

    Reference: 《创建Java线程池》[1],《Java线程:新特征-线程池》[2], 《Java线程池学习》[3],《线程池ThreadPoolExecutor使用简介》[4],《Java5中的线程池实例讲解》[5],《ThreadPoolExecutor使用和思考》[6] ...

    java线程池使用后到底要关闭吗

    主要给大家介绍了关于java线程池使用后到底要不要关闭的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

    线程池原理-ThreadPoolExecutor源码解析

    线程池原理-ThreadPoolExecutor源码解析 1.构造方法及参数 2.阻塞对列: BlockingQueue 3.线程工厂: DefaultThreadFactory 4.拒绝策略: RejectedExecutionHandler 5.执行线程 Executor

    徒手实现线程池-1

    目的是是简化并发编程 ExecutorService(执行器)Java两种基础线程池普通:ThreadPoolExecutor定时ScheduledThread

    11-线程池ThreadPoolExecutor底层原理源码分析(上)-周瑜.pdf

    11-线程池 ThreadPoolExecutor 底层原理源码分析(上)-周瑜.pdf 12-线程池 ThreadPoolExecutor底层原理源码分析(下)-周瑜.pdf 13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、...

    12-线程池ThreadPoolExecutor底层原理源码分析(下)-周瑜.pdf

    11-线程池 ThreadPoolExecutor 底层原理源码分析(上)-周瑜.pdf 12-线程池 ThreadPoolExecutor底层原理源码分析(下)-周瑜.pdf 13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、...

    java 线程池例子ThreadPoolExecutor

    一个关于java 线程池的例子,也适合android

    Java线程池与ThreadPoolExecutor.pdf

    ——学习参考资料:仅用于个人学习使用! 本代码仅作学习交流,切勿用于商业用途,否则后果自负。若涉及侵权,请联系,会尽快处理! 未进行详尽测试,请自行调试!

    死磕ThreadPoolExecutor线程池.pdf

    死磕ThreadPoolExecutor线程池.pdf!!死磕ThreadPoolExecutor线程池.pdf死磕ThreadPoolExecutor线程池.pdf死磕ThreadPoolExecutor线程池.pdf

    Java ThreadPoolExecutor 线程池的使用介绍

    提供工厂方法来创建不同类型的线程池,这篇文章主要介绍了Java ThreadPoolExecutor 线程池的使用介绍,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来...

    java线程池源码-cThreadPool:JAVA线程池源码分析与重写

    java线程池源码 cThreadPool 项目描述:对java.util.concurrent包下线程池相关源码进行重新实现,深入研究和学习线程池超时机制、饱和策略、生命周期等知识 ThreadPoolExecutor类下部分方法和内部类介绍: 1、Worker...

    Java多线程Executors批量执行数据实现限流

    java线程池Executors实现数据批量操作。 批量异步Executors处理数据,实现限流操作,QPS限流。 线程池调用第三方接口限流实现逻辑。 案例适合: 1.批量处理大数据。 2.数据批量导出。 3任务数据异步执行。 4.多线程...

    线程池ThreadPoolExecutor原理源码分析.md

    Java,线程池,ThreadPoolExecutor

    Java线程池使用说明

    Java线程池使用说明: 一 简介 二:线程池 三:ThreadPoolExecutor详解

    Java线程池ThreadPoolExecutor原理及使用实例

    主要介绍了Java线程池ThreadPoolExecutor原理及使用实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

    深入理解Java线程池:ThreadPoolExecutor _ Idea Buffer1

    1. RUNNING :能接受新提交的任务,并且也能处理阻塞队列中的任务 2. SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保

    线程池:java_ThreadPoolExecutor.mht

    (转)线程池:java_util_ThreadPoolExecutor 比较详细的介绍了ThreadPoolExecutor用法与属性

    ThreadPoolExecutor线程池

    ThreadPoolExecutor线程池

Global site tag (gtag.js) - Google Analytics