diff --git a/server/consumer.go b/server/consumer.go index a4a087fb..2dde289d 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -664,28 +664,13 @@ func (o *Consumer) updateDeliverSubject(newDeliver string) { o.mu.Lock() defer o.mu.Unlock() - mset := o.mset - if mset == nil || o.isPullMode() { + if o.closed || o.isPullMode() { return } - oldDeliver := o.config.DeliverSubject - o.dsubj = newDeliver - o.config.DeliverSubject = newDeliver - // FIXME(dlc) - check partitions, we may need offset. - o.dseq = o.adflr - o.sseq = o.asflr - - // If we never received an ack, set to 1. - if o.dseq == 0 { - o.dseq = 1 - } - if o.sseq == 0 { - o.sseq = 1 - } - + o.acc.sl.ClearNotification(o.dsubj, o.inch) + o.dsubj, o.config.DeliverSubject = newDeliver, newDeliver // When we register new one it will deliver to update state loop. - o.acc.sl.ClearNotification(oldDeliver, o.inch) o.acc.sl.RegisterNotification(newDeliver, o.inch) } @@ -977,14 +962,16 @@ func (o *Consumer) processAckMsg(sseq, dseq, dcount uint64, doSample bool) { o.sampleAck(sseq, dseq, dcount) } delete(o.pending, sseq) + // Consumers sequence numbers can skip during redlivery since + // they always increment. So if we do not have any pending treat + // as all scenario below. Otherwise check that we filled in a gap. + if len(o.pending) == 0 { + o.adflr, o.asflr = o.dseq-1, o.sseq-1 + } else if dseq == o.adflr+1 { + o.adflr, o.asflr = dseq, sseq + } } - // Consumers sequence numbers can skip during redlivery since - // they always increment. So if we do not have any pending treat - // as all scenario below. Otherwise check that we filled in a gap. - // TODO(dlc) - check this. - if len(o.pending) == 0 || dseq == o.adflr+1 { - o.adflr, o.asflr = dseq, sseq - } + // We do these regardless. delete(o.rdc, sseq) o.removeFromRedeliverQueue(sseq) case AckAll: @@ -1024,19 +1011,24 @@ func (o *Consumer) processAckMsg(sseq, dseq, dcount uint64, doSample bool) { } // Check if we need an ack for this store seq. +// This is called for interest based retention streams to remove messages. func (o *Consumer) needAck(sseq uint64) bool { - var na bool + var needAck bool o.mu.Lock() switch o.config.AckPolicy { case AckNone, AckAll: - na = sseq > o.asflr + needAck = sseq > o.asflr case AckExplicit: - if sseq > o.asflr && len(o.pending) > 0 { - _, na = o.pending[sseq] + if sseq > o.asflr { + // Generally this means we need an ack, but just double check pending acks. + needAck = true + if len(o.pending) > 0 && sseq < o.sseq { + _, needAck = o.pending[sseq] + } } } o.mu.Unlock() - return na + return needAck } // Default is 1 if msg is nil. @@ -1364,26 +1356,27 @@ func (o *Consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dcount u sendq := o.mset.sendq ap := o.config.AckPolicy - // This needs to be unlocked since the other side may need this lock on failed delivery. + // This needs to be unlocked since the other side may need this lock on a failed delivery. o.mu.Unlock() // Send message. sendq <- pmsg + // If we are ack none and mset is interest only we should make sure stream removes interest. - if ap == AckNone && mset.config.Retention == InterestPolicy { + if ap == AckNone && mset.config.Retention == InterestPolicy && !mset.checkInterest(seq, o) { // FIXME(dlc) - we have mset lock here, but should we?? - if !mset.checkInterest(seq, o) { - mset.store.RemoveMsg(seq) - } + mset.store.RemoveMsg(seq) } o.mu.Lock() - if ap == AckNone { + if ap == AckExplicit || ap == AckAll { + o.trackPending(seq) + } else if ap == AckNone { o.adflr = o.dseq o.asflr = seq - } else if ap == AckExplicit || ap == AckAll { - o.trackPending(seq) } + o.dseq++ + o.updateStore() } @@ -1763,7 +1756,10 @@ func (o *Consumer) stop(dflag, doSignal, advisory bool) error { // Sort just to keep pending sparse array state small. sort.Slice(seqs, func(i, j int) bool { return seqs[i] < seqs[j] }) for _, seq := range seqs { - if !mset.checkInterest(seq, o) { + mset.mu.Lock() + hasNoInterest := !mset.checkInterest(seq, o) + mset.mu.Unlock() + if hasNoInterest { mset.store.RemoveMsg(seq) } } diff --git a/server/filestore.go b/server/filestore.go index 9795efe4..e04acae5 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -713,13 +713,14 @@ func (fs *fileStore) removeMsg(seq uint64, secure bool) (bool, error) { return false, nil } sm, _ := mb.fetchMsg(seq) - // We have the message here, so we can delete it. - if sm != nil { + // We might have the message here, so we can delete it. + found := sm != nil + if found { if err := fs.deleteMsgFromBlock(mb, seq, sm, secure); err != nil { return false, err } } - return sm != nil, nil + return found, nil } // Loop on requests to write out our index file. This is used when calling @@ -798,11 +799,13 @@ func (fs *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStored } } - if seq < mb.cache.fseq || (seq-mb.cache.fseq) >= uint64(len(mb.cache.idx)) { + // See if the sequence numbers is still relevant. Check first and cache first. + if seq < mb.first.seq || seq < mb.cache.fseq || (seq-mb.cache.fseq) >= uint64(len(mb.cache.idx)) { mb.mu.Unlock() fs.mu.Unlock() return nil } + // Now check dmap if it is there. if mb.dmap != nil { if _, ok := mb.dmap[seq]; ok { @@ -843,6 +846,7 @@ func (fs *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStored mb.dmap[seq] = struct{}{} shouldWriteIndex = true } + if secure { fs.eraseMsg(mb, sm) } diff --git a/server/stream.go b/server/stream.go index 6cab1972..0361ee57 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1120,14 +1120,12 @@ func (mset *Stream) partitionUnique(partition string) bool { // Lock should be held. func (mset *Stream) checkInterest(seq uint64, obs *Consumer) bool { - var needAck bool for _, o := range mset.consumers { if o != obs && o.needAck(seq) { - needAck = true - break + return true } } - return needAck + return false } // ackMsg is called into from an observable when we have a WorkQueue or Interest retention policy. diff --git a/test/jetstream_test.go b/test/jetstream_test.go index 02f4c9ed..48635633 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -872,7 +872,7 @@ func TestJetStreamAddStreamSameConfigOK(t *testing.T) { func sendStreamMsg(t *testing.T, nc *nats.Conn, subject, msg string) { t.Helper() - resp, _ := nc.Request(subject, []byte(msg), 100*time.Millisecond) + resp, _ := nc.Request(subject, []byte(msg), 500*time.Millisecond) if resp == nil { t.Fatalf("No response for %q, possible timeout?", msg) } @@ -2928,7 +2928,7 @@ func TestJetStreamConsumerMaxDeliveryAndServerRestart(t *testing.T) { checkSubPending := func(numExpected int) { t.Helper() checkFor(t, time.Second, 10*time.Millisecond, func() error { - if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != numExpected { + if nmsgs, _, _ := sub.Pending(); nmsgs != numExpected { return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, numExpected) } return nil @@ -3934,7 +3934,11 @@ func TestJetStreamDurableConsumerReconnect(t *testing.T) { dname := "d22" subj1 := nats.NewInbox() - o, err := mset.AddConsumer(&server.ConsumerConfig{Durable: dname, DeliverSubject: subj1, AckPolicy: server.AckExplicit}) + o, err := mset.AddConsumer(&server.ConsumerConfig{ + Durable: dname, + DeliverSubject: subj1, + AckPolicy: server.AckExplicit, + AckWait: 50 * time.Millisecond}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -3967,7 +3971,7 @@ func TestJetStreamDurableConsumerReconnect(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - if seq := o.SeqFromReply(m.Reply); seq != uint64(seqno) { + if seq := o.StreamSeqFromReply(m.Reply); seq != uint64(seqno) { t.Fatalf("Expected sequence of %d , got %d", seqno, seq) } m.Respond(nil) @@ -3995,13 +3999,17 @@ func TestJetStreamDurableConsumerReconnect(t *testing.T) { defer sub.Unsubscribe() nc.Flush() - o, err = mset.AddConsumer(&server.ConsumerConfig{Durable: dname, DeliverSubject: subj2, AckPolicy: server.AckExplicit}) + o, err = mset.AddConsumer(&server.ConsumerConfig{ + Durable: dname, + DeliverSubject: subj2, + AckPolicy: server.AckExplicit, + AckWait: 50 * time.Millisecond}) if err != nil { t.Fatalf("Unexpected error trying to add a new durable consumer: %v", err) } // We should get the remaining messages here. - for i := toSend / 2; i <= toSend; i++ { + for i := toSend/2 + 1; i <= toSend; i++ { m := getMsg(i) m.Respond(nil) } @@ -4038,7 +4046,11 @@ func TestJetStreamDurableConsumerReconnectWithOnlyPending(t *testing.T) { dname := "d22" subj1 := nats.NewInbox() - o, err := mset.AddConsumer(&server.ConsumerConfig{Durable: dname, DeliverSubject: subj1, AckPolicy: server.AckExplicit}) + o, err := mset.AddConsumer(&server.ConsumerConfig{ + Durable: dname, + DeliverSubject: subj1, + AckPolicy: server.AckExplicit, + AckWait: 25 * time.Millisecond}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -4076,7 +4088,11 @@ func TestJetStreamDurableConsumerReconnectWithOnlyPending(t *testing.T) { // Now we should be able to replace the delivery subject. subj2 := nats.NewInbox() - o, err = mset.AddConsumer(&server.ConsumerConfig{Durable: dname, DeliverSubject: subj2, AckPolicy: server.AckExplicit}) + o, err = mset.AddConsumer(&server.ConsumerConfig{ + Durable: dname, + DeliverSubject: subj2, + AckPolicy: server.AckExplicit, + AckWait: 25 * time.Millisecond}) if err != nil { t.Fatalf("Unexpected error trying to add a new durable consumer: %v", err) } @@ -4084,15 +4100,18 @@ func TestJetStreamDurableConsumerReconnectWithOnlyPending(t *testing.T) { defer sub.Unsubscribe() nc.Flush() - // We should get msg "1" and "2" delivered. + // We should get msg "1" and "2" delivered. They will be reversed. for i := 0; i < 2; i++ { - msg, err := sub.NextMsg(250 * time.Millisecond) + msg, err := sub.NextMsg(500 * time.Millisecond) if err != nil { t.Fatalf("Unexpected error: %v", err) } - expected := fmt.Sprintf("%d", i+1) - if string(msg.Data) != expected { - t.Fatalf("Expected message %q, got %q", expected, msg.Data) + sseq, _, dc, _ := o.ReplyInfo(msg.Reply) + if sseq == 1 && dc == 1 { + t.Fatalf("Expected a redelivery count greater then 1 for sseq 1, got %d", dc) + } + if sseq != 1 && sseq != 2 { + t.Fatalf("Expected stream sequence of 1 or 2 but got %d", sseq) } } }) @@ -5031,11 +5050,12 @@ func TestJetStreamInterestRetentionStream(t *testing.T) { // we should have 1, 2, 3 acks now. checkNumMsgs(totalMsgs - 3) - // Now ack last ackall message. This should clear all of them. - for i := 4; i <= totalMsgs; i++ { + nm, _, _ := sub2.Pending() + // Now ack last ackAll message. This should clear all of them. + for i := 1; i <= nm; i++ { if m, err := sub2.NextMsg(time.Second); err != nil { t.Fatalf("Unexpected error: %v", err) - } else if i == totalMsgs { + } else if i == nm { m.Respond(nil) } } @@ -7842,3 +7862,156 @@ func TestJetStreamPubSubPerf(t *testing.T) { fmt.Printf("time is %v\n", tt) fmt.Printf("%.0f msgs/sec\n", float64(toSend)/tt.Seconds()) } + +func TestJetStreamAckExplicitMsgRemoval(t *testing.T) { + cases := []struct { + name string + mconfig *server.StreamConfig + }{ + {"MemoryStore", &server.StreamConfig{ + Name: "MY_STREAM", + Storage: server.MemoryStorage, + Subjects: []string{"foo.*"}, + Retention: server.InterestPolicy, + }}, + {"FileStore", &server.StreamConfig{ + Name: "MY_STREAM", + Storage: server.FileStorage, + Subjects: []string{"foo.*"}, + Retention: server.InterestPolicy, + }}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + mset, err := s.GlobalAccount().AddStream(c.mconfig) + if err != nil { + t.Fatalf("Unexpected error adding stream: %v", err) + } + defer mset.Delete() + + nc1 := clientConnectToServer(t, s) + defer nc1.Close() + + nc2 := clientConnectToServer(t, s) + defer nc2.Close() + + // Create two durable consumers on the same subject + sub1, _ := nc1.SubscribeSync(nats.NewInbox()) + defer sub1.Unsubscribe() + nc1.Flush() + + o1, err := mset.AddConsumer(&server.ConsumerConfig{ + Durable: "dur1", + DeliverSubject: sub1.Subject, + FilterSubject: "foo.bar", + AckPolicy: server.AckExplicit, + }) + if err != nil { + t.Fatalf("Unexpected error adding consumer: %v", err) + } + defer o1.Delete() + + sub2, _ := nc2.SubscribeSync(nats.NewInbox()) + defer sub2.Unsubscribe() + + o2, err := mset.AddConsumer(&server.ConsumerConfig{ + Durable: "dur2", + DeliverSubject: sub2.Subject, + FilterSubject: "foo.bar", + AckPolicy: server.AckExplicit, + AckWait: 25 * time.Millisecond, + }) + if err != nil { + t.Fatalf("Unexpected error adding consumer: %v", err) + } + defer o2.Delete() + + // Send 2 messages + toSend := 2 + for i := 0; i < toSend; i++ { + sendStreamMsg(t, nc1, "foo.bar", fmt.Sprintf("msg%v", i+1)) + } + state := mset.State() + if state.Msgs != uint64(toSend) { + t.Fatalf("Expected %v messages, got %d", toSend, state.Msgs) + } + + // Receive the messages and ack them. + subs := []*nats.Subscription{sub1, sub2} + for _, sub := range subs { + for i := 0; i < toSend; i++ { + m, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error acking message: %v", err) + } + m.Respond(nil) + } + } + // To make sure acks are processed for checking state after sending new ones. + nc1.Flush() + nc2.Flush() + + // Now close the 2nd subscription... + sub2.Unsubscribe() + + // Send 2 more new messages + for i := 0; i < toSend; i++ { + sendStreamMsg(t, nc1, "foo.bar", fmt.Sprintf("msg%v", 2+i+1)) + } + state = mset.State() + if state.Msgs != uint64(toSend) { + t.Fatalf("Expected %v messages, got %d", toSend, state.Msgs) + } + + // first subscription should get it and will ack it. + for i := 0; i < toSend; i++ { + m, err := sub1.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error acking message: %v", err) + } + m.Respond(nil) + } + // For acks from m.Respond above + nc1.Flush() + + // Now recreate the subscription for the 2nd JS consumer + sub2, _ = nc2.SubscribeSync(nats.NewInbox()) + defer sub2.Unsubscribe() + + o2, err = mset.AddConsumer(&server.ConsumerConfig{ + Durable: "dur2", + DeliverSubject: sub2.Subject, + FilterSubject: "foo.bar", + AckPolicy: server.AckExplicit, + AckWait: 25 * time.Millisecond, + }) + if err != nil { + t.Fatalf("Unexpected error adding consumer: %v", err) + } + defer o2.Delete() + + // Those messages should be redelivered to the 2nd consumer + for i := 1; i <= toSend; i++ { + m, err := sub2.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error receiving message %d: %v", i, err) + } + m.Respond(nil) + + sseq := o2.StreamSeqFromReply(m.Reply) + // Depending on timing from above we could receive stream sequences out of order but + // we know we want 3 & 4. + if sseq != 3 && sseq != 4 { + t.Fatalf("Expected stream sequence of 3 or 4 but got %d", sseq) + } + } + }) + } +}