From c437157c1f84a5ba08becc12e647b87d9ef3ce7e Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 17 Aug 2023 17:22:43 +0100 Subject: [PATCH] Recover in consumer assignment when asset already existed Signed-off-by: Neil Twigg --- server/consumer.go | 3 ++- server/jetstream_cluster.go | 8 ++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 2ee3896a..304fb326 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -214,7 +214,8 @@ var ( // Calculate accurate replicas for the consumer config with the parent stream config. func (consCfg ConsumerConfig) replicas(strCfg *StreamConfig) int { if consCfg.Replicas == 0 || consCfg.Replicas > strCfg.Replicas { - if !isDurableConsumer(&consCfg) && strCfg.Retention == LimitsPolicy { + if !isDurableConsumer(&consCfg) && strCfg.Retention == LimitsPolicy && consCfg.Replicas == 0 { + // Matches old-school ephemerals only, where the replica count is 0. return 1 } return strCfg.Replicas diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 12b44b33..1734f096 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4001,7 +4001,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state var didCreate, isConfigUpdate, needsLocalResponse bool if o == nil { // Add in the consumer if needed. - if o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, false); err == nil { + if o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, wasExisting); err == nil { didCreate = true } } else { @@ -5996,11 +5996,11 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su // Need to remap any consumers. for _, ca := range osa.consumers { // Ephemerals are R=1, so only auto-remap durables, or R>1, unless stream is interest or workqueue policy. - replicas := ca.Config.replicas(cfg) - if ca.Config.Durable != _EMPTY_ || replicas > 1 || cfg.Retention != LimitsPolicy { + numPeers := len(ca.Group.Peers) + if ca.Config.Durable != _EMPTY_ || numPeers > 1 || cfg.Retention != LimitsPolicy { cca := ca.copyGroup() // Adjust preferred as needed. - if replicas == 1 && len(rg.Peers) > 1 { + if numPeers == 1 && len(rg.Peers) > 1 { cca.Group.Preferred = ca.Group.Peers[0] } else { cca.Group.Preferred = _EMPTY_