0X07:发布消息
NATS源码学习系列文章基于gnatsd1.0.0。该版本于2017年7月13 日发布(Release v1.0.0),在此之前v0.9.6是2016年12月 16日发布的,中间隔了半年。算是一个比较完备的版本,但是这个版本还没有增加集群支持。为什么选择这个版本呢? 因为一来这个版本比较稳定,同时也包含了集群管理和Stream 落地相关的逻辑,相对完善。
当其他客户端订阅完消息后,就可以进行消息发布了。看过前面的NATS 开源学习——0X05:订阅消息就会知道怎么样去解析一个订阅消息了, 发布消息也是一样。
协议解析
一条PUB协议长的类似下面这样:
PUB <subject> [reply-to] <#bytes>\r\n[payload]\r\n和其他协议一样,首先解析出PUB,然后将后面的部分存入到argbuf中。
不过稍微注意点的同学可能会发现,这里有两个换行"\r\n",那么按照之前的解析,应该只能到<#bytes>这里,而后面的[payload]则没有解析。
是的,在解析PUB协议的时候,这里会将状态置为MSG_PAYLOAD,然后再接着往后读取,知道下一个换行。并将内容存储在c.msgBuf中:
189 case MSG_PAYLOAD:
190 if c.msgBuf != nil {
...
200 c.msgBuf = c.msgBuf[:start+toCopy]
201 copy(c.msgBuf[start:], buf[i:i+toCopy])
202 // Update our index
203 i = (i + toCopy) - 1
204 } else {
205 // Fall back to append if needed.
206 c.msgBuf = append(c.msgBuf, b)
207 }...
同样在解析完上面的参数也就是PUB_ARG状态是会调用c.processPub,而下面的消息读完后会调用c.processMsg处理消息。
处理参数
参数处理在server/client.go里面的:
函数中,这里这个版本写的有问题,其实应该和SUB一样调用一下SplitArg,但是这里是把这个函数的代码重写了:
split完后,因为有可选参数,所以也是做个兼容处理。
得到参数后,就对参数进行合法校验,因为这里已经有了发布的主题和发布内容的长度:
超长的非法的主题都返回失败。
处理消息
处理完参数后,开始处理发布的消息内容,代码在server/client.go的:
这里首先会禁止发送"_SYS.>" 的主题的消息,然后做权限检查:
检查逻辑和SUB里面的类似。
然后会优先去到订阅cache列表中,查看是否有缓存,如果没有则去svr.sl里面按照主题进行匹配:
如果找到的话,同时也更新到cache中。然后准备对这些搜寻到的客户端进行消息转发
消息转发
消息转发是通过协议MSG进行的,MSG消息大概是这样:
所以转发前先多消息进行组合:
这里组合一个消息头,也就是第一个换行"\r\n"前面的内容。
然后依次对每个客户端进行消息发送:
这里 deliverMsg为:
这里的 bw是:
也就是bufio.Writer进行写数据。
看到这里,可能有同学会联想到前面的时序图,那就是gnatsd在处理客户端是,读入消息,然后处理,然后向多个客户端进行写,万一某个客户端,在Write 的时候塞住了,那么对这个客户端后续的消息处理就可能会延迟。
是的,在这个版本中,处理逻辑确实是这样。
总结
发布消息的过程,其实就是接受客户端发布的消息内容,然后根据主题找到订阅了这个主题的所有客户端,根据需要将消息写往这些客户端。 在这个版本(v1.0.0)写操作是直接调用的io的flush,而在v1.4.x里面,这里做了一个缓存,有另外一个专门的goroutine进行flush操作。
Last updated
Was this helpful?