程序员L札记

V1

2022/03/14阅读:64主题:橙心

Spring Cloud Nacos 服务注册与发现源码分析(二)

spring boot版本2.3.7.RELEASE spring cloud版本Hoxton.SR9 spring cloud alibaba版本2.2.2.RELEASE


前言

在上一篇中,介绍了spring cloud与nacos客户端整合的相关代码,并借助nacos底层的能力,实现了服务的注册。今天继续分析余下内容:

  1. 服务续约
  2. 服务发现

源码分析

服务续约

在上篇跟踪服务注册相关源码的时候,看到服务注册时,某个服务的实例是区分临时实例还是永久实例。默认情况下注册的实例是临时实例,可以通过配置spring.cloud.nacos.discovery.ephemeral=false,设置成永久节点。nacos官方不推荐同一个服务下既存在临时又存在永久实例。如果是临时实例,那么就开启一个定时任务,定时的向nacos服务端发送心跳,保持服务实例的存活,下面来看看源码。

NacosNamingService

@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    //如果是临时实例
    if (instance.isEphemeral()) {
     //构造需要发送给服务端的数据(ip、端口、集群名、服务名、权重等)
        BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
        //添加心跳任务
        beatReactor.addBeatInfo(groupedServiceName, beatInfo);
    }
    serverProxy.registerService(groupedServiceName, groupName, instance);
}

BeatReactor

public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {
    BeatInfo beatInfo = new BeatInfo();
    beatInfo.setServiceName(groupedServiceName);
    beatInfo.setIp(instance.getIp());
    beatInfo.setPort(instance.getPort());
    beatInfo.setCluster(instance.getClusterName());
    beatInfo.setWeight(instance.getWeight());
    beatInfo.setMetadata(instance.getMetadata());
    beatInfo.setScheduled(false);
    beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
    return beatInfo;
}
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
     NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
     //以服务名#ip#port作为key
     String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
     BeatInfo existBeat = null;
     //fix #1733
     //如果存在缓存信息,先移除
     if ((existBeat = dom2Beat.remove(key)) != null) {
      //设置stopped属性为true,这个属性被volatile关键字修饰
         existBeat.setStopped(true);
     }
     //存入缓存ConcurrentHashMap
     dom2Beat.put(key, beatInfo);
     //执行定时任务,默认5秒后执行一次
     executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
     MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
 }
class BeatTask implements Runnable {
        
    BeatInfo beatInfo;
    
    public BeatTask(BeatInfo beatInfo) {
        this.beatInfo = beatInfo;
    }
    
    @Override
    public void run() {
        if (beatInfo.isStopped()) {
            return;
        }
        //获取下一次心跳的时间
        long nextTime = beatInfo.getPeriod();
        try {
         //发送心跳请求,请求地址/nacos/v1/ns/instance/beat httpMethod put
         //默认第一次发送的是非轻量级的请求,会额外发送beatinfo全量数据
            JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
            //获取服务端响应参数:心跳间隔时间
            long interval = result.get("clientBeatInterval").asLong();
            boolean lightBeatEnabled = false;
            //如果服务端返回lightBeatEnabled参数
            if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
             //重置lightBeatEnabled局部变量
                lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
            }
            //重置类成员变量
            BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
            //如果服务端返回的心跳间隔时间大于0
            if (interval > 0) {
             //重置下一次心跳时间
                nextTime = interval;
            }
            int code = NamingResponseCode.OK;
            if (result.has(CommonParams.CODE)) {
                code = result.get(CommonParams.CODE).asInt();
            }
            //如果code等于20404,重新构造服务实例信息,发起服务注册流程(补偿机制)
            if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
                Instance instance = new Instance();
                instance.setPort(beatInfo.getPort());
                instance.setIp(beatInfo.getIp());
                instance.setWeight(beatInfo.getWeight());
                instance.setMetadata(beatInfo.getMetadata());
                instance.setClusterName(beatInfo.getCluster());
                instance.setServiceName(beatInfo.getServiceName());
                instance.setInstanceId(instance.getInstanceId());
                instance.setEphemeral(true);
                try {
                    serverProxy.registerService(beatInfo.getServiceName(),
                            NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
                } catch (Exception ignore) {
                }
            }
        } catch (NacosException ex) {
            NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
                    JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
            
        }
        //启动下一次定时任务
        executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
    }
}

完整的服务心跳续约见代码注释。通过跟踪源码,可以看到,在心跳续约中,会对服务注册进行补偿。并且服务端会动态调节客户端发起心跳续约的间隔周期。

服务发现

服务发现是通过ribbon实现的,还记得在《Spring Cloud Netflix Ribbon源码解析(四)》中,在分析到RoundRobinRule.choose方法时,内部会分别获取两个列表,分别是可用服务列表和所有服务列表,通过查找源码,最终发现,在没有使用任何服务注册中心的时候,默认是从配置文件中获取的服务列表,即通过ConfigurationBasedServerList类获取。忘记的同学请阅读这篇文章。最终都会调用DynamicServerListLoadBalancerupdateListOfServers方法:

DynamicServerListLoadBalancer

@VisibleForTesting
public void updateListOfServers() {
    List<T> servers = new ArrayList<T>();
    if (serverListImpl != null) {
     //通过ServerList接口的具体实现来获取服务列表
        servers = serverListImpl.getUpdatedListOfServers();
        LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                getIdentifier(), servers);

        if (filter != null) {
            servers = filter.getFilteredListOfServers(servers);
            LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);
        }
    }
    updateAllServerList(servers);
}

那么今天看看在有服务注册中心的情况下,是怎么从nacos中获取服务列表的。 com.netflix.loadbalancer.ServerList接口是netflix提供的抽象,NacosServerList实现了这个接口,并通过NacosRibbonClientConfiguration配置类实例化并注册到spring容器中:

NacosRibbonClientConfiguration

@Configuration(proxyBeanMethods = false)
@ConditionalOnRibbonNacos
public class NacosRibbonClientConfiguration {

 @Autowired
 private PropertiesFactory propertiesFactory;

 @Bean
 @ConditionalOnMissingBean
 public ServerList<?> ribbonServerList(IClientConfig config,
   NacosDiscoveryProperties nacosDiscoveryProperties,
   NacosServiceManager nacosServiceManager) {
  if (this.propertiesFactory.isSet(ServerList.classconfig.getClientName())) {
   ServerList serverList = this.propertiesFactory.get(ServerList.classconfig,
     config.getClientName())
;
   return serverList;
  }
  NacosServerList serverList = new NacosServerList(nacosDiscoveryProperties,
    nacosServiceManager);
  serverList.initWithNiwsConfig(config);
  return serverList;
 }

 @Bean
 @ConditionalOnMissingBean
 public NacosServerIntrospector nacosServerIntrospector() {
  return new NacosServerIntrospector();
 }

}

NacosServerList

@Override
public List<NacosServer> getUpdatedListOfServers() {
 return getServers();
}

private List<NacosServer> getServers() {
 try {
  String group = discoveryProperties.getGroup();
  //通过NamingService获取实例列表
  List<Instance> instances = nacosServiceManager
    .getNamingService(discoveryProperties.getNacosProperties())
    .selectInstances(serviceId, group, true);
  //将List<Instance>转换成List<NacosServer>
  return instancesToServerList(instances);
 }
 catch (Exception e) {
  throw new IllegalStateException(
    "Can not get service instances from nacos, serviceId=" + serviceId,
    e);
 }
}

private List<NacosServer> instancesToServerList(List<Instance> instances) {
 List<NacosServer> result = new ArrayList<>();
 if (CollectionUtils.isEmpty(instances)) {
  return result;
 }
 for (Instance instance : instances) {
  result.add(new NacosServer(instance));
 }

 return result;
}

进入NacosNamingServiceselectInstances方法,通过调用内部的多个重载方法,最终调用到:

NacosNamingService

@Override
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,
        boolean subscribe)
 throws NacosException 
{
    
    ServiceInfo serviceInfo;
    //默认是true
    if (subscribe) {
     //得到ServiceInfo对象
        serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
                StringUtils.join(clusters, ","));
    } else {
        serviceInfo = hostReactor
                .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
                        StringUtils.join(clusters, ","));
    }
    //返回实例列表
    return selectInstances(serviceInfo, healthy);
}

private List<Instance> selectInstances(ServiceInfo serviceInfo, boolean healthy) {
    List<Instance> list;
    if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
        return new ArrayList<Instance>();
    }
    
    Iterator<Instance> iterator = list.iterator();
    //迭代服务实例列表,排除不健康的实例或者权重小于等于0的实例或者已经停止对外提供服务的实例
    while (iterator.hasNext()) {
        Instance instance = iterator.next();
        if (healthy != instance.isHealthy() || !instance.isEnabled() || instance.getWeight() <= 0) {
            iterator.remove();
        }
    }
    
    return list;
}

接着看一下HostReactorgetServiceInfo方法:

HostReactor

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
        
    NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
    //组装key 服务名@@集群名
    String key = ServiceInfo.getKey(serviceName, clusters);
    //如果为true
    if (failoverReactor.isFailoverSwitch()) {
     //根据key从内部的serviceMap缓存中获取ServiceInfo
        return failoverReactor.getService(key);
    }
    
    //根据key从内部的serviceInfoMap中获取缓存的ServiceInfo
    ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
    
    if (null == serviceObj) {
     //如果缓存中没有,则创建一个并放入缓存中
        serviceObj = new ServiceInfo(serviceName, clusters);
        
        serviceInfoMap.put(serviceObj.getKey(), serviceObj);
        
        //同时存入updatingMap中
        updatingMap.put(serviceName, new Object());
        //更新服务
        updateServiceNow(serviceName, clusters);
        //从updatingMap中移除
        updatingMap.remove(serviceName);
        
    } else if (updatingMap.containsKey(serviceName)) {
        //如果缓存中存在,再判断一下updatingMap中是否存在这个key,等待5秒,等待更新完成
        if (UPDATE_HOLD_INTERVAL > 0) {
            // hold a moment waiting for update finish
            synchronized (serviceObj) {
                try {
                    serviceObj.wait(UPDATE_HOLD_INTERVAL);
                } catch (InterruptedException e) {
                    NAMING_LOGGER
                            .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                }
            }
        }
    }
    
    //如果缺席,则定时更新
    scheduleUpdateIfAbsent(serviceName, clusters);
    
    //从缓存中获取并返回serviceInfo
    return serviceInfoMap.get(serviceObj.getKey());
}

先进入updateServiceNow方法中:

public void updateServiceNow(String serviceName, String clusters) {
 //再次从serviceInfoMap中获取
    ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
    try {
        
        //向nacos server的/nacos/v1/ns/instance/list路径发送HTTP GET请求
        String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
        
        //如果查询结果不等于空
        if (StringUtils.isNotEmpty(result)) {
         //将json字符串解析成serviceInfo,并存入serviceInfoMap中,此处自行查阅
            processServiceJson(result);
        }
    } catch (Exception e) {
        NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
    } finally {
     //最后,如果缓存中存在ServiceInfo,则唤醒这个对象上的等待
        if (oldService != null) {
            synchronized (oldService) {
                oldService.notifyAll();
            }
        }
    }
}

再进入到scheduleUpdateIfAbsent方法中:

public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
 //根据key从futureMap中获取ScheduledFuture对象
    if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
        return;
    }
    
    synchronized (futureMap) {
        if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
            return;
        }
        
        //创建UpdateTask任务并添加到ScheduledExecutorService中,默认延迟1秒执行
        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
        //将返回值存入futureMap中
        futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
    }
}

看一下UpdateTask类的run方法:

public class UpdateTask implements Runnable {
        
    long lastRefTime = Long.MAX_VALUE;
    
    private final String clusters;
    
    private final String serviceName;
    
    public UpdateTask(String serviceName, String clusters) {
        this.serviceName = serviceName;
        this.clusters = clusters;
    }
    
    @Override
    public void run() {
        long delayTime = -1;
        
        try {
         //从serviceInfoMap中获取ServiceInfo对象
            ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
            
            //缓存中不存在
            if (serviceObj == null) {
             //执行这个方法,从服务端获取,在上一步已经分析过
                updateServiceNow(serviceName, clusters);
                //下次延迟执行时间
                delayTime = DEFAULT_DELAY;
                return;
            }
            
            //如果存在缓存信息,比较lastRefTime,如果缓存中的lastRefTime小于等于当前任务中的lastRefTime,从服务端获取服务
            if (serviceObj.getLastRefTime() <= lastRefTime) {
                updateServiceNow(serviceName, clusters);
                //再从缓存中获取一次,即更新后的,因为updateServiceNow方法中已经更新了缓存
                serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
            } else {
             //只触发一次远程查询服务,至于出于什么目的,暂时不清楚
                // if serviceName already updated by push, we should not override it
                // since the push data may be different from pull through force push
                refreshOnly(serviceName, clusters);
            }
            
            //更新lastRefTime
            lastRefTime = serviceObj.getLastRefTime();
            
            //如果eventDispatcher不包含这个key并且futureMap中也不包含这个key,则中断更新任务
            if (!eventDispatcher.isSubscribed(serviceName, clusters) && !futureMap
                    .containsKey(ServiceInfo.getKey(serviceName, clusters))) {
                // abort the update task
                NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
                return;
            }
            
            //更新下次定时任务延时时间
            delayTime = serviceObj.getCacheMillis();
            
        } catch (Throwable e) {
            NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
        } finally {
            if (delayTime > 0) {
             //最后再次将当前任务放入调度器中,等待下次执行
                executor.schedule(this, delayTime, TimeUnit.MILLISECONDS);
            }
        }
        
    }
}

所以,serviceInfoMap并不是只在调用服务时才去获取更新,而是通过定时任务,通过心跳的方式,不停的异步更新,初始1秒更新一次。通过以上源码分析,至此已经清楚了整合了nacos注册中心后,是怎么实现服务发现的。

总结

今天我们将nacos客户端服务续约以及服务发现相关源码进行了分析,如有不正之处,还请指正,谢谢!好了,今天就到这里了,下次再见。

欢迎关注我的公众号:程序员L札记

分类:

后端

标签:

Java

作者介绍

程序员L札记
V1