From 790d6434310bc21121db1970108e9197c8a97f90 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 19 May 2022 14:34:06 -0700 Subject: [PATCH] Consumer's num pending can now rely on the stream's store vs trying to maintain furing runtime which could be wrong under certain conditions. Signed-off-by: Derek Collison --- server/consumer.go | 62 ++++++++++++---------------- server/norace_test.go | 95 +++++++++++++++++++++++++++++++++++++++++++ server/stream.go | 10 ++--- 3 files changed, 127 insertions(+), 40 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 84882bea..0b51b67f 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -223,8 +223,8 @@ type consumer struct { dseq uint64 adflr uint64 asflr uint64 - sgap uint64 - lsgap uint64 + npc uint64 + npsm uint64 dsubj string qgroup string lss *lastSeqSkipList @@ -2068,7 +2068,7 @@ func (o *consumer) infoWithSnap(snap bool) *ConsumerInfo { }, NumAckPending: len(o.pending), NumRedelivered: len(o.rdc), - NumPending: o.adjustedPending(), + NumPending: o.streamNumPending(), PushBound: o.isPushMode() && o.active, Cluster: ci, } @@ -2226,7 +2226,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) { } } - // If we had max ack pending set and were at limit we need to unblock folks. + // If we had max ack pending set and were at limit we need to unblock ourselves. if needSignal { o.signalNewMessages() } @@ -2631,7 +2631,7 @@ func (o *consumer) processNextMsgRequest(reply string, msg []byte) { // If the request is for noWait and we have pending requests already, check if we have room. if noWait { - msgsPending := o.adjustedPending() + uint64(len(o.rdq)) + msgsPending := o.numPending() + uint64(len(o.rdq)) // If no pending at all, decide what to do with request. // If no expires was set then fail. if msgsPending == 0 && expires.IsZero() { @@ -3133,15 +3133,21 @@ func (o *consumer) setMaxPendingBytes(limit int) { } } -// We have the case where a consumer can become greedy and pick up a messages before the stream has incremented our pending(sgap). -// Instead of trying to slow things down and synchronize we will allow this to wrap and go negative (biggest uint64) for a short time. -// This functions checks for that and returns 0. // Lock should be held. -func (o *consumer) adjustedPending() uint64 { - if o.sgap&(1<<63) != 0 { - return 0 +func (o *consumer) numPending() uint64 { + if o.npsm == 0 { + o.streamNumPending() } - return o.sgap + return o.npc +} + +// Will force a set from the stream store of num pending. +// Lock should be held. +func (o *consumer) streamNumPending() uint64 { + ss := o.mset.store.FilteredState(o.sseq, o.cfg.FilterSubject) + o.npc = ss.Msgs + o.npsm = ss.Last + return o.npc } // Deliver a msg to the consumer. @@ -3152,10 +3158,9 @@ func (o *consumer) deliverMsg(dsubj string, pmsg *jsPubMsg, dc uint64, rp Retent return } - // Update pending on first attempt. This can go upside down for a short bit, that is ok. - // See adjustedPending(). - if dc == 1 { - o.sgap-- + // Update our cached num pending. + if dc == 1 && o.npsm > 0 { + o.npc-- } dseq := o.dseq @@ -3187,7 +3192,7 @@ func (o *consumer) deliverMsg(dsubj string, pmsg *jsPubMsg, dc uint64, rp Retent pmsg.msg = nil } - pmsg.dsubj, pmsg.reply, pmsg.o = dsubj, o.ackReply(pmsg.seq, dseq, dc, pmsg.ts, o.adjustedPending()), o + pmsg.dsubj, pmsg.reply, pmsg.o = dsubj, o.ackReply(pmsg.seq, dseq, dc, pmsg.ts, o.numPending()), o psz := pmsg.size() if o.maxpb > 0 { @@ -3640,6 +3645,7 @@ func (o *consumer) selectStartingSeqNo() { // TODO(dlc) - Once clustered can't rely on this. o.sseq = o.mset.store.GetSeqFromTime(*o.cfg.OptStartTime) } else { + // DeliverNew o.sseq = state.LastSeq + 1 } } else { @@ -3748,7 +3754,6 @@ func (o *consumer) purge(sseq uint64, slseq uint64) { o.adflr = o.dseq - 1 } } - o.sgap = 0 o.pending = nil // We need to remove all those being queued for redelivery under o.rdq @@ -4015,25 +4020,13 @@ func (o *consumer) setInitialPendingAndStart(hadState bool) { } } - if !filtered && (dp != DeliverLastPerSubject && dp != DeliverNew) { - var state StreamState - mset.store.FastState(&state) - if state.Msgs > 0 { - o.sgap = state.Msgs - (o.sseq - state.FirstSeq) - o.lsgap = state.LastSeq - } - } else { + if filtered || dp == DeliverLastPerSubject || dp == DeliverNew { // Here we are filtered. if dp == DeliverLastPerSubject && o.hasSkipListPending() && o.sseq < o.lss.resume { - ss := mset.store.FilteredState(o.lss.resume+1, o.cfg.FilterSubject) if !hadState { o.sseq = o.lss.seqs[0] } - o.sgap = ss.Msgs + uint64(len(o.lss.seqs)) - o.lsgap = ss.Last } else if ss := mset.store.FilteredState(o.sseq, o.cfg.FilterSubject); ss.Msgs > 0 { - o.sgap = ss.Msgs - o.lsgap = ss.Last // See if we should update our starting sequence. if dp == DeliverLast || dp == DeliverLastPerSubject { if !hadState { @@ -4043,7 +4036,6 @@ func (o *consumer) setInitialPendingAndStart(hadState bool) { // If our original is larger we will ignore, we don't want to go backwards with DeliverNew. // If its greater, we need to adjust pending. if ss.Last >= o.sseq { - o.sgap -= (ss.Last - o.sseq + 1) if !hadState { o.sseq = ss.Last + 1 } @@ -4072,9 +4064,9 @@ func (o *consumer) setInitialPendingAndStart(hadState bool) { func (o *consumer) decStreamPending(sseq uint64, subj string) { o.mu.Lock() - // Ignore if we have already seen this one. - if sseq >= o.sseq && o.sgap > 0 && o.isFilteredMatch(subj) { - o.sgap-- + // Update our cached num pending. Only do so if we think deliverMsg has not done so. + if sseq > o.npsm && sseq >= o.sseq && o.isFilteredMatch(subj) { + o.npc-- } // Check if this message was pending. p, wasPending := o.pending[sseq] diff --git a/server/norace_test.go b/server/norace_test.go index 2eb2d421..1f393c0c 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -5190,6 +5190,101 @@ func TestNoRaceJetStreamPullConsumersAndInteriorDeletes(t *testing.T) { } } +func TestNoRaceJetStreamClusterInterestPullConsumerStreamLimitBug(t *testing.T) { + c := createJetStreamClusterExplicit(t, "JSC", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Retention: nats.InterestPolicy, + MaxMsgs: 2000, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "dur", AckPolicy: nats.AckExplicitPolicy}) + require_NoError(t, err) + + qch := make(chan bool) + var wg sync.WaitGroup + + // Publisher + wg.Add(1) + go func() { + defer wg.Done() + for { + pt := time.NewTimer(time.Duration(rand.Intn(2)) * time.Millisecond) + select { + case <-pt.C: + _, err := js.Publish("foo", []byte("BUG!")) + if err != nil { + t.Logf("Got a publisher error: %v", err) + return + } + case <-qch: + return + } + } + }() + + time.Sleep(time.Second) + + // Pull Consumers + wg.Add(100) + for i := 0; i < 100; i++ { + go func() { + defer wg.Done() + _, js := jsClientConnect(t, c.randomServer()) + sub, err := js.PullSubscribe("foo", "dur") + require_NoError(t, err) + + for { + pt := time.NewTimer(time.Duration(rand.Intn(300)) * time.Millisecond) + select { + case <-pt.C: + msgs, err := sub.Fetch(1) + if err != nil { + t.Logf("Got a Fetch error: %v", err) + return + } + if len(msgs) > 0 { + go func() { + ackDelay := time.Duration(rand.Intn(375)+15) * time.Millisecond + m := msgs[0] + time.AfterFunc(ackDelay, func() { m.AckSync() }) + }() + } + case <-qch: + return + } + } + }() + } + + time.Sleep(5 * time.Second) + close(qch) + wg.Wait() + time.Sleep(time.Second) + + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + + ci, err := js.ConsumerInfo("TEST", "dur") + require_NoError(t, err) + + ld := ci.Delivered.Stream + if si.State.FirstSeq > ld { + ld = si.State.FirstSeq - 1 + } + if si.State.LastSeq-ld != ci.NumPending { + t.Fatalf("Expected NumPending to be %d got %d", si.State.LastSeq-ld, ci.NumPending) + } +} + // Net Proxy - For introducing RTT and BW constraints. type netProxy struct { listener net.Listener diff --git a/server/stream.go b/server/stream.go index 7efdb58b..a90b7931 100644 --- a/server/stream.go +++ b/server/stream.go @@ -3570,18 +3570,18 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } if err == nil && seq > 0 && numConsumers > 0 { - mset.mu.Lock() + mset.mu.RLock() for _, o := range mset.consumers { o.mu.Lock() - if o.isLeader() { - if seq > o.lsgap && o.isFilteredMatch(subject) { - o.sgap++ + if o.isLeader() && o.isFilteredMatch(subject) { + if seq > o.npsm { + o.npc++ } o.signalNewMessages() } o.mu.Unlock() } - mset.mu.Unlock() + mset.mu.RUnlock() } return err