0X07:发布消息

NATS源码学习系列文章基于gnatsd1.0.0。该版本于2017年7月13 日发布(Release v1.0.0),在此之前v0.9.6是2016年12月 16日发布的,中间隔了半年。算是一个比较完备的版本,但是这个版本还没有增加集群支持。为什么选择这个版本呢? 因为一来这个版本比较稳定,同时也包含了集群管理和Stream 落地相关的逻辑,相对完善。

当其他客户端订阅完消息后,就可以进行消息发布了。看过前面的NATS 开源学习——0X05:订阅消息就会知道怎么样去解析一个订阅消息了, 发布消息也是一样。

协议解析

一条PUB协议长的类似下面这样:

PUB <subject> [reply-to] <#bytes>\r\n[payload]\r\n

和其他协议一样,首先解析出PUB,然后将后面的部分存入到argbuf中。

不过稍微注意点的同学可能会发现,这里有两个换行"\r\n",那么按照之前的解析,应该只能到<#bytes>这里,而后面的[payload]则没有解析。

是的,在解析PUB协议的时候,这里会将状态置为MSG_PAYLOAD,然后再接着往后读取,知道下一个换行。并将内容存储在c.msgBuf中:

189         case MSG_PAYLOAD:
190             if c.msgBuf != nil {
...
200                     c.msgBuf = c.msgBuf[:start+toCopy]
201                     copy(c.msgBuf[start:], buf[i:i+toCopy])
202                     // Update our index
203                     i = (i + toCopy) - 1
204                 } else {
205                     // Fall back to append if needed.
206                     c.msgBuf = append(c.msgBuf, b)
207                 }

...

同样在解析完上面的参数也就是PUB_ARG状态是会调用c.processPub,而下面的消息读完后会调用c.processMsg处理消息。

处理参数

参数处理在server/client.go里面的:

函数中,这里这个版本写的有问题,其实应该和SUB一样调用一下SplitArg,但是这里是把这个函数的代码重写了:

split完后,因为有可选参数,所以也是做个兼容处理。

得到参数后,就对参数进行合法校验,因为这里已经有了发布的主题和发布内容的长度:

超长的非法的主题都返回失败。

处理消息

处理完参数后,开始处理发布的消息内容,代码在server/client.go的:

这里首先会禁止发送"_SYS.>" 的主题的消息,然后做权限检查:

检查逻辑和SUB里面的类似。

然后会优先去到订阅cache列表中,查看是否有缓存,如果没有则去svr.sl里面按照主题进行匹配:

如果找到的话,同时也更新到cache中。然后准备对这些搜寻到的客户端进行消息转发

消息转发

消息转发是通过协议MSG进行的,MSG消息大概是这样:

所以转发前先多消息进行组合:

这里组合一个消息头,也就是第一个换行"\r\n"前面的内容。

然后依次对每个客户端进行消息发送:

这里 deliverMsg为:

这里的 bw是:

也就是bufio.Writer进行写数据。

看到这里,可能有同学会联想到前面的时序图,那就是gnatsd在处理客户端是,读入消息,然后处理,然后向多个客户端进行写,万一某个客户端,在Write 的时候塞住了,那么对这个客户端后续的消息处理就可能会延迟。

是的,在这个版本中,处理逻辑确实是这样。

总结

发布消息的过程,其实就是接受客户端发布的消息内容,然后根据主题找到订阅了这个主题的所有客户端,根据需要将消息写往这些客户端。 在这个版本(v1.0.0)写操作是直接调用的io的flush,而在v1.4.x里面,这里做了一个缓存,有另外一个专门的goroutine进行flush操作。

Last updated

Was this helpful?