diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 32dd6859..25b7dd15 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -6111,7 +6111,8 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec // We need to set the ephemeral here before replicating. if !isDurableConsumer(cfg) { // We chose to have ephemerals be R=1 unless stream is interest or workqueue. - if sa.Config.Retention == LimitsPolicy { + // Consumer can override. + if sa.Config.Retention == LimitsPolicy && cfg.Replicas <= 1 { rg.Peers = []string{rg.Preferred} rg.Name = groupNameForConsumer(rg.Peers, rg.Storage) } diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index e5c65d5f..d238e3af 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -611,6 +611,7 @@ func TestJetStreamClusterUserGivenConsNameWithLeaderChange(t *testing.T) { Name: consName, FilterSubject: "foo", InactiveThreshold: time.Hour, + Replicas: 3, }, } subj := fmt.Sprintf(JSApiConsumerCreateExT, "TEST", consName, "foo") @@ -651,6 +652,7 @@ func TestJetStreamClusterUserGivenConsNameWithLeaderChange(t *testing.T) { time.Sleep(250 * time.Millisecond) // Wait for new leader + c.waitOnStreamLeader(globalAccountName, "TEST") c.waitOnConsumerLeader(globalAccountName, "TEST", consName) // Make sure we can still consume.