readOnlyCacheMap用的是ConcurrentHashMap,线程安全的。
readWriteCacheMap用的是GuavaCache,不懂的小伙伴可以自己阅读以下,我之前的博客也有讲解这个,这个是谷歌开源的Guava项目基于内存的缓存,其内部也是实现的Map结构。
主要重点我们来看下GuavaCache,这里初始化大小是serverConfig.getInitialCapacityOfResponseCache() 默认是1000,也是Map的初始大小。
expireAfterWrite 刷新时间是serverConfig.getResponseCacheAutoExpirationInSeconds()默认时间是180s。
接着是build方法,这里获取注册表信息就是用的generatePayload方法,如果查询readWriteCacheMap中注册表信息为空,这会执行build方法。
继续跟进generatePayload方法:
private Value generatePayload(Key key) { Stopwatch tracer = null; try { String payload; switch (key.getEntityType()) { case Application: boolean isRemoteRegionRequested = key.hasRegions(); if (ALL_APPS.equals(key.getName())) { if (isRemoteRegionRequested) { tracer = serializeAllAppsWithRemoteRegionTimer.start(); payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions())); } else { tracer = serializeAllAppsTimer.start(); payload = getPayLoad(key, registry.getApplications()); } } else if (ALL_APPS_DELTA.equals(key.getName())) { if (isRemoteRegionRequested) { tracer = serializeDeltaAppsWithRemoteRegionTimer.start(); versionDeltaWithRegions.incrementAndGet(); versionDeltaWithRegionsLegacy.incrementAndGet(); payload = getPayLoad(key, registry.getApplicationDeltasFromMultipleRegions(key.getRegions())); } else { tracer = serializeDeltaAppsTimer.start(); versionDelta.incrementAndGet(); versionDeltaLegacy.incrementAndGet(); payload = getPayLoad(key, registry.getApplicationDeltas()); } } break; } return new Value(payload); } finally { if (tracer != null) { tracer.stop(); } } }这个代码删减了一部分,到时增量抓取注册表也会走这个逻辑,ALL_APPS就是全量抓取,ALL_APPS_DELTA就是增量抓取的意思,这里先插个眼,一会增量抓取注册表的逻辑再回头看。
上面的逻辑我们只需要关注registry.getApplicationsFromMultipleRegions 即可,这个是获取注册表的逻辑。接着继续往下跟代码:
AbstractInstanceRegistry.java
public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) { Applications apps = new Applications(); apps.setVersion(1L); for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) { Application app = null; if (entry.getValue() != null) { for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) { Lease<InstanceInfo> lease = stringLeaseEntry.getValue(); if (app == null) { app = new Application(lease.getHolder().getAppName()); } app.addInstance(decorateInstanceInfo(lease)); } } if (app != null) { apps.addApplication(app); } } if (includeRemoteRegion) { for (String remoteRegion : remoteRegions) { RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion); if (null != remoteRegistry) { Applications remoteApps = remoteRegistry.getApplications(); for (Application application : remoteApps.getRegisteredApplications()) { if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) { logger.info("Application {} fetched from the remote region {}", application.getName(), remoteRegion); Application appInstanceTillNow = apps.getRegisteredApplications(application.getName()); if (appInstanceTillNow == null) { appInstanceTillNow = new Application(application.getName()); apps.addApplication(appInstanceTillNow); } for (InstanceInfo instanceInfo : application.getInstances()) { appInstanceTillNow.addInstance(instanceInfo); } } else { logger.debug("Application {} not fetched from the remote region {} as there exists a " + "whitelist and this app is not in the whitelist.", application.getName(), remoteRegion); } } } else { logger.warn("No remote registry available for the remote region {}", remoteRegion); } } } apps.setAppsHashCode(apps.getReconcileHashCode()); return apps; }这里再看到 registry.entrySet()是不是会特别亲切?Map<String, Map<String, Lease<InstanceInfo>> 我们上一篇讲Client注册的时候 就是将注册信息放入到registry对应这个数据结构中的,果不其然,这里拿到所有的注册信息,然后封装到Applications 对象中的。
这里最后apps.setAppsHashCode()逻辑,先插个眼 后面讲增量同步有类似的逻辑,后面再回头看。接着再回头看 返回数据后 readWriteCacheMap 的操作逻辑。
if (shouldUseReadOnlyResponseCache) { timer.schedule(getCacheUpdateTask(), new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) + responseCacheUpdateIntervalMs), responseCacheUpdateIntervalMs); } private TimerTask getCacheUpdateTask() { return new TimerTask() { @Override public void run() { logger.debug("Updating the client cache from response cache"); for (Key key : readOnlyCacheMap.keySet()) { if (logger.isDebugEnabled()) { logger.debug("Updating the client cache from response cache for key : {} {} {} {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType()); } try { CurrentRequestVersion.set(key.getVersion()); Value cacheValue = readWriteCacheMap.get(key); Value currentCacheValue = readOnlyCacheMap.get(key); if (cacheValue != currentCacheValue) { readOnlyCacheMap.put(key, cacheValue); } } catch (Throwable th) { logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th); } finally { CurrentRequestVersion.remove(); } } } }; }
