ZkClient源码阅读与疑问总结

  1. 在client端使用NIO意义/优点, 相比于传统的BIO是啥?
    1. https://blog.csdn.net/cenwenchu79/article/details/4586939
    2. 尤其是在client需要连接海量的服务端时, 例如爬虫, 或者海量物理机集群的管控服务等, 参见异步HttpClient使用Netty作为SocketChannel的Provider
  2. client重连机制是怎样的?
    1. client是如何判定连接断开的?
    2. 几个重要参数是怎样的?
      1. 已经建立的连接, 超时时间是多久, 则认为是连接断开了?
      2. 未建立的连接, 建连超过多久则认为无法建立连接, 从而新选择其他zk host?
    3. 后羿的重连机制?
    4. curator的重连机制?
    5. shuffle机制!!!

org.apache.zookeeper.ClientCnxn.SendThread#onConnected

readTimeout = negotiatedSessionTimeout * 2 / 3;
connectTimeout = negotiatedSessionTimeout / hostProvider.size();

为啥在连接建立的最后, 要queue一个None的Event?
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
eventState, null));

priming-packet 启动packet

WatchedEvent 里两个重要的属性:

  1. Watcher.Event.EventType
    None (-1),
    NodeCreated (1),
    NodeDeleted (2),
    NodeDataChanged (3),
    NodeChildrenChanged (4);

  2. KeeperState
    case -1: return KeeperState.Unknown;
    case 0: return KeeperState.Disconnected;
    case 1: return KeeperState.NoSyncConnected;
    case 3: return KeeperState.SyncConnected;
    case 4: return KeeperState.AuthFailed;
    case 5: return KeeperState.ConnectedReadOnly;
    case 6: return KeeperState.SaslAuthenticated;
    case -112: return KeeperState.Expired;

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
Auth机制&流程

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
selectionKey关键:

  1. 往selector里注册事件, sockKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
    1. 幂等的. 即可以重复注册多次.
    2. 持续生效的: 即加入注册一次 sockKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
      1. 后续任意时刻网卡里有数据可读, selector.selectedKeys() 都会返回 SelectionKey, 不会select一次之后就关掉该interest了
      2. 任意时刻网卡写队列可写, selector.selectedKeys() 都会返回 SelectionKey, 不会write一次之后即关掉了
    3. 所以当没有数据要写时, 需要去掉OP_WRITE的interest, 否则会在SendThread里死循环.
  2. 其他:
    1. Selector, Channel, SelectionKey 三者对象是啥关系?
    2. 三者的线程安全情况?

// TODO: 待实验验证

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
三个重要的QUEUE:

  1. outgoingQueue(LinkedList):
    1. 生产者: 客户线程; 注册/执行操作, 会封装成packet, 放到outgoingQueue中.
    2. 消费者: SendThread; 将outgoingQueue中packet序列化, 执行网络IO, 发送给zookeeperServer
    3. 其他: 生产者可以通过 packet.finished+packet.wait(); 阻塞等待服务端返回结果
  2. pendingQueue(LinkedList):
    1. 生产者: SendThread; 当将packet已经完全发送给服务端之后, 会把packet从outgoingQueue中移除, 放到pendingQueue中
    2. 消费者: SendThread; 当收到服务端的reply, 根据(xid+严格顺序)匹配到对应的packet之后, 会把该packet从pendingQueue中移除, 同时:
  3. waitingEvents(LinkedBlockingQueue):

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
SendThread 机制:

为某个路径注册watcher过程
— UserThread发起: 生成packet对象

  1. UserThread: zk.exists(path,new Watcher() {xxx});
  2. UserThread: 将{path, watcher对象} 封装成packet放到 outgoingQueue
  3. UserThread: 调用selector.wakeup(); 引发SendThread解除阻塞
  4. UserThread: 陷入阻塞, 循环等待packet.finished完成
    synchronized (packet) {
    while (!packet.finished) {
    packet.wait();
    }
    }

— SendThread注册Selector:

  1. SendThread: 之前阻塞在selector.select(waitTimeOut); 由于上一步, UserThread调用了selector.wakeup(), 导致解除阻塞
  2. SendThread: selector.selectedKeys(); 由于服务端还没有返回数据, 即客户端缓冲区无数据, 所以为空.
  3. SendThread: 接下来查看outgoingQueue非空, 因此在Selector里注册 SelectionKey.OP_WRITE 事件 sockKey.interestOps(i | SelectionKey.OP_WRITE);

— SendThread执行I/O Write: enrich packet.request

  1. SendThread: 重新进入循环, selector.selectedKeys(); 由于此时操作系统写缓冲区readyForWrite(即写缓冲区未溢出), 因此(k.readyOps() & SelectionKey.OP_WRITE == SelectionKey.OP_WRITE)
  2. SendThread: 将{path, watcher对象}序列化, 序列化的关键信息:
    1. xid: 如果packet里没有, 会在这个时候从ClientCnxn里拿出来设置给packet
      1. 这里xid为per client的, 全局递增的ID
      2. 与sessionId是不同的.
    2. path:
    3. watch: true/false, 让服务端知道是否需要watch, 即该路径有变化时, 是否需要通知对应的xid
    4. 包字节长度 int, 占用4个字节: this.bb.putInt(this.bb.capacity() - 4);
  3. SendThread: SocketChannel.write(packet.bb); 执行网络IO
  4. SendThread: 写包完成(具体大包拆解过程参加下边), 将packet放入到pendingQueue中

— 服务端执行完成, 返回包给客户端, 数据已经写入客户端网卡缓冲区

— SendThread执行I/O Read: enrich packet.response

  1. SendThread: 重新进入循环, selector.selectedKeys(); 由于此时客户端网卡读缓冲区有数据, 因此(k.readyOps() & SelectionKey.OP_READ == SelectionKey.OP_READ)
  2. SendThread: readResponse()
    1. 反序列化ReplyHeader
      1. ReplyHeader重要字段:
        1. xid
        2. zxid
      2. 从pendingQueue中取出第一个packet
      3. 判断packet.xid 是否等于 replyHead.xid
    2. 如果replyHeader匹配上packet, 则将反序列化responseBody; 设置为 packet.response; 至此response读取处理完毕
    3. 整个packet完整了, 既包含了request信息, 又包含了reponse信息
    4. 具体如何处理大的包的读取(拆包问题), 后边详细介绍.
  3. SendThread: 往Map<String, Set>里注册watcher, key是clientPath
    1. p.watchRegistration.register(p.replyHeader.getErr());
    2. watches.put(clientPath, watchers);
  4. SendThread: 由于packet的callback为null, 因此
    1. p.finished = true;
    2. p.notifyAll();
    3. 终于到这里了, UserThread收到notifyAll的消息, 从block中唤醒.
  5. SendThread: IO Read流程结束. 重新新一轮的select流程

— watcher怎么漏了? 具体怎么watch的呢? 参见下边 Watcher流程 章节

(具体是怎么找到写时候对应的packet呢? 根据啥作为index? )

  • 根据严格的 FIFO顺序+xid
  • 即第一个发出去的packet, 必然会第一个收到reply. 这里有点不可思议: 如果网络包延时, 导致到达客户端的时候乱序怎么办?
  • 同时根据xid进行匹配校验.

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
SendThread 如何处理大包写问题: (拆包问题)
具体代码在这里: org.apache.zookeeper.ClientCnxnSocketNIO#doIO
大致流程:

  1. 首先从outgoingQueue中获取到packet (注意此时不会把packet从outgoingQuque中删除)

  2. 序列化packet, 放到packet.bb字段中, 会在开头4个字节表明包内容长度

  3. socketChannel.write(packet.bb)

  4. 判断bb.hasRemaining(); 如果为true, 则说明单次写入没写完.

  5. 重新往Selector里注册写需求: sockKey.interestOps(i | SelectionKey.OP_WRITE);

  6. 再次进入sendThread循环

  7. 由于上一步packet仍然在outgoingQueue中没被删除, 因此获取到了刚才的packet

  8. 由于已经序列化过, 即 packet.bb字段非空, 因此不需要重新序列化.

  9. 继续在之前的bb index处往后执行网络IO: socketChannel.write(packet.bb)

  10. 判断bb.hasRemaining(); 如果为true, 则循环 6~10 步骤

  11. bb.hasRemaining() 为false; 则说明该包写完成.

  12. 将该包从outgoingQueue中删除

  13. 将该包放到pendingQueue中

  14. 如果outgoingQueue为空, 则不再注册SelectionKey.OP_WRITE 事件

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
SendThread 如何处理大包读问题: (黏包问题)

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
ByteBuffer & SocketChannel 的合并用法:
ByteBuffer bb = xxx; // 假设1024B
SocketChannel.write(bb); // 这里如果单次只写出了1B, 那么是否会移动bb的cursor?
bb.hasRemaining(); // 这里是否为true?

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
心跳机制:

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —

Zk的watch机制是如何工作的?

ZKWatchManager 里三个重要的watcherMap:
dataWatches
existWatches: 所有校验exists的watcher都放在这里, key: clientPath
childWatches:
defaultWatcher: new ZooKeeper()时在构造方法里传入的. 可以用来订阅zk状态变化, 也可以订阅zk node变化

作用分别是啥?
如何处理路径级联问题?
例如在/root路径上注册了事件, 同时/root/dir 节点是存在的, 后边实际增加了 /root/dir/a 节点; 那么/root节点上注册watcher会收到事件么?

  1. 所有的watcher都是在EventThread线程里, processEvent()方法里, 串行执行的.
    通过LinkedBlockingQueue.take()一个一个串行执行.
    所以需要避免在注册的watcher.process方法里执行过重的操作. 否则会导致整个watcher机制延时.

Watcher流程

  1. SendThread: 服务端node有变化, 发送消息到客户端, 客户端网卡中数据readyForRead, 因此select()返回
  2. SendThread: 反序列化消息头; 根据消息头replyHeader.xid == -1 判定是watchEvent, 而不是普通packet的response.
  3. SendThread: 反序列化消息体; WatcherEvent 结构, 重要字段:
    1. Path: 即发生变化的node路径
    2. KeeperState: zk状态变化情况
    3. EventType: zk node变化情况, 例如node数据更新, noe
  4. SendThread: materialize() 找到该event对应的watcher
    1. 从watchRegistration里, 根据clientPath, map里获取到对应的watchers
    2. Remove掉路径对应的这个watcher
  5. SendThread: 把WatcherSetEventPair{event, watchers} 组合加到EventThread的waitingEvents queue中.
    所以当某个路径上的事件进入到watcher.process之后, 该watcher其实是已经被remove掉了. 如果有持续watch的需求, 还需要重新注册.

EventThread处理流程:
6. EventThread: 无限循环, 从waitingEvents中take一个Event
7. EventThread: 获取对应的WatcherSetEventPair
1. 从WatcherSetEventPair中获取到watchers,
2. 循环遍历执行, watcher.process(event)

// todo: EventThread processEvent里另外一个分支, Packet相关是怎么发生, 怎么处理的?

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
watcher有以下三个特性:

1、一次性:watch事件是一次性触发的,如果想再次监控数据,必须重新设置监控

2、客户端串行执行:客户端Watcher回调过程是一个串行同步的过程,这为我们保证了顺序。

3、轻量的:WatchedEvent是zk整个Watcher通知机制的最小通知单元,只有三部分组成:通知状态、时间类型、节点路径。也就是说,具体发生了什么变化,是需要客户端自己去查询的。

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
3、可以用exists方法:exists(path,true)来添加watch事件,当下次操作path会出发watch事件

例如:

    zk.exists(path,true);//给path添加watch事件

    zk.writeData(path,"newValue");//触发watch事件 EventType->nodeDataChenage



    zk.exists(path,true);//给path添加watch事件

    zk.create(path,"newValue",Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);//触发watch事件 EventType->nodeCreated



    zk.exists(path,true);//给path添加watch事件

    zk.delete(path,-1);//触发watch事件 EventType->nodeDeleted

4、getChildren(path,neeWatch) //给path添加watch事件,监控子节点变化

    如果path新增或者删除节点会触发watch事件 EventType->nodechildrenChanged

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
如果客户端启动多个zk, new ZooKeeper(), 对象结构是怎样的? 哪些是线程共用的? 哪些是线程私有的? 哪些是线程安全的?

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —

Etcd 与 ZK 区别?

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —