程序员L札记
V1
2022/04/28阅读:11主题:橙心
最新版Nacos 2.X服务端源码分析之服务续约(二)
回顾
在《Spring Cloud Nacos 服务注册与发现源码分析(二)》篇的服务续约章节中,介绍了spring cloud 向nacos服务端注册的服务实例是一个临时节点,并且在服务注册阶段默认发送的是非轻量级心跳,请求的url为/nacos/v1/ns/instance/beat。那么今天分析下nacos服务是如何接收并处理请求的。
源码分析
服务续约
InstanceController
@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {
ObjectNode result = JacksonUtils.createEmptyJsonNode();
//设置客户端下一次的心跳间隔时间,默认5s
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
//从请求中获取beat数据
String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
RsInfo clientBeat = null;
//如果beat数据不等于空,判断是否是注册心跳
if (StringUtils.isNotBlank(beat)) {
//字符串转对象
clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
}
//从请求中获取clusterName
String clusterName = WebUtils
.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
//从请求中获取ip
String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
////从请求中获取port
int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
//如果是注册心跳,即首次心跳
if (clientBeat != null) {
if (StringUtils.isNotBlank(clientBeat.getCluster())) {
clusterName = clientBeat.getCluster();
} else {
// fix #2533
clientBeat.setCluster(clusterName);
}
ip = clientBeat.getIp();
port = clientBeat.getPort();
}
//从请求中获取namespaceId
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
//从请求重获取serviceName
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
//校验服务名是否合法
NamingUtils.checkServiceNameFormat(serviceName);
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}, namespaceId: {}", clientBeat,
serviceName, namespaceId);
//构造BeatInfoInstanceBuilder实例
BeatInfoInstanceBuilder builder = BeatInfoInstanceBuilder.newBuilder();
builder.setRequest(request);
//处理心跳------重点
int resultCode = getInstanceOperator()
.handleBeat(namespaceId, serviceName, ip, port, clusterName, clientBeat, builder);
//设置心跳处理结果code
result.put(CommonParams.CODE, resultCode);
//设置客户端下一次心跳间隔周期
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL,
getInstanceOperator().getHeartBeatInterval(namespaceId, serviceName, ip, port, clusterName));
//设置是否轻量级心跳的为true,那么下一次心跳请求的时候,客户端,就不会携带beat数据
result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
return result;
}
进入getInstanceOperator().handleBeat(namespaceId, serviceName, ip, port, clusterName, clientBeat, builder)
方法:
由于服务端端源码是2.x版本,此处通过
getInstanceOperator()
方法获取实例操作类,默认为1.X版本的实现,所以此处返回的具体实例为InstanceOperatorServiceImpl
InstanceOperatorServiceImpl
@Override
public int handleBeat(String namespaceId, String serviceName, String ip, int port, String cluster,
RsInfo clientBeat, BeatInfoInstanceBuilder builder) throws NacosException {
//从缓存中获取实例数据
com.alibaba.nacos.naming.core.Instance instance = serviceManager
.getInstance(namespaceId, serviceName, cluster, ip, port);
if (instance == null) {
if (clientBeat == null) {
//如果实例数据等于null并且不是注册时发送的心跳,说明服务已经下线了,则向客户端返回资源未找到错误码
return NamingResponseCode.RESOURCE_NOT_FOUND;
}
Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
+ "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
//如果实例数据等于null并且是注册时发送的心跳,则走注册服务实例的流程,此处不再阐述
instance = parseInstance(builder.setBeatInfo(clientBeat).setServiceName(serviceName).build());
serviceManager.registerInstance(namespaceId, serviceName, instance);
}
//从缓存中获取服务数据,经过上一步,此时缓存中正常情况下一定会存在服务数据
Service service = serviceManager.getService(namespaceId, serviceName);
//校验服务是否为null,为null抛异常
serviceManager.checkServiceIsNull(service, namespaceId, serviceName);
//如果不是注册时心跳,创建一个clientBeat信息
if (clientBeat == null) {
clientBeat = new RsInfo();
clientBeat.setIp(ip);
clientBeat.setPort(port);
clientBeat.setCluster(cluster);
}
//处理客户端心跳------重点
service.processClientBeat(clientBeat);
return NamingResponseCode.OK;
}
进入service.processClientBeat(clientBeat)
方法:
Service
public void processClientBeat(final RsInfo rsInfo) {
//创建一个客户端心跳处理器实例,ClientBeatProcessor实现BeatProcessor接口,BeatProcessor接口实现Runnable接口
ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
//将当前服务添加到clientBeatProcessor实例service属性中
clientBeatProcessor.setService(this);
clientBeatProcessor.setRsInfo(rsInfo);
//立即开启调度任务
HealthCheckReactor.scheduleNow(clientBeatProcessor);
}
进入ClientBeatProcessor
的run
方法:
ClientBeatProcessor
@Override
public void run() {
//上一步设置的service
Service service = this.service;
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
}
String ip = rsInfo.getIp();
String clusterName = rsInfo.getCluster();
int port = rsInfo.getPort();
Cluster cluster = service.getClusterMap().get(clusterName);
//获取所有的临时实例
List<Instance> instances = cluster.allIPs(true);
//遍历实例列表
for (Instance instance : instances) {
if (instance.getIp().equals(ip) && instance.getPort() == port) {
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
}
//更新实例的最后一次心跳时间
instance.setLastBeat(System.currentTimeMillis());
//如果实例的Marke属性是false(默认)并且不健康
if (!instance.isMarked() && !instance.isHealthy()) {
//更新当前实例为健康实例
instance.setHealthy(true);
Loggers.EVT_LOG
.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
cluster.getService().getName(), ip, port, cluster.getName(),
UtilsAndCommons.LOCALHOST_SITE);
//通过udp协议,通知所有订阅该服务的应用,重新拉取服务列表
getPushService().serviceChanged(service);
}
}
}
}
总结
nacos服务端处理客户端心跳请求的逻辑相对比较简单,在最后通过udp协议通知所有订阅该服务的应用,我们在服务实例注册的时候也见到过,这个以后我们单独写一篇来介绍。好了,今天就到这里了,感谢大家的阅读,如有错误之处,还请指正!
欢迎关注我的公众号:程序员L札记
作者介绍
程序员L札记
V1