4.Sentinel源码分析— Sentinel是如何做到降级的? (3)

调用data的currentWindow方法会调用到LeapArray的currentWindow方法中去:
LeapArray#currentWindow

public WindowWrap<T> currentWindow(long timeMillis) { if (timeMillis < 0) { return null; } //通过当前时间判断属于哪个窗口 int idx = calculateTimeIdx(timeMillis); //计算出窗口开始时间 // Calculate current bucket start time. long windowStart = calculateWindowStart(timeMillis); while (true) { //获取数组里的老数据 WindowWrap<T> old = array.get(idx); if (old == null) { WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); if (array.compareAndSet(idx, null, window)) { // Successfully updated, return the created bucket. return window; } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } // 如果对应时间窗口的开始时间与计算得到的开始时间一样 // 那么代表当前即是我们要找的窗口对象,直接返回 } else if (windowStart == old.windowStart()) { return old; } else if (windowStart > old.windowStart()) { //如果当前的开始时间小于原开始时间,那么就更新到新的开始时间 if (updateLock.tryLock()) { try { // Successfully get the update lock, now we reset the bucket. return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart < old.windowStart()) { //一般来说不会走到这里 // Should not go through here, as the provided time is already behind. return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } } }

这里我简单介绍一下这个方法,这个方法的详细讲解已经在第一篇源码分析里做了。

这个方法里面会根据当前的时间戳来计算出array数组里面的index,然后去array数组中找相应的数据,如果节点已经存在,那么用CAS更新一个新的节点;如果节点是新的,那么直接返回;如果节点失效了,设置当前节点,清除所有失效节点。

这里我直接引用1.Sentinel源码分析—FlowRuleManager加载规则做了什么?中的例子:

1. 如果array数据里面的bucket数据如下所示: NULL B4 |_______|_______| 800 1000 1200 ^ time=888 正好当前时间所对应的槽位里面的数据是空的,那么就用CAS更新 2. 如果array里面已经有数据了,并且槽位里面的窗口开始时间和当前的开始时间相等,那么直接返回 B3 B4 ||_______|_______||___ 800 1000 1200 timestamp ^ time=888 3. 例如当前时间是1676,所对应窗口里面的数据的窗口开始时间小于当前的窗口开始时间,那么加上锁,然后设置槽位的窗口开始时间为当前窗口开始时间,并把槽位里面的数据重置 (old) B0 |_______||_______| ... 1200 1400 ^ time=1676

再回到ArrayMetric的success方法中,往下走调用data.values()方法:
LeapArray#success

public List<T> values(long timeMillis) { if (timeMillis < 0) { return new ArrayList<T>(); } int size = array.length(); List<T> result = new ArrayList<T>(size); for (int i = 0; i < size; i++) { WindowWrap<T> windowWrap = array.get(i); if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) { continue; } result.add(windowWrap.value()); } return result; }

这个方法就是用来获取所有有效的MetricBucket,并返回。
然后通过调用MetricBucket的success方法获取被成功调用的次数。

我们接着来看ArrayMetric的rt方法:

public long rt() { data.currentWindow(); long rt = 0; //获取当前时间窗口的统计数据 List<MetricBucket> list = data.values(); //统计当前时间窗口的平均相应时间之和 for (MetricBucket window : list) { rt += window.rt(); } return rt; }

这个方法和上面的success方法差不多,获取所有的MetricBucket的rt数据求和返回。
然后就可以通过rt方法返回的时间总和除以成功调用的总和求得平均数。

我们再回到DegradeRule的passCheck方法中的响应时间降级策略中:

if (grade == RuleConstant.DEGRADE_GRADE_RT) { //获取节点的平均响应时间 double rt = clusterNode.avgRt(); if (rt < this.count) { passCount.set(0); return true; } //rtSlowRequestAmount默认是5 // Sentinel will degrade the service only if count exceeds. if (passCount.incrementAndGet() < rtSlowRequestAmount) { return true; } // 根据异常比例降级 } //省略 return false;

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zydpsp.html