源码分析:Phaser 之更灵活的同步屏障 (3)

方法说明:向当前phaser添加一个新的unarrived(未到达)的party,如果onAdvance正在运行,那么这个方法会等待它运行结束再返回结果。如果当前phaser有父节点,并且当前phaser上没有已注册的party,那么就会交给父节点注册。

代码分析:

public int register() { return doRegister(1); } private int doRegister(int registrations) { // 调整的状态,等待的parties数和unarrived(未到达)parties数同时增加 long adjust = ((long)registrations << PARTIES_SHIFT) | registrations; final Phaser parent = this.parent; int phase; for (;;) { //自旋 long s = (parent == null) ? state : reconcileState(); // 取state值 // 转换成int,state的低32位,也就是parties和unarrived的值 int counts = (int)s; // 取等待的parties数 int parties = counts >>> PARTIES_SHIFT; // UNARRIVED_MASK,低16位,二进制:1111 1111 1111 1111 // 也就是取低16中存的未到达数parties数 int unarrived = counts & UNARRIVED_MASK; // 1 > 65535 - parties if (registrations > MAX_PARTIES - parties) // 检查容量 throw new IllegalStateException(badRegister(s)); phase = (int)(s >>> PHASE_SHIFT); // 无符号右移32位,取出当前的阶段phase if (phase < 0) break; // 退出自旋,返回phase ,也就是负数 // 不是第一个参与者 if (counts != EMPTY) { // not 1st registration if (parent == null || reconcileState() == s) { if (unarrived == 0)// unarrived等于0说明当前阶段正在执行onAdvance()方法,等待advance方法退出 root.internalAwaitAdvance(phase, null); // 阻塞并等待阶段前进 else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adjust)) // 使用CAS的方式修改state值,增加adjust,成功的话退出自旋,返回phase break; } } else if (parent == null) {// 没有设置父节点 // 计算state的值 long next = ((long)phase << PHASE_SHIFT) | adjust; if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) // CAS 修改成功则退出自旋 break; } else { //以上两种情况都不是,有多层级的时候 synchronized (this) { // 1st sub registration if (state == s) { // recheck under lock phase = parent.doRegister(1); // 交给父节点完成注册 if (phase < 0) break; //退出自旋,返回phase ,也就是负数 // 走到这儿,说明父节点注册成功了(phase大于0),while自旋,直到CAS修改成功 while (!UNSAFE.compareAndSwapLong(this, stateOffset, s,((long)phase << PHASE_SHIFT) | adjust)) { s = state; phase = (int)(root.state >>> PHASE_SHIFT); // assert (int)s == EMPTY; } break; } } } } return phase; }

reconcileState()方法

子Phaser的phase在没有被真正使用之前,允许滞后于它的root节点。非首次注册时,如果Phaser有父节点,则调用reconcileState()方法解决root节点的phase延迟传递问题.

当root节点的phase已经advance到下一代,但是子节点phaser还没有,这种情况下它们必须通过更新未到达parties数 完成它们自己的advance操作(如果parties为0,重置为EMPTY状态)。

private long reconcileState() { final Phaser root = this.root; long s = state; if (root != this) { int phase, p; // CAS to root phase with current parties, tripping unarrived while ((phase = (int)(root.state >>> PHASE_SHIFT)) != (int)(s >>> PHASE_SHIFT) && !UNSAFE.compareAndSwapLong(this, stateOffset, s, s = (((long)phase << PHASE_SHIFT) | ((phase < 0) ? (s & COUNTS_MASK) : (((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY : ((s & PARTIES_MASK) | p)))))) s = state; } return s; }

internalAwaitAdvance()方法:

除非终止,否则可能会阻塞或等待phase前进到下一代

private int internalAwaitAdvance(int phase, QNode node) { // assert root == this; // 确保旧队列是干净的 releaseWaiters(phase-1); // ensure old queue clean // 入队成功变为true boolean queued = false; // true when node is enqueued int lastUnarrived = 0; // to increase spins upon change int spins = SPINS_PER_ARRIVAL; //自旋的次数,(NCPU < 2) ? 1 : 1 << 8;1或者256次 long s; int p; while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) { // 无符号右移32位,得到当前阶段,检查是否有变化 if (node == null) { // spinning in noninterruptible mode int unarrived = (int)s & UNARRIVED_MASK; // 与掩码计算,得到低16位代表的未到达数 // 未到达数有变化且小于CPU核数 if (unarrived != lastUnarrived && (lastUnarrived = unarrived) < NCPU) spins += SPINS_PER_ARRIVAL; // 增加自旋次数 boolean interrupted = Thread.interrupted(); // 线程中断 if (interrupted || --spins < 0) { // need node to record intr // 线程被中断了或者自旋次数小于0,需要节点记录索引 node = new QNode(this, phase, false, false, 0L); node.wasInterrupted = interrupted; } }else if (node.isReleasable()) // done or aborted break; // 完成或者终止,退出自旋 else if (!queued) { // 推入队列 // (phase & 1 == 0 )通过位运算快速判断是奇偶数 AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; QNode q = node.next = head.get(); // 再次判断 if ((q == null || q.phase == phase) && (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq queued = head.compareAndSet(q, node); // CAS修改入队 } else { try { ForkJoinPool.managedBlock(node); // 阻塞node,等待被唤醒 } catch (InterruptedException ie) { node.wasInterrupted = true; } } } // 线程已经被唤醒,并且phase已经有变化了才会退出上面的自旋,或者完成终止,退出自旋 if (node != null) { if (node.thread != null) node.thread = null; // 避免 unpark() if (node.wasInterrupted && !node.interruptible) Thread.currentThread().interrupt(); if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase) return abortWait(phase); // possibly clean up on abort } // 唤醒当前phaser阶段的线程 releaseWaiters(phase); return p; } /** 从队列中删除线程,唤醒当前phaser阶段的线程 */ private void releaseWaiters(int phase) { QNode q; // 队列的第一个元素 Thread t; // its thread // 再次根据当前phaser选择对应的队列 AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; while ((q = head.get()) != null && q.phase != (int)(root.state >>> PHASE_SHIFT)) { if (head.compareAndSet(q, q.next) && (t = q.thread) != null) { // 删掉q节点,唤醒q节点中的线程 q.thread = null; LockSupport.unpark(t); // 唤醒线程 } } }

register()方法总结:

register方法为phaser添加一个新的party,如果onAdvance正在运行,那么这个方法会等待它运行结束再返回结果。

register和bulkRegister都由doRegister实现,bulkRegister是批量注册添加

使用了自旋 + CAS 技术来保证更新成功

如果前阶段正在执行onAdvance()方法,则需要阻塞等待(根据phase入相应队列)其执行完后再进行注册

当前phaser如果有父节点,需要交由父节点来完成注册

arrive()方法

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wssyfy.html