From c0636d117f0e9d283fbc0074ca1ba4ab5f65e6b0 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 17 Aug 2023 09:30:20 +0100 Subject: [PATCH] Tweak consumer replica scaling, add unit test for orphaned consumer subjects Signed-off-by: Neil Twigg --- server/consumer.go | 5 ++-- server/jetstream_cluster.go | 6 ++--- server/jetstream_cluster_3_test.go | 41 ++++++++++++++++++++++++++++++ 3 files changed, 46 insertions(+), 6 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index b229b9fd..2ee3896a 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -213,14 +213,13 @@ var ( // Calculate accurate replicas for the consumer config with the parent stream config. func (consCfg ConsumerConfig) replicas(strCfg *StreamConfig) int { - if consCfg.Replicas == 0 { + if consCfg.Replicas == 0 || consCfg.Replicas > strCfg.Replicas { if !isDurableConsumer(&consCfg) && strCfg.Retention == LimitsPolicy { return 1 } return strCfg.Replicas - } else { - return consCfg.Replicas } + return consCfg.Replicas } // Consumer is a jetstream consumer. diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 6af4fa42..12b44b33 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -5996,11 +5996,11 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su // Need to remap any consumers. for _, ca := range osa.consumers { // Ephemerals are R=1, so only auto-remap durables, or R>1, unless stream is interest or workqueue policy. - numPeers := len(ca.Group.Peers) - if ca.Config.Durable != _EMPTY_ || numPeers > 1 || cfg.Retention != LimitsPolicy { + replicas := ca.Config.replicas(cfg) + if ca.Config.Durable != _EMPTY_ || replicas > 1 || cfg.Retention != LimitsPolicy { cca := ca.copyGroup() // Adjust preferred as needed. - if numPeers == 1 && len(rg.Peers) > 1 { + if replicas == 1 && len(rg.Peers) > 1 { cca.Group.Preferred = ca.Group.Peers[0] } else { cca.Group.Preferred = _EMPTY_ diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index fbf46524..eaa412cc 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -4985,3 +4985,44 @@ func TestJetStreamClusterStreamFailTrackingSnapshots(t *testing.T) { t.Fatalf("Expected no errors, got %d", len(errCh)) } } + +func TestJetStreamClusterOrphanConsumerSubjects(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo.>", "bar.>"}, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Name: "consumer_foo", + Durable: "consumer_foo", + FilterSubject: "foo.something", + }) + require_NoError(t, err) + + for _, replicas := range []int{3, 1, 3} { + _, err = js.UpdateStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"bar.>"}, + Replicas: replicas, + }) + require_NoError(t, err) + c.waitOnAllCurrent() + } + + c.waitOnStreamLeader("$G", "TEST") + c.waitOnConsumerLeader("$G", "TEST", "consumer_foo") + + info, err := js.ConsumerInfo("TEST", "consumer_foo") + require_NoError(t, err) + require_True(t, info.Cluster != nil) + require_NotEqual(t, info.Cluster.Leader, "") + require_Equal(t, len(info.Cluster.Replicas), 2) +}