Merge pull request #2357 from nats-io/num-pending-bug

[FIXED] Consumer NumPending bug
This commit is contained in:
Derek Collison
2021-07-11 10:09:46 -07:00
committed by GitHub
2 changed files with 99 additions and 4 deletions

View File

@@ -1435,7 +1435,7 @@ func (o *consumer) info() *ConsumerInfo {
},
NumAckPending: len(o.pending),
NumRedelivered: len(o.rdc),
NumPending: o.sgap,
NumPending: o.adjustedPending(),
Cluster: ci,
}
// If we are a pull mode consumer, report on number of waiting requests.
@@ -2193,21 +2193,33 @@ func (o *consumer) setMaxPendingBytes(limit int) {
}
}
// We have the case where a consumer can become greedy and pick up a messages before the stream has incremented our pending(sgap).
// Instead of trying to slow things down and synchronize we will allow this to wrap and go negative (biggest uint64) for a short time.
// This functions checks for that and returns 0.
// Lock should be held.
func (o *consumer) adjustedPending() uint64 {
if o.sgap&(1<<63) != 0 {
return 0
}
return o.sgap
}
// Deliver a msg to the consumer.
// Lock should be held and o.mset validated to be non-nil.
func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint64, ts int64) {
if o.mset == nil {
return
}
// Update pending on first attempt
if dc == 1 && o.sgap > 0 {
// Update pending on first attempt. This can go upside down for a short bit, that is ok.
// See adjustedPending().
if dc == 1 {
o.sgap--
}
dseq := o.dseq
o.dseq++
pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, dseq, dc, ts, o.sgap), hdr, msg, o, seq, nil}
pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, dseq, dc, ts, o.adjustedPending()), hdr, msg, o, seq, nil}
if o.maxpb > 0 {
o.pbytes += pmsg.size()
}

View File

@@ -12033,6 +12033,89 @@ func TestJetStreamServerEncryption(t *testing.T) {
checkEncrypted()
}
// User report of bug.
func TestJetStreamConsumerBadNumPending(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
// Client for API requests.
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "ORDERS",
Subjects: []string{"orders.*"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
newOrders := func(n int) {
// Queue up new orders.
for i := 0; i < n; i++ {
js.Publish("orders.created", []byte("NEW"))
}
}
newOrders(10)
// Create to subscribers.
process := func(m *nats.Msg) {
js.Publish("orders.approved", []byte("APPROVED"))
}
op, err := js.Subscribe("orders.created", process)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer op.Unsubscribe()
mon, err := js.SubscribeSync("orders.*")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer mon.Unsubscribe()
waitForMsgs := func(n uint64) {
t.Helper()
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("ORDERS")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != n {
return fmt.Errorf("Expected %d msgs, got state: %+v", n, si.State)
}
return nil
})
}
checkForNoPending := func(sub *nats.Subscription) {
t.Helper()
if ci, err := sub.ConsumerInfo(); err != nil || ci == nil || ci.NumPending != 0 {
if ci != nil && ci.NumPending != 0 {
t.Fatalf("Bad consumer NumPending, expected 0 but got %d", ci.NumPending)
} else {
t.Fatalf("Bad consumer info: %+v", ci)
}
}
}
waitForMsgs(20)
checkForNoPending(op)
checkForNoPending(mon)
newOrders(10)
waitForMsgs(40)
checkForNoPending(op)
checkForNoPending(mon)
}
///////////////////////////////////////////////////////////////////////////
// Simple JetStream Benchmarks
///////////////////////////////////////////////////////////////////////////