0X03:Client服务
NATS源码学习系列文章基于gnatsd1.0.0。该版本于2017年7月13 日发布(Release v1.0.0),在此之前v0.9.6是2016年12月 16日发布的,中间隔了半年。算是一个比较完备的版本,但是这个版本还没有增加集群支持。为什么选择这个版本呢? 因为一来这个版本比较稳定,同时也包含了集群管理和Stream 落地相关的逻辑,相对完善。
在前面的文章NATS 开源学习——0X02:Server构造中说到,Server在接受到一个客户端的TCP 连接后,创建一个Client对象。Client对象是承载单个客户端连接逻辑的对象。Client会开启一个readloop,读消息,然后处理消息
readloop
readloop的逻辑大概是:
257 func (c *client) readLoop() {
...
276 for {
277 n, err := nc.Read(b)
...
289
290 if err := c.parse(b[:n]); err != nil {
...
298 }
}
...
这里,首先从连接中读取TCP流中的数据,然后调用client.parse 函数对读取到的内容做解析。这里解析其实也是包含了处理逻辑。
client.parse函数的定义在"server/parse.go" 文件中,在后面协议解析部分的文章我们在详细说明。这里只要知道,parse里面会对 这个读取到的buffer做解析并做相关处理。
这里可能会问到,如果一次Read没有读取完整的消息会怎么办呢?这时看下client的定义:
87 type client struct {
...
111 parseState
...
118 }
17 type parseState struct {
18 state int
19 as int
20 drop int
21 pa pubArg
22 argBuf []byte
23 msgBuf []byte
24 scratch [MAX_CONTROL_LINE_SIZE]byte
25 }
这里client继承了协议解析状态机状态"parseState"。因此可以将这里的readloop想象成一个对流处理的处理器:

逐个字节的处理,每读入一个自己,根据当前的协议状态判断下一步如何处理,继续读取下一个字节解析还是已经完成读入一个完整的message,进行 消息的处理。
Flush
在gnatsd的1.4.x中 除了上面的readLoop之外 是还有writeLoop的,用来将要发送给客户端的数据写到网络IO,就是读IO有个groutine,写IO也有个groutine。 但是在我们看的这个版本的gnatsd中,服务一个客户端连接的只有一个groutine(参考NATS 开源学习——0X02:Server构造中的时序图)

把读消息,处理消息,写消息都融合在了readLoop里面。在上面的readloop里面有:
307 for cp := range c.pcd {
308 // Flush those in the set
310 if cp.nc != nil {
...
316 cp.nc.SetWriteDeadline(time.Now().Add(opts.WriteDeadline))
317 err := cp.bw.Flush()
318 cp.nc.SetWriteDeadline(time.Time{})
319 if err != nil {
...
324 } else {
325 // Update outbound last activity.
326 cp.last = last
327 // Check if we should tune the buffer.
328 sz := cp.bw.Available()
329 // Check for expansion opportunity.
330 if wfc > 2 && sz <= maxBufSize/2 {
331 cp.bw = bufio.NewWriterSize(cp.nc, sz*2)
332 }
333 // Check for shrinking opportunity.
334 if wfc == 0 && sz >= minBufSize*2 {
335 cp.bw = bufio.NewWriterSize(cp.nc, sz/2)
336 }
337 }
338 }
339 cp.mu.Unlock()
340 delete(c.pcd, cp)
341 }
在处理发布消息的时候,就会调用 client.deliverMsg将其他的client挂在这个c.pcd里面:
87 type client struct {
...
104 pcd map[*client]struct{}
...
}
然后在每次loop里面,会将要处理的订阅消息发送给这里挂的其他订阅了的客户端。
关于订阅处理,会在后面的NATS 开源学习——0X06:发布消息中介绍。这里介绍的是client是如何处理消息的接受、处理和发送。
总结
通过readloop,Client完成了消息的读取、解析、处理和发送。一个client就是一个goroutine。多个client之间互不影响,无需异步IO操作,逻辑容易理解。 这也就是groutine带来的高效和高性能吧。
Last updated
Was this helpful?