9.源码分析---SOFARPC是如何实现故障剔除的? (2)

我们在8.源码分析---从设计模式中看SOFARPC中的EventBus?里面讲了,初始化ConsumerConfig的时候会初始化父类的静态代码块,然后会初始化RpcRuntimeContext的静态代码块。

RpcRuntimeContext

static { if (LOGGER.isInfoEnabled()) { LOGGER.info("Welcome! Loading SOFA RPC Framework : {}, PID is:{}", Version.BUILD_VERSION, PID); } put(RpcConstants.CONFIG_KEY_RPC_VERSION, Version.RPC_VERSION); // 初始化一些上下文 initContext(); // 初始化其它模块 ModuleFactory.installModules(); // 增加jvm关闭事件 if (RpcConfigs.getOrDefaultValue(RpcOptions.JVM_SHUTDOWN_HOOK, true)) { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { if (LOGGER.isWarnEnabled()) { LOGGER.warn("SOFA RPC Framework catch JVM shutdown event, Run shutdown hook now."); } destroy(false); } }, "SOFA-RPC-ShutdownHook")); } }

在这个代码块里面会调用ModuleFactory初始化其他模块

ModuleFactory#installModules

public static void installModules() { ExtensionLoader<Module> loader = ExtensionLoaderFactory.getExtensionLoader(Module.class); //moduleLoadList 默认是 * String moduleLoadList = RpcConfigs.getStringValue(RpcOptions.MODULE_LOAD_LIST); for (Map.Entry<String, ExtensionClass<Module>> o : loader.getAllExtensions().entrySet()) { String moduleName = o.getKey(); Module module = o.getValue().getExtInstance(); // judge need load from rpc option if (needLoad(moduleLoadList, moduleName)) { // judge need load from implement if (module.needLoad()) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Install Module: {}", moduleName); } //安装模板 module.install(); INSTALLED_MODULES.put(moduleName, module); } else { if (LOGGER.isInfoEnabled()) { LOGGER.info("The module " + moduleName + " does not need to be loaded."); } } } else { if (LOGGER.isInfoEnabled()) { LOGGER.info("The module " + moduleName + " is not in the module load list."); } } } }

这里会根据SPI初始化四个模块,分别是:
fault-tolerance
sofaTracer-resteasy
lookout
sofaTracer

我们这里只讲解fault-tolerance模块。

然后我们进入到FaultToleranceModule#install方法中

private Regulator regulator = new TimeWindowRegulator(); public void install() { subscriber = new FaultToleranceSubscriber(); //注册ClientSyncReceiveEvent和ClientAsyncReceiveEvent到总线中 EventBus.register(ClientSyncReceiveEvent.class, subscriber); EventBus.register(ClientAsyncReceiveEvent.class, subscriber); String regulatorAlias = RpcConfigs.getOrDefaultValue(RpcOptions.AFT_REGULATOR, "timeWindow"); regulator = ExtensionLoaderFactory.getExtensionLoader(Regulator.class).getExtension(regulatorAlias); //调用TimeWindowRegulator的init方法 regulator.init(); }

这里我们的订阅者是FaultToleranceSubscriber实例,订阅了两个ClientSyncReceiveEvent和ClientAsyncReceiveEvent事件。

然后会调用regulator的实现类TimeWindowRegulator的初始化方法
TimeWindowRegulator#init

/** * 度量策略(创建计算模型, 对计算模型里的数据进行度量,选出正常和异常节点) */ private MeasureStrategy measureStrategy; /** * 计算策略(根据度量结果,判断是否需要执行降级或者恢复) */ private RegulationStrategy regulationStrategy; /** * 降级策略: 例如调整权重 */ private DegradeStrategy degradeStrategy; /** * 恢复策略:例如调整权重 */ private RecoverStrategy recoverStrategy; /** * Listener for invocation stat change. */ private final InvocationStatListener listener = new TimeWindowRegulatorListener(); public void init() { String measureStrategyAlias = RpcConfigs .getOrDefaultValue(RpcOptions.AFT_MEASURE_STRATEGY, "serviceHorizontal"); String regulationStrategyAlias = RpcConfigs.getOrDefaultValue(RpcOptions.AFT_REGULATION_STRATEGY, "serviceHorizontal"); String degradeStrategyAlias = RpcConfigs.getOrDefaultValue(RpcOptions.AFT_DEGRADE_STRATEGY, "weight"); String recoverStrategyAlias = RpcConfigs.getOrDefaultValue(RpcOptions.AFT_RECOVER_STRATEGY, "weight"); //ServiceHorizontalMeasureStrategy measureStrategy = ExtensionLoaderFactory.getExtensionLoader(MeasureStrategy.class).getExtension( measureStrategyAlias); //ServiceHorizontalRegulationStrategy regulationStrategy = ExtensionLoaderFactory.getExtensionLoader(RegulationStrategy.class).getExtension( regulationStrategyAlias); //WeightDegradeStrategy degradeStrategy = ExtensionLoaderFactory.getExtensionLoader(DegradeStrategy.class).getExtension( degradeStrategyAlias); //WeightRecoverStrategy recoverStrategy = ExtensionLoaderFactory.getExtensionLoader(RecoverStrategy.class).getExtension( recoverStrategyAlias); //TimeWindowRegulatorListener InvocationStatFactory.addListener(listener); }

这里面主要是根据SPI初始化了度量策略,计算策略,降级策略,恢复策略,这些东西有什么用,我们下面讲。

触发权重降级

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wppwdd.html