From 4f333416bb6bad2350c4b9fc57f4da2e5f3652f7 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 15 Apr 2016 15:46:29 -0700 Subject: [PATCH] Revert race on interest graph since it could cause dropped interest propogation, fix test instead --- server/client.go | 6 +----- server/route.go | 14 +++++--------- test/routes_test.go | 5 ++++- 3 files changed, 10 insertions(+), 15 deletions(-) diff --git a/server/client.go b/server/client.go index f5764174..8957d561 100644 --- a/server/client.go +++ b/server/client.go @@ -550,6 +550,7 @@ func (c *client) processSub(argo []byte) (err error) { // race conditions. We should make sure that we process only one. sid := string(sub.sid) if c.subs[sid] == nil { + c.subs[sid] = sub if c.srv != nil { err = c.srv.sl.Insert(sub) if err != nil { @@ -570,11 +571,6 @@ func (c *client) processSub(argo []byte) (err error) { c.srv.broadcastSubscribe(sub) } - // We add it to local client map here to avoid race with new routers and sendLocalSubsToRoute(). - c.mu.Lock() - c.subs[sid] = sub - c.mu.Unlock() - return nil } diff --git a/server/route.go b/server/route.go index 52279f02..c2a34467 100644 --- a/server/route.go +++ b/server/route.go @@ -36,7 +36,6 @@ type route struct { url *url.URL authRequired bool tlsRequired bool - didSubs bool } type connectInfo struct { @@ -249,7 +248,6 @@ func (s *Server) sendLocalSubsToRoute(route *client) { s.mu.Unlock() route.mu.Lock() - route.route.didSubs = true defer route.mu.Unlock() route.bw.Write(b.Bytes()) route.bw.Flush() @@ -472,7 +470,7 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) { return !exists, sendInfo } -func (s *Server) broadcastToRoutes(proto string) { +func (s *Server) broadcastInterestToRoutes(proto string) { var arg []byte if atomic.LoadInt32(&trace) == 1 { arg = []byte(proto[:len(proto)-LEN_CR_LF]) @@ -481,10 +479,8 @@ func (s *Server) broadcastToRoutes(proto string) { for _, route := range s.routes { // FIXME(dlc) - Make same logic as deliverMsg route.mu.Lock() - if route.route.didSubs { - route.bw.WriteString(proto) - route.bw.Flush() - } + route.bw.WriteString(proto) + route.bw.Flush() route.mu.Unlock() route.traceOutOp("", arg) } @@ -499,7 +495,7 @@ func (s *Server) broadcastSubscribe(sub *subscription) { } rsid := routeSid(sub) proto := fmt.Sprintf(subProto, sub.subject, sub.queue, rsid) - s.broadcastToRoutes(proto) + s.broadcastInterestToRoutes(proto) } // broadcastUnSubscribe will forward a client unsubscribe @@ -515,7 +511,7 @@ func (s *Server) broadcastUnSubscribe(sub *subscription) { maxStr = fmt.Sprintf(" %d", sub.max) } proto := fmt.Sprintf(unsubProto, rsid, maxStr) - s.broadcastToRoutes(proto) + s.broadcastInterestToRoutes(proto) } func (s *Server) routeAcceptLoop(ch chan struct{}) { diff --git a/test/routes_test.go b/test/routes_test.go index 3020562b..d6901a26 100644 --- a/test/routes_test.go +++ b/test/routes_test.go @@ -159,9 +159,12 @@ func TestSendRouteSubAndUnsub(t *testing.T) { defer rc.Close() expectAuthRequired(t, rc) - routeSend, _ := setupRouteEx(t, rc, opts, "ROUTER:xyz") + routeSend, routeExpect := setupRouteEx(t, rc, opts, "ROUTER:xyz") routeSend("INFO {\"server_id\":\"ROUTER:xyz\"}\r\n") + routeSend("PING\r\n") + routeExpect(pongRe) + // Send SUB via client connection send("SUB foo 22\r\n")