0X06:消息存储结构
NATS源码学习系列文章基于gnatsd1.0.0。该版本于2017年7月13 日发布(Release v1.0.0),在此之前v0.9.6是2016年12月 16日发布的,中间隔了半年。算是一个比较完备的版本,但是这个版本还没有增加集群支持。为什么选择这个版本呢? 因为一来这个版本比较稳定,同时也包含了集群管理和Stream 落地相关的逻辑,相对完善。
在前面的NATS 开源学习——0X05:订阅消息中讲到订阅消息会在类型为Sublist 的svr.sl中插入一个subscription, 而在取消订阅的时候 则会从svr.sl中删除一个subscription。
那么server里面的的sl是什么呢?
47 // Server is our main struct.
48 type Server struct {
55 sl *Sublist
看字面意思像是一个列表,在来看Sublist的定义,在server/sublist.go里面:
40 // A Sublist stores and efficiently retrieves subscriptions.
41 type Sublist struct {
42 sync.RWMutex
43 genid uint64
44 matches uint64
45 cacheHits uint64
46 inserts uint64
47 removes uint64
48 cache map[string]*SublistResult
49 root *level
50 count uint32
51 }
实际上这个 Sublist 是一个按照"."做为分割的消息订阅树。NATS规定,消息主题是可以用"."做分割的,并可以通过通配符进行订阅,这里就通过树形结构来 组织这些主题,并把订阅内容记录在叶子节点中。
订阅消息树
这里可以将订阅消息树看作成是一个单链表,链表结构为 level,在server/sublist.go中:
53 // A node contains subscriptions and a pointer to the next level.
54 type node struct {
55 next *level
56 psubs []*subscription // 当前节点包含的主题
57 qsubs [][]*subscription // 当前节点包含的主题队列
58 }
62 type level struct {
63 nodes map[string]*node // 当前分级上所有的节点
64 pwc, fwc *node // pwc: *通配符 fwc: >通配符
65 }
真正表示一个订阅主题是这里的subscription,他定义在server/client.go中:
161 type subscription struct {
162 client *client // 表示哪个客户端
163 subject []byte // 订阅的主题
164 queue []byte // 所在队列
165 sid []byte // 订阅的ID
166 nm int64
167 max int64
168 }
相关成员的含义已经在注释中标出。实际上subscription的存储结构是这样的:

最上面是Sublist里面的 rootlevel。每个Level包含了一个node的Map,key为这个node表示的subscription的主题,以及另外两个表示带有通配符 "*":pwc节点和">":fwc节点。
比如图中包含了四个主题,分别是"TopicOne"、"TopicTwo"、 "TopicThree”, 因为不含通配符,所以这几个Node就都在RootLevel的nodes 字典内。
而在Node里面保存了每个具体的订阅subscription,这里对于"TopicOne"那就是主题为"TopicOne"的subscription,存储在node的psubs这个成员中。 每当有clinet订阅"TopicOne"的时候就会在“TopicOne"的Node的psubs数组中增加一个subscription,里面记录了是哪个clinet订阅的。
对于"TopicTwo",假设要订阅“TopicTwo.abc”这样的主题,会先生成“TopicTwo.abc” 这样的node,并存在“TopicTwo”的节点Level字典nodes中。这样就可以 从Root链接到"TopicTwo"再链接到"TopicTwo.abc"。然后将订阅的内容subscription放入“TopicTwo.abc” node的psubs数组中。
而当订阅带有通配符""或者">"时,比如这里订阅"TopicThree.>",则是创建一个node:"TopicThree.>"将其放入"TopicThree"节点Level的fwc节点中表示 ">"的通配符。而订阅如 "TopicThree..abc" 则先创建一个node:"TopicThree."将其放入"TopicThree"节点Level的pwc节点中表示 ""的通配符,然后再在个节点Level的nodes中增加一个"TopicThree..abc" 节点,表示具体的主题节点,然后将订阅的内容subscription放入“TopicThree..abc” node的psubs数组中。
通过这样构建的树形结构,就可以将订阅主题,按照"."进行分割后,依次找到最后的叶子节点,并将订阅的subscription插入到叶子节点的psubs数组中。
插入订阅
我们来看插入逻辑:
83 // Insert adds a subscription into the sublist
84 func (s *Sublist) Insert(sub *subscription) error {
85 // copy the subject since we hold this and this might be part of a large byte slice.
首先对主题做"." 的分割:
86 subject := string(sub.subject)
87 tsa := [32]string{}
88 tokens := tsa[:0]
89 start := 0
90 for i := 0; i < len(subject); i++ {
91 if subject[i] == btsep {
92 tokens = append(tokens, subject[start:i])
93 start = i + 1
94 }
95 }
96 tokens = append(tokens, subject[start:])
然后对每个层级进行检索,按照上面的算法,该创建节点创建节点,需要连接节点Level,创建节点Level。
104 for _, t := range tokens {
105 if len(t) == 0 || sfwc {
106 s.Unlock()
107 return ErrInvalidSubject
108 }
109
110 switch t[0] {
111 case pwc:
112 n = l.pwc
113 case fwc:
114 n = l.fwc
115 sfwc = true
116 default:
117 n = l.nodes[t]
118 }
119 if n == nil {
120 n = newNode()
121 switch t[0] {
122 case pwc:
123 l.pwc = n
124 case fwc:
125 l.fwc = n
126 default:
127 l.nodes[t] = n
128 }
129 }
130 if n.next == nil {
131 n.next = newLevel()
132 }
133 l = n.next
134 }
最后将订阅的subscription插入到qsubs数组中, 135 if sub.queue == nil { 136 n.psubs = append(n.psubs, sub) 137 } else { 138 // This is a queue subscription 139 if i := findQSliceForSub(sub, n.qsubs); i >= 0 { 140 n.qsubs[i] = append(n.qsubs[i], sub) 141 } else { 142 n.qsubs = append(n.qsubs, []*subscription{sub}) 143 } 144 }
这里有个队列订阅,如果是的话,放入psubs数组,否则放入qsubs二维数组,这里用个二维数组主要是为后面的pub发布消息寻找降低 运算复杂度,否则这里就需要一个字典来实现了。
通过上面的步骤就完成了一课订阅树的创建和增加了。
删除订阅
来看SubList的删除函数,首先还是分割主题,按照路径找到相关的node:
312 func (s *Sublist) Remove(sub *subscription) error {
313 subject := string(sub.subject)
314 tsa := [32]string{}
315 tokens := tsa[:0]
316 start := 0
317 for i := 0; i < len(subject); i++ {
318 if subject[i] == btsep {
319 tokens = append(tokens, subject[start:i])
320 start = i + 1
321 }
322 }
323 tokens = append(tokens, subject[start:])
然后将要处理的节点标记出来:
352 if n != nil {
353 levels = append(levels, lnt{l, n, t})
354 l = n.next
355 } else {
356 l = nil
357 }
然后将这个订阅subscription从相应的node上删除:
359 if !s.removeFromNode(n, sub) {
360 return ErrNotFound
361 }
回头再处理上面记录的路径level:
366 for i := len(levels) - 1; i >= 0; i-- {
367 l, n, t := levels[i].l, levels[i].n, levels[i].t
368 if n.isEmpty() {
369 l.pruneNode(n, t)
370 }
371 }
这里pruneNode就是插入node的时候的反操作,如果是pwc/fwc直接赋nil,否则从list里面删除。
删除部分的代码基本是插入操作的反操作,根据上面代码就可以知道大概意思,不用再赘述。
总结
订阅消息树实际上是一种查找树,或者是路径树。通过逐级查找,找到最终的叶子节点,然后将订阅的内容存入到叶子节点中。这样当有消息发布的时候, 只要按照主题找到这些叶子节点中记录的订阅客户端,就可以给相应的客户端发送消息了。
Last updated
Was this helpful?