diff --git a/server/consumer.go b/server/consumer.go index 8c02b3f1..a6ff6043 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2916,10 +2916,22 @@ func (o *consumer) setInitialPending() { func (o *consumer) decStreamPending(sseq uint64, subj string) { o.mu.Lock() // Ignore if we have already seen this one. - if sseq >= o.sseq && o.sgap > 0 && o.isFilteredMatch(subj) && o.sgap > 0 { + if sseq >= o.sseq && o.sgap > 0 && o.isFilteredMatch(subj) { o.sgap-- } + // Check if this message was pending. + p, wasPending := o.pending[sseq] + var rdc uint64 = 1 + if o.rdc != nil { + rdc = o.rdc[sseq] + } o.mu.Unlock() + + // If it was pending process it like an ack. + // TODO(dlc) - we could do a term here instead with a reason to generate the advisory. + if wasPending { + o.processAckMsg(sseq, p.Sequence, rdc, false) + } } func (o *consumer) account() *Account { diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 9d914591..4ddf58bb 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -5303,6 +5303,118 @@ func TestJetStreamClusterAccountLoadFailure(t *testing.T) { } } +func TestJetStreamClusterAckPendingWithExpired(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3", 3) + defer c.shutdown() + + // Client based API + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo", "bar"}, + Replicas: 3, + MaxAge: 200 * time.Millisecond, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Send in 100 messages. + msg, toSend := make([]byte, 256), 100 + rand.Read(msg) + + for i := 0; i < toSend; i++ { + if _, err = js.Publish("foo", msg); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + } + + sub, err := js.SubscribeSync("foo") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + checkSubsPending(t, sub, toSend) + ci, err := sub.ConsumerInfo() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if ci.NumAckPending != toSend { + t.Fatalf("Expected %d to be pending, got %d", toSend, ci.NumAckPending) + } + + // Wait for messages to expire. + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + si, err := js.StreamInfo("TEST") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si.State.Msgs != 0 { + return fmt.Errorf("Expected 0 msgs, got state: %+v", si.State) + } + return nil + }) + + // Once expired these messages can not be redelivered so should not be considered ack pending at this point. + // Now ack.. + ci, err = sub.ConsumerInfo() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if ci.NumAckPending != 0 { + t.Fatalf("Expected nothing to be ack pending, got %d", ci.NumAckPending) + } +} + +func TestJetStreamClusterAckPendingWithMaxRedelivered(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3", 3) + defer c.shutdown() + + // Client based API + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo", "bar"}, + Replicas: 3, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Send in 100 messages. + msg, toSend := make([]byte, 32), 100 + rand.Read(msg) + + for i := 0; i < toSend; i++ { + if _, err = js.Publish("foo", msg); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + } + + sub, err := js.SubscribeSync("foo", + nats.MaxDeliver(2), + nats.Durable("dlc"), + nats.AckWait(10*time.Millisecond), + nats.MaxAckPending(10), + ) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + checkSubsPending(t, sub, toSend*2) + ci, err := sub.ConsumerInfo() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if ci.NumAckPending != 0 { + t.Fatalf("Expected nothing to be ack pending, got %d", ci.NumAckPending) + } +} + // Support functions // Used to setup superclusters for tests.