java 高并发进阶 Phaser 相位器源码详解

Phaser
java 高并发进阶 Phaser 相位器源码详解文章插图
简介可重用的同步屏障 , 其功能类似于CyclicBarrier和CountDownLatch , 但支持更灵活的用法 。
这个工具类我们暂时就翻译为:移相器/相位器
使用入门我们来看一个简单的例子 。
假设我们有一个比赛 , 有多位玩家参加 。
当所有玩家完成第一次比赛 , 我们认为上半场游戏结束;全部参加完第二次比赛 , 认为下半场游戏结束 。
这个要如何实现呢?
自定义 Phaser 类public class MyPhaser extends Phaser {@Overrideprotected boolean onAdvance(int phase, int registeredParties) {switch (phase) {case 0 :System.out.println("上半场完成");return false;case 1:System.out.println("下半场完成");return false;default:return true;}}}自定义 Runnable 类private static class GameRunnable implements Runnable {private final Phaser phaser;private GameRunnable(Phaser phaser) {this.phaser = phaser;}@Overridepublic void run() {//参加上半场比赛System.out.println("玩家-"+Thread.currentThread().getName()+":参加上半场比赛");//执行这个方法的话会等所有的选手都完成了之后再继续下面的方法phaser.arriveAndAwaitAdvance();// 下半场//参加上半场比赛System.out.println("玩家-"+Thread.currentThread().getName()+":参加下半场比赛");//执行这个方法的话会等所有的选手都完成了之后再继续下面的方法phaser.arriveAndAwaitAdvance();}}测试验证public static void main(String[] args) {int nums = 3;Phaser phaser = new MyPhaser();//注册一次表示 phaser 维护的线程个数phaser.register();for(int i = 0; i < nums; i++) {phaser.register();Thread thread = new Thread(new GameRunnable(phaser));thread.start();}//后续阶段主线程就不参加了phaser.arriveAndDeregister();}对应日志如下:
玩家-Thread-0:参加上半场比赛玩家-Thread-2:参加上半场比赛玩家-Thread-1:参加上半场比赛上半场完成玩家-Thread-1:参加下半场比赛玩家-Thread-2:参加下半场比赛玩家-Thread-0:参加下半场比赛下半场完成非常符合我们的预期 。
那么这个到底是怎么实现的呢?
这个基本上已经是 juc 的最后一节了 。
java 高并发进阶 Phaser 相位器源码详解文章插图
源码解析类定义此类实现X10“时钟”的扩展 。
感谢Vijay Saraswat的想法 , 以及Vivek Sarkar的扩展以扩展功能 。
/** * @since 1.7 * @author Doug Lea */public class Phaser {}这个类是在 jdk1.7 引入的 。
状态状态是一个很重要的属性 , 我们这里重点看一下 。
private volatile long state;这是一个通过 volatile 修饰的变量 。
主要状态表示形式 , 具有四个位域:
unarrived-尚未达到要求的参与方数量(位0-15)parties-等待的派对数量(16-31位)phase-屏障的产生(位32-62)terminated-设置是否终止屏障(位63 /符号)除了没有注册方的 phaser 以外 , 否则具有零方和一个未到达方的非法状态(在下面编码为EMPTY)除外 。
为了有效地保持原子性 , 这些值打包成一个(原子)长整型变量 。
良好的性能取决于保持状态解码和编码简单 , 并保持竞争窗口简短 。
所有状态更新都是通过CAS执行的 , 除了子 phaser(即具有非空父级的子phaser)的初始注册 。
在这种情况下(相对罕见) , 我们在首次向其父级注册时使用内置同步进行锁定 。
子phaser的相位被允许滞后于其祖先的相位 , 直到其被实际访问为止-参见方法reconcileState 。
其他内部变量主要是一些位运算变量 , 还有一些特殊的值 。
private static final intMAX_PARTIES= 0xffff;private static final intMAX_PHASE= Integer.MAX_VALUE;private static final intPARTIES_SHIFT= 16;private static final intPHASE_SHIFT= 32;private static final intUNARRIVED_MASK= 0xffff;// to mask intsprivate static final long PARTIES_MASK= 0xffff0000L; // to mask longsprivate static final long COUNTS_MASK= 0xffffffffL;private static final long TERMINATION_BIT = 1L << 63;// some special valuesprivate static final intONE_ARRIVAL= 1;private static final intONE_PARTY= 1 << PARTIES_SHIFT;private static final intONE_DEREGISTER= ONE_ARRIVAL|ONE_PARTY;private static final intEMPTY= 1;构造器public Phaser() {this(null, 0);}public Phaser(int parties) {this(null, parties);}public Phaser(Phaser parent) {this(parent, 0);}上面 3 个调用的都是下面的方法:
public Phaser(Phaser parent, int parties) {if (parties >>> PARTIES_SHIFT != 0)throw new IllegalArgumentException("Illegal number of parties");int phase = 0;this.parent = parent;if (parent != null) {final Phaser root = parent.root;this.root = root;this.evenQ = root.evenQ;this.oddQ = root.oddQ;if (parties != 0)phase = parent.doRegister(1);}else {this.root = this;this.evenQ = new AtomicReference