本文共 24459 字,大约阅读时间需要 81 分钟。
一、首先看一下<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>的结构如下:
看spring.factories中内容,优先加载装配的配置
org.springframework.cloud.bootstrap.BootstrapConfiguration=\org.springframework.cloud.alibaba.nacos.NacosConfigBootstrapConfigurationorg.springframework.boot.autoconfigure.EnableAutoConfiguration=\org.springframework.cloud.alibaba.nacos.NacosConfigAutoConfiguration,\org.springframework.cloud.alibaba.nacos.endpoint.NacosConfigEndpointAutoConfigurationorg.springframework.boot.diagnostics.FailureAnalyzer=\org.springframework.cloud.alibaba.nacos.diagnostics.analyzer.NacosConnectionFailureAnalyzer
自动装配配置,进行Bean的初始化
@Bean public NacosConfigProperties nacosConfigProperties(ApplicationContext context) { if (context.getParent() != null && BeanFactoryUtils.beanNamesForTypeIncludingAncestors( context.getParent(), NacosConfigProperties.class).length > 0) { return BeanFactoryUtils.beanOfTypeIncludingAncestors(context.getParent(), NacosConfigProperties.class); } NacosConfigProperties nacosConfigProperties = new NacosConfigProperties(); return nacosConfigProperties; } @Bean public NacosRefreshProperties nacosRefreshProperties() { return new NacosRefreshProperties(); } @Bean public NacosRefreshHistory nacosRefreshHistory() { return new NacosRefreshHistory(); } @Bean public NacosContextRefresher nacosContextRefresher( NacosConfigProperties nacosConfigProperties, NacosRefreshProperties nacosRefreshProperties, NacosRefreshHistory refreshHistory) { return new NacosContextRefresher(nacosRefreshProperties, refreshHistory, nacosConfigProperties.configServiceInstance()); }
首先远程配置的加载,spring boot 通过implements PropertySourceLocator作为入口,具体实现override locate方法
发现都调了nacosPropertySourceBuilder.build(dataId, group, fileExtension, true);
NacosPropertySourceBuilder中build通过configService.getConfig获取到远程配置。
NacosPropertySource build(String dataId, String group, String fileExtension, boolean isRefreshable) { Properties p = loadNacosData(dataId, group, fileExtension); NacosPropertySource nacosPropertySource = new NacosPropertySource(group, dataId, propertiesToMap(p), new Date(), isRefreshable); NacosPropertySourceRepository.collectNacosPropertySources(nacosPropertySource); return nacosPropertySource; } private Properties loadNacosData(String dataId, String group, String fileExtension) { String data = null; try { data = configService.getConfig(dataId, group, timeout); if (!StringUtils.isEmpty(data)) { log.info(String.format("Loading nacos data, dataId: '%s', group: '%s'", dataId, group)); if (fileExtension.equalsIgnoreCase("properties")) { Properties properties = new Properties(); properties.load(new StringReader(data)); return properties; } else if (fileExtension.equalsIgnoreCase("yaml") || fileExtension.equalsIgnoreCase("yml")) { YamlPropertiesFactoryBean yamlFactory = new YamlPropertiesFactoryBean(); yamlFactory.setResources(new ByteArrayResource(data.getBytes())); return yamlFactory.getObject(); } } } catch (NacosException e) { log.error("get data from Nacos error,dataId:{}, ", dataId, e); } catch (Exception e) { log.error("parse data from Nacos error,dataId:{},data:{},", dataId, data, e); } return EMPTY_PROPERTIES; }
其次是配置刷新的设计以及实现,通过NacosContextRefresher,其实现了ApplicationListener
public class NacosContextRefresher implements ApplicationListener, ApplicationContextAware
当Spring上下文已经准备完毕的时候触发onApplicationEvent(ApplicationReadyEvent event)
@Override public void onApplicationEvent(ApplicationReadyEvent event) { // many Spring context if (this.ready.compareAndSet(false, true)) { this.registerNacosListenersForApplications(); } } @Override public void setApplicationContext(ApplicationContext applicationContext) { this.applicationContext = applicationContext; } private void registerNacosListenersForApplications() { if (refreshProperties.isEnabled()) { for (NacosPropertySource nacosPropertySource : NacosPropertySourceRepository .getAll()) { if (!nacosPropertySource.isRefreshable()) { continue; } String dataId = nacosPropertySource.getDataId(); registerNacosListener(nacosPropertySource.getGroup(), dataId); } } }
最后是注册了一个listener,override了receiveConfigInfo方法,
try { configService.addListener(dataId, group, listener);}
private void registerNacosListener(final String group, final String dataId) { Listener listener = listenerMap.computeIfAbsent(dataId, i -> new Listener() { @Override public void receiveConfigInfo(String configInfo) { refreshCountIncrement(); String md5 = ""; if (!StringUtils.isEmpty(configInfo)) { try { MessageDigest md = MessageDigest.getInstance("MD5"); md5 = new BigInteger(1, md.digest(configInfo.getBytes("UTF-8"))) .toString(16); } catch (NoSuchAlgorithmException | UnsupportedEncodingException e) { log.warn("[Nacos] unable to get md5 for dataId: " + dataId, e); } } refreshHistory.add(dataId, md5); applicationContext.publishEvent( new RefreshEvent(this, null, "Refresh Nacos config")); if (log.isDebugEnabled()) { log.debug("Refresh Nacos config group " + group + ",dataId" + dataId); } } @Override public Executor getExecutor() { return null; } }); try { configService.addListener(dataId, group, listener); } catch (NacosException e) { e.printStackTrace(); } }
package主要作用:
client 启动时拉取配置信息
analyzer:FailureAnalyzer 一种很好的方式在启动时拦截异常并将其转换为易读的消息,并将其包含在FailureAnalysis中。
endpoint:主要用于暴漏SpringMvc内部运行的信息,Spring boot actuator监控端点集成
refresh:定时监听配置变化
二、接下来重点看依赖的nacos-client中的ConfigService的实现类NacosConfigService
上面提到自动装载时的代码
configService = NacosFactory.createConfigService(properties);
最后追踪到是ConfigFactory中进行了创建createConfigService,通过反射的方式创建。
public static ConfigService createConfigService(Properties properties) throws NacosException { try { Class driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService"); Constructor constructor = driverImplClass.getConstructor(Properties.class); ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties); return vendorImpl; } catch (Throwable e) { throw new NacosException(-400, e.getMessage()); }}
看一下package目录:
看到NacosConfigService中有两个属性HttpAgent,ClientWorker,agent主要是http请求,worker则是负责longpolling长轮训
/** * http agent */ private HttpAgent agent; /** * longpolling */ private ClientWorker worker; private String namespace; private String encode; private ConfigFilterChainManager configFilterChainManager = new ConfigFilterChainManager(); public NacosConfigService(Properties properties) throws NacosException { String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE); if (StringUtils.isBlank(encodeTmp)) { encode = Constants.ENCODE; } else { encode = encodeTmp.trim(); } initNamespace(properties); agent = new MetricsHttpAgent(new ServerHttpAgent(properties)); agent.start(); worker = new ClientWorker(agent, configFilterChainManager); }
再看ClientWorker的构造方法,有一个固定线程池,一个cache线程池,构造方法后面固定线程池每10ms执行了checkConfigInfo(),校验cacheMap中listenerSize,如果现在listenerSize大于长轮训的数量,则全部取出进行长轮训线程池中执行长轮训,同时赋值listenerSize给长轮训数量。
public void checkConfigInfo() { // 分任务 int listenerSize = cacheMap.get().size(); // 向上取整为批数 int longingTaskCount = (int)Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); if (longingTaskCount > currentLongingTaskCount) { for (int i = (int)currentLongingTaskCount; i < longingTaskCount; i++) { // 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题 executorService.execute(new LongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; } }
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager) { this.agent = agent; this.configFilterChainManager = configFilterChainManager; executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker." + agent.getName()); t.setDaemon(true); return t; } }); executorService = Executors.newCachedThreadPool(new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker.longPolling" + agent.getName()); t.setDaemon(true); return t; } }); executor.scheduleWithFixedDelay(new Runnable() { public void run() { try { checkConfigInfo(); } catch (Throwable e) { LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); } } }, 1L, 10L, TimeUnit.MILLISECONDS);}
再看checkConfigInfo(),取一批新任务后,然后用上面的cache线程池执行LongPollingRunnable线程
public void checkConfigInfo() { // 分任务 int listenerSize = cacheMap.get().size(); // 向上取整为批数 int longingTaskCount = (int)Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); if (longingTaskCount > currentLongingTaskCount) { for (int i = (int)currentLongingTaskCount; i < longingTaskCount; i++) { // 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题 executorService.execute(new LongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; }}
那么再看LongPollingRunnable线程run,首先是本地检测checkLocalConfig(),本地校验中可以发现本地缓存的路径是~/nacos/config/fixed-{address}_8848_nacos/snapshot/DEFAULT_GROUP/{dataId}
static { LOCAL_FILEROOT_PATH = System.getProperty("JM.LOG.PATH", System.getProperty("user.home")) + File.separator + "nacos" + File.separator + "config"; LOCAL_SNAPSHOT_PATH = System.getProperty("JM.SNAPSHOT.PATH", System.getProperty("user.home")) + File.separator + "nacos" + File.separator + "config"; LOGGER.info("LOCAL_SNAPSHOT_PATH:{}", LOCAL_SNAPSHOT_PATH);}
,然后是服务器校验变化的groupKeys,通过方法checkUpdateDataIds(),之后看到cacheData.checkListenerMd5();最后的finally 中又重新通过 executorService 提交了本任务(实现长轮训,无限循环执行)。
class LongPollingRunnable implements Runnable { private int taskId; public LongPollingRunnable(int taskId) { this.taskId = taskId; } public void run() { try { ListcacheDatas = new ArrayList (); // check failover config for (CacheData cacheData : cacheMap.get().values()) { if (cacheData.getTaskId() == taskId) { cacheDatas.add(cacheData); try { checkLocalConfig(cacheData); if (cacheData.isUseLocalConfigInfo()) { cacheData.checkListenerMd5(); } } catch (Exception e) { LOGGER.error("get local config info error", e); } } } List inInitializingCacheList = new ArrayList (); // check server config List changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); for (String groupKey : changedGroupKeys) { String[] key = GroupKey.parseKey(groupKey); String dataId = key[0]; String group = key[1]; String tenant = null; if (key.length == 3) { tenant = key[2]; } try { String content = getServerConfig(dataId, group, tenant, 3000L); CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant)); cache.setContent(content); LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(content)); } catch (NacosException ioe) { String message = String.format( "[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ioe); } } for (CacheData cacheData : cacheDatas) { if (!cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { cacheData.checkListenerMd5(); cacheData.setInitializing(false); } } inInitializingCacheList.clear(); } catch (Throwable e) { LOGGER.error("longPolling error", e); } finally { executorService.execute(this); } }}
上面自动配置时,提到了最后configService.addListener(dataId, group, listener);最后是调用ClientWorker中下面的addTenantListeners方法
public void addTenantListeners(String dataId, String group, List listeners) { group = null2defaultGroup(group); String tenant = agent.getTenant(); CacheData cache = addCacheDataIfAbsent(dataId, group, tenant); for (Listener listener : listeners) { cache.addListener(listener); }}
public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) { CacheData cache = getCache(dataId, group, tenant); if (null != cache) { return cache; } String key = GroupKey.getKeyTenant(dataId, group, tenant); cache = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant); synchronized (cacheMap) { CacheData cacheFromMap = getCache(dataId, group, tenant); // multiple listeners on the same dataid+group and race condition,so // double check again // other listener thread beat me to set to cacheMap if (null != cacheFromMap) { cache = cacheFromMap; // reset so that server not hang this check cache.setInitializing(true); } Mapcopy = new HashMap (cacheMap.get()); copy.put(key, cache); cacheMap.set(copy); } LOGGER.info("[{}] [subscribe] {}", agent.getName(), key); MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size()); return cache;}
可以看到是listener是被CacheData所持有的,下面是CacheData的主要属性,md5值在构造时getMd5String(content),ManagerListenerWrap中有个lastCallMd5
private final String name;private final ConfigFilterChainManager configFilterChainManager;public final String dataId;public final String group;public final String tenant;private final CopyOnWriteArrayListlisteners;private volatile String md5;/** * whether use local config */private volatile boolean isUseLocalConfig = false;/** * last motify time */private volatile long localConfigLastModified;private volatile String content;private int taskId;private volatile boolean isInitializing = true;
再看LongPollingRunnable线程run方法中CacheData的checkListenerMd5(),如果md5跟lastCallMd5不一致,则safeNotifyListener()
void checkListenerMd5() { for (ManagerListenerWrap wrap : listeners) { if (!md5.equals(wrap.lastCallMd5)) { safeNotifyListener(dataId, group, content, md5, wrap); } }}
safeNotifyListener是执行了listener的回调方法listener.receiveConfigInfo(contentTmp);这样上面NacosContextRefresher注册的listener就会执行receiveConfigInfo中的代码,完成了配置的更新。
private void safeNotifyListener(final String dataId, final String group, final String content, final String md5, final ManagerListenerWrap listenerWrap) { final Listener listener = listenerWrap.listener; Runnable job = new Runnable() { public void run() { ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader(); ClassLoader appClassLoader = listener.getClass().getClassLoader(); try { if (listener instanceof AbstractSharedListener) { AbstractSharedListener adapter = (AbstractSharedListener)listener; adapter.fillContext(dataId, group); LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5); } // 执行回调之前先将线程classloader设置为具体webapp的classloader,以免回调方法中调用spi接口是出现异常或错用(多应用部署才会有该问题)。 Thread.currentThread().setContextClassLoader(appClassLoader); ConfigResponse cr = new ConfigResponse(); cr.setDataId(dataId); cr.setGroup(group); cr.setContent(content); configFilterChainManager.doFilter(null, cr); String contentTmp = cr.getContent(); listener.receiveConfigInfo(contentTmp); listenerWrap.lastCallMd5 = md5; LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5, listener); } catch (NacosException de) { LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}", name, dataId, group, md5, listener, de.getErrCode(), de.getErrMsg()); } catch (Throwable t) { LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId, group, md5, listener, t.getCause()); } finally { Thread.currentThread().setContextClassLoader(myClassLoader); } } }; final long startNotify = System.currentTimeMillis(); try { if (null != listener.getExecutor()) { listener.getExecutor().execute(job); } else { job.run(); } } catch (Throwable t) { LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId, group, md5, listener, t.getCause()); } final long finishNotify = System.currentTimeMillis(); LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ", name, (finishNotify - startNotify), dataId, group, md5, listener);}
上面客户端走完了闭环代码,再看客户端去服务端请求变化的配置的数据方法,设置超时时间30s
ListchangedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), timeout);
请求服务端路径为/v1/cs/configs/listener,对应为nacos中的config模块中ConfigController,追踪后是调用ConfigServletInner的doPollingConfig()
// 长轮询if (LongPollingService.isSupportLongPolling(request)) { longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize); return HttpServletResponse.SC_OK + "";}
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, MapclientMd5Map, int probeRequestSize) { String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER); String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER); String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER); String tag = req.getHeader("Vipserver-Tag"); int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500); /** * 提前500ms返回响应,为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动 add delay time for LoadBalance */ long timeout = Math.max(10000, Long.parseLong(str) - delayTime); if (isFixedPolling()) { timeout = Math.max(10000, getFixedPollingInterval()); // do nothing but set fix polling timeout } else { long start = System.currentTimeMillis(); List changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map); if (changedGroups.size() > 0) { generateResponse(req, rsp, changedGroups); LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) { LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } } String ip = RequestUtil.getRemoteIp(req); // 一定要由HTTP线程调用,否则离开后容器会立即发送响应 final AsyncContext asyncContext = req.startAsync(); // AsyncContext.setTimeout()的超时时间不准,所以只能自己控制 asyncContext.setTimeout(0L); scheduler.execute( new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));}
此时交给了final ScheduledExecutorService scheduler;定时任务执行,同时定时时间为29.5秒。再看ClientLongPolling的run()方法
@Overridepublic void run() { asyncTimeoutFuture = scheduler.schedule(new Runnable() { public void run() { try { getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis()); /** * 删除订阅关系 */ allSubs.remove(ClientLongPolling.this); if (isFixedPolling()) { LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()), "polling", clientMd5Map.size(), probeRequestSize); ListchangedGroups = MD5Util.compareMd5( (HttpServletRequest)asyncContext.getRequest(), (HttpServletResponse)asyncContext.getResponse(), clientMd5Map); if (changedGroups.size() > 0) { sendResponse(changedGroups); } else { sendResponse(null); } } else { LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()), "polling", clientMd5Map.size(), probeRequestSize); sendResponse(null); } } catch (Throwable t) { LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause()); } } }, timeoutTime, TimeUnit.MILLISECONDS); allSubs.add(this);}
创建一个定时任务,同时把自己加入到allSubs中,当时间到了后,具体执行内容,把自己从allSubs移除,取消订阅关系,把变化的数据写回到reponse中
/** * 长轮询订阅关系 */final QueueallSubs;
但是如果服务器的配置更新后,客户端配置是如何做到实时更新配置的。查到服务器修改配置接口/v1/cs/configs,追踪代码发现有EventDispatcher.fireEvent
persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
看 EventDispatcher的fireEvent
/** * fire event, notify listeners. */static public void fireEvent(Event event) { if (null == event) { throw new IllegalArgumentException(); } for (AbstractEventListener listener : getEntry(event.getClass()).listeners) { try { listener.onEvent(event); } catch (Exception e) { log.error(e.toString(), e); } }}
它触发了AbstractEventListener的onEvent方法,那再找哪些类具体实现了AbstractEventListener的方法那?
发现了熟悉的身影LongPollingService,发现它的onEvent方法。AsyncNotifyService(用于节点之间的信息通知)
@Overridepublic void onEvent(Event event) { if (isFixedPolling()) { // ignore } else { if (event instanceof LocalDataChangeEvent) { LocalDataChangeEvent evt = (LocalDataChangeEvent)event; scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps)); } }}
再看DataChangeTask线程
线程中具体方法,遍历allSubs中的值,匹配到后,移除订阅关系,然后返回reopense,返回时首先取消等待定时执行的请求,防止写完返回值后,定时请求的再写返回值时报错。
void sendResponse(ListchangedGroups) { /** * 取消超时任务 */ if (null != asyncTimeoutFuture) { asyncTimeoutFuture.cancel(false); } generateResponse(changedGroups);}
总结:nacos的config客户端采用长轮训的http请求服务器是否有配置变化。如果配置服务器变化了,则从订阅队列中取出订阅者进行实时返回的方式,来实现远程配置。
转载地址:http://sgadi.baihongyu.com/