From 9bd11580e39a4a61946a45e1bb1558084b63f03d Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 10 Oct 2022 13:00:05 -0600 Subject: [PATCH] [FIXED] JetStream: User-defined ephemeral Name not used in cluster mode If the user sends a CONSUMER.CREATE request with a configuration that specifies the name that the user wants for the ephemeral consumer, this would not work on cluster mode, that is, the server would still pick a name instead of using the provided one. Signed-off-by: Ivan Kozlovic --- server/jetstream_cluster.go | 29 +++++++++----- server/jetstream_cluster_3_test.go | 62 ++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+), 10 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 4f7fe5b8..97e8f2ff 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -6076,9 +6076,14 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec var ca *consumerAssignment var oname string - // See if we have an existing one already under same durable name. - if isDurableConsumer(cfg) { - oname = cfg.Durable + // See if we have an existing one already under same durable name or + // if name was set by the user. + if isDurableConsumer(cfg) || cfg.Name != _EMPTY_ { + if cfg.Name != _EMPTY_ { + oname = cfg.Name + } else { + oname = cfg.Durable + } if ca = sa.consumers[oname]; ca != nil && !ca.deleted { // Do quick sanity check on new cfg to prevent here if possible. if err := acc.checkNewConsumerConfig(ca.Config, cfg); err != nil { @@ -6110,15 +6115,19 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec rg.Peers = []string{rg.Preferred} rg.Name = groupNameForConsumer(rg.Peers, rg.Storage) } - // Make sure name is unique. - for { - oname = createConsumerName() - if sa.consumers != nil { - if sa.consumers[oname] != nil { - continue + if cfg.Name != _EMPTY_ { + oname = cfg.Name + } else { + // Make sure name is unique. + for { + oname = createConsumerName() + if sa.consumers != nil { + if sa.consumers[oname] != nil { + continue + } } + break } - break } } if len(rg.Peers) > 1 { diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 334aa9bb..b0c7c306 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -519,3 +519,65 @@ func TestJetStreamClusterNegativeReplicas(t *testing.T) { t.Run("Standalone", func(t *testing.T) { testBadReplicas(t, s, "TEST1") }) t.Run("Clustered", func(t *testing.T) { testBadReplicas(t, c.randomServer(), "TEST2") }) } + +func TestJetStreamClusterUserSelectedConsName(t *testing.T) { + s := RunBasicJetStreamServer() + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + test := func(t *testing.T, s *Server, stream string, replicas int, cons string) { + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: stream, + Replicas: replicas, + }) + require_NoError(t, err) + + cc := &CreateConsumerRequest{ + Stream: stream, + Config: ConsumerConfig{ + Name: cons, + FilterSubject: stream, + InactiveThreshold: 10 * time.Second, + }, + } + subj := fmt.Sprintf(JSApiConsumerCreateExT, stream, cons, stream) + req, err := json.Marshal(cc) + require_NoError(t, err) + + reply, err := nc.Request(subj, req, 2*time.Second) + require_NoError(t, err) + + var cresp JSApiConsumerCreateResponse + json.Unmarshal(reply.Data, &cresp) + if cresp.Error != nil { + t.Fatalf("Unexpected error: %v", cresp.Error) + } + require_Equal(t, cresp.Name, cons) + require_Equal(t, cresp.Config.Name, cons) + + // Resend the add request but before change something that the server + // should reject since the consumer already exist and we don't support + // the update of the consumer that way. + cc.Config.DeliverPolicy = DeliverNew + req, err = json.Marshal(cc) + require_NoError(t, err) + reply, err = nc.Request(subj, req, 2*time.Second) + require_NoError(t, err) + + cresp = JSApiConsumerCreateResponse{} + json.Unmarshal(reply.Data, &cresp) + require_Error(t, cresp.Error, NewJSConsumerCreateError(errors.New("deliver policy can not be updated"))) + } + + t.Run("Standalone", func(t *testing.T) { test(t, s, "TEST", 1, "cons") }) + t.Run("Clustered R1", func(t *testing.T) { test(t, c.randomServer(), "TEST2", 1, "cons2") }) + t.Run("Clustered R3", func(t *testing.T) { test(t, c.randomServer(), "TEST3", 3, "cons3") }) +}