diff --git a/go.mod b/go.mod index 906df0b3..156ee860 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/klauspost/compress v1.16.5 github.com/minio/highwayhash v1.0.2 github.com/nats-io/jwt/v2 v2.4.1 - github.com/nats-io/nats.go v1.24.0 + github.com/nats-io/nats.go v1.26.0 github.com/nats-io/nkeys v0.4.4 github.com/nats-io/nuid v1.0.1 go.uber.org/automaxprocs v1.5.1 diff --git a/go.sum b/go.sum index a0ed6701..6988df07 100644 --- a/go.sum +++ b/go.sum @@ -15,8 +15,8 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4= github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= -github.com/nats-io/nats.go v1.24.0 h1:CRiD8L5GOQu/DcfkmgBcTTIQORMwizF+rPk6T0RaHVQ= -github.com/nats-io/nats.go v1.24.0/go.mod h1:dVQF+BK3SzUZpwyzHedXsvH3EO38aVKuOPkkHlv5hXA= +github.com/nats-io/nats.go v1.26.0 h1:fWJTYPnZ8DzxIaqIHOAMfColuznchnd5Ab5dbJpgPIE= +github.com/nats-io/nats.go v1.26.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/server/accounts.go b/server/accounts.go index 6ec0d8c7..5d1ba450 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -1372,7 +1372,7 @@ func (a *Account) sendTrackingLatency(si *serviceImport, responder *client) bool } sl.RequestStart = time.Unix(0, si.ts-int64(reqRTT)).UTC() sl.ServiceLatency = serviceRTT - respRTT - sl.TotalLatency = sl.Requestor.RTT + serviceRTT + sl.TotalLatency = reqRTT + serviceRTT if respRTT > 0 { sl.SystemLatency = time.Since(ts) sl.TotalLatency += sl.SystemLatency @@ -3813,10 +3813,11 @@ func (ur *URLAccResolver) Fetch(name string) (string, error) { return _EMPTY_, fmt.Errorf("could not fetch <%q>: %v", redactURLString(url), err) } else if resp == nil { return _EMPTY_, fmt.Errorf("could not fetch <%q>: no response", redactURLString(url)) - } else if resp.StatusCode != http.StatusOK { - return _EMPTY_, fmt.Errorf("could not fetch <%q>: %v", redactURLString(url), resp.Status) } defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return _EMPTY_, fmt.Errorf("could not fetch <%q>: %v", redactURLString(url), resp.Status) + } body, err := io.ReadAll(resp.Body) if err != nil { return _EMPTY_, err diff --git a/server/client.go b/server/client.go index e851cce6..17c4e148 100644 --- a/server/client.go +++ b/server/client.go @@ -2138,7 +2138,9 @@ func (c *client) authViolation() { ErrAuthentication.Error(), c.opts.Username) } else { - c.Errorf(ErrAuthentication.Error()) + if c.srv != nil { + c.Errorf(ErrAuthentication.Error()) + } } if c.isMqtt() { c.mqttEnqueueConnAck(mqttConnAckRCNotAuthorized, false) diff --git a/server/consumer.go b/server/consumer.go index 4df9c271..5e658c42 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1482,9 +1482,10 @@ func (o *consumer) deleteNotActive() { defer ticker.Stop() for range ticker.C { js.mu.RLock() - ca := js.consumerAssignment(acc, stream, name) + nca := js.consumerAssignment(acc, stream, name) js.mu.RUnlock() - if ca != nil { + // Make sure this is not a new consumer with the same name. + if nca != nil && nca == ca { s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name) meta.ForwardProposal(removeEntry) } else { diff --git a/server/ipqueue.go b/server/ipqueue.go index 8519043b..b26a749e 100644 --- a/server/ipqueue.go +++ b/server/ipqueue.go @@ -104,6 +104,9 @@ func (q *ipQueue[T]) push(e T) int { // emptied the queue. So the caller should never assume that pop() will // return a slice of 1 or more, it could return `nil`. func (q *ipQueue[T]) pop() []T { + if q == nil { + return nil + } var elts []T q.Lock() if q.pos == 0 { diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 58d87fa8..c891455d 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3108,7 +3108,9 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) bool { accStreams = make(map[string]*streamAssignment) } else if osa := accStreams[stream]; osa != nil && osa != sa { // Copy over private existing state from former SA. - sa.Group.node = osa.Group.node + if sa.Group != nil { + sa.Group.node = osa.Group.node + } sa.consumers = osa.consumers sa.responded = osa.responded sa.err = osa.err @@ -3199,7 +3201,9 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) { } // Copy over private existing state from former SA. - sa.Group.node = osa.Group.node + if sa.Group != nil { + sa.Group.node = osa.Group.node + } sa.consumers = osa.consumers sa.err = osa.err @@ -3217,7 +3221,9 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) { sa.responded = false } else { // Make sure to clean up any old node in case this stream moves back here. - sa.Group.node = nil + if sa.Group != nil { + sa.Group.node = nil + } } js.mu.Unlock() @@ -3790,7 +3796,9 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { } else if oca := sa.consumers[ca.Name]; oca != nil { wasExisting = true // Copy over private existing state from former SA. - ca.Group.node = oca.Group.node + if ca.Group != nil { + ca.Group.node = oca.Group.node + } ca.responded = oca.responded ca.err = oca.err } @@ -3905,8 +3913,12 @@ func (js *jetStream) processConsumerRemoval(ca *consumerAssignment) { var needDelete bool if accStreams := cc.streams[ca.Client.serviceAccount()]; accStreams != nil { if sa := accStreams[ca.Stream]; sa != nil && sa.consumers != nil && sa.consumers[ca.Name] != nil { - needDelete = true - delete(sa.consumers, ca.Name) + oca := sa.consumers[ca.Name] + // Make sure this removal is for what we have, otherwise ignore. + if ca.Group != nil && oca.Group != nil && ca.Group.Name == oca.Group.Name { + needDelete = true + delete(sa.consumers, ca.Name) + } } } js.mu.Unlock() diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 9979c9b6..da34a7a3 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -27,6 +27,7 @@ import ( "reflect" "strings" "sync" + "sync/atomic" "testing" "time" @@ -4398,3 +4399,99 @@ func TestJetStreamClusterPurgeExReplayAfterRestart(t *testing.T) { si.State.FirstSeq, si.State.LastSeq) } } + +func TestJetStreamClusterConsumerCleanupWithSameName(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3F", 3) + defer c.shutdown() + + // Client based API + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "UPDATES", + Subjects: []string{"DEVICE.*"}, + Replicas: 3, + }) + require_NoError(t, err) + + // Create a consumer that will be an R1 that we will auto-recreate but using the same name. + // We want to make sure that the system does not continually try to cleanup the new one from the old one. + + // Track the sequence for restart etc. + var seq atomic.Uint64 + + msgCB := func(msg *nats.Msg) { + msg.AckSync() + meta, err := msg.Metadata() + require_NoError(t, err) + seq.Store(meta.Sequence.Stream) + } + + waitOnSeqDelivered := func(expected uint64) { + checkFor(t, 10*time.Second, 200*time.Millisecond, func() error { + received := seq.Load() + if received == expected { + return nil + } + return fmt.Errorf("Seq is %d, want %d", received, expected) + }) + } + + doSub := func() { + _, err = js.Subscribe( + "DEVICE.22", + msgCB, + nats.ConsumerName("dlc"), + nats.SkipConsumerLookup(), + nats.StartSequence(seq.Load()+1), + nats.MaxAckPending(1), // One at a time. + nats.ManualAck(), + nats.ConsumerReplicas(1), + nats.ConsumerMemoryStorage(), + nats.MaxDeliver(1), + nats.InactiveThreshold(time.Second), + nats.IdleHeartbeat(250*time.Millisecond), + ) + require_NoError(t, err) + } + + // Track any errors for consumer not active so we can recreate the consumer. + errCh := make(chan error, 10) + nc.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) { + if errors.Is(err, nats.ErrConsumerNotActive) { + s.Unsubscribe() + errCh <- err + doSub() + } + }) + + doSub() + + sendStreamMsg(t, nc, "DEVICE.22", "update-1") + sendStreamMsg(t, nc, "DEVICE.22", "update-2") + sendStreamMsg(t, nc, "DEVICE.22", "update-3") + waitOnSeqDelivered(3) + + // Shutdown the consumer's leader. + s := c.consumerLeader(globalAccountName, "UPDATES", "dlc") + s.Shutdown() + c.waitOnStreamLeader(globalAccountName, "UPDATES") + + // In case our client connection was to the same server. + nc, _ = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + sendStreamMsg(t, nc, "DEVICE.22", "update-4") + sendStreamMsg(t, nc, "DEVICE.22", "update-5") + sendStreamMsg(t, nc, "DEVICE.22", "update-6") + + // Wait for the consumer not active error. + <-errCh + // Now restart server with the old consumer. + c.restartServer(s) + // Wait on all messages delivered. + waitOnSeqDelivered(6) + // Make sure no other errors showed up + require_True(t, len(errCh) == 0) +} diff --git a/server/leafnode.go b/server/leafnode.go index 3ca6384b..5a60c433 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -1626,7 +1626,7 @@ func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, c } // If we have a specified JetStream domain we will want to add a mapping to // allow access cross domain for each non-system account. - if opts.JetStreamDomain != _EMPTY_ && acc != sysAcc && opts.JetStream { + if opts.JetStreamDomain != _EMPTY_ && opts.JetStream && acc != nil && acc != sysAcc { for src, dest := range generateJSMappingTable(opts.JetStreamDomain) { if err := acc.AddMapping(src, dest); err != nil { c.Debugf("Error adding JetStream domain mapping: %s", err.Error()) diff --git a/server/route.go b/server/route.go index f4123d2a..a1921a74 100644 --- a/server/route.go +++ b/server/route.go @@ -2625,12 +2625,12 @@ func (c *client) processRouteConnect(srv *Server, arg []byte, lang string) error c.closeConnection(WrongGateway) return ErrWrongGateway } - var perms *RoutePermissions - //TODO this check indicates srv may be nil. see srv usage below - if srv != nil { - perms = srv.getOpts().Cluster.Permissions + + if srv == nil { + return ErrServerNotRunning } + perms := srv.getOpts().Cluster.Permissions clusterName := srv.ClusterName() // If we have a cluster name set, make sure it matches ours. diff --git a/server/server.go b/server/server.go index 942fbbd7..a0eead46 100644 --- a/server/server.go +++ b/server/server.go @@ -3908,7 +3908,10 @@ func (s *Server) lameDuckMode() { numClients := int64(len(s.clients)) batch := 1 // Sleep interval between each client connection close. - si := dur / numClients + var si int64 + if numClients != 0 { + si = dur / numClients + } if si < 1 { // Should not happen (except in test with very small LD duration), but // if there are too many clients, batch the number of close and