Fix for #2083 to release ack pending when messages expire or hit max redeliveries.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-04-08 11:47:59 -07:00
parent 36e18c20ff
commit deb015ec73
2 changed files with 125 additions and 1 deletions

View File

@@ -2916,10 +2916,22 @@ func (o *consumer) setInitialPending() {
func (o *consumer) decStreamPending(sseq uint64, subj string) {
o.mu.Lock()
// Ignore if we have already seen this one.
if sseq >= o.sseq && o.sgap > 0 && o.isFilteredMatch(subj) && o.sgap > 0 {
if sseq >= o.sseq && o.sgap > 0 && o.isFilteredMatch(subj) {
o.sgap--
}
// Check if this message was pending.
p, wasPending := o.pending[sseq]
var rdc uint64 = 1
if o.rdc != nil {
rdc = o.rdc[sseq]
}
o.mu.Unlock()
// If it was pending process it like an ack.
// TODO(dlc) - we could do a term here instead with a reason to generate the advisory.
if wasPending {
o.processAckMsg(sseq, p.Sequence, rdc, false)
}
}
func (o *consumer) account() *Account {

View File

@@ -5303,6 +5303,118 @@ func TestJetStreamClusterAccountLoadFailure(t *testing.T) {
}
}
func TestJetStreamClusterAckPendingWithExpired(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3", 3)
defer c.shutdown()
// Client based API
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
Replicas: 3,
MaxAge: 200 * time.Millisecond,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Send in 100 messages.
msg, toSend := make([]byte, 256), 100
rand.Read(msg)
for i := 0; i < toSend; i++ {
if _, err = js.Publish("foo", msg); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
sub, err := js.SubscribeSync("foo")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkSubsPending(t, sub, toSend)
ci, err := sub.ConsumerInfo()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if ci.NumAckPending != toSend {
t.Fatalf("Expected %d to be pending, got %d", toSend, ci.NumAckPending)
}
// Wait for messages to expire.
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != 0 {
return fmt.Errorf("Expected 0 msgs, got state: %+v", si.State)
}
return nil
})
// Once expired these messages can not be redelivered so should not be considered ack pending at this point.
// Now ack..
ci, err = sub.ConsumerInfo()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if ci.NumAckPending != 0 {
t.Fatalf("Expected nothing to be ack pending, got %d", ci.NumAckPending)
}
}
func TestJetStreamClusterAckPendingWithMaxRedelivered(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3", 3)
defer c.shutdown()
// Client based API
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
Replicas: 3,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Send in 100 messages.
msg, toSend := make([]byte, 32), 100
rand.Read(msg)
for i := 0; i < toSend; i++ {
if _, err = js.Publish("foo", msg); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
sub, err := js.SubscribeSync("foo",
nats.MaxDeliver(2),
nats.Durable("dlc"),
nats.AckWait(10*time.Millisecond),
nats.MaxAckPending(10),
)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkSubsPending(t, sub, toSend*2)
ci, err := sub.ConsumerInfo()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if ci.NumAckPending != 0 {
t.Fatalf("Expected nothing to be ack pending, got %d", ci.NumAckPending)
}
}
// Support functions
// Used to setup superclusters for tests.