0X06:消息存储结构
NATS源码学习系列文章基于gnatsd1.0.0。该版本于2017年7月13 日发布(Release v1.0.0),在此之前v0.9.6是2016年12月 16日发布的,中间隔了半年。算是一个比较完备的版本,但是这个版本还没有增加集群支持。为什么选择这个版本呢? 因为一来这个版本比较稳定,同时也包含了集群管理和Stream 落地相关的逻辑,相对完善。
在前面的NATS 开源学习——0X05:订阅消息中讲到订阅消息会在类型为Sublist 的svr.sl中插入一个subscription, 而在取消订阅的时候 则会从svr.sl中删除一个subscription。
那么server里面的的sl是什么呢?
看字面意思像是一个列表,在来看Sublist的定义,在server/sublist.go里面:
实际上这个 Sublist 是一个按照"."做为分割的消息订阅树。NATS规定,消息主题是可以用"."做分割的,并可以通过通配符进行订阅,这里就通过树形结构来 组织这些主题,并把订阅内容记录在叶子节点中。
订阅消息树
这里可以将订阅消息树看作成是一个单链表,链表结构为 level,在server/sublist.go中:
真正表示一个订阅主题是这里的subscription,他定义在server/client.go中:
相关成员的含义已经在注释中标出。实际上subscription的存储结构是这样的:
最上面是Sublist里面的 rootlevel。每个Level包含了一个node的Map,key为这个node表示的subscription的主题,以及另外两个表示带有通配符 "*":pwc节点和">":fwc节点。
比如图中包含了四个主题,分别是"TopicOne"、"TopicTwo"、 "TopicThree”, 因为不含通配符,所以这几个Node就都在RootLevel的nodes 字典内。
而在Node里面保存了每个具体的订阅subscription,这里对于"TopicOne"那就是主题为"TopicOne"的subscription,存储在node的psubs这个成员中。 每当有clinet订阅"TopicOne"的时候就会在“TopicOne"的Node的psubs数组中增加一个subscription,里面记录了是哪个clinet订阅的。
对于"TopicTwo",假设要订阅“TopicTwo.abc”这样的主题,会先生成“TopicTwo.abc” 这样的node,并存在“TopicTwo”的节点Level字典nodes中。这样就可以 从Root链接到"TopicTwo"再链接到"TopicTwo.abc"。然后将订阅的内容subscription放入“TopicTwo.abc” node的psubs数组中。
而当订阅带有通配符""或者">"时,比如这里订阅"TopicThree.>",则是创建一个node:"TopicThree.>"将其放入"TopicThree"节点Level的fwc节点中表示 ">"的通配符。而订阅如 "TopicThree..abc" 则先创建一个node:"TopicThree."将其放入"TopicThree"节点Level的pwc节点中表示 ""的通配符,然后再在个节点Level的nodes中增加一个"TopicThree..abc" 节点,表示具体的主题节点,然后将订阅的内容subscription放入“TopicThree..abc” node的psubs数组中。
通过这样构建的树形结构,就可以将订阅主题,按照"."进行分割后,依次找到最后的叶子节点,并将订阅的subscription插入到叶子节点的psubs数组中。
插入订阅
我们来看插入逻辑:
首先对主题做"." 的分割:
然后对每个层级进行检索,按照上面的算法,该创建节点创建节点,需要连接节点Level,创建节点Level。
最后将订阅的subscription插入到qsubs数组中, 135 if sub.queue == nil { 136 n.psubs = append(n.psubs, sub) 137 } else { 138 // This is a queue subscription 139 if i := findQSliceForSub(sub, n.qsubs); i >= 0 { 140 n.qsubs[i] = append(n.qsubs[i], sub) 141 } else { 142 n.qsubs = append(n.qsubs, []*subscription{sub}) 143 } 144 }
这里有个队列订阅,如果是的话,放入psubs数组,否则放入qsubs二维数组,这里用个二维数组主要是为后面的pub发布消息寻找降低 运算复杂度,否则这里就需要一个字典来实现了。
通过上面的步骤就完成了一课订阅树的创建和增加了。
删除订阅
来看SubList的删除函数,首先还是分割主题,按照路径找到相关的node:
然后将要处理的节点标记出来:
然后将这个订阅subscription从相应的node上删除:
回头再处理上面记录的路径level:
这里pruneNode就是插入node的时候的反操作,如果是pwc/fwc直接赋nil,否则从list里面删除。
删除部分的代码基本是插入操作的反操作,根据上面代码就可以知道大概意思,不用再赘述。
总结
订阅消息树实际上是一种查找树,或者是路径树。通过逐级查找,找到最终的叶子节点,然后将订阅的内容存入到叶子节点中。这样当有消息发布的时候, 只要按照主题找到这些叶子节点中记录的订阅客户端,就可以给相应的客户端发送消息了。
Last updated
Was this helpful?