diff --git a/go.mod b/go.mod index 906df0b3..156ee860 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/klauspost/compress v1.16.5 github.com/minio/highwayhash v1.0.2 github.com/nats-io/jwt/v2 v2.4.1 - github.com/nats-io/nats.go v1.24.0 + github.com/nats-io/nats.go v1.26.0 github.com/nats-io/nkeys v0.4.4 github.com/nats-io/nuid v1.0.1 go.uber.org/automaxprocs v1.5.1 diff --git a/go.sum b/go.sum index a0ed6701..6988df07 100644 --- a/go.sum +++ b/go.sum @@ -15,8 +15,8 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4= github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= -github.com/nats-io/nats.go v1.24.0 h1:CRiD8L5GOQu/DcfkmgBcTTIQORMwizF+rPk6T0RaHVQ= -github.com/nats-io/nats.go v1.24.0/go.mod h1:dVQF+BK3SzUZpwyzHedXsvH3EO38aVKuOPkkHlv5hXA= +github.com/nats-io/nats.go v1.26.0 h1:fWJTYPnZ8DzxIaqIHOAMfColuznchnd5Ab5dbJpgPIE= +github.com/nats-io/nats.go v1.26.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/server/consumer.go b/server/consumer.go index e5165896..61589e96 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1417,9 +1417,10 @@ func (o *consumer) deleteNotActive() { defer ticker.Stop() for range ticker.C { js.mu.RLock() - ca := js.consumerAssignment(acc, stream, name) + nca := js.consumerAssignment(acc, stream, name) js.mu.RUnlock() - if ca != nil { + // Make sure this is not a new consumer with the same name. + if nca != nil && nca == ca { s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name) meta.ForwardProposal(removeEntry) } else { diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 7ebec4ba..9cfb3cc0 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3902,8 +3902,12 @@ func (js *jetStream) processConsumerRemoval(ca *consumerAssignment) { var needDelete bool if accStreams := cc.streams[ca.Client.serviceAccount()]; accStreams != nil { if sa := accStreams[ca.Stream]; sa != nil && sa.consumers != nil && sa.consumers[ca.Name] != nil { - needDelete = true - delete(sa.consumers, ca.Name) + oca := sa.consumers[ca.Name] + // Make sure this removal is for what we have, otherwise ignore. + if ca.Group != nil && oca.Group != nil && ca.Group.Name == oca.Group.Name { + needDelete = true + delete(sa.consumers, ca.Name) + } } } js.mu.Unlock() diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 3e4cd530..3922309c 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -27,6 +27,7 @@ import ( "reflect" "strings" "sync" + "sync/atomic" "testing" "time" @@ -4398,3 +4399,99 @@ func TestJetStreamClusterPurgeExReplayAfterRestart(t *testing.T) { si.State.FirstSeq, si.State.LastSeq) } } + +func TestJetStreamClusterConsumerCleanupWithSameName(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3F", 3) + defer c.shutdown() + + // Client based API + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "UPDATES", + Subjects: []string{"DEVICE.*"}, + Replicas: 3, + }) + require_NoError(t, err) + + // Create a consumer that will be an R1 that we will auto-recreate but using the same name. + // We want to make sure that the system does not continually try to cleanup the new one from the old one. + + // Track the sequence for restart etc. + var seq atomic.Uint64 + + msgCB := func(msg *nats.Msg) { + msg.AckSync() + meta, err := msg.Metadata() + require_NoError(t, err) + seq.Store(meta.Sequence.Stream) + } + + waitOnSeqDelivered := func(expected uint64) { + checkFor(t, 10*time.Second, 200*time.Millisecond, func() error { + received := seq.Load() + if received == expected { + return nil + } + return fmt.Errorf("Seq is %d, want %d", received, expected) + }) + } + + doSub := func() { + _, err = js.Subscribe( + "DEVICE.22", + msgCB, + nats.ConsumerName("dlc"), + nats.SkipConsumerLookup(), + nats.StartSequence(seq.Load()+1), + nats.MaxAckPending(1), // One at a time. + nats.ManualAck(), + nats.ConsumerReplicas(1), + nats.ConsumerMemoryStorage(), + nats.MaxDeliver(1), + nats.InactiveThreshold(time.Second), + nats.IdleHeartbeat(250*time.Millisecond), + ) + require_NoError(t, err) + } + + // Track any errors for consumer not active so we can recreate the consumer. + errCh := make(chan error, 10) + nc.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) { + if errors.Is(err, nats.ErrConsumerNotActive) { + s.Unsubscribe() + errCh <- err + doSub() + } + }) + + doSub() + + sendStreamMsg(t, nc, "DEVICE.22", "update-1") + sendStreamMsg(t, nc, "DEVICE.22", "update-2") + sendStreamMsg(t, nc, "DEVICE.22", "update-3") + waitOnSeqDelivered(3) + + // Shutdown the consumer's leader. + s := c.consumerLeader(globalAccountName, "UPDATES", "dlc") + s.Shutdown() + c.waitOnStreamLeader(globalAccountName, "UPDATES") + + // In case our client connection was to the same server. + nc, _ = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + sendStreamMsg(t, nc, "DEVICE.22", "update-4") + sendStreamMsg(t, nc, "DEVICE.22", "update-5") + sendStreamMsg(t, nc, "DEVICE.22", "update-6") + + // Wait for the consumer not active error. + <-errCh + // Now restart server with the old consumer. + c.restartServer(s) + // Wait on all messages delivered. + waitOnSeqDelivered(6) + // Make sure no other errors showed up + require_True(t, len(errCh) == 0) +}