Tweak consumer replica scaling, add unit test for orphaned consumer subjects

Signed-off-by: Neil Twigg <neil@nats.io>
This commit is contained in:
Neil Twigg
2023-08-17 09:30:20 +01:00
parent 1e87c3d820
commit c0636d117f
3 changed files with 46 additions and 6 deletions

View File

@@ -213,14 +213,13 @@ var (
// Calculate accurate replicas for the consumer config with the parent stream config.
func (consCfg ConsumerConfig) replicas(strCfg *StreamConfig) int {
if consCfg.Replicas == 0 {
if consCfg.Replicas == 0 || consCfg.Replicas > strCfg.Replicas {
if !isDurableConsumer(&consCfg) && strCfg.Retention == LimitsPolicy {
return 1
}
return strCfg.Replicas
} else {
return consCfg.Replicas
}
return consCfg.Replicas
}
// Consumer is a jetstream consumer.

View File

@@ -5996,11 +5996,11 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
// Need to remap any consumers.
for _, ca := range osa.consumers {
// Ephemerals are R=1, so only auto-remap durables, or R>1, unless stream is interest or workqueue policy.
numPeers := len(ca.Group.Peers)
if ca.Config.Durable != _EMPTY_ || numPeers > 1 || cfg.Retention != LimitsPolicy {
replicas := ca.Config.replicas(cfg)
if ca.Config.Durable != _EMPTY_ || replicas > 1 || cfg.Retention != LimitsPolicy {
cca := ca.copyGroup()
// Adjust preferred as needed.
if numPeers == 1 && len(rg.Peers) > 1 {
if replicas == 1 && len(rg.Peers) > 1 {
cca.Group.Preferred = ca.Group.Peers[0]
} else {
cca.Group.Preferred = _EMPTY_

View File

@@ -4985,3 +4985,44 @@ func TestJetStreamClusterStreamFailTrackingSnapshots(t *testing.T) {
t.Fatalf("Expected no errors, got %d", len(errCh))
}
}
func TestJetStreamClusterOrphanConsumerSubjects(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.>", "bar.>"},
Replicas: 3,
})
require_NoError(t, err)
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Name: "consumer_foo",
Durable: "consumer_foo",
FilterSubject: "foo.something",
})
require_NoError(t, err)
for _, replicas := range []int{3, 1, 3} {
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"bar.>"},
Replicas: replicas,
})
require_NoError(t, err)
c.waitOnAllCurrent()
}
c.waitOnStreamLeader("$G", "TEST")
c.waitOnConsumerLeader("$G", "TEST", "consumer_foo")
info, err := js.ConsumerInfo("TEST", "consumer_foo")
require_NoError(t, err)
require_True(t, info.Cluster != nil)
require_NotEqual(t, info.Cluster.Leader, "")
require_Equal(t, len(info.Cluster.Replicas), 2)
}