java 高并发进阶 Phaser 相位器源码详解( 二 )

();this.oddQ = new AtomicReference();}this.state = (parties == 0) ? (long)EMPTY :((long)phase << PHASE_SHIFT) |((long)parties << PARTIES_SHIFT) |((long)parties);}这里的 root 或者是 parent 实际上也是一个 phaser 变量:
/** * The parent of this phaser, or null if none */private final Phaser parent;/** * The root of phaser tree. Equals this if not in a tree. */private final Phaser root;对应的 evenQ/oddQ 是一个 atomic 的引用:
/** * Treiber堆栈头用于等待线程 。* 为了消除释放某些线程而添加其他线程时的争用 , 我们使用其中两个 , 在偶数和奇数阶段交替使用 。* 子相位器与root共享队列以加快发布速度 。*/private final AtomicReference evenQ;private final AtomicReference oddQ;register 注册我们主要看一下案例中用到的几个方法 , 首先看一下 register 方法 。
public int register() {return doRegister(1);}实际上调用的是下面的方法:
/** * Implementation of register, bulkRegister * * @param registrations 要添加到双方和未到达字段的数量 。必须大于零 。* @author 老马啸西风 */private int doRegister(int registrations) {// adjustment to state// 位移+或运算long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;final Phaser parent = this.parent;int phase;for (;;) {// 父类状态为 null , 直接取 state , 或者取 reconcileState(见下方)long s = (parent == null) ? state : reconcileState();int counts = (int)s;//int parties = counts >>> PARTIES_SHIFT;int unarrived = countsif (registrations > MAX_PARTIES - parties)// 返回异常信息 , 见下方throw new IllegalStateException(badRegister(s));phase = (int)(s >>> PHASE_SHIFT);if (phase < 0)break;if (counts != EMPTY) {// not 1st registrationif (parent == null || reconcileState() == s) {if (unarrived == 0)// wait out advance// 这个是内部等待的方法 , 见下方详解 。root.internalAwaitAdvance(phase, null);// 通过 CAS 设置else if (UNSAFE.compareAndSwapLong(this, stateOffset,s, s + adjust))break;}}else if (parent == null) {// 1st root registrationlong next = ((long)phase << PHASE_SHIFT) | adjust;if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))break;}else {// 使用悲观锁加锁synchronized (this) {// 1st sub registrationif (state == s) {// recheck under lockphase = parent.doRegister(1);if (phase < 0)break;// finish registration whenever parent registration// succeeded, even when racing with termination,// since these are part of the same "transaction".while (!UNSAFE.compareAndSwapLong(this, stateOffset, s,((long)phase << PHASE_SHIFT) | adjust)) {s = state;phase = (int)(root.state >>> PHASE_SHIFT);// assert (int)s == EMPTY;}break;}}}}return phase;}reconcileState 解析/** * 如有必要 , 解决从根开始的滞后相位传播 。* 协调通常在root已提前但子相位尚未执行时发生 , 在这种情况下 , 它们必须通过将未到达方设置为前进(或如果方为零 , 则重置为未注册的EMPTY状态)来完成自己的 * 前 。* * @return reconciled state * @author 老马啸西风 */private long reconcileState() {// 获取 root 节点final Phaser root = this.root;long s = state;// 默认的 root 就是 this , 不等于说明有真正的 root 节点 。if (root != this) {int phase, p;// CAS to root phase with current parties, tripping unarrived// 秀的头皮发麻的 CAS 操作 。while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=(int)(s >>> PHASE_SHIFT)}return s;}badRegister返回相关注册失败的信息 。
/** * Returns message string for bounds exceptions on registration. */private String badRegister(long s) {return "Attempt to register more than " +MAX_PARTIES + " parties for " + stateToString(s);}对应的状态信息为:
/** * Implementation of toString and string-based error messages */private String stateToString(long s) {return super.toString() +"[phase = " + phaseOf(s) +" parties = " + partiesOf(s) +" arrived = " + arrivedOf(s) + "]";}internalAwaitAdvance 内部的等待方法【java 高并发进阶 Phaser 相位器源码详解】这个方法只能被 root 节点调用 , 用于阻塞线程 , 等待阶段完成使用 。
/** * 除非中止 , 否则可能会阻塞并等待阶段前进 。* 仅在根相位器上调用 。* * @param phase current phase * @param node if non-null, the wait node to track interrupt and timeout; * if null, denotes noninterruptible wait * @return current phase * @author 老马啸西风 */private int internalAwaitAdvance(int phase, QNode node) {// assert root == this;releaseWaiters(phase-1);// ensure old queue cleanboolean queued = false;// true when node is enqueuedint lastUnarrived = 0;// to increase spins upon changeint spins = SPINS_PER_ARRIVAL;long s;int p;while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {if (node == null) {// spinning in noninterruptible modeint unarrived = (int)sif (unarrived != lastUnarrivedboolean interrupted = Thread.interrupted();// 被中断 , 且已经自旋结束 。if (interrupted || --spins