9.源码分析---SOFARPC是如何实现故障剔除的? (3)

我们在3. 源码分析---SOFARPC客户端服务调用里面讲到了,客户端在调用的时候最后会调用AbstractCluster#doSendMsg方法,然后根据不同的策略,同步、异步、单向等调用然后返回response实例。

protected SofaResponse doSendMsg(ProviderInfo providerInfo, ClientTransport transport, SofaRequest request) throws SofaRpcException { .... // 同步调用 if (RpcConstants.INVOKER_TYPE_SYNC.equals(invokeType)) { long start = RpcRuntimeContext.now(); try { //BoltClientTransport#syncSend response = transport.syncSend(request, timeout); } finally { if (RpcInternalContext.isAttachmentEnable()) { long elapsed = RpcRuntimeContext.now() - start; context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE, elapsed); } } } .... }

因为在故障模块注册的时候订阅了两个ClientSyncReceiveEvent和ClientAsyncReceiveEvent事件。即一个同步事件和一个异步事件,我们这里挑同步调用进行讲解。

在上面的代码片段中,我们看到了会调用到BoltClientTransport#syncSend。

BoltClientTransport#syncSend

public SofaResponse syncSend(SofaRequest request, int timeout) throws SofaRpcException { //检查连接 checkConnection(); RpcInternalContext context = RpcInternalContext.getContext(); InvokeContext boltInvokeContext = createInvokeContext(request); SofaResponse response = null; SofaRpcException throwable = null; try { //向总线发出ClientBeforeSendEvent事件 beforeSend(context, request); response = doInvokeSync(request, boltInvokeContext, timeout); return response; } catch (Exception e) { // 其它异常 throwable = convertToRpcException(e); throw throwable; } finally { //向总线发出ClientAfterSendEvent事件 afterSend(context, boltInvokeContext, request); //向总线发出ClientSyncReceiveEvent事件 if (EventBus.isEnable(ClientSyncReceiveEvent.class)) { //把当前被调用的provider和ConsumerConfig发送到总线中去 EventBus.post(new ClientSyncReceiveEvent(transportConfig.getConsumerConfig(), transportConfig.getProviderInfo(), request, response, throwable)); } } }

其实上面这么一大段代码和我们这篇文章有关系的也就只要最后向总线发送ClientSyncReceiveEvent事件而已。

总线发送的时候会触发订阅者FaultToleranceSubscriber的onEvent方法。

我们进入到FaultToleranceSubscriber#onEvent

public void onEvent(Event originEvent) { Class eventClass = originEvent.getClass(); if (eventClass == ClientSyncReceiveEvent.class) { //这里会调用aftEnable if (!FaultToleranceConfigManager.isEnable()) { return; } // 同步结果 ClientSyncReceiveEvent event = (ClientSyncReceiveEvent) originEvent; ConsumerConfig consumerConfig = event.getConsumerConfig(); ProviderInfo providerInfo = event.getProviderInfo(); InvocationStat result = InvocationStatFactory.getInvocationStat(consumerConfig, providerInfo); if (result != null) { //记录调用次数 result.invoke(); Throwable t = event.getThrowable(); if (t != null) { //记录异常次数 result.catchException(t); } } } ... }

这里我们忽略其他的事件,只留下ClientSyncReceiveEvent事件的处理流程。
在这里我们又看到了InvocationStatFactory这个工厂类,在上面TimeWindowRegulator#init也用到了这个类。

在返回result之后会调用invoke方法,记录一下客户端调用服务端的次数,如果有异常,也会调用一下catchException方法,记录一下异常的次数。这两个参数会在做服务剔除的时候异步做统计使用。

InvocationStatFactory#getInvocationStat

public static InvocationStat getInvocationStat(ConsumerConfig consumerConfig, ProviderInfo providerInfo) { String appName = consumerConfig.getAppName(); if (appName == null) { return null; } // 应用开启单机故障摘除功能 if (FaultToleranceConfigManager.isRegulationEffective(appName)) { return getInvocationStat(new InvocationStatDimension(providerInfo, consumerConfig)); } return null; } public static InvocationStat getInvocationStat(InvocationStatDimension statDimension) { //第一次的时候为空 InvocationStat invocationStat = ALL_STATS.get(statDimension); if (invocationStat == null) { //直接new一个实例放入到ALL_STATS变量中 invocationStat = new ServiceExceptionInvocationStat(statDimension); InvocationStat old = ALL_STATS.putIfAbsent(statDimension, invocationStat); if (old != null) { invocationStat = old; } //LISTENERS在调用TimeWindowRegulator#init的时候add进来的,只有一个TimeWindowRegulatorListener for (InvocationStatListener listener : LISTENERS) { listener.onAddInvocationStat(invocationStat); } } return invocationStat; }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wppwdd.html