Zookeeper-Watch 机制

1. Watch 机制

在日常生活中也有很多订阅发布的场景。比如我们喜欢观看某一个剧集,视频网站会有一个订阅按钮,用户可以订阅自己喜欢的电视剧,当有新的剧集发布时,网站会通知该用户第一时间观看。或者我们在网站上看到一件心仪的商品,但是当前没有库存,网站会提供到货通知的功能,我们开启这个商品的到货通知功能后,商品补货的时候会通知我们,之后就可以进行购买了。ZooKeeper 中的 Watch 机制很像这些日常的应用场景,其中的客户端就是用户,而服务端的数据节点就好像是我们订阅的商品或剧集。

现在我们可以从技术实现的角度分析一下上边提到的这些场景,无论是订阅一集电视剧还是订购一件商品。都有几个核心节点,即用户端注册服务、服务端处理请求、客户端收到回调后执行相应的操作。

2. Zookeeper-Watch机制

ZooKeeper 中的 Watch 机制实现了分布式的通知功能,Zookeeper允许客户端向服务端注册一个Watcher监听,当服务端的一些指定事件触发了这个Watcher,就会向客户端发送一个事件通知来实现分布式的通知功能。

我们知道Zookeeper是通过节点储存数据的,因此当ZNode节点数据发生变化的时候就能触发。过程如下:

技术分享图片

ZooKeeper 的客户端也可以通过 Watch 机制来订阅当服务器上某一节点的数据或状态发生变化时收到相应的通知,我们可以通过向 ZooKeeper 客户端的构造方法中传递 Watcher 参数的方式实现:

1
new ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)

上面代码的意思是定义了一个了 ZooKeeper 客户端对象实例,并传入三个参数:

  • connectString 服务端地址

  • sessionTimeout:超时时间

  • Watcher:监控事件

这个 Watcher 将作为整个 ZooKeeper 会话期间的上下文 ,一直被保存在客户端 ZKWatchManagerdefaultWatcher 中。

除此之外,ZooKeeper 客户端也可以通过 getDatagetChildrenexists三个接口来向 ZooKeeper 服务器注册 Watcher,从而方便地在不同的情况下添加 Watch 事件:

getData(String path, Watcher watcher, Stat stat)

3. 状态和事件

image.png

上图中列出了客户端在不同会话状态下,相应的在服务器节点所能支持的事件类型。例如在客户端连接服务端的时候,可以对数据节点的创建、删除、数据变更、子节点的更新等操作进行监控。

4. Watch 机制的底层原理

由于 Watch 机制涉及了客户端和服务端的多个函数和操作节点,单单按照程序执行流程分析跳跃性对整体实现机制的理解难度大。为了更好地阐述 Watch 机制,我们另辟蹊径,从设计模式角度出发来分析其底层实现:

image

学习 Watch 机制的时候,它给我的第一印象是,其结构很像设计模式中的”观察者模式“,一个对象或者数据节点可能会被多个客户端监控,当对应事件被触发时,会通知这些对象或客户端。我们可以将 Watch 机制理解为是分布式环境下的观察者模式。所以接下来我们就以观察者模式的角度点来看看 ZooKeeper 底层 Watch 是如何实现的。

image

通常我们在实现观察者模式时,最核心或者说关键的代码就是创建一个列表来存放观察者。 而在 ZooKeeper 中则是在客户端和服务器端分别实现两个存放观察者列表,即:ZKWatchManagerWatchManager。其核心操作就是围绕着这两个展开的。

4.1 客户端 Watch 注册实现过程

客户端在发送一个 Watch 监控事件的会话请求时,ZooKeeper 客户端主要做了两个工作:

  • 标记该会话是一个带有 Watch 事件的请求

  • 将 Watch 事件存储到 ZKWatchManager

  • getData

我们以 getData 接口为例。当发送一个带有 Watch 事件的请求时,客户端首先会把该会话标记为带有 Watch 监控的事件请求,之后通过 DataWatchRegistration 类来保存 watcher 事件和节点的对应关系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* Return the data and the stat of the node of the given path.
* <p>
* If the watch is non-null and the call is successful (no exception is
* thrown), a watch will be left on the node with the given path. The watch
* will be triggered by a successful operation that sets data on the node, or
* deletes the node.
* <p>
* A KeeperException with error code KeeperException.NoNode will be thrown
* if no node with the given path exists.
*
* @param path the given path
* @param watcher explicit watcher
* @param stat the stat of the node
* @return the data of the node
* @throws KeeperException If the server signals an error with a non-zero error code
* @throws InterruptedException If the server transaction is interrupted.
* @throws IllegalArgumentException if an invalid path is specified
*/
public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException {
... ...
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new DataWatchRegistration(watcher, clientPath);
}

RequestHeader h = new RequestHeader();

GetDataRequest request = new GetDataRequest();
... ...
request.setWatch(watcher != null);
GetDataResponse response = new GetDataResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
... ...
return response.getData();
}
  • cnxn.submitRequest
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public ReplyHeader submitRequest(
RequestHeader h,
Record request,
Record response,
WatchRegistration watchRegistration) throws InterruptedException {
return submitRequest(h, request, response, watchRegistration, null);
}

public ReplyHeader submitRequest(
RequestHeader h,
Record request,
Record response,
WatchRegistration watchRegistration,
WatchDeregistration watchDeregistration) throws InterruptedException {
ReplyHeader r = new ReplyHeader();
Packet packet = queuePacket(
h,
r,
request,
response,
null,
null,
null,
null,
watchRegistration,
watchDeregistration);
... ...
return r;
}
  • queuePacket

之后客户端向服务器发送请求时,是将请求封装成一个 Packet 对象,并添加到一个等待发送队列 outgoingQueue 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public Packet queuePacket(
RequestHeader h,
ReplyHeader r,
Record request,
Record response,
AsyncCallback cb,
String clientPath,
String serverPath,
Object ctx,
WatchRegistration watchRegistration,
WatchDeregistration watchDeregistration) {
Packet packet = null;

// Note that we do not generate the Xid for the packet yet. It is
// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
// where the packet is actually sent.
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
packet.watchDeregistration = watchDeregistration;

// The synchronized block here is for two purpose:
// 1. synchronize with the final cleanup() in SendThread.run() to avoid race
// 2. synchronized against each packet. So if a closeSession packet is added,
// later packet will be notified.
synchronized (outgoingQueue) {
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
// mark as closing
if (h.getType() == OpCode.closeSession) {
closing = true;
}
outgoingQueue.add(packet);
}
}
sendThread.getClientCnxnSocket().packetAdded();
return packet;
}

outgoingQueue 是一个LinkedBlockingDeque。正如注释所说:队列存储的是需要被发送的packets。

/**These are the packets that need to be sent.*/ private final LinkedBlockingDeque outgoingQueue = new LinkedBlockingDeque<>();

最后,ZooKeeper 客户端就会向服务器端发送这个请求,完成请求发送后。调用负责处理服务器响应的 SendThread 线程类中的 readResponse 方法接收服务端的回调,并在最后执行 finishPacket()方法将 Watch 注册到 ZKWatchManager 中:

sendThread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
 /**
* This class services the outgoing request queue and generates the heart
* beats. It also spawns the ReadThread.
*/
class SendThread extends ZooKeeperThread {
... ...

void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();

replyHdr.deserialize(bbia, "header");
switch (replyHdr.getXid()) {
case PING_XID:
... ...
return;
case AUTHPACKET_XID:
... ...
return;
case NOTIFICATION_XID:
... ...
return;
default:
break;
}
... ...
/*
* Since requests are processed in order, we better get a response
* to the first request!
*/
try {
if (packet.requestHeader.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid()
+ " with err " + replyHdr.getErr()
+ " expected Xid " + packet.requestHeader.getXid()
+ " for a packet with details: " + packet);
}

packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
if (packet.response != null && replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response");
}

LOG.debug("Reading reply session id: 0x{}, packet:: {}", Long.toHexString(sessionId), packet);
} finally {
finishPacket(packet);
}
}
... ...
}
  • finishPacket
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void finishPacket(Packet p) {

int err = p.replyHeader.getErr();

if (p.watchRegistration != null) {

p.watchRegistration.register(err);

}
...
}

public void register(int rc) {
if (shouldAddWatch(rc)) {
Map<String, Set<Watcher>> watches = getWatches(rc);
synchronized (watches) {
Set<Watcher> watchers = watches.get(clientPath);
if (watchers == null) {
watchers = new HashSet<>();
watches.put(clientPath, watchers);
}
watchers.add(watcher);
}
}
}

总结:

客户端当发送一个带有 Watch 事件的请求时,客户端首先会把该会话标记为带有 Watch 监控的事件请求,之后通过 DataWatchRegistration 类来保存 watcher 事件和节点的对应关系,然后将请求封装成一个 Packet 对象,并添加到一个等待发送队列 outgoingQueue 中。

最后,ZooKeeper 客户端就会向服务器端发送这个请求,完成请求发送后。调用负责处理服务器响应的 SendThread 线程类中的 readResponse 方法接收服务端的回调,并在最后执行 finishPacket方法将 Watch 注册到 ZKWatchManager 中。

4.2 服务端 Watch 注册实现过程

Zookeeper 服务端处理 Watch 事件基本有 2 个过程:

  • 解析收到的请求是否带有 Watch 注册事件
  • 将对应的 Watch 事件存储到 WatchManager

当 ZooKeeper 服务器接收到一个客户端请求后,首先会对请求进行解析,判断该请求是否包含 Watch 事件。这在 ZooKeeper 底层是通过 FinalRequestProcessor 类中的 processRequest 函数实现的。当 getDataRequest.getWatch() 值为 True 时,表明该请求需要进行 Watch 监控注册。并通过 zks.getZKDatabase().getData 函数将 Watch 事件注册到服务端的 WatchManager 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void processRequest(Request request) {
... ...
case OpCode.multiRead: {
lastOp = "MLTR";
MultiOperationRecord multiReadRecord = request.readRequestRecord(MultiOperationRecord::new);
rsp = new MultiResponse();
OpResult subResult;
for (Op readOp : multiReadRecord) {
... ...
rec = handleGetDataRequest(readOp.toRequestRecord(), cnxn, request.authInfo);
GetDataResponse gdr = (GetDataResponse) rec;
... ...
}
break;
}
}
  • handleGetDataRequest
1
2
3
4
5
6
7
8
9
10
11
12
13
private Record handleGetDataRequest(Record request, ServerCnxn cnxn, List<Id> authInfo) throws KeeperException, IOException {
GetDataRequest getDataRequest = (GetDataRequest) request;
String path = getDataRequest.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
zks.checkACL(cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, authInfo, path, null);
Stat stat = new Stat();
byte[] b = zks.getZKDatabase().getData(path, stat, getDataRequest.getWatch() ? cnxn : null);
return new GetDataResponse(b, stat);
}

  • zks.getZKDatabase().getData
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public byte[] getData(String path, Stat stat, Watcher watcher) throws NoNodeException {
DataNode n = nodes.get(path);
if (n == null) {
throw new NoNodeException();
}
byte[] data;
synchronized (n) {
n.copyStat(stat);
if (watcher != null) {
dataWatches.addWatch(path, watcher);
}
data = n.data;
}
updateReadStat(path, data == null ? 0 : data.length);
return data;
}

通过 dataWatches.addWatch(path, watcher);将 Watch 事件注册到服务端的 WatchManager 中

4.3 服务端 Watch 事件的触发过程

在客户端和服务端都对 watch 注册完成后, ZooKeeper 会在特定场景汇总中触发Watch 事件。

setData 接口即"节点数据内容发生变更"事件为例。

  • DataTree->setData
1
2
3
4
5
6
7
8
9
10
11
12
13
public Stat setData(String path, byte data[], ...){

Stat s = new Stat();

DataNode n = nodes.get(path);

...

dataWatches.triggerWatch(path, EventType.NodeDataChanged);

return s;

}

在 setData 方法内部执行完对节点数据的变更后,会调用 WatchManager.triggerWatch 方法触发数据变更事件。

  • triggerWatch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
Set<Watcher> watchers = new HashSet<>();
PathParentIterator pathParentIterator = getPathParentIterator(path);
synchronized (this) {
for (String localPath : pathParentIterator.asIterable()) {
Set<Watcher> thisWatchers = watchTable.get(localPath);
if (thisWatchers == null || thisWatchers.isEmpty()) {
continue;
}
Iterator<Watcher> iterator = thisWatchers.iterator();
while (iterator.hasNext()) {
Watcher watcher = iterator.next();
WatcherMode watcherMode = watcherModeManager.getWatcherMode(watcher, localPath);
... ...
watchers.add(watcher);//添加到定义的 Wathcers 集合中
}

if (thisWatchers.isEmpty()) {
watchTable.remove(localPath);
}
}
}
if (watchers.isEmpty()) {
... ...
return null;
}

for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
w.process(e);//调用 process 方法
}

return watchers;

}

triggerWatch 函数内部首先封装了一个具有会话状态事件类型数据节点 3 种属性的 WatchedEvent 对象。之后查询该节点注册的 Watch 事件,如果为空说明该节点没有注册过 Watch 事件。如果存在 Watch 事件则添加到定义的 Wathcers 集合中,并在 WatchManager 管理中删除。最后,通过调用 process 方法向客户端发送通知。

4.4 客户端回调的处理过程

服务器端 Watch 事件的触发过程后,客户端接收到通知后就会进行相应处理。

客户端使用 SendThread.readResponse() 方法来统一处理服务端的相应。首先反序列化服务器发送请求头信息 replyHdr.deserialize(bbia, "header"),并判断相属性字段 xid 的值为 -1,表示该请求响应为通知类型。在处理通知类型时,首先将己收到的字节流反序列化转换成 WatcherEvent 对象。接着判断客户端是否配置了 chrootPath 属性,如果为 True 说明客户端配置了 chrootPath 属性。需要对接收到的节点路径进行 chrootPath 处理。最后调用 eventThread.queueEvent( )方法将接收到的事件交给 EventThread 线程进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();

replyHdr.deserialize(bbia, "header");//反序列化服务器发送请求头信息
switch (replyHdr.getXid()) {
case PING_XID:
... ...
return;
case NOTIFICATION_XID://-1:通知类型
LOG.debug("Got notification session id: 0x{}",Long.toHexString(sessionId));
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");

// convert from a server path to a client path
if (chrootPath != null) {//判断客户端是否配置了 chrootPath 属性
String serverPath = event.getPath();
String clientPath = stripChroot(serverPath);
event.setPath(clientPath);
}

WatchedEvent we = new WatchedEvent(event);
LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId));
eventThread.queueEvent(we);//将接收到的事件交给 EventThread 线程进行处理
return;
default:
break;
}
  • queueEvent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void queueEvent(WatchedEvent event) {
queueEvent(event, null);
}

private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
if (event.getType() == EventType.None && sessionState == event.getState()) {
return;
}
sessionState = event.getState();
final Set<Watcher> watchers;
if (materializedWatchers == null) {
// materialize the watchers based on the event
watchers = watchManager.materialize(event.getState(), event.getType(), event.getPath());
} else {
watchers = new HashSet<>(materializedWatchers);
}
WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
// queue the pair (watch set & event) for later processing
waitingEvents.add(pair);
}
  • materialize
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
public Set<Watcher> materialize(
Watcher.Event.KeeperState state,
Watcher.Event.EventType type,
String clientPath
) {
final Set<Watcher> result = new HashSet<>();

switch (type) {
case None:
... ...
case NodeDataChanged:
case NodeCreated:
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
synchronized (existWatches) {
addTo(existWatches.remove(clientPath), result);
}
addPersistentWatches(clientPath, type, result);
break;
case NodeChildrenChanged:
... ...
case NodeDeleted:
... ...
default:
... ...
throw new RuntimeException(errorMsg);
}

return result;
}

我们会发现此处的Type就是前面的状态:NoneNodeDataChangedNodeCreatedNodeChildrenChangedNodeDeleted

EventThread.queueEvent() 方法内部的执行逻辑。其主要工作分为 2 点:

  • 第 1 步按照通知的事件类型,从 ZKWatchManager 中查询注册过的客户端 Watch 信息。客户端在查询到对应的 Watch 信息后,会将其从 ZKWatchManager 的管理中删除。因此这里多注意,客户端的 Watcher 机制是一次性的,触发后就会被删除

  • 第 2 步获取到对应的 Watcher 信息后,将查询到的 Watcher 存储到 waitingEvents 队列中,调用 EventThread 类中的 run 方法会循环取出在 waitingEvents 队列中等待的 Watcher 事件进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public void run() {
try {
isRunning = true;
while (true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
processEvent(event);//执行实现了 Watcher 接口的 process方法
}
if (wasKilled) {
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}

LOG.info("EventThread shut down for session: 0x{}", Long.toHexString(getSessionId()));
}
  • processEvent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void processEvent(Object event) {
... ...
if (event instanceof WatcherSetEventPair) {
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
watcher.process(pair.event);//执行方法
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
} else if (event instanceof LocalCallback) {
... ...
}
}

processEvent(event) 方法来最终执行实现了 Watcher 接口的 process()方法。

5. 总结

客户端当发送一个带有 Watch 事件的请求时,客户端首先会把该会话标记为带有 Watch 监控的事件请求,之后通过 DataWatchRegistration 类来保存 watcher 事件和节点的对应关系,然后将请求封装成一个 Packet 对象,并添加到一个等待发送队列 outgoingQueue 中。

最后,ZooKeeper 客户端就会向服务器端发送这个请求,完成请求发送后。调用负责处理服务器响应的 SendThread 线程类中的 readResponse 方法接收服务端的回调,并在最后执行 finishPacket方法将 Watch 注册到 ZKWatchManager 中。


当 ZooKeeper 服务器接收到一个客户端请求后,首先会对请求进行解析,判断该请求是否包含 Watch 事件。这在 ZooKeeper 底层是通过 FinalRequestProcessor 类中的 processRequest 函数实现的。当 getDataRequest.getWatch() 值为 True 时,表明该请求需要进行 Watch 监控注册。并通过 zks.getZKDatabase().getData 函数将 Watch 事件注册到服务端的 WatchManager 中。


在特定的场合会触发事件,因此调用 WatchManager.triggerWatch 方法触发事件。 triggerWatch 函数内部首先封装了一个具有会话状态事件类型数据节点 3 种属性的 WatchedEvent 对象。之后查询该节点注册的 Watch 事件,如果为空说明该节点没有注册过 Watch 事件。如果存在 Watch 事件则添加到定义的 Wathcers 集合中,并在 WatchManager 管理中删除。最后,通过调用 process 方法向客户端发送通知。


服务器端 Watch 事件的触发过程后,客户端接收到通知后就会进行相应处理。

客户端使用 SendThread.readResponse() 方法来统一处理服务端的相应。首先反序列化服务器发送请求头信息 replyHdr.deserialize(bbia, "header"),并判断相属性字段 xid 的值为 -1,表示该请求响应为通知类型。在处理通知类型时,首先将己收到的字节流反序列化转换成 WatcherEvent 对象。接着判断客户端是否配置了 chrootPath 属性,如果为 True 说明客户端配置了 chrootPath 属性。需要对接收到的节点路径进行 chrootPath 处理。最后调用 eventThread.queueEvent( )方法将接收到的事件交给 EventThread 线程进行处理。其主要工作分为 2 点:

  • 第 1 步按照通知的事件类型,从 ZKWatchManager 中查询注册过的客户端 Watch 信息。客户端在查询到对应的 Watch 信息后,会将其从 ZKWatchManager 的管理中删除。因此这里多注意,客户端的 Watcher 机制是一次性的,触发后就会被删除

  • 第 2 步获取到对应的 Watcher 信息后,将查询到的 Watcher 存储到 waitingEvents 队列中,调用 EventThread 类中的 run 方法会循环取出在 waitingEvents 队列中等待的 Watcher 事件进行处理。

6.使用场景

订阅发布场景实现

现在我们已经知道 Watch 事件在 ZooKeeper 中的完整处理过程,接下来我们通过一个实际应用来加深我们对 ZooKeeper 中 Watch 机制的理解。

提到 ZooKeeper 的应用场景,你可能第一时间会想到最为典型的发布订阅功能。发布订阅功能可以看作是一个一对多的关系,即一个服务或数据的发布者可以被多个不同的消费者调用。一般一个发布订阅模式的数据交互可以分为消费者主动请求生产者信息的拉取模式,和生产者数据变更时主动推送给消费者的推送模式。ZooKeeper 采用了两种模式结合的方式实现订阅发布功能。下面我们来分析一个具体案例:

在系统开发的过程中会用到各种各样的配置信息,如数据库配置项、第三方接口、服务地址等,这些配置操作在我们开发过程中很容易完成,但是放到一个大规模的集群中配置起来就比较麻烦了。通常这种集群中,我们可以用配置管理功能自动完成服务器配置信息的维护,利用ZooKeeper 的发布订阅功能就能解决这个问题。

我们可以把诸如数据库配置项这样的信息存储在 ZooKeeper 数据节点中。如图中的 /confs/data_item1。服务器集群客户端对该节点添加 Watch 事件监控,当集群中的服务启动时,会读取该节点数据获取数据配置信息。而当该节点数据发生变化时,ZooKeeper 服务器会发送 Watch 事件给各个客户端,集群中的客户端在接收到该通知后,重新读取节点的数据库配置信息。

image

我们使用 Watch 机制实现了一个分布式环境下的配置管理功能,通过对 ZooKeeper 服务器节点添加数据变更事件,实现当数据库配置项信息变更后,集群中的各个客户端能接收到该变更事件的通知,并获取最新的配置信息。要注意一点是,我们提到 Watch 具有一次性,所以当我们获得服务器通知后要再次添加 Watch 事件。

7. 🍪

当服务端某一节点发生数据变更操作时,所有曾经设置了该节点监控事件的客户端都会收到服务器的通知吗?

否定的,通过本课时对 ZooKeeper 内部实现机制的解析可以知道,Watch 事件的触发机制取决于会话的连接状态和客户端注册事件的类型,所以当客户端会话状态或数据节点发生改变时,都会触发对应的 Watch 事件。

8.Read more

:lollipop::发布订阅模式:如何使用 Watch 机制实现分布式通知


博客说明

文章所涉及的资料来自互联网整理和个人总结,意在于个人学习和经验汇总,不用于任何的商业用途。如有侵权,请联系本人删除。谢谢!


Zookeeper-Watch 机制
https://nanchengjiumeng123.top/2022/09/19/framework/zookeeper/2022-09-19_Zookeeper-Watch 机制/
作者
Yang Xin
发布于
2022年9月19日
许可协议