From 27a8b96ee3bea5e63e855cb8265d196f61c568bb Mon Sep 17 00:00:00 2001 From: Artem Seleznev Date: Fri, 2 Jun 2023 13:19:22 +0300 Subject: [PATCH 1/5] different panic fixes Signed-off-by: Artem Seleznev --- server/accounts.go | 7 ++++--- server/client.go | 4 +++- server/ipqueue.go | 3 +++ server/jetstream_cluster.go | 16 ++++++++++++---- server/leafnode.go | 2 +- server/server.go | 5 ++++- server/stream.go | 8 ++++++-- 7 files changed, 33 insertions(+), 12 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index 8cd21e12..09a6b96b 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -1429,7 +1429,7 @@ func (a *Account) sendTrackingLatency(si *serviceImport, responder *client) bool } sl.RequestStart = time.Unix(0, si.ts-int64(reqRTT)).UTC() sl.ServiceLatency = serviceRTT - respRTT - sl.TotalLatency = sl.Requestor.RTT + serviceRTT + sl.TotalLatency = reqRTT + serviceRTT if respRTT > 0 { sl.SystemLatency = time.Since(ts) sl.TotalLatency += sl.SystemLatency @@ -3784,10 +3784,11 @@ func (ur *URLAccResolver) Fetch(name string) (string, error) { return _EMPTY_, fmt.Errorf("could not fetch <%q>: %v", redactURLString(url), err) } else if resp == nil { return _EMPTY_, fmt.Errorf("could not fetch <%q>: no response", redactURLString(url)) - } else if resp.StatusCode != http.StatusOK { - return _EMPTY_, fmt.Errorf("could not fetch <%q>: %v", redactURLString(url), resp.Status) } defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return _EMPTY_, fmt.Errorf("could not fetch <%q>: %v", redactURLString(url), resp.Status) + } body, err := io.ReadAll(resp.Body) if err != nil { return _EMPTY_, err diff --git a/server/client.go b/server/client.go index 48337db6..6586a5db 100644 --- a/server/client.go +++ b/server/client.go @@ -1989,7 +1989,9 @@ func (c *client) authViolation() { ErrAuthentication.Error(), c.opts.Username) } else { - c.Errorf(ErrAuthentication.Error()) + if c.srv != nil { + c.Errorf(ErrAuthentication.Error()) + } } if c.isMqtt() { c.mqttEnqueueConnAck(mqttConnAckRCNotAuthorized, false) diff --git a/server/ipqueue.go b/server/ipqueue.go index 8519043b..b26a749e 100644 --- a/server/ipqueue.go +++ b/server/ipqueue.go @@ -104,6 +104,9 @@ func (q *ipQueue[T]) push(e T) int { // emptied the queue. So the caller should never assume that pop() will // return a slice of 1 or more, it could return `nil`. func (q *ipQueue[T]) pop() []T { + if q == nil { + return nil + } var elts []T q.Lock() if q.pos == 0 { diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 035f8658..4fdbcfc8 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3107,7 +3107,9 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) bool { accStreams = make(map[string]*streamAssignment) } else if osa := accStreams[stream]; osa != nil && osa != sa { // Copy over private existing state from former SA. - sa.Group.node = osa.Group.node + if sa.Group != nil { + sa.Group.node = osa.Group.node + } sa.consumers = osa.consumers sa.responded = osa.responded sa.err = osa.err @@ -3198,7 +3200,9 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) { } // Copy over private existing state from former SA. - sa.Group.node = osa.Group.node + if sa.Group != nil { + sa.Group.node = osa.Group.node + } sa.consumers = osa.consumers sa.err = osa.err @@ -3216,7 +3220,9 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) { sa.responded = false } else { // Make sure to clean up any old node in case this stream moves back here. - sa.Group.node = nil + if sa.Group != nil { + sa.Group.node = nil + } } js.mu.Unlock() @@ -3784,7 +3790,9 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { } else if oca := sa.consumers[ca.Name]; oca != nil { wasExisting = true // Copy over private existing state from former SA. - ca.Group.node = oca.Group.node + if ca.Group != nil { + ca.Group.node = oca.Group.node + } ca.responded = oca.responded ca.err = oca.err } diff --git a/server/leafnode.go b/server/leafnode.go index 2c727adf..05d94db2 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -1422,7 +1422,7 @@ func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, c } // If we have a specified JetStream domain we will want to add a mapping to // allow access cross domain for each non-system account. - if opts.JetStreamDomain != _EMPTY_ && acc != sysAcc && opts.JetStream { + if opts.JetStreamDomain != _EMPTY_ && opts.JetStream && acc != nil && acc != sysAcc { for src, dest := range generateJSMappingTable(opts.JetStreamDomain) { if err := acc.AddMapping(src, dest); err != nil { c.Debugf("Error adding JetStream domain mapping: %s", err.Error()) diff --git a/server/server.go b/server/server.go index e9bc025d..ec623990 100644 --- a/server/server.go +++ b/server/server.go @@ -3560,7 +3560,10 @@ func (s *Server) lameDuckMode() { numClients := int64(len(s.clients)) batch := 1 // Sleep interval between each client connection close. - si := dur / numClients + var si int64 + if numClients != 0 { + si = dur / numClients + } if si < 1 { // Should not happen (except in test with very small LD duration), but // if there are too many clients, batch the number of close and diff --git a/server/stream.go b/server/stream.go index 9a3355d7..6789796f 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1788,8 +1788,12 @@ func gatherSourceMirrorSubjects(subjects []string, cfg *StreamConfig, acc *Accou // Return the subjects for a stream source. func (a *Account) streamSourceSubjects(ss *StreamSource, seen map[string]bool) (subjects []string, hasExt bool) { - if ss != nil && ss.External != nil { - return nil, true + if ss == nil { + return nil, false + } else { + if ss.External != nil { + return nil, true + } } s, js, _ := a.getJetStreamFromAccount() From f71c49511bb6b02fdbd25fcd0b0ddc1a3794449a Mon Sep 17 00:00:00 2001 From: Nikita Mochalov Date: Mon, 5 Jun 2023 15:27:45 +0300 Subject: [PATCH 2/5] Fix client panic on absent server field --- server/route.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/server/route.go b/server/route.go index 9f9dba7e..379bd90c 100644 --- a/server/route.go +++ b/server/route.go @@ -17,6 +17,7 @@ import ( "bytes" "crypto/tls" "encoding/json" + "errors" "fmt" "math/rand" "net" @@ -1949,12 +1950,12 @@ func (c *client) processRouteConnect(srv *Server, arg []byte, lang string) error c.closeConnection(WrongGateway) return ErrWrongGateway } - var perms *RoutePermissions - //TODO this check indicates srv may be nil. see srv usage below - if srv != nil { - perms = srv.getOpts().Cluster.Permissions + + if srv == nil { + return errors.New("server is not set") } + perms := srv.getOpts().Cluster.Permissions clusterName := srv.ClusterName() // If we have a cluster name set, make sure it matches ours. From 4c181bc99ac49247c5c1bee12f28b026d3c5a942 Mon Sep 17 00:00:00 2001 From: Nikita Mochalov Date: Mon, 5 Jun 2023 22:41:09 +0300 Subject: [PATCH 3/5] Use sentinel error --- server/route.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/route.go b/server/route.go index 379bd90c..90da6b1e 100644 --- a/server/route.go +++ b/server/route.go @@ -17,7 +17,6 @@ import ( "bytes" "crypto/tls" "encoding/json" - "errors" "fmt" "math/rand" "net" @@ -1952,7 +1951,7 @@ func (c *client) processRouteConnect(srv *Server, arg []byte, lang string) error } if srv == nil { - return errors.New("server is not set") + return ErrServerNotRunning } perms := srv.getOpts().Cluster.Permissions From 5141b87dff34858b66f03bfa9c417a5dbe1e1753 Mon Sep 17 00:00:00 2001 From: Nikita Mochalov Date: Mon, 5 Jun 2023 22:42:28 +0300 Subject: [PATCH 4/5] Refactor code --- server/stream.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/server/stream.go b/server/stream.go index 6789796f..e9b58891 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1790,10 +1790,8 @@ func gatherSourceMirrorSubjects(subjects []string, cfg *StreamConfig, acc *Accou func (a *Account) streamSourceSubjects(ss *StreamSource, seen map[string]bool) (subjects []string, hasExt bool) { if ss == nil { return nil, false - } else { - if ss.External != nil { - return nil, true - } + } else if ss.External != nil { + return nil, true } s, js, _ := a.getJetStreamFromAccount() From 4ac45ff6f34db6643d1f44add606f50978bcc2d0 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 5 Jun 2023 12:48:18 -0700 Subject: [PATCH 5/5] When consumers were R1 and the same name was reused, server restarts could try to cleanup old ones and effect the new ones. These changes allow consumer name reuse more effectively during server restarts. Signed-off-by: Derek Collison --- go.mod | 2 +- go.sum | 4 +- server/consumer.go | 5 +- server/jetstream_cluster.go | 8 ++- server/jetstream_cluster_3_test.go | 97 ++++++++++++++++++++++++++++++ 5 files changed, 109 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index 906df0b3..156ee860 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/klauspost/compress v1.16.5 github.com/minio/highwayhash v1.0.2 github.com/nats-io/jwt/v2 v2.4.1 - github.com/nats-io/nats.go v1.24.0 + github.com/nats-io/nats.go v1.26.0 github.com/nats-io/nkeys v0.4.4 github.com/nats-io/nuid v1.0.1 go.uber.org/automaxprocs v1.5.1 diff --git a/go.sum b/go.sum index a0ed6701..6988df07 100644 --- a/go.sum +++ b/go.sum @@ -15,8 +15,8 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4= github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= -github.com/nats-io/nats.go v1.24.0 h1:CRiD8L5GOQu/DcfkmgBcTTIQORMwizF+rPk6T0RaHVQ= -github.com/nats-io/nats.go v1.24.0/go.mod h1:dVQF+BK3SzUZpwyzHedXsvH3EO38aVKuOPkkHlv5hXA= +github.com/nats-io/nats.go v1.26.0 h1:fWJTYPnZ8DzxIaqIHOAMfColuznchnd5Ab5dbJpgPIE= +github.com/nats-io/nats.go v1.26.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/server/consumer.go b/server/consumer.go index e5165896..61589e96 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1417,9 +1417,10 @@ func (o *consumer) deleteNotActive() { defer ticker.Stop() for range ticker.C { js.mu.RLock() - ca := js.consumerAssignment(acc, stream, name) + nca := js.consumerAssignment(acc, stream, name) js.mu.RUnlock() - if ca != nil { + // Make sure this is not a new consumer with the same name. + if nca != nil && nca == ca { s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name) meta.ForwardProposal(removeEntry) } else { diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 7ebec4ba..9cfb3cc0 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3902,8 +3902,12 @@ func (js *jetStream) processConsumerRemoval(ca *consumerAssignment) { var needDelete bool if accStreams := cc.streams[ca.Client.serviceAccount()]; accStreams != nil { if sa := accStreams[ca.Stream]; sa != nil && sa.consumers != nil && sa.consumers[ca.Name] != nil { - needDelete = true - delete(sa.consumers, ca.Name) + oca := sa.consumers[ca.Name] + // Make sure this removal is for what we have, otherwise ignore. + if ca.Group != nil && oca.Group != nil && ca.Group.Name == oca.Group.Name { + needDelete = true + delete(sa.consumers, ca.Name) + } } } js.mu.Unlock() diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 3e4cd530..3922309c 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -27,6 +27,7 @@ import ( "reflect" "strings" "sync" + "sync/atomic" "testing" "time" @@ -4398,3 +4399,99 @@ func TestJetStreamClusterPurgeExReplayAfterRestart(t *testing.T) { si.State.FirstSeq, si.State.LastSeq) } } + +func TestJetStreamClusterConsumerCleanupWithSameName(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3F", 3) + defer c.shutdown() + + // Client based API + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "UPDATES", + Subjects: []string{"DEVICE.*"}, + Replicas: 3, + }) + require_NoError(t, err) + + // Create a consumer that will be an R1 that we will auto-recreate but using the same name. + // We want to make sure that the system does not continually try to cleanup the new one from the old one. + + // Track the sequence for restart etc. + var seq atomic.Uint64 + + msgCB := func(msg *nats.Msg) { + msg.AckSync() + meta, err := msg.Metadata() + require_NoError(t, err) + seq.Store(meta.Sequence.Stream) + } + + waitOnSeqDelivered := func(expected uint64) { + checkFor(t, 10*time.Second, 200*time.Millisecond, func() error { + received := seq.Load() + if received == expected { + return nil + } + return fmt.Errorf("Seq is %d, want %d", received, expected) + }) + } + + doSub := func() { + _, err = js.Subscribe( + "DEVICE.22", + msgCB, + nats.ConsumerName("dlc"), + nats.SkipConsumerLookup(), + nats.StartSequence(seq.Load()+1), + nats.MaxAckPending(1), // One at a time. + nats.ManualAck(), + nats.ConsumerReplicas(1), + nats.ConsumerMemoryStorage(), + nats.MaxDeliver(1), + nats.InactiveThreshold(time.Second), + nats.IdleHeartbeat(250*time.Millisecond), + ) + require_NoError(t, err) + } + + // Track any errors for consumer not active so we can recreate the consumer. + errCh := make(chan error, 10) + nc.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) { + if errors.Is(err, nats.ErrConsumerNotActive) { + s.Unsubscribe() + errCh <- err + doSub() + } + }) + + doSub() + + sendStreamMsg(t, nc, "DEVICE.22", "update-1") + sendStreamMsg(t, nc, "DEVICE.22", "update-2") + sendStreamMsg(t, nc, "DEVICE.22", "update-3") + waitOnSeqDelivered(3) + + // Shutdown the consumer's leader. + s := c.consumerLeader(globalAccountName, "UPDATES", "dlc") + s.Shutdown() + c.waitOnStreamLeader(globalAccountName, "UPDATES") + + // In case our client connection was to the same server. + nc, _ = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + sendStreamMsg(t, nc, "DEVICE.22", "update-4") + sendStreamMsg(t, nc, "DEVICE.22", "update-5") + sendStreamMsg(t, nc, "DEVICE.22", "update-6") + + // Wait for the consumer not active error. + <-errCh + // Now restart server with the old consumer. + c.restartServer(s) + // Wait on all messages delivered. + waitOnSeqDelivered(6) + // Make sure no other errors showed up + require_True(t, len(errCh) == 0) +}