0X05:订阅消息

NATS源码学习系列文章基于gnatsd1.0.0。该版本于2017年7月13 日发布(Release v1.0.0),在此之前v0.9.6是2016年12月 16日发布的,中间隔了半年。算是一个比较完备的版本,但是这个版本还没有增加集群支持。为什么选择这个版本呢? 因为一来这个版本比较稳定,同时也包含了集群管理和Stream 落地相关的逻辑,相对完善。

NATS总共有10条协议:

NATS 开源学习——0X04:协议解析中已经介绍了其中的6个,但是这些还不是最主要的逻辑,属于边角料。既然是MQ,最重要的当然是SUB/PUB了,这里我们先来看 订阅协议。

SUB协议解析

一条SUB协议大概是这样的:

首先是SUB关键字,然后跟在后面是订阅的主题以及一个可选订阅组,再后面是客户端生成的订阅ID,最后换行结束。

在读取消息的时候,根据NATS 开源学习——0X04:协议解析的介绍,先解析出SUB关键字,然后开始处理参数。这次参数不再是JSON格式,所以需要 根据自定义的协议进行解析:

调用splitArg进行解析,其实就是将上面的格式,按照空格进行分割,得到上面说的几个部分。相关代码可以再splitArg的定义中找到,不在列出, 这里其实是可以用strings.Split进行替换的,没必要自己去实现一个,只是需要注意下'\r','\n'。

同时这里因为订阅组是可选的内容,所以

如果没有则设置为nil。

订阅权限

在客户端连接的时候会先注册其订阅权限,processConnect里面有

这里会调用:

然后会注册这个客户端到订阅的列表中:

插入过程会先构建一个subscription对象:

其中permission的定义为:

实际上就是一个保持了订阅主题和发布主题的列表。

然后在订阅处理中检查权限,判断客户端是否可以进行订阅操作:

”c.canSubscribe“就是检查上面的permissions的允许列表。

订阅处理

在解析了参数后会创建一个subscription订阅对象:

然后对router转发做一个过滤,比如两个router转发了同一个客户端的请求,此时订阅ID是相同的:

只对第一个进行处理,并将其存入client里面的保存这个客户端订阅的字典中:

接着会将这个subscription插入一个订阅到列表中:

关于插入的逻辑和内部存储的数据结构,在后面的NATS 开源学习——0X06:消息存储结构中会详细说明

UNSUB取消订阅

而如果要取消订阅则使用UNSUB协议:

这里 sid为之前订阅的时传入的主题ID,而max_msgs为可选的参数,表示退订前最大可接受的消息条数。

和前面解析SUB时一样,这里也是先解析出UNSUB命令字,然后对其后面的参数进行解析,这里也不是JSON,而是空格分割的两个参数:

因为 max是个可选的参数,所以根据分割参数后的数目来进行决定赋值。

然后根据这里的sid找到当时订阅产生的subscription,并执行c.unsubscribe操作:

而unsubscribe的逻辑主要是 :

删除SUB时插入到client.subs字典里面的subscription,并从SubList中删除这个subscription。 关于删除的逻辑和内部存储的数据结构,在后面的NATS 开源学习——0X06:消息存储结构中会详细说明

总结

订阅消息的过程,首先是解析订阅协议,并产生一个subscription,让然后将其记录并存入到订阅列表中,而取消订阅则是相反的操作,分别从 client的记录和订阅列表中删除这个subscription。在订阅之前还会进行权限检查,以支持权限系统。

Last updated

Was this helpful?