9.源码分析---SOFARPC是如何实现故障剔除的? (8)

ServiceHorizontalRegulationStrategy#isReachMaxDegradeIpCount

public boolean isReachMaxDegradeIpCount(MeasureResultDetail measureResultDetail) { InvocationStatDimension statDimension = measureResultDetail.getInvocationStatDimension(); ConcurrentHashSet<String> ips = getDegradeProviders(statDimension.getDimensionKey()); String ip = statDimension.getIp(); if (ips.contains(ip)) { return false; } else { //默认一个服务能够调控的最大ip数 int degradeMaxIpCount = FaultToleranceConfigManager.getDegradeMaxIpCount(statDimension.getAppName()); ipsLock.lock(); try { if (ips.size() < degradeMaxIpCount) { ips.add(ip); return false; } else { return true; } } finally { ipsLock.unlock(); } } }

这个方法是为了能够控制最多一个服务下面能调控多少个节点。比如一个服务下面只有3个节点,其中2个节点出了问题,通过调控解决了,那么不可能将第三个节点也进行调控了吧,必须要进行人工干预了,为啥会出现这样的问题。

然后会调用WeightDegradeStrategy#degrade对节点进行降权
WeightDegradeStrategy#degrade

public void degrade(MeasureResultDetail measureResultDetail) { //调用LogPrintDegradeStrategy方法,打印日志用 super.degrade(measureResultDetail); if (measureResultDetail.isLogOnly()) { return; } InvocationStatDimension statDimension = measureResultDetail.getInvocationStatDimension(); String appName = statDimension.getAppName(); ProviderInfo providerInfo = statDimension.getProviderInfo(); // if provider is removed or provider is warming up //如果为空,或是在预热中,则直接返回 if (providerInfo == null || providerInfo.getStatus() == ProviderStatus.WARMING_UP) { return; } //目前provider权重 int currentWeight = ProviderInfoWeightManager.getWeight(providerInfo); //降权比重 double weightDegradeRate = FaultToleranceConfigManager.getWeightDegradeRate(appName); //最少权重,默认为1 int degradeLeastWeight = FaultToleranceConfigManager.getDegradeLeastWeight(appName); //权重比率 * 目前权重 int degradeWeight = CalculateUtils.multiply(currentWeight, weightDegradeRate); //不能小于最小值 degradeWeight = degradeWeight < degradeLeastWeight ? degradeLeastWeight : degradeWeight; // degrade weight of this provider info boolean success = ProviderInfoWeightManager.degradeWeight(providerInfo, degradeWeight); if (success && LOGGER.isInfoEnabled(appName)) { LOGGER.infoWithApp(appName, "the weight was degraded. serviceUniqueName:[" + statDimension.getService() + "],ip:[" + statDimension.getIp() + "],origin weight:[" + currentWeight + "],degraded weight:[" + degradeWeight + "]."); } } //ProviderInfoWeightManager public static boolean degradeWeight(ProviderInfo providerInfo, int weight) { providerInfo.setStatus(ProviderStatus.DEGRADED); providerInfo.setWeight(weight); return true; }

这个方法实际上就是权重拿出来,然后根据比率进行设值并且不能小于最小的比重。
最后调用ProviderInfoWeightManager把当前的节点设值为DEGRADED,并设值新的权重。

如果是健康节点

调用ServiceHorizontalRegulationStrategy#isExistInTheDegradeList判断一下当前节点有没有被降级
ServiceHorizontalRegulationStrategy#isExistInTheDegradeList

public boolean isExistInTheDegradeList(MeasureResultDetail measureResultDetail) { InvocationStatDimension statDimension = measureResultDetail.getInvocationStatDimension(); ConcurrentHashSet<String> ips = getDegradeProviders(statDimension.getDimensionKey()); return ips != null && ips.contains(statDimension.getIp()); }

在调用isReachMaxDegradeIpCount方法的时候会把被降级的ip放入到ips集合中,所以这里只要获取就可以了。

如果该节点已被降级那么调用WeightRecoverStrategy#recover进行恢复
WeightRecoverStrategy#recover

public void recover(MeasureResultDetail measureResultDetail) { InvocationStatDimension statDimension = measureResultDetail.getInvocationStatDimension(); ProviderInfo providerInfo = statDimension.getProviderInfo(); // if provider is removed or provider is warming up if (providerInfo == null || providerInfo.getStatus() == ProviderStatus.WARMING_UP) { return; } Integer currentWeight = ProviderInfoWeightManager.getWeight(providerInfo); if (currentWeight == -1) { return; } String appName = statDimension.getAppName(); //默认2 double weightRecoverRate = FaultToleranceConfigManager.getWeightRecoverRate(appName); //也就是说一次只能恢复到2倍,不会一次性就恢复到originWeight int recoverWeight = CalculateUtils.multiply(currentWeight, weightRecoverRate); int originWeight = statDimension.getOriginWeight(); // recover weight of this provider info if (recoverWeight >= originWeight) { measureResultDetail.setRecoveredOriginWeight(true); //将provider状态设置为AVAILABLE,并且设置Weight ProviderInfoWeightManager.recoverOriginWeight(providerInfo, originWeight); if (LOGGER.isInfoEnabled(appName)) { LOGGER.infoWithApp(appName, "the weight was recovered to origin value. serviceUniqueName:[" + statDimension.getService() + "],ip:[" + statDimension.getIp() + "],origin weight:[" + currentWeight + "],recover weight:[" + originWeight + "]."); } } else { measureResultDetail.setRecoveredOriginWeight(false); boolean success = ProviderInfoWeightManager.recoverWeight(providerInfo, recoverWeight); if (success && LOGGER.isInfoEnabled(appName)) { LOGGER.infoWithApp(appName, "the weight was recovered. serviceUniqueName:[" + statDimension.getService() + "],ip:[" + statDimension.getIp() + "],origin weight:[" + currentWeight + "],recover weight:[" + recoverWeight + "]."); } } }

这个方法很简单,各位可以看看我上面的注释。

总结

总的来说FaultToleranceModule分为两部分:

FaultToleranceSubscriber订阅事件,负责订阅同步和异步结果事件

根据调用事件进行统计,以及内置的一些策略完成服务的降级和恢复操作。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wppwdd.html