Merge pull request #3137 from nats-io/consumer_num_pending

Consumer's num pending can now rely on the stream's store
This commit is contained in:
Derek Collison
2022-05-21 11:43:44 -07:00
committed by GitHub
3 changed files with 127 additions and 40 deletions

View File

@@ -223,8 +223,8 @@ type consumer struct {
dseq uint64
adflr uint64
asflr uint64
sgap uint64
lsgap uint64
npc uint64
npsm uint64
dsubj string
qgroup string
lss *lastSeqSkipList
@@ -2068,7 +2068,7 @@ func (o *consumer) infoWithSnap(snap bool) *ConsumerInfo {
},
NumAckPending: len(o.pending),
NumRedelivered: len(o.rdc),
NumPending: o.adjustedPending(),
NumPending: o.streamNumPending(),
PushBound: o.isPushMode() && o.active,
Cluster: ci,
}
@@ -2226,7 +2226,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) {
}
}
// If we had max ack pending set and were at limit we need to unblock folks.
// If we had max ack pending set and were at limit we need to unblock ourselves.
if needSignal {
o.signalNewMessages()
}
@@ -2631,7 +2631,7 @@ func (o *consumer) processNextMsgRequest(reply string, msg []byte) {
// If the request is for noWait and we have pending requests already, check if we have room.
if noWait {
msgsPending := o.adjustedPending() + uint64(len(o.rdq))
msgsPending := o.numPending() + uint64(len(o.rdq))
// If no pending at all, decide what to do with request.
// If no expires was set then fail.
if msgsPending == 0 && expires.IsZero() {
@@ -3133,15 +3133,21 @@ func (o *consumer) setMaxPendingBytes(limit int) {
}
}
// We have the case where a consumer can become greedy and pick up a messages before the stream has incremented our pending(sgap).
// Instead of trying to slow things down and synchronize we will allow this to wrap and go negative (biggest uint64) for a short time.
// This functions checks for that and returns 0.
// Lock should be held.
func (o *consumer) adjustedPending() uint64 {
if o.sgap&(1<<63) != 0 {
return 0
func (o *consumer) numPending() uint64 {
if o.npsm == 0 {
o.streamNumPending()
}
return o.sgap
return o.npc
}
// Will force a set from the stream store of num pending.
// Lock should be held.
func (o *consumer) streamNumPending() uint64 {
ss := o.mset.store.FilteredState(o.sseq, o.cfg.FilterSubject)
o.npc = ss.Msgs
o.npsm = ss.Last
return o.npc
}
// Deliver a msg to the consumer.
@@ -3152,10 +3158,9 @@ func (o *consumer) deliverMsg(dsubj string, pmsg *jsPubMsg, dc uint64, rp Retent
return
}
// Update pending on first attempt. This can go upside down for a short bit, that is ok.
// See adjustedPending().
if dc == 1 {
o.sgap--
// Update our cached num pending.
if dc == 1 && o.npsm > 0 {
o.npc--
}
dseq := o.dseq
@@ -3187,7 +3192,7 @@ func (o *consumer) deliverMsg(dsubj string, pmsg *jsPubMsg, dc uint64, rp Retent
pmsg.msg = nil
}
pmsg.dsubj, pmsg.reply, pmsg.o = dsubj, o.ackReply(pmsg.seq, dseq, dc, pmsg.ts, o.adjustedPending()), o
pmsg.dsubj, pmsg.reply, pmsg.o = dsubj, o.ackReply(pmsg.seq, dseq, dc, pmsg.ts, o.numPending()), o
psz := pmsg.size()
if o.maxpb > 0 {
@@ -3640,6 +3645,7 @@ func (o *consumer) selectStartingSeqNo() {
// TODO(dlc) - Once clustered can't rely on this.
o.sseq = o.mset.store.GetSeqFromTime(*o.cfg.OptStartTime)
} else {
// DeliverNew
o.sseq = state.LastSeq + 1
}
} else {
@@ -3748,7 +3754,6 @@ func (o *consumer) purge(sseq uint64, slseq uint64) {
o.adflr = o.dseq - 1
}
}
o.sgap = 0
o.pending = nil
// We need to remove all those being queued for redelivery under o.rdq
@@ -4015,25 +4020,13 @@ func (o *consumer) setInitialPendingAndStart(hadState bool) {
}
}
if !filtered && (dp != DeliverLastPerSubject && dp != DeliverNew) {
var state StreamState
mset.store.FastState(&state)
if state.Msgs > 0 {
o.sgap = state.Msgs - (o.sseq - state.FirstSeq)
o.lsgap = state.LastSeq
}
} else {
if filtered || dp == DeliverLastPerSubject || dp == DeliverNew {
// Here we are filtered.
if dp == DeliverLastPerSubject && o.hasSkipListPending() && o.sseq < o.lss.resume {
ss := mset.store.FilteredState(o.lss.resume+1, o.cfg.FilterSubject)
if !hadState {
o.sseq = o.lss.seqs[0]
}
o.sgap = ss.Msgs + uint64(len(o.lss.seqs))
o.lsgap = ss.Last
} else if ss := mset.store.FilteredState(o.sseq, o.cfg.FilterSubject); ss.Msgs > 0 {
o.sgap = ss.Msgs
o.lsgap = ss.Last
// See if we should update our starting sequence.
if dp == DeliverLast || dp == DeliverLastPerSubject {
if !hadState {
@@ -4043,7 +4036,6 @@ func (o *consumer) setInitialPendingAndStart(hadState bool) {
// If our original is larger we will ignore, we don't want to go backwards with DeliverNew.
// If its greater, we need to adjust pending.
if ss.Last >= o.sseq {
o.sgap -= (ss.Last - o.sseq + 1)
if !hadState {
o.sseq = ss.Last + 1
}
@@ -4072,9 +4064,9 @@ func (o *consumer) setInitialPendingAndStart(hadState bool) {
func (o *consumer) decStreamPending(sseq uint64, subj string) {
o.mu.Lock()
// Ignore if we have already seen this one.
if sseq >= o.sseq && o.sgap > 0 && o.isFilteredMatch(subj) {
o.sgap--
// Update our cached num pending. Only do so if we think deliverMsg has not done so.
if sseq > o.npsm && sseq >= o.sseq && o.isFilteredMatch(subj) {
o.npc--
}
// Check if this message was pending.
p, wasPending := o.pending[sseq]

View File

@@ -5190,6 +5190,101 @@ func TestNoRaceJetStreamPullConsumersAndInteriorDeletes(t *testing.T) {
}
}
func TestNoRaceJetStreamClusterInterestPullConsumerStreamLimitBug(t *testing.T) {
c := createJetStreamClusterExplicit(t, "JSC", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Retention: nats.InterestPolicy,
MaxMsgs: 2000,
Replicas: 3,
})
require_NoError(t, err)
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "dur", AckPolicy: nats.AckExplicitPolicy})
require_NoError(t, err)
qch := make(chan bool)
var wg sync.WaitGroup
// Publisher
wg.Add(1)
go func() {
defer wg.Done()
for {
pt := time.NewTimer(time.Duration(rand.Intn(2)) * time.Millisecond)
select {
case <-pt.C:
_, err := js.Publish("foo", []byte("BUG!"))
if err != nil {
t.Logf("Got a publisher error: %v", err)
return
}
case <-qch:
return
}
}
}()
time.Sleep(time.Second)
// Pull Consumers
wg.Add(100)
for i := 0; i < 100; i++ {
go func() {
defer wg.Done()
_, js := jsClientConnect(t, c.randomServer())
sub, err := js.PullSubscribe("foo", "dur")
require_NoError(t, err)
for {
pt := time.NewTimer(time.Duration(rand.Intn(300)) * time.Millisecond)
select {
case <-pt.C:
msgs, err := sub.Fetch(1)
if err != nil {
t.Logf("Got a Fetch error: %v", err)
return
}
if len(msgs) > 0 {
go func() {
ackDelay := time.Duration(rand.Intn(375)+15) * time.Millisecond
m := msgs[0]
time.AfterFunc(ackDelay, func() { m.AckSync() })
}()
}
case <-qch:
return
}
}
}()
}
time.Sleep(5 * time.Second)
close(qch)
wg.Wait()
time.Sleep(time.Second)
si, err := js.StreamInfo("TEST")
require_NoError(t, err)
ci, err := js.ConsumerInfo("TEST", "dur")
require_NoError(t, err)
ld := ci.Delivered.Stream
if si.State.FirstSeq > ld {
ld = si.State.FirstSeq - 1
}
if si.State.LastSeq-ld != ci.NumPending {
t.Fatalf("Expected NumPending to be %d got %d", si.State.LastSeq-ld, ci.NumPending)
}
}
// Net Proxy - For introducing RTT and BW constraints.
type netProxy struct {
listener net.Listener

View File

@@ -3570,18 +3570,18 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}
if err == nil && seq > 0 && numConsumers > 0 {
mset.mu.Lock()
mset.mu.RLock()
for _, o := range mset.consumers {
o.mu.Lock()
if o.isLeader() {
if seq > o.lsgap && o.isFilteredMatch(subject) {
o.sgap++
if o.isLeader() && o.isFilteredMatch(subject) {
if seq > o.npsm {
o.npc++
}
o.signalNewMessages()
}
o.mu.Unlock()
}
mset.mu.Unlock()
mset.mu.RUnlock()
}
return err