9.源码分析---SOFARPC是如何实现故障剔除的?

SOFARPC源码解析系列:

1. 源码分析---SOFARPC可扩展的机制SPI

2. 源码分析---SOFARPC客户端服务引用

3. 源码分析---SOFARPC客户端服务调用

4. 源码分析---SOFARPC服务端暴露

5.源码分析---SOFARPC调用服务

6.源码分析---和dubbo相比SOFARPC是如何实现负载均衡的?

7.源码分析---SOFARPC是如何实现连接管理与心跳?

8.源码分析---从设计模式中看SOFARPC中的EventBus?

在第七讲里面7.源码分析---SOFARPC是如何实现连接管理与心跳?,我讲了客户端是怎么维护服务端的长连接的。但是有一种情况是Consumer 和 Provider的长连接还在,注册中心未下发摘除,但服务器端由于某些原因,例如长时间的 Full GC, 硬件故障(后文中为避免重复,统一描述为机器假死)等场景,处于假死状态。

这个时候 Consumer 应该不调用或少调用该 Provider,可以通过权重的方式来进行控制。目前 SOFARPC 5.3.0 以上的版本支持 RPC 单机故障剔除能力。SOFARPC 通过服务权重控制方式来减少异常服务的调用,将更多流量打到正常服务机器上,提高服务可用性。

接下来我们来讲讲具体的服务权重降级是怎么实现的。在看这篇文章之前我希望读者能读完如下几篇文章:

8.源码分析---从设计模式中看SOFARPC中的EventBus?,因为SOFARPC的服务权重降级是通过EventBus来调用的。

3. 源码分析---SOFARPC客户端服务调用,这篇文章里面写了是如何调用服务端的,客户端会在调用服务端的时候触发总线,给订阅者发送一个消息。

6.源码分析---和dubbo相比SOFARPC是如何实现负载均衡的?,这篇文章里面写的是SOFARPC的负载均衡是怎么实现的,以及如何通过权重控制并发量。

如果你了解了上面的知识,那么可以开始接下来的内容了。

实例

我们首先给出一个服务端和客户端的实例,方便大家去调试。

官方的文档在这里:自动故障剔除

service public static void main(String[] args) { ServerConfig serverConfig = new ServerConfig() .setProtocol("bolt") // 设置一个协议,默认bolt .setPort(12200) // 设置一个端口,默认12200 .setDaemon(false); // 非守护线程 ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>() .setInterfaceId(HelloService.class.getName()) // 指定接口 .setRef(new HelloServiceImpl()) // 指定实现 .setServer(serverConfig); // 指定服务端 providerConfig.export(); // 发布服务 } client public static void main(String[] args) { FaultToleranceConfig faultToleranceConfig = new FaultToleranceConfig(); faultToleranceConfig.setRegulationEffective(true); faultToleranceConfig.setDegradeEffective(true); faultToleranceConfig.setTimeWindow(10); faultToleranceConfig.setWeightDegradeRate(0.5); FaultToleranceConfigManager.putAppConfig("appName", faultToleranceConfig); ApplicationConfig applicationConfig = new ApplicationConfig(); applicationConfig.setAppName("appName"); ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>() .setInterfaceId(HelloService.class.getName()) // 指定接口 .setProtocol("bolt") // 指定协议 .setDirectUrl("bolt://127.0.0.1:12200") // 指定直连地址 .setConnectTimeout(2000 * 1000) .setApplication(applicationConfig); HelloService helloService = consumerConfig.refer(); while (true) { try { LOGGER.info(helloService.sayHello("world")); } catch (Exception e) { e.printStackTrace(); } try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } } 自动故障剔除模块的注册

我们在在客户端的例子里面通过FaultToleranceConfigManager注册了FaultToleranceConfig配置。

FaultToleranceConfig faultToleranceConfig = new FaultToleranceConfig(); faultToleranceConfig.setRegulationEffective(true); faultToleranceConfig.setDegradeEffective(true); faultToleranceConfig.setTimeWindow(10); faultToleranceConfig.setWeightDegradeRate(0.5); FaultToleranceConfigManager.putAppConfig("appName", faultToleranceConfig);

我们先进入到FaultToleranceConfigManager里面看看putAppConfig做了什么。

FaultToleranceConfigManager#putAppConfig /** * All fault-tolerance config of apps */ private static final ConcurrentMap<String, FaultToleranceConfig> APP_CONFIGS = new ConcurrentHashMap<String, FaultToleranceConfig>(); public static void putAppConfig(String appName, FaultToleranceConfig value) { if (appName == null) { if (LOGGER.isWarnEnabled()) { LOGGER.warn("App name is null when put fault-tolerance config"); } return; } if (value != null) { APP_CONFIGS.put(appName, value); if (LOGGER.isInfoEnabled(appName)) { LOGGER.infoWithApp(appName, "Get a new resource, value[" + value + "]"); } } else { APP_CONFIGS.remove(appName); if (LOGGER.isInfoEnabled(appName)) { LOGGER.infoWithApp(appName, "Remove a resource, key[" + appName + "]"); } } calcEnable(); } static void calcEnable() { for (FaultToleranceConfig config : APP_CONFIGS.values()) { if (config.isRegulationEffective()) { aftEnable = true; return; } } aftEnable = false; }

上面的方法写的非常的清楚:

校验appName,为空的话直接返回

然后把我们定义的config放到APP_CONFIGS这个变量里面

调用calcEnable,根据我们配置的config,将aftEnable变量设置为true

到这里就完成了故障剔除的配置设置。

注册故障剔除模块

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wppwdd.html