mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Revert race on interest graph since it could cause dropped interest propogation, fix test instead
This commit is contained in:
@@ -550,6 +550,7 @@ func (c *client) processSub(argo []byte) (err error) {
|
||||
// race conditions. We should make sure that we process only one.
|
||||
sid := string(sub.sid)
|
||||
if c.subs[sid] == nil {
|
||||
c.subs[sid] = sub
|
||||
if c.srv != nil {
|
||||
err = c.srv.sl.Insert(sub)
|
||||
if err != nil {
|
||||
@@ -570,11 +571,6 @@ func (c *client) processSub(argo []byte) (err error) {
|
||||
c.srv.broadcastSubscribe(sub)
|
||||
}
|
||||
|
||||
// We add it to local client map here to avoid race with new routers and sendLocalSubsToRoute().
|
||||
c.mu.Lock()
|
||||
c.subs[sid] = sub
|
||||
c.mu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -36,7 +36,6 @@ type route struct {
|
||||
url *url.URL
|
||||
authRequired bool
|
||||
tlsRequired bool
|
||||
didSubs bool
|
||||
}
|
||||
|
||||
type connectInfo struct {
|
||||
@@ -249,7 +248,6 @@ func (s *Server) sendLocalSubsToRoute(route *client) {
|
||||
s.mu.Unlock()
|
||||
|
||||
route.mu.Lock()
|
||||
route.route.didSubs = true
|
||||
defer route.mu.Unlock()
|
||||
route.bw.Write(b.Bytes())
|
||||
route.bw.Flush()
|
||||
@@ -472,7 +470,7 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) {
|
||||
return !exists, sendInfo
|
||||
}
|
||||
|
||||
func (s *Server) broadcastToRoutes(proto string) {
|
||||
func (s *Server) broadcastInterestToRoutes(proto string) {
|
||||
var arg []byte
|
||||
if atomic.LoadInt32(&trace) == 1 {
|
||||
arg = []byte(proto[:len(proto)-LEN_CR_LF])
|
||||
@@ -481,10 +479,8 @@ func (s *Server) broadcastToRoutes(proto string) {
|
||||
for _, route := range s.routes {
|
||||
// FIXME(dlc) - Make same logic as deliverMsg
|
||||
route.mu.Lock()
|
||||
if route.route.didSubs {
|
||||
route.bw.WriteString(proto)
|
||||
route.bw.Flush()
|
||||
}
|
||||
route.bw.WriteString(proto)
|
||||
route.bw.Flush()
|
||||
route.mu.Unlock()
|
||||
route.traceOutOp("", arg)
|
||||
}
|
||||
@@ -499,7 +495,7 @@ func (s *Server) broadcastSubscribe(sub *subscription) {
|
||||
}
|
||||
rsid := routeSid(sub)
|
||||
proto := fmt.Sprintf(subProto, sub.subject, sub.queue, rsid)
|
||||
s.broadcastToRoutes(proto)
|
||||
s.broadcastInterestToRoutes(proto)
|
||||
}
|
||||
|
||||
// broadcastUnSubscribe will forward a client unsubscribe
|
||||
@@ -515,7 +511,7 @@ func (s *Server) broadcastUnSubscribe(sub *subscription) {
|
||||
maxStr = fmt.Sprintf(" %d", sub.max)
|
||||
}
|
||||
proto := fmt.Sprintf(unsubProto, rsid, maxStr)
|
||||
s.broadcastToRoutes(proto)
|
||||
s.broadcastInterestToRoutes(proto)
|
||||
}
|
||||
|
||||
func (s *Server) routeAcceptLoop(ch chan struct{}) {
|
||||
|
||||
@@ -159,9 +159,12 @@ func TestSendRouteSubAndUnsub(t *testing.T) {
|
||||
defer rc.Close()
|
||||
|
||||
expectAuthRequired(t, rc)
|
||||
routeSend, _ := setupRouteEx(t, rc, opts, "ROUTER:xyz")
|
||||
routeSend, routeExpect := setupRouteEx(t, rc, opts, "ROUTER:xyz")
|
||||
routeSend("INFO {\"server_id\":\"ROUTER:xyz\"}\r\n")
|
||||
|
||||
routeSend("PING\r\n")
|
||||
routeExpect(pongRe)
|
||||
|
||||
// Send SUB via client connection
|
||||
send("SUB foo 22\r\n")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user