程序地带

[从源码学设计]蚂蚁金服SOFARegistry之续约和驱逐


[从源码学设计]蚂蚁金服SOFARegistry之续约和驱逐

目录[从源码学设计]蚂蚁金服SOFARegistry之续约和驱逐0x00 摘要0x01 业务范畴1.1 失效剔除1.2 服务续约0x02 DatumLeaseManager2.1 定义2.2 续约2.2.1 数据结构2.2.2 调用2.2.3 续约2.2.4 图示2.3 驱逐2.3.1 数据结构2.3.2 显式调用2.3.3 启动调用2.3.4 驱逐2.3.5 驱逐处理业务2.3.5.1 转发驱逐消息2.3.5.1 DataChangeEventCenter 转发2.3.5.2 DataChangeEventQueue 处理0xFF 参考


0x00 摘要

SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。


本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。


本文为第十五篇,介绍续约和剔除。


0x01 业务范畴

续约和剔除是服务注册与发现的重要功能,比如:


1.1 失效剔除

有些时候,我们的服务实例并不一定会正常下线,可能由于内存溢出,网络故障等原因使服务不能正常工作,而服务注册中心未收到”服务下线“的请求。


为了从服务列表中将这些无法提供服务的实例剔除。Server在启动的时候会创建一个定时任务,默认每隔一段时间(默认60s)将当前清单中,超时(默认为90s)没有续约的服务剔除出去。


1.2 服务续约

在注册完服务之后,服务提供者会维护一个心跳用来持续告诉 Server: "我还活着"。以防止 Server 的”剔除任务“将该服务实例从服务列表中排除出去。我们称该操作为服务续约(Renew)。


0x02 DatumLeaseManager

在 Data Server 端,DatumLeaseManager 实现了 “失效剔除” 和 “服务续约 “功能。


2.1 定义

DatumLeaseManager 的主要变量如下:


connectIdRenewTimestampMap 里面会维护每个服务最近一次发送心跳的时间,Eureka 里面也有类似的数据结构;


locksForConnectId :为了每次只有一个线程操作;lock for connectId: every connectId allows only one task to be created;


具体定义如下:


public class DatumLeaseManager implements AfterWorkingProcess {
/** record the latest heartbeat time for each connectId, format: connectId -> lastRenewTimestamp */
private final Map<String, Long> connectIdRenewTimestampMap = new ConcurrentHashMap<>();
/** lock for connectId , format: connectId -> true */
private ConcurrentHashMap<String, Boolean> locksForConnectId = new ConcurrentHashMap();
private volatile boolean serverWorking = false;
private volatile boolean renewEnable = true;
private AsyncHashedWheelTimer datumAsyncHashedWheelTimer;
@Autowired
private DataServerConfig dataServerConfig;
@Autowired
private DisconnectEventHandler disconnectEventHandler;
@Autowired
private DatumCache datumCache;
@Autowired
private DataNodeStatus dataNodeStatus;
private ScheduledThreadPoolExecutor executorForHeartbeatLess;
private ScheduledFuture<?> futureForHeartbeatLess;
}
2.2 续约
2.2.1 数据结构

在DatumLeaseManager之中,主要是有如下数据结构对续约起作用。


private ConcurrentHashMap<String, Boolean> locksForConnectId = new ConcurrentHashMap();
private AsyncHashedWheelTimer datumAsyncHashedWheelTimer;
2.2.2 调用

在如下模块会调用到 review,这些都是 AbstractServerHandler。


public class PublishDataHandler extends AbstractServerHandler<PublishDataRequest>
public class DatumSnapshotHandler extends AbstractServerHandler<DatumSnapshotRequest>
public class RenewDatumHandler extends AbstractServerHandler<RenewDatumRequest> implements AfterWorkingProcess
public class UnPublishDataHandler extends AbstractServerHandler<UnPublishDataRequest>
2.2.3 续约

DatumLeaseManager 这里会记录最新的时间戳,然后启动scheduleEvictTask。


public void renew(String connectId) {
// record the renew timestamp
connectIdRenewTimestampMap.put(connectId, System.currentTimeMillis());
// try to trigger evict task
scheduleEvictTask(connectId, 0);
}

具体如下:


如果当前ConnectionId已经被锁定,则返回;
启动时间轮,加入一个定时操作,如果时间到,则:
释放当前ConnectionId对应的lock;
获取当前ConnectionId对应的上次续约时间,如果不存在,说明当前ConnectionId已经被移除,则返回;
如果当前状态是不可续约状态,则设置下次定时操作时间,因为If in a non-working state, cannot clean up because the renew request cannot be received at this time;
如果上次续约时间已经到期,则使用evict进行驱逐
如果没到期,则会调用 scheduleEvictTask(connectId, nextDelaySec); 设置下次操作

具体代码如下:


/**
* trigger evict task: if connectId expired, create ClientDisconnectEvent to cleanup datums bind to the connectId
* PS: every connectId allows only one task to be created
*/
private void scheduleEvictTask(String connectId, long delaySec) {
delaySec = (delaySec <= 0) ? dataServerConfig.getDatumTimeToLiveSec() : delaySec;
// lock for connectId: every connectId allows only one task to be created
Boolean ifAbsent = locksForConnectId.putIfAbsent(connectId, true);
if (ifAbsent != null) {
return;
}
datumAsyncHashedWheelTimer.newTimeout(_timeout -> {
boolean continued = true;
long nextDelaySec = 0;
try {
// release lock
locksForConnectId.remove(connectId);
// get lastRenewTime of this connectId
Long lastRenewTime = connectIdRenewTimestampMap.get(connectId);
if (lastRenewTime == null) {
// connectId is already clientOff
return;
}
/*
* 1. lastRenewTime expires, then:
* - build ClientOffEvent and hand it to DataChangeEventCenter.
* - It will not be scheduled next time, so terminated.
* 2. lastRenewTime not expires, then:
* - trigger the next schedule
*/
boolean isExpired =
System.currentTimeMillis() - lastRenewTime > dataServerConfig.getDatumTimeToLiveSec() * 1000L;
if (!isRenewEnable()) {
nextDelaySec = dataServerConfig.getDatumTimeToLiveSec();
} else if (isExpired) {
int ownPubSize = getOwnPubSize(connectId);
if (ownPubSize > 0) {
evict(connectId);
}
connectIdRenewTimestampMap.remove(connectId, lastRenewTime);
continued = false;
} else {
nextDelaySec = dataServerConfig.getDatumTimeToLiveSec()
- (System.currentTimeMillis() - lastRenewTime) / 1000L;
nextDelaySec = nextDelaySec <= 0 ? 1 : nextDelaySec;
}
}
if (continued) {
scheduleEvictTask(connectId, nextDelaySec);
}
}, delaySec, TimeUnit.SECONDS);
}
2.2.4 图示

具体如下图所示


+------------------+ +-------------------------------------------+
|PublishDataHandler| | DatumLeaseManager |
+--------+---------+ | |
| | newTimeout |
| | +----------------------> |
doHandle | ^ + |
| | | | |
| renew | +-----------+--------------+ | |
| +--------------> | | AsyncHashedWheelTimer | | |
| | +-----+-----+--------------+ | |
| | | ^ | |
| | | | scheduleEvictTask | |
| | evict | + v |
| | | <----------------------+ |
| +-------------------------------------------+
| |
| |
| |
| |
v v

或者如下图所示:


+------------------+ +-------------------+ +------------------------+
|PublishDataHandler| | DatumLeaseManager | | AsyncHashedWheelTimer |
+--------+---------+ +--------+----------+ +-----------+------------+
| | new |
doHandle +------------------------> |
| renew | |
+-------------------> | |
| | |
| | |
| scheduleEvictTask |
| | |
| | newTimeout |
| +----------> +------------------------> |
| | | |
| | | |
| | | |
| | | No +
| | | <---------------+ if (ownPubSize > 0)
| | | +
| | v |
| +--+ scheduleEvictTask | Yes
| + v
| | evict
| | |
v v v
2.3 驱逐
2.3.1 数据结构

在DatumLeaseManager之中,主要是有如下数据结构对续约起作用。


private ScheduledThreadPoolExecutor executorForHeartbeatLess;
private ScheduledFuture<?> futureForHeartbeatLess;

有两个调用途径,这样在数据变化时,就会看看是否可以驱逐:


启动时调用;
显式调用;
2.3.2 显式调用

LocalDataServerChangeEventHandler 类中,调用了datumLeaseManager.reset(),随之调用了 evict。


@Override
public void doHandle(LocalDataServerChangeEvent localDataServerChangeEvent) {
isChanged.set(true);
// Better change to Listener pattern
localDataServerCleanHandler.reset();
datumLeaseManager.reset();
events.offer(localDataServerChangeEvent);
}

DatumLeaseManager的reset调用了scheduleEvictTaskForHeartbeatLess启动了驱逐线程。


public synchronized void reset() {
if (futureForHeartbeatLess != null) {
futureForHeartbeatLess.cancel(false);
}
scheduleEvictTaskForHeartbeatLess();
}
2.3.3 启动调用

启动时候,会启动驱逐线程。


@PostConstruct
public void init() {
......
executorForHeartbeatLess = new ScheduledThreadPoolExecutor(1, threadFactoryBuilder
.setNameFormat("Registry-DatumLeaseManager-ExecutorForHeartbeatLess").build());
scheduleEvictTaskForHeartbeatLess();
}
2.3.4 驱逐

具体驱逐是通过启动了一个定时线程 EvictTaskForHeartbeatLess 来完成。


private void scheduleEvictTaskForHeartbeatLess() {
futureForHeartbeatLess = executorForHeartbeatLess.scheduleWithFixedDelay(
new EvictTaskForHeartbeatLess(), dataServerConfig.getDatumTimeToLiveSec(),
dataServerConfig.getDatumTimeToLiveSec(), TimeUnit.SECONDS);
}

当时间端到达之后,会从datumCache获取目前所有connectionId,然后遍历connectionID,看看上次时间戳是否到期,如果到期就驱逐。


/**
* evict own connectIds with heartbeat less
*/
private class EvictTaskForHeartbeatLess implements Runnable {
@Override
public void run() {
// If in a non-working state, cannot clean up because the renew request cannot be received at this time.
if (!isRenewEnable()) {
return;
}
Set<String> allConnectIds = datumCache.getAllConnectIds();
for (String connectId : allConnectIds) {
Long timestamp = connectIdRenewTimestampMap.get(connectId);
// no heartbeat
if (timestamp == null) {
int ownPubSize = getOwnPubSize(connectId);
if (ownPubSize > 0) {
evict(connectId);
}
}
}
}
}

这里调用


private void evict(String connectId) {
disconnectEventHandler.receive(new ClientDisconnectEvent(connectId, System
.currentTimeMillis(), 0));
}

具体如下图:


+--------------------------------------------------+
| DatumLeaseManager |
| |
| |
| EvictTaskForHeartbeatLess.run |
| |
| +----------------------------------------------+ |
| | | |
| | | | |
| | | | |
| | v | |
| | | |
| | allConnectIds = datumCache.getAllConnectIds()| |
| | | |
| | | | |
| | | for (allConnectIds) | |
| | v | |
| | | |
| | connectIdRenewTimestampMap | |
| | | |
| | | | |
| | | no heartbeat | |
| | v | |
| | | |
| | evict | |
| | | |
| +----------------------------------------------+ |
+--------------------------------------------------+
2.3.5 驱逐处理业务
2.3.5.1 转发驱逐消息

驱逐消息需要转发出来,就对应到 DisconnectEventHandler . receive 这里,就是 EVENT_QUEUE.add(event);


public class DisconnectEventHandler implements InitializingBean, AfterWorkingProcess {
/**
* a DelayQueue that contains client disconnect events
*/
private final DelayQueue<DisconnectEvent> EVENT_QUEUE = new DelayQueue<>();
@Autowired
private SessionServerConnectionFactory sessionServerConnectionFactory;
@Autowired
private DataChangeEventCenter dataChangeEventCenter;
@Autowired
private DataNodeStatus dataNodeStatus;
private static final BlockingQueue<DisconnectEvent> noWorkQueue = new LinkedBlockingQueue<>();
public void receive(DisconnectEvent event) {
if (dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) {
noWorkQueue.add(event);
return;
}
EVENT_QUEUE.add(event);
}
}

在 afterPropertiesSet 中会启动一个 Thread,循环从 EVENT_QUEUE 之中取出消息,然后处理,具体就是:


从 sessionServerConnectionFactory 之中移除对应的 Connection;
给 dataChangeEventCenter 发一个 ClientChangeEvent 通知;

具体如下:


@Override
public void afterPropertiesSet() {
Executor executor = ExecutorFactory
.newSingleThreadExecutor(DisconnectEventHandler.class.getSimpleName());
executor.execute(() -> {
while (true) {
try {
DisconnectEvent disconnectEvent = EVENT_QUEUE.take();
if (disconnectEvent.getType() == DisconnectTypeEnum.SESSION_SERVER) {
SessionServerDisconnectEvent event = (SessionServerDisconnectEvent) disconnectEvent;
String processId = event.getProcessId();
//check processId confirm remove,and not be registered again when delay time
String sessionServerHost = event.getSessionServerHost();
if (sessionServerConnectionFactory
.removeProcessIfMatch(processId,sessionServerHost)) {
Set<String> connectIds = sessionServerConnectionFactory
.removeConnectIds(processId);
if (connectIds != null && !connectIds.isEmpty()) {
for (String connectId : connectIds) {
unPub(connectId, event.getRegisterTimestamp());
}
}
}
} else {
ClientDisconnectEvent event = (ClientDisconnectEvent) disconnectEvent;
unPub(event.getConnectId(), event.getRegisterTimestamp());
}
}
}
});
}
/**
*
* @param connectId
* @param registerTimestamp
*/
private void unPub(String connectId, long registerTimestamp) {
dataChangeEventCenter.onChange(new ClientChangeEvent(connectId, dataServerConfig
.getLocalDataCenter(), registerTimestamp));
}

如下图所示


+--------------------------------------------------+
| DatumLeaseManager |
| |
| |
| EvictTaskForHeartbeatLess.run |
| |
| +----------------------------------------------+ |
| | | |
| | | | |
| | | | |
| | v | |
| | | |
| | allConnectIds = datumCache.getAllConnectIds()| |
| | | |
| | | | |
| | | for (allConnectIds) | | +------------------------+
| | v | | | |
| | | | | DisconnectEventHandler |
| | connectIdRenewTimestampMap | | | |
| | | | | +-------------+ |
| | | | | | | noWorkQueue | |
| | | no heartbeat | | | +-------------+ |
| | v | | receive | |
| | | | | +--------------+ |
| | evict +---------------------------------> | EVENT_QUEUE | |
| | | | | +--------------+ |
| +----------------------------------------------+ | +------------------------+
+--------------------------------------------------+
2.3.5.1 DataChangeEventCenter 转发

逻辑然后来到了 DataChangeEventCenter,这里也是起到转发作用。


public class DataChangeEventCenter {
/**
* queues of DataChangeEvent
*/
private DataChangeEventQueue[] dataChangeEventQueues;
/**
* receive changed publisher, then wrap it into the DataChangeEvent and put it into dataChangeEventQueue
*
* @param publisher
* @param dataCenter
*/
public void onChange(Publisher publisher, String dataCenter) {
int idx = hash(publisher.getDataInfoId());
Datum datum = new Datum(publisher, dataCenter);
if (publisher instanceof UnPublisher) {
datum.setContainsUnPub(true);
}
if (publisher.getPublishType() != PublishType.TEMPORARY) {
dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
DataSourceTypeEnum.PUB, datum));
} else {
dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
DataSourceTypeEnum.PUB_TEMP, datum));
}
}
}
2.3.5.2 DataChangeEventQueue 处理

具体业务是 DataChangeEventQueue 完成的,就是调用 addTempChangeData 与 handleDatum 处理对应数据,就是处理这些需要驱逐的数据。


当event被取出之后,会根据 DataChangeScopeEnum.DATUM 的不同,会做不同的处理。


如果是DataChangeScopeEnum.DATUM,则判断dataChangeEvent.getSourceType();
如果是 DataSourceTypeEnum.PUB_TEMP,则addTempChangeData,就是往CHANGE_QUEUE添加ChangeData;
如果不是,则handleDatum;
如果是DataChangeScopeEnum.CLIENT,则handleClientOff((ClientChangeEvent) event);
如果是DataChangeScopeEnum.SNAPSHOT,则handleSnapshot((DatumSnapshotEvent) event);

具体参见前文 从源码学设计]蚂蚁金服SOFARegistry之消息总线异步处理


+--------------------------------------------------+
| DatumLeaseManager |
| |
| |
| EvictTaskForHeartbeatLess.run |
| |
| +----------------------------------------------+ |
| | | |
| | | | |
| | | | |
| | v | |
| | | |
| | allConnectIds = datumCache.getAllConnectIds()| |
| | | |
| | | | |
| | | for (allConnectIds) | | +------------------------+
| | v | | | |
| | | | | DisconnectEventHandler |
| | connectIdRenewTimestampMap | | | |
| | | | | +-------------+ |
| | | | | | | noWorkQueue | |
| | | no heartbeat | | | +-------------+ |
| | v | | receive | |
| | | | | +--------------+ |
| | evict +---------------------------------> | EVENT_QUEUE | |
| | | | | +--------------+ |
| +----------------------------------------------+ | +------------------------+
+--------------------------------------------------+ |
|
+----------------------+ | onChange
| DataChangeEventQueue | v
| | +--------+------------------+
| | | DataChangeEventCenter |
| +------------+ | | |
| | eventQueue | | add DataChangeEvent | |
| +------------+ | | +-----------------------+ |
| | <-----------------------------+ | | dataChangeEventQueues | |
| addTempChangeData | | +-----------------------+ |
| | +---------------------------+
| handleDatum |
| |
+----------------------+
0xFF 参考

蚂蚁金服服务注册中心如何实现 DataServer 平滑扩缩容


蚂蚁金服服务注册中心 SOFARegistry 解析 | 服务发现优化之路


服务注册中心 Session 存储策略 | SOFARegistry 解析


海量数据下的注册中心 - SOFARegistry 架构介绍


服务注册中心数据分片和同步方案详解 | SOFARegistry 解析


蚂蚁金服开源通信框架SOFABolt解析之连接管理剖析


蚂蚁金服开源通信框架SOFABolt解析之超时控制机制及心跳机制


蚂蚁金服开源通信框架 SOFABolt 协议框架解析


蚂蚁金服服务注册中心数据一致性方案分析 | SOFARegistry 解析


蚂蚁通信框架实践


sofa-bolt 远程调用


sofa-bolt学习


SOFABolt 设计总结 - 优雅简洁的设计之道


SofaBolt源码分析-服务启动到消息处理


SOFABolt 源码分析


SOFABolt 源码分析9 - UserProcessor 自定义处理器的设计


SOFARegistry 介绍


SOFABolt 源码分析13 - Connection 事件处理机制的设计


版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/rossiXYZ/p/14284967.html

随机推荐

【mui】mui.ajax 如何和php建立链接

1、一定要注意php的路径,是你链接局域网的ipv4地址2、使用本地地址(127.0.0.1或者localhost)也得不到数据3、这个地址是根据你链接的wifi或者热点来变化的...

菜鸟M 阅读(573)

2.ES6-新特性

1、let声明变量//1.var声明的是全局变量,let声明的是局部变量<script>//var声明的变量往往会越域//let声明的变量有严格局部作用域{vara=1;letb&...

云朵很甜。 阅读(604)

3DMAX零基础图文教程学习

3DMAX零基础图文教程学习

今天给大家带来一篇零基础的教程,许多小伙伴可能为想学3Dmax而无从下手,希望这篇教程可以帮助到大家。※第一步1.打开软件啊后会显示下图所示界面。 2.我们可以暂时把不需要...

金豆数据工程师 阅读(607)

Mysql 零距离-入门(六)数据唯一约束性

在数据记录中我们为保证数据的唯一性,保证用户数据在数据中不出现重复记录。我们需要用到Mysql的主键key,唯一约束。主键约束PRIMARYKEY一张数据表中只能存在一个主...

杨周龙 阅读(753)