博客
关于我
CyclicBarrier源码探究 (JDK 1.8)
阅读量:408 次
发布时间:2019-03-06

本文共 9878 字,大约阅读时间需要 32 分钟。

CyclicBarrier也叫回环栅栏,能够实现让一组线程运行到栅栏处并阻塞,等到所有线程都到达栅栏时再一起执行的功能。“回环”意味着CyclicBarrier可以多次重复使用,相比于CountDownLatch只能使用一次,CyclicBarrier可以节省许多资源,并且还可以在构造器中传入任务,当栅栏条件满足时执行这个任务。CyclicBarrier是使用了ReentrantLock,主要方法在执行时都会加锁,因此并发性能不是很高。

1.相关字段

//重入锁,CyclicBarrier内部通过重入锁实现线程安全    private final ReentrantLock lock = new ReentrantLock();    //线程阻塞时的等待条件    private final Condition trip = lock.newCondition();    //需要等待的线程数    private final int parties;    //栅栏打开之后首先执行的任务    private final Runnable barrierCommand;    //记录当前的分代标记    private Generation generation = new Generation();    //当前还需要等待多少个线程运行到栅栏位置    private int count;

需要注意的是generation字段,用于标记栅栏当前处在哪一代。当满足一定的条件时(例如调用了reset方法,或者栅栏打开等),栅栏状态会切换到下一代,实际就是new一个新的Generation对象,这是CyclicBarrier的内部类,代码非常简单,如下:

private static class Generation {        boolean broken = false;   //标记栅栏是否被破坏    }

实际使用的过程中,会利用generation字段判断当前是否在同一个分代,而使用broker字段判断栅栏是否被破坏。

2.构造函数

CyclicBarrier有两个重载的构造函数,构造函数只是对上述的相关字段进行初始化,如下:

public CyclicBarrier(int parties) {        this(parties, null);    }    public CyclicBarrier(int parties, Runnable barrierAction) {        if (parties <= 0) throw new IllegalArgumentException();        this.parties = parties;        this.count = parties;        this.barrierCommand = barrierAction;    }

3.核心方法

  • await
    await是开发时最常用到的方法了,同CountDownLatch一样,CyclicBarrier也提供了两个await方法,一个不带参数,一个带有超时参数,其内部只是简单调用了一下dowait方法:
public int await() throws InterruptedException, BrokenBarrierException {        try {            return dowait(false, 0L);        } catch (TimeoutException toe) {            throw new Error(toe); // cannot happen        }    }    public int await(long timeout, TimeUnit unit)        throws InterruptedException,               BrokenBarrierException,               TimeoutException {        return dowait(true, unit.toNanos(timeout));    }

接下来看看至关重要的dowait方法:

private int dowait(boolean timed, long nanos)        throws InterruptedException, BrokenBarrierException,               TimeoutException {        final ReentrantLock lock = this.lock;        //加重入锁        lock.lock();        try {            //首先获取年龄代信息            final Generation g = generation;            //如果栅栏状态被破坏,抛出异常,例如先启动的线程调用了breakBarrier方法,后启动的线程就能够看到g.broker=true            if (g.broken)                throw new BrokenBarrierException();            //检测线程的中断状态,如果线程设置了中断状态,则通过breakBarrier设置栅栏为已破坏状态,并唤醒其他线程            //如果这里能够检测到中断状态,那只可能是在await方法外部设置的            if (Thread.interrupted()) {                breakBarrier();                throw new InterruptedException();            }            //每调用一次await,就将需要等待的线程数减1            int index = --count;            //index=0表示这是最后一个到达的线程,由该线程执行下面的逻辑            if (index == 0) {  // tripped                boolean ranAction = false;                try {                    final Runnable command = barrierCommand;                    //如果在构造器中传入了第二个任务参数,就在放开栅栏前先执行这个任务                    if (command != null)                        command.run();                    ranAction = true;                    //正常结束,需要唤醒阻塞的线程,并换代                    nextGeneration();                    return 0;                } finally {                    //try代码块如果正常执行,ranAction就一定等于true,而try代码块唯一可能发生异常的地方就是command.run(),                    //因此这里为了保证在任务执行失败时,将栅栏标记为已破坏,唤醒阻塞线程                    if (!ranAction)                        breakBarrier();                }            }            // loop until tripped, broken, interrupted, or timed out            //栅栏没被破坏,线程没有被中断,且不是最后一个到达栅栏的线程,就会执行下面的自旋,排队等待            for (;;) {                try {                    //没有设置超时标记,就加入等待队列                    //注意,只有在最后finally语句中释放了锁,那么其他的线程是如何走到这里的呢(具体分析见文末的问题解析部分。)                    if (!timed)                        trip.await();                    //设置了超时标记,但目前还没有超时,则继续等待                    else if (nanos > 0L)                        nanos = trip.awaitNanos(nanos);                } catch (InterruptedException ie) {                    //如果线程等待的过程中被中断,会执行到这里                    //g == generation表示当前还在同一个年龄分代中,即当前栅栏还没有放开,并且外部也没有调用reset()方法                    //!g.broker表示当前栅栏状态没有被破坏,在这种情况下需要破坏当前的栅栏状态                    //当最后一个线程执行完换代逻辑后(或调用了reset()方法),线程还没被唤醒的过程中发生了中断,此时g!=generation                    if (g == generation && ! g.broken) {                        breakBarrier();                        throw ie;                    } else {                        //上面的条件不满足,说明:1)g!=generation,说明线程执行到这里时已经换代了,                        //要么是最后一个线程正常打开栅栏之后,当前线程被中断,要么是外部调用了reset()方法,随后当前线程被中断                        //2)没有换代,但是栅栏被破坏了,这种情况会在下文代码的条件语句捕获到                        //无论哪种情况,都只是简单地设置一下当前线程的中断状态                        Thread.currentThread().interrupt();                    }                }                //栅栏被破坏,抛出异常                //注意,在breakBarrier方法中会唤醒所有等待条件的线程,这些线程会执行到这里,判断栅栏已经被破坏,都会抛出异常                if (g.broken)                    throw new BrokenBarrierException();                //注意:代码中breakBarrier方法和nextGeneration方法都会唤醒阻塞的线程,但是breakBarrier在上一个判断就被拦截了,                //因此走到这里的有三种情况:                //a)最后一个线程正常执行,栅栏打开导致其他线程被唤醒,此时最后一个线程执行了nextGeneration方法,导致换代,之前的一批线程全部返回                //b)栅栏被重置(调用了reset方法),此时g!=negeration,全都直接返回                //c)线程等待超时了,不属于当前代的返回就可以了,属于当前代的则要在下个条件语句中设置generation.broken = true                if (g != generation)                    return index;                //如果线程等待超时,标记栅栏为破坏状态并抛出异常,如果还没超时,则自旋后又重新阻塞                if (timed && nanos <= 0L) {                    breakBarrier();                    throw new TimeoutException();                }            }        } finally {            //别忘了解锁            lock.unlock();        }    }

dowait的方法逻辑是:每一个调用await方法的线程都会将计数count1,最后一个线程将count减为0时,顺带还要执行barrierCommand指定的任务,并将generation切换到下一代,当然,最重要的还是要唤醒之前在栅栏处阻塞的线程。由于trip对应的Condition对象没有任何地方会修改,因此trip.signalAll()会唤醒所有在该条件上等待的线程,如果线程在等待的过程中,其他线程将generation更新到下一代,就会出现被唤醒的线程中有部分还属于之前那一代的情况。

接下来将会对dowait用到的一些方法进行简单介绍。

  • breakBarrier
    dowait方法有四个地方调用了breakBarrier,从名字可以看出,该方法会将generation.broken设置为true,除此之外,还会还原count的值,并且唤醒所有被阻塞的线程:
private void breakBarrier() {        generation.broken = true;        count = parties;        //唤醒所有的阻塞线程        trip.signalAll();    }

纵观CyclicBarrier源码,generation.broken统一在breakBarrier方法中被设置为true,而一旦将generation.broken设置为true之后,代码中检查到这个状态之后都会抛出异常,栅栏就没办法再使用了(可以手动调用reset进行重置),而源码中会在以下几种情况调用breakBarrier方法:

① 当前线程被中断
② 通过构造器传入的任务执行失败
③ 条件等待时被中断
④ 线程等待超时
⑤ 显式调用reset方法

  • nextGeneration
private void nextGeneration() {        // 唤醒所有的阻塞线程        trip.signalAll();        // 开启下一代        count = parties;        generation = new Generation();    }
  • reset
    reset方法主要是结束这一代,并切换到下一代
public void reset() {        final ReentrantLock lock = this.lock;        lock.lock();        try {            breakBarrier();   // break the current generation            nextGeneration(); // start a new generation        } finally {            lock.unlock();        }    }

介绍到这里,整个CyclicBarrier已经差不多介绍完了,但是内部的流程远远没有这么简单,因为很大一部分逻辑封装在AbstractQueuedSynchronizer中,这个类定义了阻塞的线程如何加入等待队列,又如何被唤醒,因此如果想要深入了解线程等待的逻辑,还需要仔细研究AbstractQueuedSynchronizer才行。本文不会对这部分内容进行介绍,后面有时间的话将会专门对其进行介绍。

4.问题解析

  • 1.dowait()方法中独占锁的释放问题
    通过本文的分析可以知道,CyclicBarrier会让先到达栅栏的线程阻塞起来,等待最后一个到达的线程唤醒。在dowait()方法中用到了ReentrantLock这个独占锁,也就是说必须等待持有锁的线程释放了锁之后,其他线程才能够再次获取锁从而向下执行。然而,一个显然的问题是,锁是在dowait()方法最后的finally语句块中才释放的,第一个持有锁的线程执行到trip.await()的时候就阻塞了,那么第一个线程之后的其他线程是如何执行到trip.await()这里的呢?通过写测试代码调试可以发现,在trip.await()执行之后,锁就会被其他线程占有,相当于原来占有锁的线程释放了锁,因此秘密就在trip.await()方法中,来看看其源码(这部分代码在AQS类中):
public final void await() throws InterruptedException {        if (Thread.interrupted())            throw new InterruptedException();        //加入条件等待队列        Node node = addConditionWaiter();        //就是在这里释放了锁,需要注意的是,这里保存了当前的state值        int savedState = fullyRelease(node);        int interruptMode = 0;        //前面的addConditionWaiter()方法之后,node的状态就是CONDITION了,isOnSyncQueue(node)会返回false,将当前线程挂起        while (!isOnSyncQueue(node)) {            LockSupport.park(this);            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)                break;        }        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)            interruptMode = REINTERRUPT;        if (node.nextWaiter != null) // clean up if cancelled            unlinkCancelledWaiters();        if (interruptMode != 0)            reportInterruptAfterWait(interruptMode);    }    final int fullyRelease(Node node) {        boolean failed = true;        try {            //先获取当前的state的值,并作为结果返回            int savedState = getState();            if (release(savedState)) {                failed = false;                return savedState;            } else {                throw new IllegalMonitorStateException();            }        } finally {            if (failed)                node.waitStatus = Node.CANCELLED;        }    }    public final boolean release(int arg) {        if (tryRelease(arg)) {            Node h = head;            if (h != null && h.waitStatus != 0)                unparkSuccessor(h);            return true;        }        return false;    }    //这段代码在ReentrantLock中,其他的几个方法都在AQS内    protected final boolean tryRelease(int releases) {        //注意参数releases=1,因此下面的c=0        int c = getState() - releases;        //异常处理,对于CyclicBarrier来讲,这个就是当前持有锁的线程        if (Thread.currentThread() != getExclusiveOwnerThread())            throw new IllegalMonitorStateException();        boolean free = false;        if (c == 0) {            free = true;            //在这里清空了exclusiveOwnerThread字段,表明当前可重入锁没有线程占有            setExclusiveOwnerThread(null);        }        //将state字段设置为0,那么其他的线程在执行到lock.lock()时就能够获取到锁了        setState(c);        return free;    }

从这段代码可以看到,整个释放锁的逻辑是await -> fullyRelease -> release -> tryRelease,即最后是在tryRelease方法中设置state=0,并设置exclusiveOwnerThread=null,并在release方法中通过unparkSuccessor()方法唤醒头结点之后排队等待的节点,该节点会从lock.lock()这里继续向下执行,并且每个醒来的线程都会将count1,如果当前醒来的线程不是最后一个线程,那么当前线程继续阻塞。这里并没有对ReentrantLock进行介绍,大家有兴趣的话可以参考进行了解。

5.更新日志

  • 3.19日更新了问题解析的问题1。

转载地址:http://slkkz.baihongyu.com/

你可能感兴趣的文章
MySQL 树形结构 根据指定节点 获取其下属的所有子节点(包含路径上的枝干节点和叶子节点)...
查看>>
mysql 死锁 Deadlock found when trying to get lock; try restarting transaction
查看>>
mysql 死锁(先delete 后insert)日志分析
查看>>
MySQL 死锁了,怎么办?
查看>>
MySQL 深度分页性能急剧下降,该如何优化?
查看>>
MySQL 深度分页性能急剧下降,该如何优化?
查看>>
MySQL 添加列,修改列,删除列
查看>>
mysql 添加索引
查看>>
MySQL 添加索引,删除索引及其用法
查看>>
mysql 状态检查,备份,修复
查看>>
MySQL 用 limit 为什么会影响性能?
查看>>
MySQL 用 limit 为什么会影响性能?有什么优化方案?
查看>>
MySQL 用户权限管理:授权、撤销、密码更新和用户删除(图文解析)
查看>>
mysql 用户管理和权限设置
查看>>
MySQL 的 varchar 水真的太深了!
查看>>
mysql 的GROUP_CONCAT函数的使用(group_by 如何显示分组之前的数据)
查看>>
MySQL 的instr函数
查看>>
MySQL 的mysql_secure_installation安全脚本执行过程介绍
查看>>
MySQL 的Rename Table语句
查看>>
MySQL 的全局锁、表锁和行锁
查看>>