程序员L札记

V1

2022/04/28阅读:11主题:橙心

最新版Nacos 2.X服务端源码分析之服务续约(二)

回顾

在《Spring Cloud Nacos 服务注册与发现源码分析(二)》篇的服务续约章节中,介绍了spring cloud 向nacos服务端注册的服务实例是一个临时节点,并且在服务注册阶段默认发送的是非轻量级心跳,请求的url为/nacos/v1/ns/instance/beat。那么今天分析下nacos服务是如何接收并处理请求的。

源码分析

服务续约

InstanceController

@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.classaction = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {
    
    ObjectNode result = JacksonUtils.createEmptyJsonNode();
    //设置客户端下一次的心跳间隔时间,默认5s
    result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
    
    //从请求中获取beat数据
    String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
    RsInfo clientBeat = null;
    //如果beat数据不等于空,判断是否是注册心跳
    if (StringUtils.isNotBlank(beat)) {
     //字符串转对象
        clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
    }
    //从请求中获取clusterName
    String clusterName = WebUtils
            .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
    //从请求中获取ip
    String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
    ////从请求中获取port
    int port = Integer.parseInt(WebUtils.optional(request, "port""0"));
    //如果是注册心跳,即首次心跳
    if (clientBeat != null) {
        if (StringUtils.isNotBlank(clientBeat.getCluster())) {
            clusterName = clientBeat.getCluster();
        } else {
            // fix #2533
            clientBeat.setCluster(clusterName);
        }
        ip = clientBeat.getIp();
        port = clientBeat.getPort();
    }
    //从请求中获取namespaceId
    String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    //从请求重获取serviceName
    String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    //校验服务名是否合法
    NamingUtils.checkServiceNameFormat(serviceName);
    Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}, namespaceId: {}", clientBeat,
            serviceName, namespaceId);
    //构造BeatInfoInstanceBuilder实例
    BeatInfoInstanceBuilder builder = BeatInfoInstanceBuilder.newBuilder();
    builder.setRequest(request);
    //处理心跳------重点
    int resultCode = getInstanceOperator()
            .handleBeat(namespaceId, serviceName, ip, port, clusterName, clientBeat, builder);
    //设置心跳处理结果code
    result.put(CommonParams.CODE, resultCode);
    //设置客户端下一次心跳间隔周期
    result.put(SwitchEntry.CLIENT_BEAT_INTERVAL,
            getInstanceOperator().getHeartBeatInterval(namespaceId, serviceName, ip, port, clusterName));
    //设置是否轻量级心跳的为true,那么下一次心跳请求的时候,客户端,就不会携带beat数据
    result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
    return result;
}

进入getInstanceOperator().handleBeat(namespaceId, serviceName, ip, port, clusterName, clientBeat, builder)方法:

由于服务端端源码是2.x版本,此处通过getInstanceOperator()方法获取实例操作类,默认为1.X版本的实现,所以此处返回的具体实例为InstanceOperatorServiceImpl

InstanceOperatorServiceImpl

@Override
public int handleBeat(String namespaceId, String serviceName, String ip, int port, String cluster,
        RsInfo clientBeat, BeatInfoInstanceBuilder builder)
 throws NacosException 
{
    //从缓存中获取实例数据
    com.alibaba.nacos.naming.core.Instance instance = serviceManager
            .getInstance(namespaceId, serviceName, cluster, ip, port);
    
    if (instance == null) {
        if (clientBeat == null) {
         //如果实例数据等于null并且不是注册时发送的心跳,说明服务已经下线了,则向客户端返回资源未找到错误码
            return NamingResponseCode.RESOURCE_NOT_FOUND;
        }
        
        Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
                + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
        //如果实例数据等于null并且是注册时发送的心跳,则走注册服务实例的流程,此处不再阐述
        instance = parseInstance(builder.setBeatInfo(clientBeat).setServiceName(serviceName).build());
        serviceManager.registerInstance(namespaceId, serviceName, instance);
    }
    
    //从缓存中获取服务数据,经过上一步,此时缓存中正常情况下一定会存在服务数据
    Service service = serviceManager.getService(namespaceId, serviceName);
    
    //校验服务是否为null,为null抛异常
    serviceManager.checkServiceIsNull(service, namespaceId, serviceName);
    
    //如果不是注册时心跳,创建一个clientBeat信息
    if (clientBeat == null) {
        clientBeat = new RsInfo();
        clientBeat.setIp(ip);
        clientBeat.setPort(port);
        clientBeat.setCluster(cluster);
    }
    //处理客户端心跳------重点
    service.processClientBeat(clientBeat);
    return NamingResponseCode.OK;
}

进入service.processClientBeat(clientBeat)方法:

Service

public void processClientBeat(final RsInfo rsInfo) {
 //创建一个客户端心跳处理器实例,ClientBeatProcessor实现BeatProcessor接口,BeatProcessor接口实现Runnable接口
    ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
    //将当前服务添加到clientBeatProcessor实例service属性中
    clientBeatProcessor.setService(this);
    clientBeatProcessor.setRsInfo(rsInfo);
    //立即开启调度任务
    HealthCheckReactor.scheduleNow(clientBeatProcessor);
}

进入ClientBeatProcessorrun方法:

ClientBeatProcessor

@Override
public void run() {
 //上一步设置的service
    Service service = this.service;
    if (Loggers.EVT_LOG.isDebugEnabled()) {
        Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
    }
    
    String ip = rsInfo.getIp();
    String clusterName = rsInfo.getCluster();
    int port = rsInfo.getPort();
    Cluster cluster = service.getClusterMap().get(clusterName);
    //获取所有的临时实例
    List<Instance> instances = cluster.allIPs(true);
    
    //遍历实例列表
    for (Instance instance : instances) {
        if (instance.getIp().equals(ip) && instance.getPort() == port) {
            if (Loggers.EVT_LOG.isDebugEnabled()) {
                Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
            }
            //更新实例的最后一次心跳时间
            instance.setLastBeat(System.currentTimeMillis());
            //如果实例的Marke属性是false(默认)并且不健康
            if (!instance.isMarked() && !instance.isHealthy()) {
             //更新当前实例为健康实例
                instance.setHealthy(true);
                Loggers.EVT_LOG
                        .info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
                                cluster.getService().getName(), ip, port, cluster.getName(),
                                UtilsAndCommons.LOCALHOST_SITE);
                //通过udp协议,通知所有订阅该服务的应用,重新拉取服务列表
                getPushService().serviceChanged(service);
            }
        }
    }
}

总结

nacos服务端处理客户端心跳请求的逻辑相对比较简单,在最后通过udp协议通知所有订阅该服务的应用,我们在服务实例注册的时候也见到过,这个以后我们单独写一篇来介绍。好了,今天就到这里了,感谢大家的阅读,如有错误之处,还请指正!

欢迎关注我的公众号:程序员L札记

分类:

后端

标签:

Java

作者介绍

程序员L札记
V1