程序员L札记
2022/03/14阅读:64主题:橙心
Spring Cloud Nacos 服务注册与发现源码分析(二)
spring boot版本2.3.7.RELEASE spring cloud版本Hoxton.SR9 spring cloud alibaba版本2.2.2.RELEASE
前言
在上一篇中,介绍了spring cloud与nacos客户端整合的相关代码,并借助nacos底层的能力,实现了服务的注册。今天继续分析余下内容:
-
服务续约 -
服务发现
源码分析
服务续约
在上篇跟踪服务注册相关源码的时候,看到服务注册时,某个服务的实例是区分临时实例还是永久实例。默认情况下注册的实例是临时实例,可以通过配置spring.cloud.nacos.discovery.ephemeral=false
,设置成永久节点。nacos官方不推荐同一个服务下既存在临时又存在永久实例。如果是临时实例,那么就开启一个定时任务,定时的向nacos服务端发送心跳,保持服务实例的存活,下面来看看源码。
NacosNamingService
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
//如果是临时实例
if (instance.isEphemeral()) {
//构造需要发送给服务端的数据(ip、端口、集群名、服务名、权重等)
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
//添加心跳任务
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
serverProxy.registerService(groupedServiceName, groupName, instance);
}
BeatReactor
public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(groupedServiceName);
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
return beatInfo;
}
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
//以服务名#ip#port作为key
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
//fix #1733
//如果存在缓存信息,先移除
if ((existBeat = dom2Beat.remove(key)) != null) {
//设置stopped属性为true,这个属性被volatile关键字修饰
existBeat.setStopped(true);
}
//存入缓存ConcurrentHashMap
dom2Beat.put(key, beatInfo);
//执行定时任务,默认5秒后执行一次
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
@Override
public void run() {
if (beatInfo.isStopped()) {
return;
}
//获取下一次心跳的时间
long nextTime = beatInfo.getPeriod();
try {
//发送心跳请求,请求地址/nacos/v1/ns/instance/beat httpMethod put
//默认第一次发送的是非轻量级的请求,会额外发送beatinfo全量数据
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
//获取服务端响应参数:心跳间隔时间
long interval = result.get("clientBeatInterval").asLong();
boolean lightBeatEnabled = false;
//如果服务端返回lightBeatEnabled参数
if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
//重置lightBeatEnabled局部变量
lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
}
//重置类成员变量
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
//如果服务端返回的心跳间隔时间大于0
if (interval > 0) {
//重置下一次心跳时间
nextTime = interval;
}
int code = NamingResponseCode.OK;
if (result.has(CommonParams.CODE)) {
code = result.get(CommonParams.CODE).asInt();
}
//如果code等于20404,重新构造服务实例信息,发起服务注册流程(补偿机制)
if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
Instance instance = new Instance();
instance.setPort(beatInfo.getPort());
instance.setIp(beatInfo.getIp());
instance.setWeight(beatInfo.getWeight());
instance.setMetadata(beatInfo.getMetadata());
instance.setClusterName(beatInfo.getCluster());
instance.setServiceName(beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
serverProxy.registerService(beatInfo.getServiceName(),
NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
} catch (Exception ignore) {
}
}
} catch (NacosException ex) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
}
//启动下一次定时任务
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
完整的服务心跳续约见代码注释。通过跟踪源码,可以看到,在心跳续约中,会对服务注册进行补偿。并且服务端会动态调节客户端发起心跳续约的间隔周期。
服务发现
服务发现是通过ribbon实现的,还记得在《Spring Cloud Netflix Ribbon源码解析(四)》中,在分析到RoundRobinRule.choose
方法时,内部会分别获取两个列表,分别是可用服务列表和所有服务列表,通过查找源码,最终发现,在没有使用任何服务注册中心的时候,默认是从配置文件中获取的服务列表,即通过ConfigurationBasedServerList
类获取。忘记的同学请阅读这篇文章。最终都会调用DynamicServerListLoadBalancer
的updateListOfServers
方法:
DynamicServerListLoadBalancer
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
//通过ServerList接口的具体实现来获取服务列表
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
那么今天看看在有服务注册中心的情况下,是怎么从nacos中获取服务列表的。 com.netflix.loadbalancer.ServerList
接口是netflix提供的抽象,NacosServerList
实现了这个接口,并通过NacosRibbonClientConfiguration
配置类实例化并注册到spring容器中:
NacosRibbonClientConfiguration
@Configuration(proxyBeanMethods = false)
@ConditionalOnRibbonNacos
public class NacosRibbonClientConfiguration {
@Autowired
private PropertiesFactory propertiesFactory;
@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(IClientConfig config,
NacosDiscoveryProperties nacosDiscoveryProperties,
NacosServiceManager nacosServiceManager) {
if (this.propertiesFactory.isSet(ServerList.class, config.getClientName())) {
ServerList serverList = this.propertiesFactory.get(ServerList.class, config,
config.getClientName());
return serverList;
}
NacosServerList serverList = new NacosServerList(nacosDiscoveryProperties,
nacosServiceManager);
serverList.initWithNiwsConfig(config);
return serverList;
}
@Bean
@ConditionalOnMissingBean
public NacosServerIntrospector nacosServerIntrospector() {
return new NacosServerIntrospector();
}
}
NacosServerList
@Override
public List<NacosServer> getUpdatedListOfServers() {
return getServers();
}
private List<NacosServer> getServers() {
try {
String group = discoveryProperties.getGroup();
//通过NamingService获取实例列表
List<Instance> instances = nacosServiceManager
.getNamingService(discoveryProperties.getNacosProperties())
.selectInstances(serviceId, group, true);
//将List<Instance>转换成List<NacosServer>
return instancesToServerList(instances);
}
catch (Exception e) {
throw new IllegalStateException(
"Can not get service instances from nacos, serviceId=" + serviceId,
e);
}
}
private List<NacosServer> instancesToServerList(List<Instance> instances) {
List<NacosServer> result = new ArrayList<>();
if (CollectionUtils.isEmpty(instances)) {
return result;
}
for (Instance instance : instances) {
result.add(new NacosServer(instance));
}
return result;
}
进入NacosNamingService
的selectInstances
方法,通过调用内部的多个重载方法,最终调用到:
NacosNamingService
@Override
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
//默认是true
if (subscribe) {
//得到ServiceInfo对象
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
} else {
serviceInfo = hostReactor
.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
}
//返回实例列表
return selectInstances(serviceInfo, healthy);
}
private List<Instance> selectInstances(ServiceInfo serviceInfo, boolean healthy) {
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<Instance>();
}
Iterator<Instance> iterator = list.iterator();
//迭代服务实例列表,排除不健康的实例或者权重小于等于0的实例或者已经停止对外提供服务的实例
while (iterator.hasNext()) {
Instance instance = iterator.next();
if (healthy != instance.isHealthy() || !instance.isEnabled() || instance.getWeight() <= 0) {
iterator.remove();
}
}
return list;
}
接着看一下HostReactor
的getServiceInfo
方法:
HostReactor
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
//组装key 服务名@@集群名
String key = ServiceInfo.getKey(serviceName, clusters);
//如果为true
if (failoverReactor.isFailoverSwitch()) {
//根据key从内部的serviceMap缓存中获取ServiceInfo
return failoverReactor.getService(key);
}
//根据key从内部的serviceInfoMap中获取缓存的ServiceInfo
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
if (null == serviceObj) {
//如果缓存中没有,则创建一个并放入缓存中
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
//同时存入updatingMap中
updatingMap.put(serviceName, new Object());
//更新服务
updateServiceNow(serviceName, clusters);
//从updatingMap中移除
updatingMap.remove(serviceName);
} else if (updatingMap.containsKey(serviceName)) {
//如果缓存中存在,再判断一下updatingMap中是否存在这个key,等待5秒,等待更新完成
if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER
.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
//如果缺席,则定时更新
scheduleUpdateIfAbsent(serviceName, clusters);
//从缓存中获取并返回serviceInfo
return serviceInfoMap.get(serviceObj.getKey());
}
先进入updateServiceNow
方法中:
public void updateServiceNow(String serviceName, String clusters) {
//再次从serviceInfoMap中获取
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
//向nacos server的/nacos/v1/ns/instance/list路径发送HTTP GET请求
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
//如果查询结果不等于空
if (StringUtils.isNotEmpty(result)) {
//将json字符串解析成serviceInfo,并存入serviceInfoMap中,此处自行查阅
processServiceJson(result);
}
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
} finally {
//最后,如果缓存中存在ServiceInfo,则唤醒这个对象上的等待
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
}
}
再进入到scheduleUpdateIfAbsent
方法中:
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
//根据key从futureMap中获取ScheduledFuture对象
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
synchronized (futureMap) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
//创建UpdateTask任务并添加到ScheduledExecutorService中,默认延迟1秒执行
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
//将返回值存入futureMap中
futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
}
}
看一下UpdateTask
类的run
方法:
public class UpdateTask implements Runnable {
long lastRefTime = Long.MAX_VALUE;
private final String clusters;
private final String serviceName;
public UpdateTask(String serviceName, String clusters) {
this.serviceName = serviceName;
this.clusters = clusters;
}
@Override
public void run() {
long delayTime = -1;
try {
//从serviceInfoMap中获取ServiceInfo对象
ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
//缓存中不存在
if (serviceObj == null) {
//执行这个方法,从服务端获取,在上一步已经分析过
updateServiceNow(serviceName, clusters);
//下次延迟执行时间
delayTime = DEFAULT_DELAY;
return;
}
//如果存在缓存信息,比较lastRefTime,如果缓存中的lastRefTime小于等于当前任务中的lastRefTime,从服务端获取服务
if (serviceObj.getLastRefTime() <= lastRefTime) {
updateServiceNow(serviceName, clusters);
//再从缓存中获取一次,即更新后的,因为updateServiceNow方法中已经更新了缓存
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
} else {
//只触发一次远程查询服务,至于出于什么目的,暂时不清楚
// if serviceName already updated by push, we should not override it
// since the push data may be different from pull through force push
refreshOnly(serviceName, clusters);
}
//更新lastRefTime
lastRefTime = serviceObj.getLastRefTime();
//如果eventDispatcher不包含这个key并且futureMap中也不包含这个key,则中断更新任务
if (!eventDispatcher.isSubscribed(serviceName, clusters) && !futureMap
.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
// abort the update task
NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
return;
}
//更新下次定时任务延时时间
delayTime = serviceObj.getCacheMillis();
} catch (Throwable e) {
NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
} finally {
if (delayTime > 0) {
//最后再次将当前任务放入调度器中,等待下次执行
executor.schedule(this, delayTime, TimeUnit.MILLISECONDS);
}
}
}
}
所以,serviceInfoMap并不是只在调用服务时才去获取更新,而是通过定时任务,通过心跳的方式,不停的异步更新,初始1秒更新一次。通过以上源码分析,至此已经清楚了整合了nacos注册中心后,是怎么实现服务发现的。
总结
今天我们将nacos客户端服务续约以及服务发现相关源码进行了分析,如有不正之处,还请指正,谢谢!好了,今天就到这里了,下次再见。
欢迎关注我的公众号:程序员L札记
作者介绍