From 1d6c58074f40dcaa169b94590564363dba265456 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 22 Jul 2019 17:01:03 -0700 Subject: [PATCH 1/2] Fix for #1065 (leaked subscribers from dq subs across routes) Signed-off-by: Derek Collison --- server/accounts.go | 1 + server/client.go | 9 +- server/route.go | 150 +++++++++++++++--------------- server/server.go | 1 + test/new_routes_test.go | 198 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 277 insertions(+), 82 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index 86a39d97..574b13c7 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -50,6 +50,7 @@ type Account struct { nrleafs int32 clients map[*client]*client rm map[string]int32 + lqws map[string]int32 lleafs []*client imports importMap exports exportMap diff --git a/server/client.go b/server/client.go index 358fe34c..a575a275 100644 --- a/server/client.go +++ b/server/client.go @@ -954,13 +954,13 @@ func (c *client) flushOutbound() bool { } if sce { atomic.AddInt64(&srv.slowConsumers, 1) - c.clearConnection(SlowConsumerWriteDeadline) c.Noticef("Slow Consumer Detected: WriteDeadline of %v exceeded with %d chunks of %d total bytes.", c.out.wdl, len(cnb), attempted) + c.clearConnection(SlowConsumerWriteDeadline) } } else { - c.clearConnection(WriteError) c.Debugf("Error flushing: %v", err) + c.clearConnection(WriteError) } return true } @@ -1343,9 +1343,9 @@ func (c *client) queueOutbound(data []byte) bool { // Check for slow consumer via pending bytes limit. // ok to return here, client is going away. if c.out.pb > c.out.mp { - c.clearConnection(SlowConsumerPendingBytes) atomic.AddInt64(&c.srv.slowConsumers, 1) c.Noticef("Slow Consumer Detected: MaxPending of %d Exceeded", c.out.mp) + c.clearConnection(SlowConsumerPendingBytes) return referenced } @@ -2913,7 +2913,8 @@ func (c *client) closeConnection(reason ClosedState) { // and reference existing one. var subs []*subscription if kind == CLIENT || kind == LEAF { - subs = make([]*subscription, 0, len(c.subs)) + var _subs [32]*subscription + subs = _subs[:0] for _, sub := range c.subs { // Auto-unsubscribe subscriptions must be unsubscribed forcibly. sub.max = 0 diff --git a/server/route.go b/server/route.go index 512e5629..6eaadbd4 100644 --- a/server/route.go +++ b/server/route.go @@ -712,11 +712,8 @@ func (c *client) parseUnsubProto(arg []byte) (string, []byte, []byte, error) { c.in.subs++ args := splitArg(arg) - var ( - accountName string - subject []byte - queue []byte - ) + var queue []byte + switch len(args) { case 2: case 3: @@ -724,9 +721,7 @@ func (c *client) parseUnsubProto(arg []byte) (string, []byte, []byte, error) { default: return "", nil, nil, fmt.Errorf("parse error: '%s'", arg) } - subject = args[1] - accountName = string(args[0]) - return accountName, subject, queue, nil + return string(args[0]), args[1], queue, nil } // Indicates no more interest in the given account/subject for the remote side. @@ -944,10 +939,7 @@ func (s *Server) sendSubsToRoute(route *client) { } a.mu.RUnlock() - closed = route.sendRouteSubProtos(subs, false, func(sub *subscription) bool { - return route.canImport(string(sub.subject)) - }) - + closed = route.sendRouteSubProtos(subs, false, route.importFilter) if closed { route.mu.Unlock() return @@ -1305,71 +1297,72 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) { return !exists, sendInfo } +// Import filter check. +func (c *client) importFilter(sub *subscription) bool { + return c.canImport(string(sub.subject)) +} + // updateRouteSubscriptionMap will make sure to update the route map for the subscription. Will // also forward to all routes if needed. func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, delta int32) { if acc == nil || sub == nil { return } - acc.mu.RLock() - rm := acc.rm - acc.mu.RUnlock() - - // This is non-nil when we know we are in cluster mode. - if rm == nil { - return - } // We only store state on local subs for transmission across all other routes. if sub.client == nil || (sub.client.kind != CLIENT && sub.client.kind != SYSTEM && sub.client.kind != LEAF) { return } - // Create the fast key which will use the subject or 'subjectqueue' for queue subscribers. - var ( - _rkey [1024]byte - key []byte - update bool - ) - if sub.queue != nil { - // Just make the key subject spc group, e.g. 'foo bar' - key = _rkey[:0] - key = append(key, sub.subject...) - key = append(key, byte(' ')) - key = append(key, sub.queue...) - // We always update for a queue subscriber since we need to send our relative weight. - update = true - } else { - key = sub.subject - } - // Copy to hold outside acc lock. var n int32 var ok bool + // Create the fast key which will use the subject or 'subjectqueue' for queue subscribers. + key := keyFromSub(sub) + isq := len(sub.queue) > 0 + + // Decide whether we need to send an update out to all the routes. + update := isq + acc.mu.Lock() - if n, ok = rm[string(key)]; ok { + + // This is non-nil when we know we are in cluster mode. + rm, lqws := acc.rm, acc.lqws + if rm == nil { + acc.mu.Unlock() + return + } + + // This is where we do update to account. For queues we need to take + // special care that this order of updates is same as what is sent out + // over routes. + if n, ok = rm[key]; ok { n += delta if n <= 0 { - delete(rm, string(key)) + delete(rm, key) + if isq { + delete(lqws, key) + } update = true // Update for deleting (N->0) } else { - rm[string(key)] = n + rm[key] = n } } else if delta > 0 { n = delta - rm[string(key)] = delta + rm[key] = delta update = true // Adding a new entry for normal sub means update (0->1) } + acc.mu.Unlock() if !update { return } - // We need to send out this update. - // If we are sending a queue sub, copy and place in the queue weight. - if sub.queue != nil { + // If we are sending a queue sub, make a copy and place in the queue weight. + // FIXME(dlc) - We can be smarter here and avoid copying and acquiring the lock. + if isq { sub.client.mu.Lock() nsub := *sub sub.client.mu.Unlock() @@ -1377,45 +1370,46 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del sub = &nsub } - // Note that queue unsubs where entry.n > 0 are still - // subscribes with a smaller weight. - if n > 0 { - s.broadcastSubscribe(sub) - } else { - s.broadcastUnSubscribe(sub) - } -} + // We need to send out this update. Gather routes + var _routes [32]*client + routes := _routes[:0] -// broadcastSubscribe will forward a client subscription -// to all active routes as needed. -func (s *Server) broadcastSubscribe(sub *subscription) { - trace := atomic.LoadInt32(&s.logging.trace) == 1 s.mu.Lock() - subs := []*subscription{sub} for _, route := range s.routes { + routes = append(routes, route) + } + trace := atomic.LoadInt32(&s.logging.trace) == 1 + s.mu.Unlock() + + // If we are a queue subscriber we need to make sure our updates are serialized from + // poyential multiple connections. We want to make sure that the order above is preserved + // here but not necessarily all updates need to be sent. We need to block and recheck the + // n count with the lock held through sending here. We will suppress duplicate sends of same qw. + if isq { + acc.mu.Lock() + defer acc.mu.Unlock() + n = rm[key] + sub.qw = n + // Check the last sent weight here. If same, then someone + // beat us to it and we can just return here. Otherwise update + if ls, ok := lqws[key]; ok && ls == n { + return + } else { + lqws[key] = n + } + } + + // Snapshot into array + subs := []*subscription{sub} + + // Deliver to all routes. + for _, route := range routes { route.mu.Lock() - route.sendRouteSubProtos(subs, trace, func(sub *subscription) bool { - return route.canImport(string(sub.subject)) - }) + // Note that queue unsubs where n > 0 are still + // subscribes with a smaller weight. + route.sendRouteSubOrUnSubProtos(subs, n > 0, trace, route.importFilter) route.mu.Unlock() } - s.mu.Unlock() -} - -// broadcastUnSubscribe will forward a client unsubscribe -// action to all active routes. -func (s *Server) broadcastUnSubscribe(sub *subscription) { - trace := atomic.LoadInt32(&s.logging.trace) == 1 - s.mu.Lock() - subs := []*subscription{sub} - for _, route := range s.routes { - route.mu.Lock() - route.sendRouteUnSubProtos(subs, trace, func(sub *subscription) bool { - return route.canImport(string(sub.subject)) - }) - route.mu.Unlock() - } - s.mu.Unlock() } func (s *Server) routeAcceptLoop(ch chan struct{}) { diff --git a/server/server.go b/server/server.go index 90b6b518..e45450de 100644 --- a/server/server.go +++ b/server/server.go @@ -785,6 +785,7 @@ func (s *Server) registerAccount(acc *Account) { // TODO(dlc)- Double check that we need this for GWs. if acc.rm == nil && s.opts != nil && s.shouldTrackSubscriptions() { acc.rm = make(map[string]int32) + acc.lqws = make(map[string]int32) } acc.srv = s acc.mu.Unlock() diff --git a/test/new_routes_test.go b/test/new_routes_test.go index 218b05c4..8231ab48 100644 --- a/test/new_routes_test.go +++ b/test/new_routes_test.go @@ -14,17 +14,21 @@ package test import ( + "context" "encoding/json" "fmt" "net" "net/url" "runtime" + "strconv" + "sync" "testing" "time" "github.com/nats-io/nats-server/v2/logger" "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" + "github.com/nats-io/nuid" ) func runNewRouteServer(t *testing.T) (*server.Server, *server.Options) { @@ -1651,3 +1655,197 @@ func TestLargeClusterMem(t *testing.T) { s.Shutdown() } } + +func TestClusterLeaksSubscriptions(t *testing.T) { + srvA, srvB, optsA, optsB := runServers(t) + defer srvA.Shutdown() + defer srvB.Shutdown() + + checkClusterFormed(t, srvA, srvB) + + urlA := fmt.Sprintf("nats://%s:%d/", optsA.Host, optsA.Port) + urlB := fmt.Sprintf("nats://%s:%d/", optsB.Host, optsB.Port) + + numResponses := 100 + repliers := make([]*nats.Conn, 0, numResponses) + + // Create 100 repliers + for i := 0; i < 50; i++ { + nc1, _ := nats.Connect(urlA) + nc2, _ := nats.Connect(urlB) + repliers = append(repliers, nc1, nc2) + nc1.Subscribe("test.reply", func(m *nats.Msg) { + m.Respond([]byte("{\"sender\": 22 }")) + }) + nc2.Subscribe("test.reply", func(m *nats.Msg) { + m.Respond([]byte("{\"sender\": 33 }")) + }) + nc1.Flush() + nc2.Flush() + } + + servers := fmt.Sprintf("%s, %s", urlA, urlB) + req := sizedBytes(8 * 1024) + + // Now run a requestor in a loop, creating and tearing down each time to + // simulate running a modified nats-req. + doReq := func() { + msgs := make(chan *nats.Msg, 1) + inbox := nats.NewInbox() + grp := nuid.Next() + // Create 8 queue Subscribers for responses. + for i := 0; i < 8; i++ { + nc, _ := nats.Connect(servers) + nc.ChanQueueSubscribe(inbox, grp, msgs) + nc.Flush() + defer nc.Close() + } + nc, _ := nats.Connect(servers) + nc.PublishRequest("test.reply", inbox, req) + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + + var received int + for { + select { + case <-msgs: + received++ + if received >= numResponses { + return + } + case <-ctx.Done(): + return + } + } + } + + var wg sync.WaitGroup + + doRequests := func(n int) { + for i := 0; i < n; i++ { + doReq() + } + wg.Done() + } + + concurrent := 10 + wg.Add(concurrent) + for i := 0; i < concurrent; i++ { + go doRequests(10) + } + wg.Wait() + + // Close responders too, should have zero(0) subs attached to routes. + for _, nc := range repliers { + nc.Close() + } + + // Make sure no clients remain. This is to make sure the test is correct and that + // we have closed all the client connections. + checkFor(t, time.Second, 10*time.Millisecond, func() error { + v1, _ := srvA.Varz(nil) + v2, _ := srvB.Varz(nil) + if v1.Connections != 0 || v2.Connections != 0 { + return fmt.Errorf("We have lingering client connections %d:%d", v1.Connections, v2.Connections) + } + return nil + }) + + loadRoutez := func() (*server.Routez, *server.Routez) { + v1, err := srvA.Routez(&server.RoutezOptions{Subscriptions: true}) + if err != nil { + t.Fatalf("Error getting Routez: %v", err) + } + v2, err := srvB.Routez(&server.RoutezOptions{Subscriptions: true}) + if err != nil { + t.Fatalf("Error getting Routez: %v", err) + } + return v1, v2 + } + + checkFor(t, time.Second, 10*time.Millisecond, func() error { + r1, r2 := loadRoutez() + if r1.Routes[0].NumSubs != 0 { + return fmt.Errorf("Leaked %d subs: %+v", r1.Routes[0].NumSubs, r1.Routes[0].Subs) + } + if r2.Routes[0].NumSubs != 0 { + return fmt.Errorf("Leaked %d subs: %+v", r2.Routes[0].NumSubs, r2.Routes[0].Subs) + } + return nil + }) +} + +// Make sure we have the correct remote state when dealing with queue subscribers +// across many client connections. +func TestQueueSubWeightOrderMultipleConnections(t *testing.T) { + s, opts := runNewRouteServer(t) + defer s.Shutdown() + + // Create 100 connections to s + url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) + clients := make([]*nats.Conn, 0, 100) + for i := 0; i < 100; i++ { + nc, err := nats.Connect(url, nats.NoReconnect()) + if err != nil { + t.Fatalf("Error connecting: %v", err) + } + defer nc.Close() + clients = append(clients, nc) + } + + rc := createRouteConn(t, opts.Cluster.Host, opts.Cluster.Port) + defer rc.Close() + + routeID := "RTEST_NEW:22" + routeSend, routeExpect := setupRouteEx(t, rc, opts, routeID) + + info := checkInfoMsg(t, rc) + + info.ID = routeID + b, err := json.Marshal(info) + if err != nil { + t.Fatalf("Could not marshal test route info: %v", err) + } + routeSend(fmt.Sprintf("INFO %s\r\n", b)) + + start := make(chan bool) + for _, nc := range clients { + go func(nc *nats.Conn) { + <-start + // Now create 100 identical queue subscribers on each connection. + for i := 0; i < 100; i++ { + if _, err := nc.QueueSubscribeSync("foo", "bar"); err != nil { + return + } + } + nc.Flush() + }(nc) + } + close(start) + + // We did have this where we wanted to get every update, but now with optimizations + // we just want to make sure we always are increasing and that a previous update to + // a lesser queue weight is never delivered for this test. + max_expected := 10000 + for qw := 0; qw < max_expected; { + buf := routeExpect(rsubRe) + matches := rsubRe.FindAllSubmatch(buf, -1) + for _, m := range matches { + if len(m) != 5 { + t.Fatalf("Expected a weight for the queue group") + } + nqw, err := strconv.Atoi(string(m[4])) + if err != nil { + t.Fatalf("Got an error converting queue weight: %v", err) + } + // Make sure the new value only increases, ok to skip since we will + // optimize this now, but needs to always be increasing. + if nqw <= qw { + t.Fatalf("Was expecting increasing queue weight after %d, got %d", qw, nqw) + } + qw = nqw + } + } +} From df29be11ed0e9bb88d13140d9575caf83ed9d1d8 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 22 Jul 2019 18:37:40 -0700 Subject: [PATCH 2/2] Changes based on PR comments Signed-off-by: Derek Collison --- server/route.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/server/route.go b/server/route.go index 6eaadbd4..fdff9bc4 100644 --- a/server/route.go +++ b/server/route.go @@ -1318,13 +1318,6 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del var n int32 var ok bool - // Create the fast key which will use the subject or 'subjectqueue' for queue subscribers. - key := keyFromSub(sub) - isq := len(sub.queue) > 0 - - // Decide whether we need to send an update out to all the routes. - update := isq - acc.mu.Lock() // This is non-nil when we know we are in cluster mode. @@ -1334,6 +1327,13 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del return } + // Create the fast key which will use the subject or 'subjectqueue' for queue subscribers. + key := keyFromSub(sub) + isq := len(sub.queue) > 0 + + // Decide whether we need to send an update out to all the routes. + update := isq + // This is where we do update to account. For queues we need to take // special care that this order of updates is same as what is sent out // over routes. @@ -1382,7 +1382,7 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del s.mu.Unlock() // If we are a queue subscriber we need to make sure our updates are serialized from - // poyential multiple connections. We want to make sure that the order above is preserved + // potential multiple connections. We want to make sure that the order above is preserved // here but not necessarily all updates need to be sent. We need to block and recheck the // n count with the lock held through sending here. We will suppress duplicate sends of same qw. if isq {