Consumer must match replica of parent stream if interest based policy.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-01-23 19:53:10 -08:00
parent 3380fe1c74
commit f62d929018
6 changed files with 85 additions and 0 deletions

View File

@@ -382,6 +382,17 @@ func checkConsumerCfg(
if config.Replicas < 0 {
return NewJSReplicasCountCannotBeNegativeError()
}
// If the stream is interest or workqueue retention make sure the replicas
// match that of the stream. This is REQUIRED for now.
if cfg.Retention == InterestPolicy || cfg.Retention == WorkQueuePolicy {
// Only error here if not recovering.
// We handle recovering in a different spot to allow consumer to come up
// if previous version allowed it to be created. We do not want it to not come up.
if !isRecovering && config.Replicas != 0 && config.Replicas != cfg.Replicas {
fmt.Printf("config is %+v\n", config)
return NewJSConsumerReplicasShouldMatchStreamError()
}
}
// Check if we have a BackOff defined that MaxDeliver is within range etc.
if lbo := len(config.BackOff); lbo > 0 && config.MaxDeliver <= lbo {

View File

@@ -1318,5 +1318,15 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSConsumerReplicasShouldMatchStream",
"code": 400,
"error_code": 10134,
"description": "consumer config replicas must match interest retention stream's replicas",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]

View File

@@ -1381,6 +1381,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
// Check interest policy streams for auto cleanup.
for _, mset := range ipstreams {
mset.checkForOrphanMsgs()
mset.checkConsumerReplication()
}
s.Debugf("JetStream state for account %q recovered", a.Name)

View File

@@ -2462,3 +2462,31 @@ func TestJetStreamClusterDirectGetStreamUpgrade(t *testing.T) {
require_NoError(t, err)
require_True(t, string(entry.Value()) == "derek")
}
// For interest (or workqueue) based streams its important to match the replication factor.
// This was the case but now that more control over consumer creation is allowed its possible
// to create a consumer where the replication factor does not match. This could cause
// instability in the state between servers and cause problems on leader switches.
func TestJetStreamClusterInterestPolicyStreamForConsumersToMatchRFactor(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Retention: nats.InterestPolicy,
Replicas: 3,
})
require_NoError(t, err)
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "XX",
AckPolicy: nats.AckExplicitPolicy,
Replicas: 1,
})
require_Error(t, err, NewJSConsumerReplicasShouldMatchStreamError())
}

View File

@@ -167,6 +167,9 @@ const (
// JSConsumerReplicasExceedsStream consumer config replica count exceeds parent stream
JSConsumerReplicasExceedsStream ErrorIdentifier = 10126
// JSConsumerReplicasShouldMatchStream consumer config replicas must match interest retention stream's replicas
JSConsumerReplicasShouldMatchStream ErrorIdentifier = 10134
// JSConsumerSmallHeartbeatErr consumer idle heartbeat needs to be >= 100ms
JSConsumerSmallHeartbeatErr ErrorIdentifier = 10083
@@ -458,6 +461,7 @@ var (
JSConsumerPushMaxWaitingErr: {Code: 400, ErrCode: 10080, Description: "consumer in push mode can not set max waiting"},
JSConsumerReplacementWithDifferentNameErr: {Code: 400, ErrCode: 10106, Description: "consumer replacement durable config not the same"},
JSConsumerReplicasExceedsStream: {Code: 400, ErrCode: 10126, Description: "consumer config replica count exceeds parent stream"},
JSConsumerReplicasShouldMatchStream: {Code: 400, ErrCode: 10134, Description: "consumer config replicas must match interest retention stream's replicas"},
JSConsumerSmallHeartbeatErr: {Code: 400, ErrCode: 10083, Description: "consumer idle heartbeat needs to be >= 100ms"},
JSConsumerStoreFailedErrF: {Code: 500, ErrCode: 10104, Description: "error creating store for consumer: {err}"},
JSConsumerWQConsumerNotDeliverAllErr: {Code: 400, ErrCode: 10101, Description: "consumer must be deliver all on workqueue stream"},
@@ -1149,6 +1153,16 @@ func NewJSConsumerReplicasExceedsStreamError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSConsumerReplicasExceedsStream]
}
// NewJSConsumerReplicasShouldMatchStreamError creates a new JSConsumerReplicasShouldMatchStream error: "consumer config replicas must match interest retention stream's replicas"
func NewJSConsumerReplicasShouldMatchStreamError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}
return ApiErrors[JSConsumerReplicasShouldMatchStream]
}
// NewJSConsumerSmallHeartbeatError creates a new JSConsumerSmallHeartbeatErr error: "consumer idle heartbeat needs to be >= 100ms"
func NewJSConsumerSmallHeartbeatError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)

View File

@@ -5028,3 +5028,24 @@ func (mset *stream) checkForOrphanMsgs() {
}
}
}
// Check on startup to make sure that consumers replication matches us.
// Interest retention requires replication matches.
func (mset *stream) checkConsumerReplication() {
mset.mu.RLock()
defer mset.mu.RUnlock()
if mset.cfg.Retention != InterestPolicy {
return
}
s, acc := mset.srv, mset.acc
for _, o := range mset.consumers {
o.mu.RLock()
if mset.cfg.Replicas != o.cfg.Replicas {
s.Errorf("consumer '%s > %s > %s' MUST match replication (%d vs %d) of stream with interest policy",
acc, mset.cfg.Name, o.cfg.Name, mset.cfg.Replicas, o.cfg.Replicas)
}
o.mu.RUnlock()
}
}