From 4ac45ff6f34db6643d1f44add606f50978bcc2d0 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 5 Jun 2023 12:48:18 -0700 Subject: [PATCH] When consumers were R1 and the same name was reused, server restarts could try to cleanup old ones and effect the new ones. These changes allow consumer name reuse more effectively during server restarts. Signed-off-by: Derek Collison --- go.mod | 2 +- go.sum | 4 +- server/consumer.go | 5 +- server/jetstream_cluster.go | 8 ++- server/jetstream_cluster_3_test.go | 97 ++++++++++++++++++++++++++++++ 5 files changed, 109 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index 906df0b3..156ee860 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/klauspost/compress v1.16.5 github.com/minio/highwayhash v1.0.2 github.com/nats-io/jwt/v2 v2.4.1 - github.com/nats-io/nats.go v1.24.0 + github.com/nats-io/nats.go v1.26.0 github.com/nats-io/nkeys v0.4.4 github.com/nats-io/nuid v1.0.1 go.uber.org/automaxprocs v1.5.1 diff --git a/go.sum b/go.sum index a0ed6701..6988df07 100644 --- a/go.sum +++ b/go.sum @@ -15,8 +15,8 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4= github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= -github.com/nats-io/nats.go v1.24.0 h1:CRiD8L5GOQu/DcfkmgBcTTIQORMwizF+rPk6T0RaHVQ= -github.com/nats-io/nats.go v1.24.0/go.mod h1:dVQF+BK3SzUZpwyzHedXsvH3EO38aVKuOPkkHlv5hXA= +github.com/nats-io/nats.go v1.26.0 h1:fWJTYPnZ8DzxIaqIHOAMfColuznchnd5Ab5dbJpgPIE= +github.com/nats-io/nats.go v1.26.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/server/consumer.go b/server/consumer.go index e5165896..61589e96 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1417,9 +1417,10 @@ func (o *consumer) deleteNotActive() { defer ticker.Stop() for range ticker.C { js.mu.RLock() - ca := js.consumerAssignment(acc, stream, name) + nca := js.consumerAssignment(acc, stream, name) js.mu.RUnlock() - if ca != nil { + // Make sure this is not a new consumer with the same name. + if nca != nil && nca == ca { s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name) meta.ForwardProposal(removeEntry) } else { diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 7ebec4ba..9cfb3cc0 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3902,8 +3902,12 @@ func (js *jetStream) processConsumerRemoval(ca *consumerAssignment) { var needDelete bool if accStreams := cc.streams[ca.Client.serviceAccount()]; accStreams != nil { if sa := accStreams[ca.Stream]; sa != nil && sa.consumers != nil && sa.consumers[ca.Name] != nil { - needDelete = true - delete(sa.consumers, ca.Name) + oca := sa.consumers[ca.Name] + // Make sure this removal is for what we have, otherwise ignore. + if ca.Group != nil && oca.Group != nil && ca.Group.Name == oca.Group.Name { + needDelete = true + delete(sa.consumers, ca.Name) + } } } js.mu.Unlock() diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 3e4cd530..3922309c 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -27,6 +27,7 @@ import ( "reflect" "strings" "sync" + "sync/atomic" "testing" "time" @@ -4398,3 +4399,99 @@ func TestJetStreamClusterPurgeExReplayAfterRestart(t *testing.T) { si.State.FirstSeq, si.State.LastSeq) } } + +func TestJetStreamClusterConsumerCleanupWithSameName(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3F", 3) + defer c.shutdown() + + // Client based API + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "UPDATES", + Subjects: []string{"DEVICE.*"}, + Replicas: 3, + }) + require_NoError(t, err) + + // Create a consumer that will be an R1 that we will auto-recreate but using the same name. + // We want to make sure that the system does not continually try to cleanup the new one from the old one. + + // Track the sequence for restart etc. + var seq atomic.Uint64 + + msgCB := func(msg *nats.Msg) { + msg.AckSync() + meta, err := msg.Metadata() + require_NoError(t, err) + seq.Store(meta.Sequence.Stream) + } + + waitOnSeqDelivered := func(expected uint64) { + checkFor(t, 10*time.Second, 200*time.Millisecond, func() error { + received := seq.Load() + if received == expected { + return nil + } + return fmt.Errorf("Seq is %d, want %d", received, expected) + }) + } + + doSub := func() { + _, err = js.Subscribe( + "DEVICE.22", + msgCB, + nats.ConsumerName("dlc"), + nats.SkipConsumerLookup(), + nats.StartSequence(seq.Load()+1), + nats.MaxAckPending(1), // One at a time. + nats.ManualAck(), + nats.ConsumerReplicas(1), + nats.ConsumerMemoryStorage(), + nats.MaxDeliver(1), + nats.InactiveThreshold(time.Second), + nats.IdleHeartbeat(250*time.Millisecond), + ) + require_NoError(t, err) + } + + // Track any errors for consumer not active so we can recreate the consumer. + errCh := make(chan error, 10) + nc.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) { + if errors.Is(err, nats.ErrConsumerNotActive) { + s.Unsubscribe() + errCh <- err + doSub() + } + }) + + doSub() + + sendStreamMsg(t, nc, "DEVICE.22", "update-1") + sendStreamMsg(t, nc, "DEVICE.22", "update-2") + sendStreamMsg(t, nc, "DEVICE.22", "update-3") + waitOnSeqDelivered(3) + + // Shutdown the consumer's leader. + s := c.consumerLeader(globalAccountName, "UPDATES", "dlc") + s.Shutdown() + c.waitOnStreamLeader(globalAccountName, "UPDATES") + + // In case our client connection was to the same server. + nc, _ = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + sendStreamMsg(t, nc, "DEVICE.22", "update-4") + sendStreamMsg(t, nc, "DEVICE.22", "update-5") + sendStreamMsg(t, nc, "DEVICE.22", "update-6") + + // Wait for the consumer not active error. + <-errCh + // Now restart server with the old consumer. + c.restartServer(s) + // Wait on all messages delivered. + waitOnSeqDelivered(6) + // Make sure no other errors showed up + require_True(t, len(errCh) == 0) +}