NATS源码学习系列文章基于gnatsd1.0.0 。该版本于2017年7月13 日发布(Release v1.0.0 ),在此之前v0.9.6是2016年12月 16日发布的,中间隔了半年。算是一个比较完备的版本,但是这个版本还没有增加集群支持。为什么选择这个版本呢? 因为一来这个版本比较稳定,同时也包含了集群管理和Stream 落地相关的逻辑,相对完善。
目录结构
抛开第一级目录的其他文件,现在开始聚焦到server这个目录来:
Copy server cz$ find . -name "*\.go" |grep -v test | xargs wc -l | sort -d -k 1
12 ./pse/pse_solaris.go
13 ./pse/pse_rumprun.go
16 ./service.go
23 ./pse/pse_darwin.go
36 ./errors.go
50 ./monitor_sort_opts.go
55 ./ciphersuites_1.5.go
56 ./util.go
64 ./ciphersuites_1.8.go
72 ./pse/pse_freebsd.go
90 ./signal_windows.go
93 ./const.go
94 ./service_windows.go
115 ./pse/pse_linux.go
139 ./signal.go
163 ./log.go
190 ./auth.go
268 ./pse/pse_windows.go
527 ./monitor.go
640 ./sublist.go
648 ./reload.go
738 ./parser.go
762 ./route.go
895 ./opts.go
1047 ./server.go
1410 ./client.go
8216 total
抛开test文件,总共只有26个文件,8K代码。所以gnatsd核心还是比较简单的,休闲之余就可以将其代码通读一遍,跟着我们的文章走也很快。
创建Server对象
在server.go里面有:
Copy 100 // New will setup a new server struct after parsing the options.
101 func New(opts *Options) *Server {
...
123 s := &Server{
124 configFile: opts.ConfigFile,
125 info: info,
126 sl: NewSublist(),
127 opts: opts,
128 done: make(chan bool, 1),
129 start: now,
130 configTime: now,
131 }
...
157 return s
}
创建Server的时候,用选项opts和配置文件opts.ConfigFile初始化一个Server对象,Server为:
Copy 47 // Server is our main struct.
48 type Server struct {
49 gcid uint64
50 grid uint64
51 stats
52 mu sync.Mutex
53 info Info
54 infoJSON []byte
55 sl *Sublist
56 configFile string
57 optsMu sync.RWMutex
58 opts *Options
59 running bool
60 shutdown bool
61 listener net.Listener
62 clients map[uint64]*client
63 routes map[uint64]*client
64 remotes map[string]*client
65 users map[string]*User
66 totalClients uint64
67 done chan bool
68 start time.Time
69 http net.Listener
70 httpHandler http.Handler
71 profiler net.Listener
72 httpReqStats map[string]uint64
73 routeListener net.Listener
74 routeInfo Info
75 routeInfoJSON []byte
76 rcQuit chan bool
77 grMu sync.Mutex
78 grTmpClients map[uint64]*client
79 grRunning bool
80 grWG sync.WaitGroup // to wait on various go routines
81 cproto int64 // number of clients supporting async INFO
82 configTime time.Time // last time config was loaded
83 logging struct {
84 sync.RWMutex
85 logger Logger
86 trace int32
87 debug int32
88 }
89 }
这里感性认识一下就好了,后面说到具体逻辑的时候回用到响应的成员。
运行Server
在main函数中,我们看到是通过:server.Run(s)
来启动Server的,他实际上在 services.go中:
Copy 6 // Run starts the NATS server. This wrapper function allows Windows to add a
7 // hook for running NATS as a service.
8 func Run(server *Server) error {
9 server.Start()
10 return nil
11 }
可以看到实际上是调用了server.Start函数,这里通过编译选项控制了下平台:
Copy 1 // +build !windows
因为在windows上需要对windows service做兼容,所以有个单独的文件”service_windows.go” 来实现相关逻辑,该跨平台操作主要通过 “golang.org/x/sys/windows/svc"来实现对"Windows service"的支持:
Copy 36 func (w *winServiceWrapper) Execute(args []string, changes <-chan svc.ChangeRequest,
37 status chan<- svc.Status) (bool, uint32) {
38
39 status <- svc.Status{State: svc.StartPending}
40 go w.server.Start()
不管哪个平台,其实最终都是Server对象的Start函数。
Server.Start()
先来看代码:
Copy 237 func (s *Server) Start() {
241 // Avoid RACE between Start() and Shutdown()
242 s.mu.Lock()
243 s.running = true
244 s.mu.Unlock()
245
246 s.grMu.Lock()
247 s.grRunning = true
248 s.grMu.Unlock()
....
259
260 // Start monitoring if needed
261 if err := s.StartMonitoring(); err != nil {
...
265
266 // The Routing routine needs to wait for the client listen
267 // port to be opened and potential ephemeral port selected.
268 clientListenReady := make(chan struct{})
270 // Start up routing as well if needed.
271 if opts.Cluster.Port != 0 {
272 s.startGoRoutine(func() {
273 s.StartRouting(clientListenReady)
274 })
275 }
276
277 // Pprof http endpoint for the profiler.
278 if opts.ProfPort != 0 {
279 s.StartProfiler()
280 }
281
282 // Wait for clients.
283 s.AcceptLoop(clientListenReady)
284 }
最开始的地方通过mutex控制,设置服务状态的标记位。
然后启动Monitor监控以及接受其他服务消息的Router服务,需要的话启动Profile。
这里看最后一步s.AcceptLoop
,这里想象普通的网络程序,这里开启了一个Loop来接受客户端的TCP链接。
AcceptLoop
来看代码:
Copy 370 // AcceptLoop is exported for easier testing.
371 func (s *Server) AcceptLoop(clr chan struct{}) {
...
430 for s.isRunning() {
431 conn, err := l.Accept()
432 if err != nil {
433 if ne, ok := err.(net.Error); ok && ne.Temporary() {
434 s.Debugf("Temporary Client Accept Error(%v), sleeping %dms",
435 ne, tmpDelay/time.Millisecond)
436 time.Sleep(tmpDelay)
437 tmpDelay *= 2
438 if tmpDelay > ACCEPT_MAX_SLEEP {
439 tmpDelay = ACCEPT_MAX_SLEEP
440 }
441 } else if s.isRunning() {
442 s.Noticef("Accept error: %v", err)
443 }
444 continue
445 }
446 tmpDelay = ACCEPT_MIN_SLEEP
447 s.startGoRoutine(func() {
448 s.createClient(conn)
449 s.grWG.Done()
450 })
451 }
452 s.Noticef("Server Exiting..")
453 s.done <- true
454 }
这里传入的clr,最终当循环退出时,会传递一个消息到channel中,通知启动Server.Start()的调用者,服务结束了。
而这里的:for s.isRunning() {
形成了真正的AcceptLoop等待客户端过来创建TCP链接。
每当Accept一条心链接后,开启一个goroutine用这个链接创建一个Client对象。
ReadLoop
创建client的代码是这样的:
Copy 642 func (s *Server) createClient(conn net.Conn) *client {
...
646 c := &client{srv: s, nc: conn, opts: defaultOpts, mpay: int64(opts.MaxPayload), start: time.Now()}
...
658
659 // Initialize
660 c.initClient()
664 // Send our information.
665 c.sendInfo(info)
...
743 // Spin up the read loop.
744 s.startGoRoutine(func() { c.readLoop() })
...
}
首先创建一个client对象并将链接conn传个client。然后对client进行初始化,并向客户端发送INFO(想想在NATS 开源学习——0X00:协议 中介绍的协议) 。接着开启一个routinue执行client的readLoop。相关代码,等我们分析client的时候再展开,实际上就是从客户端读消息然后处理消息。
这块逻辑在1.4.x版本中有些不同,在新版本中还开启了一个writeLoop,用来flush缓存中的数据到客户端,这样做就可以对消息进行读写分离,并提高写的 效率。在1.0.0中还是读写在同一个goroutine里。
goroutine管理
看到上面的代码中,启动一个goroutine都是通过s.startGoRoutine
整个函数的:
Copy 987 func (s *Server) startGoRoutine(f func()) {
988 s.grMu.Lock()
989 if s.grRunning {
990 s.grWG.Add(1)
991 go f()
992 }
993 s.grMu.Unlock()
994 }
这里显示用一个锁来控制对s.grWG的修改,然后给waitgroup s.grWG做增1操作. 然后在调用时有:
Copy 447 s.startGoRoutine(func() {
448 s.createClient(conn)
449 s.grWG.Done()
450 })
也就是client链接断开是做waitgroup的Done操作。
在Server的ShutDown里面有
Copy 366 // Wait for go routines to be done.
367 s.grWG.Wait()
等待所有链接断开并回收groutine。
时序
现在我们再回过头来看整个时序关系。
就大概了解了:
服务Server先开一个AcceptLoop用来接收客户端TCP链接。
接收到一个客户端的链接后,启动一个ReadLoop来接收客户端发送过来的消息。
整个readloop负责收消息然后处理消息,直到退出。
服务Server通过WaitGroup来管理所有的客户端链接状况。
总结
这一篇,从Server入手,看一个服务进程是怎么起来的,然后分几个goroutine,分别是做什么的,主控服务怎么对服务每个客户端的goroutine进行管理的。 可以说是了解了服务的整体结构吧。