From f62d92901814f53fcc6afc44e737091f7d17bf40 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 23 Jan 2023 19:53:10 -0800 Subject: [PATCH] Consumer must match replica of parent stream if interest based policy. Signed-off-by: Derek Collison --- server/consumer.go | 11 +++++++++++ server/errors.json | 10 ++++++++++ server/jetstream.go | 1 + server/jetstream_cluster_3_test.go | 28 ++++++++++++++++++++++++++++ server/jetstream_errors_generated.go | 14 ++++++++++++++ server/stream.go | 21 +++++++++++++++++++++ 6 files changed, 85 insertions(+) diff --git a/server/consumer.go b/server/consumer.go index aceaf590..6c56b612 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -382,6 +382,17 @@ func checkConsumerCfg( if config.Replicas < 0 { return NewJSReplicasCountCannotBeNegativeError() } + // If the stream is interest or workqueue retention make sure the replicas + // match that of the stream. This is REQUIRED for now. + if cfg.Retention == InterestPolicy || cfg.Retention == WorkQueuePolicy { + // Only error here if not recovering. + // We handle recovering in a different spot to allow consumer to come up + // if previous version allowed it to be created. We do not want it to not come up. + if !isRecovering && config.Replicas != 0 && config.Replicas != cfg.Replicas { + fmt.Printf("config is %+v\n", config) + return NewJSConsumerReplicasShouldMatchStreamError() + } + } // Check if we have a BackOff defined that MaxDeliver is within range etc. if lbo := len(config.BackOff); lbo > 0 && config.MaxDeliver <= lbo { diff --git a/server/errors.json b/server/errors.json index 2413531b..e48eea78 100644 --- a/server/errors.json +++ b/server/errors.json @@ -1318,5 +1318,15 @@ "help": "", "url": "", "deprecates": "" + }, + { + "constant": "JSConsumerReplicasShouldMatchStream", + "code": 400, + "error_code": 10134, + "description": "consumer config replicas must match interest retention stream's replicas", + "comment": "", + "help": "", + "url": "", + "deprecates": "" } ] \ No newline at end of file diff --git a/server/jetstream.go b/server/jetstream.go index 63e7dadc..8ca10d47 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1381,6 +1381,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro // Check interest policy streams for auto cleanup. for _, mset := range ipstreams { mset.checkForOrphanMsgs() + mset.checkConsumerReplication() } s.Debugf("JetStream state for account %q recovered", a.Name) diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index f53ea6bf..04bd1bb3 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -2462,3 +2462,31 @@ func TestJetStreamClusterDirectGetStreamUpgrade(t *testing.T) { require_NoError(t, err) require_True(t, string(entry.Value()) == "derek") } + +// For interest (or workqueue) based streams its important to match the replication factor. +// This was the case but now that more control over consumer creation is allowed its possible +// to create a consumer where the replication factor does not match. This could cause +// instability in the state between servers and cause problems on leader switches. +func TestJetStreamClusterInterestPolicyStreamForConsumersToMatchRFactor(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Retention: nats.InterestPolicy, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "XX", + AckPolicy: nats.AckExplicitPolicy, + Replicas: 1, + }) + + require_Error(t, err, NewJSConsumerReplicasShouldMatchStreamError()) +} diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index 8cf409c3..58bdfe30 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -167,6 +167,9 @@ const ( // JSConsumerReplicasExceedsStream consumer config replica count exceeds parent stream JSConsumerReplicasExceedsStream ErrorIdentifier = 10126 + // JSConsumerReplicasShouldMatchStream consumer config replicas must match interest retention stream's replicas + JSConsumerReplicasShouldMatchStream ErrorIdentifier = 10134 + // JSConsumerSmallHeartbeatErr consumer idle heartbeat needs to be >= 100ms JSConsumerSmallHeartbeatErr ErrorIdentifier = 10083 @@ -458,6 +461,7 @@ var ( JSConsumerPushMaxWaitingErr: {Code: 400, ErrCode: 10080, Description: "consumer in push mode can not set max waiting"}, JSConsumerReplacementWithDifferentNameErr: {Code: 400, ErrCode: 10106, Description: "consumer replacement durable config not the same"}, JSConsumerReplicasExceedsStream: {Code: 400, ErrCode: 10126, Description: "consumer config replica count exceeds parent stream"}, + JSConsumerReplicasShouldMatchStream: {Code: 400, ErrCode: 10134, Description: "consumer config replicas must match interest retention stream's replicas"}, JSConsumerSmallHeartbeatErr: {Code: 400, ErrCode: 10083, Description: "consumer idle heartbeat needs to be >= 100ms"}, JSConsumerStoreFailedErrF: {Code: 500, ErrCode: 10104, Description: "error creating store for consumer: {err}"}, JSConsumerWQConsumerNotDeliverAllErr: {Code: 400, ErrCode: 10101, Description: "consumer must be deliver all on workqueue stream"}, @@ -1149,6 +1153,16 @@ func NewJSConsumerReplicasExceedsStreamError(opts ...ErrorOption) *ApiError { return ApiErrors[JSConsumerReplicasExceedsStream] } +// NewJSConsumerReplicasShouldMatchStreamError creates a new JSConsumerReplicasShouldMatchStream error: "consumer config replicas must match interest retention stream's replicas" +func NewJSConsumerReplicasShouldMatchStreamError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSConsumerReplicasShouldMatchStream] +} + // NewJSConsumerSmallHeartbeatError creates a new JSConsumerSmallHeartbeatErr error: "consumer idle heartbeat needs to be >= 100ms" func NewJSConsumerSmallHeartbeatError(opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) diff --git a/server/stream.go b/server/stream.go index 0c1cd095..bfaf3fd6 100644 --- a/server/stream.go +++ b/server/stream.go @@ -5028,3 +5028,24 @@ func (mset *stream) checkForOrphanMsgs() { } } } + +// Check on startup to make sure that consumers replication matches us. +// Interest retention requires replication matches. +func (mset *stream) checkConsumerReplication() { + mset.mu.RLock() + defer mset.mu.RUnlock() + + if mset.cfg.Retention != InterestPolicy { + return + } + + s, acc := mset.srv, mset.acc + for _, o := range mset.consumers { + o.mu.RLock() + if mset.cfg.Replicas != o.cfg.Replicas { + s.Errorf("consumer '%s > %s > %s' MUST match replication (%d vs %d) of stream with interest policy", + acc, mset.cfg.Name, o.cfg.Name, mset.cfg.Replicas, o.cfg.Replicas) + } + o.mu.RUnlock() + } +}