diff --git a/server/consumer.go b/server/consumer.go index 400bd5f0..086e8eec 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1435,7 +1435,7 @@ func (o *consumer) info() *ConsumerInfo { }, NumAckPending: len(o.pending), NumRedelivered: len(o.rdc), - NumPending: o.sgap, + NumPending: o.adjustedPending(), Cluster: ci, } // If we are a pull mode consumer, report on number of waiting requests. @@ -2193,21 +2193,33 @@ func (o *consumer) setMaxPendingBytes(limit int) { } } +// We have the case where a consumer can become greedy and pick up a messages before the stream has incremented our pending(sgap). +// Instead of trying to slow things down and synchronize we will allow this to wrap and go negative (biggest uint64) for a short time. +// This functions checks for that and returns 0. +// Lock should be held. +func (o *consumer) adjustedPending() uint64 { + if o.sgap&(1<<63) != 0 { + return 0 + } + return o.sgap +} + // Deliver a msg to the consumer. // Lock should be held and o.mset validated to be non-nil. func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint64, ts int64) { if o.mset == nil { return } - // Update pending on first attempt - if dc == 1 && o.sgap > 0 { + // Update pending on first attempt. This can go upside down for a short bit, that is ok. + // See adjustedPending(). + if dc == 1 { o.sgap-- } dseq := o.dseq o.dseq++ - pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, dseq, dc, ts, o.sgap), hdr, msg, o, seq, nil} + pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, dseq, dc, ts, o.adjustedPending()), hdr, msg, o, seq, nil} if o.maxpb > 0 { o.pbytes += pmsg.size() } diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 97f31c6a..296ed4f8 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -12033,6 +12033,89 @@ func TestJetStreamServerEncryption(t *testing.T) { checkEncrypted() } +// User report of bug. +func TestJetStreamConsumerBadNumPending(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + + // Client for API requests. + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "ORDERS", + Subjects: []string{"orders.*"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + newOrders := func(n int) { + // Queue up new orders. + for i := 0; i < n; i++ { + js.Publish("orders.created", []byte("NEW")) + } + } + + newOrders(10) + + // Create to subscribers. + process := func(m *nats.Msg) { + js.Publish("orders.approved", []byte("APPROVED")) + } + + op, err := js.Subscribe("orders.created", process) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer op.Unsubscribe() + + mon, err := js.SubscribeSync("orders.*") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer mon.Unsubscribe() + + waitForMsgs := func(n uint64) { + t.Helper() + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + si, err := js.StreamInfo("ORDERS") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si.State.Msgs != n { + return fmt.Errorf("Expected %d msgs, got state: %+v", n, si.State) + } + return nil + }) + } + + checkForNoPending := func(sub *nats.Subscription) { + t.Helper() + if ci, err := sub.ConsumerInfo(); err != nil || ci == nil || ci.NumPending != 0 { + if ci != nil && ci.NumPending != 0 { + t.Fatalf("Bad consumer NumPending, expected 0 but got %d", ci.NumPending) + } else { + t.Fatalf("Bad consumer info: %+v", ci) + } + } + } + + waitForMsgs(20) + checkForNoPending(op) + checkForNoPending(mon) + + newOrders(10) + + waitForMsgs(40) + checkForNoPending(op) + checkForNoPending(mon) +} + /////////////////////////////////////////////////////////////////////////// // Simple JetStream Benchmarks ///////////////////////////////////////////////////////////////////////////