`

java栅栏--CyclicBarrier

阅读更多

CyclicBarrier介绍

 

java栅栏--CyclicBarrier,直接翻译为循环屏障。其作用与前面讲的闭锁CountDownLatch有些类似,可以构造一个可循环利用的“屏障”,在一组线程执行任务到达指定位置之前 阻塞已经到达的线程,等所有线程都到达指定位置后,同时依次唤醒所有线程。

 

CountDownLatch也可以阻塞一组线程,但在使用上与CyclicBarrier有所区别。CountDownLatch可以实现阻塞一组线程等待另一组线程执行完成,这里可以有两组线程;而CyclicBarrier只是阻塞一组线程,另外它还可以重复使用,并且还允许所有线程到达指定位置后 先执行一段方法,再依次唤醒这组线程继续继续,这些都是CountDownLatch所不具备的。

 

下面还是以讲解CountDownLatch时,使用的多人准备游戏为例,简单暂时下CyclicBarrier的用法:

public class CyclicBarrierTest {
    public static void main(String[] args) {
        ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
 
        CyclicBarrier playersCounter = new CyclicBarrier(10, new Runnable() {
            @Override
            public void run() {
                System.out.println("所有玩家准备就绪,开始游戏!");
            }
        });
        System.out.println("等待玩家加入游戏");
 
        for (int i=0;i<20;i++){
            executorService.execute(new Player(playersCounter,i+""));
        }
 
        playersCounter.isBroken();
 
    }
}
 
 
//多线程操作线程不安全容器 ThreadSafe.datas
class Player implements Runnable{
    private CyclicBarrier playersCounter;
    private String name;
 
    public Player(CyclicBarrier playersCounter,String name) {
        this.playersCounter = playersCounter;
        this.name = name;
    }
    @Override
    public void run() {
        System.out.println("玩家:"+name+"加入游戏");
        try {
            Thread.sleep(1000);
            playersCounter.await();
            System.out.println("玩家:"+name+"开始选英雄");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

注意与CountDownLatch版本的实现有三点区别:

1CountDownLatch是在主线程调用await阻塞(实际上可以在一组线程上await),而CyclicBarrier是在任务线程里await阻塞。

2CountDownLatch只能使用一次,也就是新建一个CountDownLatch对象对应10个玩家准备游戏,另外10个玩家准备时 又得创建一个新的CountDownLatch对象;而CyclicBarrier可以重复使用,这里可以看到20个玩家,分别被分成了两组开始游戏。

3、每10个玩家准备就绪后,在CyclicBarrier实现中可以先执行一段Runnablerun方法,再依次唤醒线程,这种处理方式在阶段性为题中非常有用。

 

在简单了解CyclicBarrier的使用方法之后,继续开始深入理解CyclicBarrier的实现原理。

 

CyclicBarrier实现原理

 

前面讲过CountDownLatch是基于AQS实现的;而CyclicBarrier是基于ReentrantLock重入锁实现的,当然ReentrantLock也是基于AQS实现的,非要说CyclicBarrier也是基于AQS实现的也不为过。

 

重要成员变量

CyclicBarrier定义了下列几个成员变量,其核心方法都会操作这几个成员变量。这里先列出来,结合具体方法使用到时 方便查看和理解。

 

    //可以理解为初始化时 需要阻塞的任务个数
    private final int parties;
    //剩余需要等待的任务个数,初始值为parties,直到为0时依次唤醒所有被阻塞的任务线程。
    private int count;
 
    //每次对“栅栏”的主要成员变量进行变更操作,都应该加锁
    private final ReentrantLock lock = new ReentrantLock();
    //用于阻塞和唤醒任务线程
   private final Condition trip = lock.newCondition();
 
    //在所有线程被唤醒前,需要执行的一个Runable对应的run方法
    private final Runnable barrierCommand;
    //用于表示“栅栏”当前的状态
    private Generation generation = new Generation();
 

 

其中只有countgeneration不是final的,也就是说其他几个成员变量初始化后是不允许修改的。

 

构造方法

CyclicBarrier有两个重载的构造方法,一个是不带Runnable参数,另一个带有Runnable参数。本质上都会调用带Runnable参数的构造方法进行实例化,这里只贴出带Runnable参数的构造方法实现:

public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties; //为了实现复用,进行备份
        this.count = parties;//初始化,待阻塞的任务总数
        this.barrierCommand = barrierAction;//初始化
    }

 

构造方法很好理解,主要工作就是初始化三个重要的成员变量。

 

await方法

await方法是CyclicBarrier的核心方法,本质上调用的是dowait,一组线程的阻塞和唤醒工作都在这个方法中实现:

public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
 
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock(); //加锁
        try {
            final Generation g = generation;
 
            if (g.broken)
                throw new BrokenBarrierException();
            //有一个线程线程被中断,整个CyclicBarrier将不可用
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
 
            int index = --count; //待等待的任务数减1
            if (index == 0) {  // 如果待等待的任务数减至0,依次唤醒所有线程
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();//唤醒前先执行Runnable对象的run方法
                    ranAction = true;
                    nextGeneration();//重置整个CyclicBarrier,方便下次重用
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
 
            //如果待等待的任务数大于0,进行线程阻塞,直到count为0时被唤醒
            for (;;) {
                try {
                    if (!timed)
                        trip.await();//阻塞当前线程
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);//延时阻塞当前线程
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
 
                if (g.broken)//异常唤醒
                    throw new BrokenBarrierException();
 
                if (g != generation)//正常被唤醒,generation会被新建
                    return index;
 
                if (timed && nanos <= 0L) {//延迟阻塞时间到后唤醒
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
}
 

 

理解这个方法主要是理解两部分代码块:在count还没有到0之前,都会执行for循环着块代码,对线程进行阻塞;直到count变为0,执行if (index == 0)所在的代码块依次唤醒所有阻塞的线程,并对CyclicBarrier对象进行重置,方便继续使用。结合给出的注释,对await方法的实现应该就不难理解了。

 

另外await方法还有一个延时版本,主要是为了防止没有足够多线程调用await方法,count就不会减为0,线程就会被永久阻塞;如果使用延时版本的await方法就可以避免这个问题,时间到后,所有线程阻塞的线程都会被依次唤醒,并收到TimeoutException异常,避免被长时间阻塞。

 

isBroken方法

该方法主要用于判断当前CyclicBarrier对象的状态,如果状态为false,说明现在CyclicBarrier不可用,如果此时调用await方法 会直接收到一个BrokenBarrierException异常,见dowait方法。

 

reset()方法

该方法主要用于重置CyclicBarrier对象:

public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   //先把状态置为不可用,并唤醒被阻塞的线程执行完成任务
            nextGeneration(); //重置CyclicBarrier对象
        } finally {
            lock.unlock();
        }
    }
 

 

getNumberWaiting()方法

该方法用于获取当前已经被阻塞的任务数:

public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count; //计算已被阻塞的任务数
        } finally {
            lock.unlock();
        }
    }
 

 

到这里CyclicBarrier的实现原理分析结束。

 

CyclicBarrier的不足

 

CyclicBarrier相对于CountDownLatch来说,在分段执行任务上进行了改进,可以重复使用。并且在每个分段完成时,可以执行一个Runnable对应的run方法 做一些业务处理,比如:统计汇总。使用CyclicBarrier实现分阶段汇总:



 

假设把任务分为三个阶段,每个阶段结束的汇总方法相同,使用CyclicBarrier可以很简单的实现,这个“汇总方法”其实就是CyclicBarrier构造方法的第二参数,Runnable对应的run方法。 但如果每个阶段的汇总方法如果不一样时,CyclicBarrier就显得束手无措,因为它只能定义一个Runnable参数。如果遇到这种场景可以使用java1.7中新增的Phaser

 

Phaser的名字就可以看出,它主要就是用于解决分阶段问题,比起CyclicBarrier来,Phaser可以在每个阶段结束后执行不同的操作,从分阶段的角度看PhaserCyclicBarrier的增强版。从CountDownLatchCyclicBarrier 再到Phaser是一个依次增强的过程,但又不能相互完全取到。关于Phaser这里就不再深入讨论,后面有时间再单独总结。

 

 

  • 大小: 10.2 KB
0
0
分享到:
评论

相关推荐

    Java并发编程之栅栏(CyclicBarrier)实例介绍

    主要介绍了Java并发编程之栅栏(CyclicBarrier)实例介绍,栅栏类似闭锁,但是它们是有区别的,需要的朋友可以参考下

    java并发编程-AQS和JUC实战

    CyclicBarrier循环栅栏; 重⼊锁可以完全替代synchronized关键字。在JDK5.0的早期版本中,重⼊锁的性能远远好于 synchronized,但从JDK6.0开始,JDK在synchronized上做了⼤量的优化,使得两者的性能差距并 不⼤。重...

    java并发工具包 java.util.concurrent中文版用户指南pdf

    13. 栅栏 CyclicBarrier 14. 交换机 Exchanger 15. 信号量 Semaphore 16. 执行器服务 ExecutorService 17. 线程池执行者 ThreadPoolExecutor 18. 定时执行者服务 ScheduledExecutorService 19. 使用 ForkJoinPool ...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版

    13. 栅栏 CyclicBarrier 14. 交换机 Exchanger 15. 信号量 Semaphore 16. 执行器服务 ExecutorService 17. 线程池执行者 ThreadPoolExecutor 18. 定时执行者服务 ScheduledExecutorService 19. 使用 ForkJoinPool ...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版.pdf

    栅栏 CyclicBarrier 14. 交换机 Exchanger 15. 信号量 Semaphore 16. 执行器服务 ExecutorService 17. 线程池执行者 ThreadPoolExecutor 18. 定时执行者服务 ScheduledExecutorService 19. 使用 ForkJoinPool 进行...

    java并发工具包详解

    13. 栅栏 CyclicBarrier 14. 交换机 Exchanger 15. 信号量 Semaphore 16. 执行器服务 ExecutorService 17. 线程池执行者 ThreadPoolExecutor 18. 定时执行者服务 ScheduledExecutorService 19. 使用 ForkJoinPool ...

    java并发包资源

    13. 栅栏 CyclicBarrier 14. 交换机 Exchanger 15. 信号量 Semaphore 16. 执行器服务 ExecutorService 17. 线程池执行者 ThreadPoolExecutor 18. 定时执行者服务 ScheduledExecutorService 19. 使用 ForkJoinPool ...

    java8源码-baijia123:常用工具类及测试类

    CellularAutomata-&gt;通过CyclicBarrier(栅栏)协调细胞自动衍生系统中的计算 Memoizer1-&gt;使用HashMap和同步机制来初始化缓存 Memoizer2-&gt;用ConcurrentHashMap替换HashMap Memoizer3-&gt;基于FutureTask的Memoizing封装器 ...

    详解java多线程的同步控制

    目录线程安全 Thread Safety重入锁 ReentrantLock读写锁 ReadWriteLock倒计数器 CountDownLatch循环栅栏 CyclicBarrier信号量 Semaphore 线程安全 Thread Safety JMM JMM(Java Memory Model)是一种基于计算机内存...

    Java并发编程(学习笔记).xmind

    CyclicBarrier 可以让一定数量的参与线程反复地在栅栏位置汇集 应用场景在并行迭代算法中非常有用 Exchanger 这是一种两方栅栏,各方在栅栏位置上交换数据。 应用场景:当...

    java核心知识点整理.pdf

    25 JAVA8 与元数据.................................................................................................................................25 2.4. 垃圾回收与算法 .................................

    java面试常见基础(深层次,高级研发)

    23. 栅栏的原理和实现。 51 23.1. 1. CyclicBarrier简介 51 23.2. 2. CyclicBarrier数据结构 52 23.3. 3. CyclicBarrier源码分析(基于JDK1.7.0_40) 52 23.3.1. 3.1 构造函数 52 23.3.2. 3.2 等待函数 53 23.4. 4. ...

    JAVA核心知识点整理(有效)

    25 JAVA8 与元数据.................................................................................................................................25 2.4. 垃圾回收与算法 .................................

Global site tag (gtag.js) - Google Analytics