mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Merge pull request #1714 from nats-io/maxp
Make sure to honor MaxAckPending when streaming directly to consumers
This commit is contained in:
@@ -1633,6 +1633,13 @@ func (o *Consumer) deliverCurrentMsg(subj string, hdr, msg []byte, seq uint64, t
|
||||
return false
|
||||
}
|
||||
|
||||
// Since we short circuit the getNextMsg() call where we check for max pending
|
||||
// we need to do that here as well.
|
||||
if o.maxp > 0 && len(o.pending) >= o.maxp {
|
||||
o.mu.Unlock()
|
||||
return false
|
||||
}
|
||||
|
||||
// Bump store sequence here.
|
||||
o.sseq++
|
||||
|
||||
|
||||
@@ -9447,6 +9447,37 @@ func TestJetStreamConsumerMaxAckPending(t *testing.T) {
|
||||
m.Respond(nil)
|
||||
}
|
||||
checkSubPending(maxAckPending)
|
||||
|
||||
o.Stop()
|
||||
mset.Purge()
|
||||
|
||||
// Now test a consumer that is live while we publish messages to the stream.
|
||||
o, err = mset.AddConsumer(&server.ConsumerConfig{
|
||||
Durable: "d22",
|
||||
DeliverSubject: nats.NewInbox(),
|
||||
AckPolicy: server.AckExplicit,
|
||||
MaxAckPending: maxAckPending,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error, got %v", err)
|
||||
}
|
||||
defer o.Delete()
|
||||
|
||||
sub, _ = nc.SubscribeSync(o.Info().Config.DeliverSubject)
|
||||
defer sub.Unsubscribe()
|
||||
nc.Flush()
|
||||
|
||||
checkSubPending(0)
|
||||
|
||||
// Now stream more then maxAckPending.
|
||||
for i := 0; i < toSend; i++ {
|
||||
sendStreamMsg(t, nc, "foo.baz", fmt.Sprintf("MSG: %d", i+1))
|
||||
}
|
||||
checkSubPending(maxAckPending)
|
||||
// We hit the limit, double check we stayed there.
|
||||
if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != maxAckPending {
|
||||
t.Fatalf("Too many messages received: %d vs %d", nmsgs, maxAckPending)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user