0X08:Router转发

NATS源码学习系列文章基于gnatsd1.0.0。该版本于2017年7月13 日发布(Release v1.0.0),在此之前v0.9.6是2016年12月 16日发布的,中间隔了半年。算是一个比较完备的版本,但是这个版本还没有增加集群支持。为什么选择这个版本呢? 因为一来这个版本比较稳定,同时也包含了集群管理和Stream 落地相关的逻辑,相对完善。

每篇的引用里面都有说NATS支持集群,那么是怎么支持的呢?我们来看配置文档

里面有:

# Cluster definition
cluster {

  listen: localhost:4244 # host/port for inbound route connections

  # Authorization for route connections
  authorization {
    user: route_user
    # ./util/mkpasswd -p T0pS3cr3tT00!
        password: $2a$11$xH8dkGrty1cBNtZjhPeWJewu/YPbSU.rXJWmS6SFilOBXzmZoMk9m
    timeout: 0.5
  }

  # Routes are actively solicited and connected to from this server.
  # Other servers can connect to us if they supply the correct credentials
  # in their routes definitions from above.
  routes = [
    nats-route://user1:pass1@127.0.0.1:4245
    nats-route://user2:pass2@127.0.0.1:4246
  ]
}

这里看到,首先gnatsd需要新监听一个IP:Port : listen 字段。然后还指定了 routes。这里的routes是这个gnatsd可以连的服务器,而listen是这个gnatsd可以被 其他服务器连的地址。因此就产生了这样的结构:

图中是个环行结构,其实可以自由组合,本质就是一个gnatsd连到另一个gnatsd上。这样,即使client_1连在gnatsd_3上面,也可以将消息发布到 连在gnatsd_1上的client_2上面,只要他们订阅的消息主题是一致的。

作为服务器端

因为gnatsd在这个时候既要接受其他gnatsd的链接,也要链接其他gnatsd,所以我们先来看作为服务器时,如何处理链接。

服务器回起一个接受gnatsd来链接的goroutine:

601 func (s *Server) routeAcceptLoop(ch chan struct{}) {
...
658     for s.isRunning() {
659         conn, err := l.Accept()
...
675         s.startGoRoutine(func() {
676             s.createRoute(conn, nil)
677             s.grWG.Done()
678         })
679     }    

这里创建Route的实现为:

312 func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
...
r := &route{didSolicit: didSolicit}
...
c := &client{srv: s, nc: conn, opts: clientOpts{}, typ: ROUTER, route: r}
...
336     // Initialize
337     c.initClient()    
...
409     // Spin up the read loop.
410     s.startGoRoutine(func() { c.readLoop() })    ...
424     // Send our info to the other side.
425     c.sendInfo(infoJSON)    

这里可以看到和处理客户端连接是一样的。也是先创建一个client,然后初始化,然后起client的readLoop goroutine,最后发送INFO给客户端。

作为客户端

作为客户端时,首先要连接其他Router:

 57 // Lock should be held entering here.
 58 func (c *client) sendConnect(tlsRequired bool) {
 ...
 78     c.sendProto([]byte(fmt.Sprintf(ConProto, b)), true)
 79 }

这里直接组合一个CONNECT协议,进行发送。

我们在NATS 开源学习——0X04:协议解析有说到在gnatsd解析INFO的实际上是调用了"server/router.go"里面的:

 81 // Process the info message if we are a route.
 82 func (c *client) processRouteInfo(info *Info) {

这里会记录其他Router的信息,用于后面的转发。

转发消息

订阅

在gnatsd处理客户端的SUB的消息里面有:

 750 func (c *client) processSub(argo []byte) (err error) {
 751     c.traceInOp("SUB", argo)
 ...
 812     if shouldForward {
 813         c.srv.broadcastSubscribe(sub)
 814     }

这里如果有连接到其他router,就要进行srv.broadcastSubscribe,其会调用broadcastInterestToRoutes在server/router.go里面:

556 func (s *Server) broadcastInterestToRoutes(proto string) {

561     protoAsBytes := []byte(proto)
562     s.mu.Lock()
563     for _, route := range s.routes {

566         route.sendProto(protoAsBytes, true)

569     }
571 }

这里就是向各个Router发送了SUB。其他Router收到这个SUB后,也会在自己的SubList里面构建相关的subscription。

发布

在gnatsd处理客户端的PUB的消息里面有:

1012 // processMsg is called to process an inbound msg from a client.
1013 func (c *client) processMsg(msg []byte) {
...
1146     for _, sub := range r.psubs {
1147         // Check if this is a send to a ROUTER, make sure we only send it
1148         // once. The other side will handle the appropriate re-processing
1149         // and fan-out. Also enforce 1-Hop semantics, so no routing to another.
1150         if sub.client.typ == ROUTER {    ...
1176         mh := c.msgHeader(msgh[:si], sub)
1177         c.deliverMsg(sub, mh, msg)

这里如果是要转发给router,过滤下不重复发送。也就是消息会直接转发给其他连接到自己的router 客户端。

转发

上面转发给其他router时,实际上是MSG协议,到目前我们还没有分析过对MSG协议的处理,我们来看server/client.go里面:

 623 func (c *client) processMsgArgs(arg []byte) error {
 624     if c.trace {
 625         c.traceInOp("MSG", arg)
 626     }

是用来处理MSG协议,其解析和其他的一样。这里面的逻辑其实和processPub是一样的,就是解析参数,然后做opt赋值,结束。然后等着他后面处理 MSG协议里面的payload。也是调用处理PUB一样的processMsg函数。这就会到上面的发布里面的代码了。就是给自己的 r.psubs里面注册的客户端 进行”c.deliverMsg(sub, mh, msg)“ MSG发送。

总结

这里来捋一下。首先client_2注册SUB时,gnatsd_1会调用broadcastInterestToRoutes给gnatsd_3发SUB,此时gnatsd_3的SubList里面就记录了客户端gantsd_1 订阅了这个主题。当client1发送订阅SUB时,gnatsd_3会给gnatsd_1发送MSG,而gnatsd_1会找出自己SubList里面订阅了这个消息的客户端,并给他发送MSG,也就是 会给client_2发送gnatsd_3传过来的MSG,既client_1发送的PUB。

Last updated

Was this helpful?