From 359a4d980b1bc21240bdb95737acbb475dd5c1f1 Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Thu, 8 Jul 2021 18:14:03 +0200 Subject: [PATCH 1/2] remove duplicate error Signed-off-by: R.I.Pienaar --- server/consumer.go | 2 +- server/errors.json | 12 +----------- server/jetstream_errors_generated.go | 4 ---- 3 files changed, 2 insertions(+), 16 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index c24073fd..400bd5f0 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -443,7 +443,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri } if maxc > 0 && len(mset.consumers) >= maxc { mset.mu.Unlock() - return nil, ApiErrors[JSStreamMaximumConsumersReachedErr] + return nil, ApiErrors[JSMaximumConsumersLimitErr] } // Check on stream type conflicts with WorkQueues. diff --git a/server/errors.json b/server/errors.json index 70bfd10f..b5fcf7be 100644 --- a/server/errors.json +++ b/server/errors.json @@ -949,16 +949,6 @@ "url": "", "deprecates": "" }, - { - "constant": "JSStreamMaximumConsumersReachedErr", - "code": 400, - "error_code": 10097, - "description": "maximum consumers limit reached", - "comment": "", - "help": "", - "url": "", - "deprecates": "" - }, { "constant": "JSConsumerWQRequiresExplicitAckErr", "code": 400, @@ -1049,4 +1039,4 @@ "url": "", "deprecates": "" } -] \ No newline at end of file +] diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index 76e50400..27de5042 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -249,9 +249,6 @@ const ( // JSStreamLimitsErrF General stream limits exceeded error string ({err}) JSStreamLimitsErrF ErrorIdentifier = 10053 - // JSStreamMaximumConsumersReachedErr maximum consumers limit reached - JSStreamMaximumConsumersReachedErr ErrorIdentifier = 10097 - // JSStreamMessageExceedsMaximumErr message size exceeds maximum allowed JSStreamMessageExceedsMaximumErr ErrorIdentifier = 10054 @@ -403,7 +400,6 @@ var ( JSStreamInvalidErr: {Code: 500, ErrCode: 10096, Description: "stream not valid"}, JSStreamInvalidExternalDeliverySubjErrF: {Code: 400, ErrCode: 10024, Description: "stream external delivery prefix {prefix} must not contain wildcards"}, JSStreamLimitsErrF: {Code: 500, ErrCode: 10053, Description: "{err}"}, - JSStreamMaximumConsumersReachedErr: {Code: 400, ErrCode: 10097, Description: "maximum consumers limit reached"}, JSStreamMessageExceedsMaximumErr: {Code: 400, ErrCode: 10054, Description: "message size exceeds maximum allowed"}, JSStreamMirrorNotUpdatableErr: {Code: 400, ErrCode: 10055, Description: "Mirror configuration can not be updated"}, JSStreamMismatchErr: {Code: 400, ErrCode: 10056, Description: "stream name in subject does not match request"}, From 225df046621426d5419e67575cd7c3a21814dfd1 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 10 Jul 2021 11:43:21 -0700 Subject: [PATCH 2/2] Fix for a consumer's num pending being stuck at 1. We were trying to protect the sgap uint64 from wrapping, but in some cases the consumers is eager and can get a message before we sgap++. Instead of slowing things down and sycnhronizing ++ then --, we allow it to wrap temporarily and have and adjustedPending() func that will set to zero for reporting. Signed-off-by: Derek Collison --- server/consumer.go | 20 ++++++++-- server/jetstream_test.go | 83 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+), 4 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 400bd5f0..086e8eec 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1435,7 +1435,7 @@ func (o *consumer) info() *ConsumerInfo { }, NumAckPending: len(o.pending), NumRedelivered: len(o.rdc), - NumPending: o.sgap, + NumPending: o.adjustedPending(), Cluster: ci, } // If we are a pull mode consumer, report on number of waiting requests. @@ -2193,21 +2193,33 @@ func (o *consumer) setMaxPendingBytes(limit int) { } } +// We have the case where a consumer can become greedy and pick up a messages before the stream has incremented our pending(sgap). +// Instead of trying to slow things down and synchronize we will allow this to wrap and go negative (biggest uint64) for a short time. +// This functions checks for that and returns 0. +// Lock should be held. +func (o *consumer) adjustedPending() uint64 { + if o.sgap&(1<<63) != 0 { + return 0 + } + return o.sgap +} + // Deliver a msg to the consumer. // Lock should be held and o.mset validated to be non-nil. func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint64, ts int64) { if o.mset == nil { return } - // Update pending on first attempt - if dc == 1 && o.sgap > 0 { + // Update pending on first attempt. This can go upside down for a short bit, that is ok. + // See adjustedPending(). + if dc == 1 { o.sgap-- } dseq := o.dseq o.dseq++ - pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, dseq, dc, ts, o.sgap), hdr, msg, o, seq, nil} + pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, dseq, dc, ts, o.adjustedPending()), hdr, msg, o, seq, nil} if o.maxpb > 0 { o.pbytes += pmsg.size() } diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 97f31c6a..296ed4f8 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -12033,6 +12033,89 @@ func TestJetStreamServerEncryption(t *testing.T) { checkEncrypted() } +// User report of bug. +func TestJetStreamConsumerBadNumPending(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + + // Client for API requests. + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "ORDERS", + Subjects: []string{"orders.*"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + newOrders := func(n int) { + // Queue up new orders. + for i := 0; i < n; i++ { + js.Publish("orders.created", []byte("NEW")) + } + } + + newOrders(10) + + // Create to subscribers. + process := func(m *nats.Msg) { + js.Publish("orders.approved", []byte("APPROVED")) + } + + op, err := js.Subscribe("orders.created", process) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer op.Unsubscribe() + + mon, err := js.SubscribeSync("orders.*") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer mon.Unsubscribe() + + waitForMsgs := func(n uint64) { + t.Helper() + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + si, err := js.StreamInfo("ORDERS") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si.State.Msgs != n { + return fmt.Errorf("Expected %d msgs, got state: %+v", n, si.State) + } + return nil + }) + } + + checkForNoPending := func(sub *nats.Subscription) { + t.Helper() + if ci, err := sub.ConsumerInfo(); err != nil || ci == nil || ci.NumPending != 0 { + if ci != nil && ci.NumPending != 0 { + t.Fatalf("Bad consumer NumPending, expected 0 but got %d", ci.NumPending) + } else { + t.Fatalf("Bad consumer info: %+v", ci) + } + } + } + + waitForMsgs(20) + checkForNoPending(op) + checkForNoPending(mon) + + newOrders(10) + + waitForMsgs(40) + checkForNoPending(op) + checkForNoPending(mon) +} + /////////////////////////////////////////////////////////////////////////// // Simple JetStream Benchmarks ///////////////////////////////////////////////////////////////////////////