java 高并发进阶 Phaser 相位器源码详解( 三 )

< 0) { // need node to record intrnode = new QNode(this, phase, false, false, 0L);node.wasInterrupted = interrupted;}}else if (node.isReleasable()) // done or abortedbreak;else if (!queued) {// push onto queueAtomicReference head = (phaseQNode q = node.next = head.get();if ((q == null || q.phase == phase)}else {try {// 线程阻塞ForkJoinPool.managedBlock(node);} catch (InterruptedException ie) {node.wasInterrupted = true;}}}if (node != null) {if (node.thread != null)node.thread = null;// avoid need for unpark()if (node.wasInterruptedif (p == phase// possibly clean up on abort}releaseWaiters(phase);return p;}

  • releaseWaiters 释放等待者
/*** 从队列中删除线程并发出信号通知阶段 。 * @author 老马啸西风*/private void releaseWaiters(int phase) {// 队列中的第一个元素QNode q;// 对应的线程信息Thread t;// 根据奇偶 , 选择不同的队列 。AtomicReference head = (phasewhile ((q = head.get()) != null// 唤醒 t 对应的线程LockSupport.unpark(t);}}}
  • abortWait 中断等待
/** * releaseWaiters的一种变体 , 它另外尝试删除由于超时或中断而不再等待提前的任何节点 。* 当前 , 仅当节点位于队列头时才将其删除 , 这足以减少大多数使用情况下的内存占用 。* * @return current phase on exit * @author 老马啸西风 */private int abortWait(int phase) {// 选择奇偶队列AtomicReference head = (phasefor (;;) {Thread t;// 头节点QNode q = head.get();int p = (int)(root.state >>> PHASE_SHIFT);if (q == null || ((t = q.thread) != null// CAS 设置节点 q 为 q.next(删除头节点)if (head.compareAndSet(q, q.next)LockSupport.unpark(t);}}}arriveAndAwaitAdvance 到达并且等待实现/** * 到达此移相器并等待其他人 。* 等效于awaitAdvance 。* 如果您需要等待中断或超时 , 则可以使用 awaitAdvance 方法的其他形式之一以类似的方式进行安排 。* 如果相反 , 您需要在到达时注销 , 请使用 awaitAdvance(arriveAndDeregister()) 。* * 未注册方调用此方法是错误的用法 。* 但是 , 仅在此相位器上进行一些后续操作时 , 此错误才可能导致IllegalStateException 。* * @return the arrival phase number, or the (negative) * {@linkplain #getPhase() current phase} if terminated * @throws IllegalStateException if not terminated and the number * of unarrived parties would become negative * @author 老马啸西风 */public int arriveAndAwaitAdvance() {// Specialization of doArrive+awaitAdvance eliminating some reads/pathsfinal Phaser root = this.root;for (;;) {// 获取状态long s = (root == this) ? state : reconcileState();int phase = (int)(s >>> PHASE_SHIFT);if (phase < 0)return phase;// 计算未到达的数量int counts = (int)s;int unarrived = (counts == EMPTY) ? 0 : (countsif (unarrived <= 0)throw new IllegalStateException(badArrive(s));// 通过 CAS 更新到达者的数量if (UNSAFE.compareAndSwapLong(this, stateOffset, s,s -= ONE_ARRIVAL)) {// 如果还存在未到达的 , 参见上面的等待方法 。if (unarrived > 1)return root.internalAwaitAdvance(phase, null);if (root != this)// 这个方法上面有解析 , 不过此处调用的时 parentreturn parent.arriveAndAwaitAdvance();long n = s// base of next stateint nextUnarrived = (int)n >>> PARTIES_SHIFT;// 注意:这里调用了对应的 onAdvance 方法 , 就是我们前面自定义实现的方法 。if (onAdvance(phase, nextUnarrived))n |= TERMINATION_BIT;else if (nextUnarrived == 0)n |= EMPTY;elsen |= nextUnarrived;int nextPhase = (phase + 1)n |= (long)nextPhase << PHASE_SHIFT;//CAS 更新失败 , 直接中断if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))return (int)(state >>> PHASE_SHIFT); // terminatedreleaseWaiters(phase);return nextPhase;}}}onAdvance 重载时的核心方法一般我们都会对这个方法进行重载 。
/** * 一种在即将到来的相位超前执行操作并控制终止的可重写方法 。* 在推进此移相器的一方到达时(当所有其他等待方都处于休眠状态时)调用此方法 。* 如果此方法返回{@code true} , 则此移相器将提前设置为最终终止状态 , 随后对{@link #isTerminated}的调用将返回true 。*调用此方法引发的任何(未经检查的)异常或错误都会传播到尝试推进此相位器的一方 , 在这种情况下 , 不会发生提前 。* * 此方法的参数提供了当前过渡中普遍使用的移相器状态 。* 从{@code onAdvance}内在此相位器上调用到达 , 注册和等待方法的效果是不确定的 , 因此不应依赖 。* * 如果此相位器是分层相位器集合的成员 , 则每次前进时仅为其根相位器调用{@code onAdvance} 。* * 为了支持最常见的用例 , 当由于一方调用{@code到达AndDeregister}而导致的注册方数量变为零时 , 此方法的默认实现返回{@code true} 。* 您可以通过重写此方法以始终返回{@code false}来禁用此行为 , 从而使以后的注册继续进行: * *{@code * Phaser phaser = new Phaser() { *protected boolean onAdvance(int phase, int parties) { return false; } * }}