ZkClient源码阅读与疑问总结
- 在client端使用NIO意义/优点, 相比于传统的BIO是啥?
- https://blog.csdn.net/cenwenchu79/article/details/4586939
- 尤其是在client需要连接海量的服务端时, 例如爬虫, 或者海量物理机集群的管控服务等, 参见异步HttpClient使用Netty作为SocketChannel的Provider
- client重连机制是怎样的?
- client是如何判定连接断开的?
- 几个重要参数是怎样的?
- 已经建立的连接, 超时时间是多久, 则认为是连接断开了?
- 未建立的连接, 建连超过多久则认为无法建立连接, 从而新选择其他zk host?
- 后羿的重连机制?
- curator的重连机制?
- shuffle机制!!!
org.apache.zookeeper.ClientCnxn.SendThread#onConnected
readTimeout = negotiatedSessionTimeout * 2 / 3;
connectTimeout = negotiatedSessionTimeout / hostProvider.size();
为啥在连接建立的最后, 要queue一个None的Event?
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
eventState, null));
priming-packet 启动packet
WatchedEvent 里两个重要的属性:
Watcher.Event.EventType
None (-1),
NodeCreated (1),
NodeDeleted (2),
NodeDataChanged (3),
NodeChildrenChanged (4);KeeperState
case -1: return KeeperState.Unknown;
case 0: return KeeperState.Disconnected;
case 1: return KeeperState.NoSyncConnected;
case 3: return KeeperState.SyncConnected;
case 4: return KeeperState.AuthFailed;
case 5: return KeeperState.ConnectedReadOnly;
case 6: return KeeperState.SaslAuthenticated;
case -112: return KeeperState.Expired;
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
Auth机制&流程
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
selectionKey关键:
- 往selector里注册事件, sockKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
- 幂等的. 即可以重复注册多次.
- 持续生效的: 即加入注册一次 sockKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
- 后续任意时刻网卡里有数据可读, selector.selectedKeys() 都会返回 SelectionKey, 不会select一次之后就关掉该interest了
- 任意时刻网卡写队列可写, selector.selectedKeys() 都会返回 SelectionKey, 不会write一次之后即关掉了
- 所以当没有数据要写时, 需要去掉OP_WRITE的interest, 否则会在SendThread里死循环.
- 其他:
- Selector, Channel, SelectionKey 三者对象是啥关系?
- 三者的线程安全情况?
// TODO: 待实验验证
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
三个重要的QUEUE:
- outgoingQueue(LinkedList):
- 生产者: 客户线程; 注册/执行操作, 会封装成packet, 放到outgoingQueue中.
- 消费者: SendThread; 将outgoingQueue中packet序列化, 执行网络IO, 发送给zookeeperServer
- 其他: 生产者可以通过 packet.finished+packet.wait(); 阻塞等待服务端返回结果
- pendingQueue(LinkedList):
- 生产者: SendThread; 当将packet已经完全发送给服务端之后, 会把packet从outgoingQueue中移除, 放到pendingQueue中
- 消费者: SendThread; 当收到服务端的reply, 根据(xid+严格顺序)匹配到对应的packet之后, 会把该packet从pendingQueue中移除, 同时:
- waitingEvents(LinkedBlockingQueue):
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
SendThread 机制:
为某个路径注册watcher过程
— UserThread发起: 生成packet对象
- UserThread: zk.exists(path,new Watcher() {xxx});
- UserThread: 将{path, watcher对象} 封装成packet放到 outgoingQueue
- UserThread: 调用selector.wakeup(); 引发SendThread解除阻塞
- UserThread: 陷入阻塞, 循环等待packet.finished完成
synchronized (packet) {
while (!packet.finished) {
packet.wait();
}
}
— SendThread注册Selector:
- SendThread: 之前阻塞在selector.select(waitTimeOut); 由于上一步, UserThread调用了selector.wakeup(), 导致解除阻塞
- SendThread: selector.selectedKeys(); 由于服务端还没有返回数据, 即客户端缓冲区无数据, 所以为空.
- SendThread: 接下来查看outgoingQueue非空, 因此在Selector里注册 SelectionKey.OP_WRITE 事件 sockKey.interestOps(i | SelectionKey.OP_WRITE);
— SendThread执行I/O Write: enrich packet.request
- SendThread: 重新进入循环, selector.selectedKeys(); 由于此时操作系统写缓冲区readyForWrite(即写缓冲区未溢出), 因此(k.readyOps() & SelectionKey.OP_WRITE == SelectionKey.OP_WRITE)
- SendThread: 将{path, watcher对象}序列化, 序列化的关键信息:
- xid: 如果packet里没有, 会在这个时候从ClientCnxn里拿出来设置给packet
- 这里xid为per client的, 全局递增的ID
- 与sessionId是不同的.
- path:
- watch: true/false, 让服务端知道是否需要watch, 即该路径有变化时, 是否需要通知对应的xid
- 包字节长度 int, 占用4个字节: this.bb.putInt(this.bb.capacity() - 4);
- xid: 如果packet里没有, 会在这个时候从ClientCnxn里拿出来设置给packet
- SendThread: SocketChannel.write(packet.bb); 执行网络IO
- SendThread: 写包完成(具体大包拆解过程参加下边), 将packet放入到pendingQueue中
— 服务端执行完成, 返回包给客户端, 数据已经写入客户端网卡缓冲区
— SendThread执行I/O Read: enrich packet.response
- SendThread: 重新进入循环, selector.selectedKeys(); 由于此时客户端网卡读缓冲区有数据, 因此(k.readyOps() & SelectionKey.OP_READ == SelectionKey.OP_READ)
- SendThread: readResponse()
- 反序列化ReplyHeader
- ReplyHeader重要字段:
- xid
- zxid
- 从pendingQueue中取出第一个packet
- 判断packet.xid 是否等于 replyHead.xid
- ReplyHeader重要字段:
- 如果replyHeader匹配上packet, 则将反序列化responseBody; 设置为 packet.response; 至此response读取处理完毕
- 整个packet完整了, 既包含了request信息, 又包含了reponse信息
- 具体如何处理大的包的读取(拆包问题), 后边详细介绍.
- 反序列化ReplyHeader
- SendThread: 往Map<String, Set
>里注册watcher, key是clientPath - p.watchRegistration.register(p.replyHeader.getErr());
- watches.put(clientPath, watchers);
- SendThread: 由于packet的callback为null, 因此
- p.finished = true;
- p.notifyAll();
- 终于到这里了, UserThread收到notifyAll的消息, 从block中唤醒.
- SendThread: IO Read流程结束. 重新新一轮的select流程
— watcher怎么漏了? 具体怎么watch的呢? 参见下边 Watcher流程 章节
(具体是怎么找到写时候对应的packet呢? 根据啥作为index? )
- 根据严格的 FIFO顺序+xid
- 即第一个发出去的packet, 必然会第一个收到reply. 这里有点不可思议: 如果网络包延时, 导致到达客户端的时候乱序怎么办?
- 同时根据xid进行匹配校验.
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
SendThread 如何处理大包写问题: (拆包问题)
具体代码在这里: org.apache.zookeeper.ClientCnxnSocketNIO#doIO
大致流程:
首先从outgoingQueue中获取到packet (注意此时不会把packet从outgoingQuque中删除)
序列化packet, 放到packet.bb字段中, 会在开头4个字节表明包内容长度
socketChannel.write(packet.bb)
判断bb.hasRemaining(); 如果为true, 则说明单次写入没写完.
重新往Selector里注册写需求: sockKey.interestOps(i | SelectionKey.OP_WRITE);
再次进入sendThread循环
由于上一步packet仍然在outgoingQueue中没被删除, 因此获取到了刚才的packet
由于已经序列化过, 即 packet.bb字段非空, 因此不需要重新序列化.
继续在之前的bb index处往后执行网络IO: socketChannel.write(packet.bb)
判断bb.hasRemaining(); 如果为true, 则循环 6~10 步骤
bb.hasRemaining() 为false; 则说明该包写完成.
将该包从outgoingQueue中删除
将该包放到pendingQueue中
如果outgoingQueue为空, 则不再注册SelectionKey.OP_WRITE 事件
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
SendThread 如何处理大包读问题: (黏包问题)
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
ByteBuffer & SocketChannel 的合并用法:
ByteBuffer bb = xxx; // 假设1024B
SocketChannel.write(bb); // 这里如果单次只写出了1B, 那么是否会移动bb的cursor?
bb.hasRemaining(); // 这里是否为true?
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
心跳机制:
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
Zk的watch机制是如何工作的?
ZKWatchManager 里三个重要的watcherMap:
dataWatches
existWatches: 所有校验exists的watcher都放在这里, key: clientPath
childWatches:
defaultWatcher: new ZooKeeper()时在构造方法里传入的. 可以用来订阅zk状态变化, 也可以订阅zk node变化
作用分别是啥?
如何处理路径级联问题?
例如在/root路径上注册了事件, 同时/root/dir 节点是存在的, 后边实际增加了 /root/dir/a 节点; 那么/root节点上注册watcher会收到事件么?
- 所有的watcher都是在EventThread线程里, processEvent()方法里, 串行执行的.
通过LinkedBlockingQueue.take()一个一个串行执行.
所以需要避免在注册的watcher.process方法里执行过重的操作. 否则会导致整个watcher机制延时.
Watcher流程
- SendThread: 服务端node有变化, 发送消息到客户端, 客户端网卡中数据readyForRead, 因此select()返回
- SendThread: 反序列化消息头; 根据消息头replyHeader.xid == -1 判定是watchEvent, 而不是普通packet的response.
- SendThread: 反序列化消息体; WatcherEvent 结构, 重要字段:
- Path: 即发生变化的node路径
- KeeperState: zk状态变化情况
- EventType: zk node变化情况, 例如node数据更新, noe
- SendThread: materialize() 找到该event对应的watcher
- 从watchRegistration里, 根据clientPath, map里获取到对应的watchers
- Remove掉路径对应的这个watcher
- SendThread: 把WatcherSetEventPair{event, watchers} 组合加到EventThread的waitingEvents queue中.
所以当某个路径上的事件进入到watcher.process之后, 该watcher其实是已经被remove掉了. 如果有持续watch的需求, 还需要重新注册.
EventThread处理流程:
6. EventThread: 无限循环, 从waitingEvents中take一个Event
7. EventThread: 获取对应的WatcherSetEventPair
1. 从WatcherSetEventPair中获取到watchers,
2. 循环遍历执行, watcher.process(event)
// todo: EventThread processEvent里另外一个分支, Packet相关是怎么发生, 怎么处理的?
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
watcher有以下三个特性:
1、一次性:watch事件是一次性触发的,如果想再次监控数据,必须重新设置监控
2、客户端串行执行:客户端Watcher回调过程是一个串行同步的过程,这为我们保证了顺序。
3、轻量的:WatchedEvent是zk整个Watcher通知机制的最小通知单元,只有三部分组成:通知状态、时间类型、节点路径。也就是说,具体发生了什么变化,是需要客户端自己去查询的。
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
3、可以用exists方法:exists(path,true)来添加watch事件,当下次操作path会出发watch事件
例如:
zk.exists(path,true);//给path添加watch事件
zk.writeData(path,"newValue");//触发watch事件 EventType->nodeDataChenage
zk.exists(path,true);//给path添加watch事件
zk.create(path,"newValue",Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);//触发watch事件 EventType->nodeCreated
zk.exists(path,true);//给path添加watch事件
zk.delete(path,-1);//触发watch事件 EventType->nodeDeleted
4、getChildren(path,neeWatch) //给path添加watch事件,监控子节点变化
如果path新增或者删除节点会触发watch事件 EventType->nodechildrenChanged
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
如果客户端启动多个zk, new ZooKeeper(), 对象结构是怎样的? 哪些是线程共用的? 哪些是线程私有的? 哪些是线程安全的?
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
Etcd 与 ZK 区别?
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —