From eff27e26be69f933af840a8f435facbfd9af3693 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 1 Oct 2020 12:22:02 -0600 Subject: [PATCH 1/5] AckExplicit removes message for "offline" durable The test shows the issue. It seems that Consumer.needAck() for AckExplicit should consider that an Ack is needed if sseq > o.asflr and there is no pending ack at all. However making this change would break the test TestJetStreamInterestRetentionStream. Also, running the new test with `-count 100` and by adding an artificial delay in stream.ackMsg() (just before calling mset.store.RemoveMsg(seq)) causes sometimes delete requests for the same sequence to be processed twice, which causes the new test to fail (even with an attempted fix as discussed above). I think that the attempt to remove the same sequence twice is messing up the state. Signed-off-by: Ivan Kozlovic --- test/jetstream_test.go | 138 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 138 insertions(+) diff --git a/test/jetstream_test.go b/test/jetstream_test.go index 02f4c9ed..a9c94173 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -7842,3 +7842,141 @@ func TestJetStreamPubSubPerf(t *testing.T) { fmt.Printf("time is %v\n", tt) fmt.Printf("%.0f msgs/sec\n", float64(toSend)/tt.Seconds()) } + +func TestJetStreamAckExplicitMsgRemoval(t *testing.T) { + cases := []struct { + name string + mconfig *server.StreamConfig + }{ + {"MemoryStore", &server.StreamConfig{ + Name: "MY_STREAM", + Storage: server.MemoryStorage, + Subjects: []string{"foo.*"}, + Retention: server.InterestPolicy, + }}, + {"FileStore", &server.StreamConfig{ + Name: "MY_STREAM", + Storage: server.FileStorage, + Subjects: []string{"foo.*"}, + Retention: server.InterestPolicy, + }}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + mset, err := s.GlobalAccount().AddStream(c.mconfig) + if err != nil { + t.Fatalf("Unexpected error adding stream: %v", err) + } + defer mset.Delete() + + nc1 := clientConnectToServer(t, s) + defer nc1.Close() + + nc2 := clientConnectToServer(t, s) + defer nc2.Close() + + // Create two durable consumers on the same subject + sub1, _ := nc1.SubscribeSync(nats.NewInbox()) + defer sub1.Unsubscribe() + nc1.Flush() + + o1, err := mset.AddConsumer(&server.ConsumerConfig{ + Durable: "dur1", + DeliverSubject: sub1.Subject, + FilterSubject: "foo.bar", + AckPolicy: server.AckExplicit, + }) + if err != nil { + t.Fatalf("Unexpected error adding consumer: %v", err) + } + defer o1.Delete() + + sub2, _ := nc2.SubscribeSync(nats.NewInbox()) + defer sub2.Unsubscribe() + nc2.Flush() + + o2, err := mset.AddConsumer(&server.ConsumerConfig{ + Durable: "dur2", + DeliverSubject: sub2.Subject, + FilterSubject: "foo.bar", + AckPolicy: server.AckExplicit, + }) + if err != nil { + t.Fatalf("Unexpected error adding consumer: %v", err) + } + defer o2.Delete() + + // Send 1 message + toSend := 2 + for i := 0; i < toSend; i++ { + sendStreamMsg(t, nc1, "foo.bar", fmt.Sprintf("msg%v", i+1)) + } + state := mset.State() + if state.Msgs != uint64(toSend) { + t.Fatalf("Expected %v messages, got %d", toSend, state.Msgs) + } + + // Receive this first message and ack it. + subs := []*nats.Subscription{sub1, sub2} + for _, sub := range subs { + for i := 0; i < toSend; i++ { + m, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error acking message: %v", err) + } + m.Respond(nil) + } + } + + // Now close the 2nd subscription... + sub2.Unsubscribe() + nc2.Flush() + + // Send new messages + for i := 0; i < toSend; i++ { + sendStreamMsg(t, nc1, "foo.bar", fmt.Sprintf("msg%v", 2+i+1)) + } + + // first subscription should get it and will ack it. + for i := 0; i < toSend; i++ { + m, err := sub1.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error acking message: %v", err) + } + m.Respond(nil) + } + + // Now recreate the subscription for the 2nd JS consumer + sub2, _ = nc2.SubscribeSync(nats.NewInbox()) + defer sub2.Unsubscribe() + nc2.Flush() + + o2, err = mset.AddConsumer(&server.ConsumerConfig{ + Durable: "dur2", + DeliverSubject: sub2.Subject, + FilterSubject: "foo.bar", + AckPolicy: server.AckExplicit, + }) + if err != nil { + t.Fatalf("Unexpected error adding consumer: %v", err) + } + defer o2.Delete() + + // Those messages should be redelivered to the 2nd consumer + for i := 0; i < toSend; i++ { + m, err := sub2.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error acking message: %v", err) + } + m.Respond(nil) + } + }) + } +} From a75be04b0a82ebf157ffe8397e3253ec1b28c053 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 1 Oct 2020 21:24:40 -0700 Subject: [PATCH 2/5] Various fixes for this PR. - Fix for updating delivery subject and adjusting next delivery sequences. - When acking explicitly but out of order, need to make sure we set floor correctly. - Only update ack floors on an ack if the message is present. - Fix for needAck for explicitAck out of order consumers detecting if message has been acked. - Fix for race not locking stream when checking interest during stop. - Fix for filestore determing if a message block still has a message. Added check to first sequence as well as cache. - Some additions to the original test. Signed-off-by: Derek Collison --- server/consumer.go | 57 +++++++++++++++++++++++++----------------- server/filestore.go | 12 ++++++--- server/stream.go | 6 ++--- test/jetstream_test.go | 42 +++++++++++++++++++++++-------- 4 files changed, 75 insertions(+), 42 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index a4a087fb..ef8607ab 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -673,8 +673,8 @@ func (o *Consumer) updateDeliverSubject(newDeliver string) { o.dsubj = newDeliver o.config.DeliverSubject = newDeliver // FIXME(dlc) - check partitions, we may need offset. - o.dseq = o.adflr - o.sseq = o.asflr + o.dseq = o.adflr + 1 + o.sseq = o.asflr + 1 // If we never received an ack, set to 1. if o.dseq == 0 { @@ -977,14 +977,16 @@ func (o *Consumer) processAckMsg(sseq, dseq, dcount uint64, doSample bool) { o.sampleAck(sseq, dseq, dcount) } delete(o.pending, sseq) + // Consumers sequence numbers can skip during redlivery since + // they always increment. So if we do not have any pending treat + // as all scenario below. Otherwise check that we filled in a gap. + if len(o.pending) == 0 { + o.adflr, o.asflr = o.dseq-1, o.sseq-1 + } else if dseq == o.adflr+1 { + o.adflr, o.asflr = dseq, sseq + } } - // Consumers sequence numbers can skip during redlivery since - // they always increment. So if we do not have any pending treat - // as all scenario below. Otherwise check that we filled in a gap. - // TODO(dlc) - check this. - if len(o.pending) == 0 || dseq == o.adflr+1 { - o.adflr, o.asflr = dseq, sseq - } + // We do these regardless. delete(o.rdc, sseq) o.removeFromRedeliverQueue(sseq) case AckAll: @@ -1024,19 +1026,24 @@ func (o *Consumer) processAckMsg(sseq, dseq, dcount uint64, doSample bool) { } // Check if we need an ack for this store seq. +// This is called for interest based retention streams to remove messages. func (o *Consumer) needAck(sseq uint64) bool { - var na bool + var needAck bool o.mu.Lock() switch o.config.AckPolicy { case AckNone, AckAll: - na = sseq > o.asflr + needAck = sseq > o.asflr case AckExplicit: - if sseq > o.asflr && len(o.pending) > 0 { - _, na = o.pending[sseq] + if sseq > o.asflr { + // Generally this means we need an ack, but just double check pending acks. + needAck = true + if len(o.pending) > 0 { + _, needAck = o.pending[sseq] + } } } o.mu.Unlock() - return na + return needAck } // Default is 1 if msg is nil. @@ -1364,26 +1371,27 @@ func (o *Consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dcount u sendq := o.mset.sendq ap := o.config.AckPolicy - // This needs to be unlocked since the other side may need this lock on failed delivery. + // This needs to be unlocked since the other side may need this lock on a failed delivery. o.mu.Unlock() // Send message. sendq <- pmsg + // If we are ack none and mset is interest only we should make sure stream removes interest. - if ap == AckNone && mset.config.Retention == InterestPolicy { + if ap == AckNone && mset.config.Retention == InterestPolicy && !mset.checkInterest(seq, o) { // FIXME(dlc) - we have mset lock here, but should we?? - if !mset.checkInterest(seq, o) { - mset.store.RemoveMsg(seq) - } + mset.store.RemoveMsg(seq) } o.mu.Lock() - if ap == AckNone { + if ap == AckExplicit || ap == AckAll { + o.trackPending(seq) + } else if ap == AckNone { o.adflr = o.dseq o.asflr = seq - } else if ap == AckExplicit || ap == AckAll { - o.trackPending(seq) } + o.dseq++ + o.updateStore() } @@ -1763,7 +1771,10 @@ func (o *Consumer) stop(dflag, doSignal, advisory bool) error { // Sort just to keep pending sparse array state small. sort.Slice(seqs, func(i, j int) bool { return seqs[i] < seqs[j] }) for _, seq := range seqs { - if !mset.checkInterest(seq, o) { + mset.mu.Lock() + hasNoInterest := !mset.checkInterest(seq, o) + mset.mu.Unlock() + if hasNoInterest { mset.store.RemoveMsg(seq) } } diff --git a/server/filestore.go b/server/filestore.go index 9795efe4..e04acae5 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -713,13 +713,14 @@ func (fs *fileStore) removeMsg(seq uint64, secure bool) (bool, error) { return false, nil } sm, _ := mb.fetchMsg(seq) - // We have the message here, so we can delete it. - if sm != nil { + // We might have the message here, so we can delete it. + found := sm != nil + if found { if err := fs.deleteMsgFromBlock(mb, seq, sm, secure); err != nil { return false, err } } - return sm != nil, nil + return found, nil } // Loop on requests to write out our index file. This is used when calling @@ -798,11 +799,13 @@ func (fs *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStored } } - if seq < mb.cache.fseq || (seq-mb.cache.fseq) >= uint64(len(mb.cache.idx)) { + // See if the sequence numbers is still relevant. Check first and cache first. + if seq < mb.first.seq || seq < mb.cache.fseq || (seq-mb.cache.fseq) >= uint64(len(mb.cache.idx)) { mb.mu.Unlock() fs.mu.Unlock() return nil } + // Now check dmap if it is there. if mb.dmap != nil { if _, ok := mb.dmap[seq]; ok { @@ -843,6 +846,7 @@ func (fs *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStored mb.dmap[seq] = struct{}{} shouldWriteIndex = true } + if secure { fs.eraseMsg(mb, sm) } diff --git a/server/stream.go b/server/stream.go index 6cab1972..0361ee57 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1120,14 +1120,12 @@ func (mset *Stream) partitionUnique(partition string) bool { // Lock should be held. func (mset *Stream) checkInterest(seq uint64, obs *Consumer) bool { - var needAck bool for _, o := range mset.consumers { if o != obs && o.needAck(seq) { - needAck = true - break + return true } } - return needAck + return false } // ackMsg is called into from an observable when we have a WorkQueue or Interest retention policy. diff --git a/test/jetstream_test.go b/test/jetstream_test.go index a9c94173..2fc24665 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -872,7 +872,7 @@ func TestJetStreamAddStreamSameConfigOK(t *testing.T) { func sendStreamMsg(t *testing.T, nc *nats.Conn, subject, msg string) { t.Helper() - resp, _ := nc.Request(subject, []byte(msg), 100*time.Millisecond) + resp, _ := nc.Request(subject, []byte(msg), 500*time.Millisecond) if resp == nil { t.Fatalf("No response for %q, possible timeout?", msg) } @@ -2928,7 +2928,7 @@ func TestJetStreamConsumerMaxDeliveryAndServerRestart(t *testing.T) { checkSubPending := func(numExpected int) { t.Helper() checkFor(t, time.Second, 10*time.Millisecond, func() error { - if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != numExpected { + if nmsgs, _, _ := sub.Pending(); nmsgs != numExpected { return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, numExpected) } return nil @@ -4001,7 +4001,7 @@ func TestJetStreamDurableConsumerReconnect(t *testing.T) { } // We should get the remaining messages here. - for i := toSend / 2; i <= toSend; i++ { + for i := toSend/2 + 1; i <= toSend; i++ { m := getMsg(i) m.Respond(nil) } @@ -5031,11 +5031,12 @@ func TestJetStreamInterestRetentionStream(t *testing.T) { // we should have 1, 2, 3 acks now. checkNumMsgs(totalMsgs - 3) - // Now ack last ackall message. This should clear all of them. - for i := 4; i <= totalMsgs; i++ { + nm, _, _ := sub2.Pending() + // Now ack last ackAll message. This should clear all of them. + for i := 1; i <= nm; i++ { if m, err := sub2.NextMsg(time.Second); err != nil { t.Fatalf("Unexpected error: %v", err) - } else if i == totalMsgs { + } else if i == nm { m.Respond(nil) } } @@ -7913,7 +7914,7 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) { } defer o2.Delete() - // Send 1 message + // Send 2 messages toSend := 2 for i := 0; i < toSend; i++ { sendStreamMsg(t, nc1, "foo.bar", fmt.Sprintf("msg%v", i+1)) @@ -7923,7 +7924,7 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) { t.Fatalf("Expected %v messages, got %d", toSend, state.Msgs) } - // Receive this first message and ack it. + // Receive the messages and ack them. subs := []*nats.Subscription{sub1, sub2} for _, sub := range subs { for i := 0; i < toSend; i++ { @@ -7934,15 +7935,26 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) { m.Respond(nil) } } + nc1.Flush() + nc2.Flush() // Now close the 2nd subscription... sub2.Unsubscribe() - nc2.Flush() + checkFor(t, time.Second, 10*time.Millisecond, func() error { + if o2.Active() { + return fmt.Errorf("Consumer still active") + } + return nil + }) - // Send new messages + // Send 2 more new messages for i := 0; i < toSend; i++ { sendStreamMsg(t, nc1, "foo.bar", fmt.Sprintf("msg%v", 2+i+1)) } + state = mset.State() + if state.Msgs != uint64(toSend) { + t.Fatalf("Expected %v messages, got %d", toSend, state.Msgs) + } // first subscription should get it and will ack it. for i := 0; i < toSend; i++ { @@ -7952,11 +7964,12 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) { } m.Respond(nil) } + // For acks from m.Respond above + nc1.Flush() // Now recreate the subscription for the 2nd JS consumer sub2, _ = nc2.SubscribeSync(nats.NewInbox()) defer sub2.Unsubscribe() - nc2.Flush() o2, err = mset.AddConsumer(&server.ConsumerConfig{ Durable: "dur2", @@ -7969,6 +7982,13 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) { } defer o2.Delete() + checkFor(t, time.Second, 100*time.Millisecond, func() error { + if nmsgs, _, _ := sub2.Pending(); nmsgs != toSend { + return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, toSend) + } + return nil + }) + // Those messages should be redelivered to the 2nd consumer for i := 0; i < toSend; i++ { m, err := sub2.NextMsg(time.Second) From 8a9f6eaf427cca970c3706680aa1f4ad879ec26b Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 2 Oct 2020 12:58:59 -0700 Subject: [PATCH 3/5] Additional fixes to interest retention based streams and offline durables with redelivery. Signed-off-by: Derek Collison --- server/consumer.go | 23 +++---------- test/jetstream_test.go | 73 +++++++++++++++++++++++++++--------------- 2 files changed, 51 insertions(+), 45 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index ef8607ab..21abc4db 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -664,28 +664,13 @@ func (o *Consumer) updateDeliverSubject(newDeliver string) { o.mu.Lock() defer o.mu.Unlock() - mset := o.mset - if mset == nil || o.isPullMode() { + if o.mset == nil || o.isPullMode() { return } - oldDeliver := o.config.DeliverSubject - o.dsubj = newDeliver - o.config.DeliverSubject = newDeliver - // FIXME(dlc) - check partitions, we may need offset. - o.dseq = o.adflr + 1 - o.sseq = o.asflr + 1 - - // If we never received an ack, set to 1. - if o.dseq == 0 { - o.dseq = 1 - } - if o.sseq == 0 { - o.sseq = 1 - } - + o.acc.sl.ClearNotification(o.dsubj, o.inch) + o.dsubj, o.config.DeliverSubject = newDeliver, newDeliver // When we register new one it will deliver to update state loop. - o.acc.sl.ClearNotification(oldDeliver, o.inch) o.acc.sl.RegisterNotification(newDeliver, o.inch) } @@ -1037,7 +1022,7 @@ func (o *Consumer) needAck(sseq uint64) bool { if sseq > o.asflr { // Generally this means we need an ack, but just double check pending acks. needAck = true - if len(o.pending) > 0 { + if len(o.pending) > 0 && sseq < o.sseq { _, needAck = o.pending[sseq] } } diff --git a/test/jetstream_test.go b/test/jetstream_test.go index 2fc24665..ef54db19 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -3934,7 +3934,11 @@ func TestJetStreamDurableConsumerReconnect(t *testing.T) { dname := "d22" subj1 := nats.NewInbox() - o, err := mset.AddConsumer(&server.ConsumerConfig{Durable: dname, DeliverSubject: subj1, AckPolicy: server.AckExplicit}) + o, err := mset.AddConsumer(&server.ConsumerConfig{ + Durable: dname, + DeliverSubject: subj1, + AckPolicy: server.AckExplicit, + AckWait: 50 * time.Millisecond}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -3967,7 +3971,7 @@ func TestJetStreamDurableConsumerReconnect(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - if seq := o.SeqFromReply(m.Reply); seq != uint64(seqno) { + if seq := o.StreamSeqFromReply(m.Reply); seq != uint64(seqno) { t.Fatalf("Expected sequence of %d , got %d", seqno, seq) } m.Respond(nil) @@ -3995,7 +3999,11 @@ func TestJetStreamDurableConsumerReconnect(t *testing.T) { defer sub.Unsubscribe() nc.Flush() - o, err = mset.AddConsumer(&server.ConsumerConfig{Durable: dname, DeliverSubject: subj2, AckPolicy: server.AckExplicit}) + o, err = mset.AddConsumer(&server.ConsumerConfig{ + Durable: dname, + DeliverSubject: subj2, + AckPolicy: server.AckExplicit, + AckWait: 50 * time.Millisecond}) if err != nil { t.Fatalf("Unexpected error trying to add a new durable consumer: %v", err) } @@ -4038,7 +4046,11 @@ func TestJetStreamDurableConsumerReconnectWithOnlyPending(t *testing.T) { dname := "d22" subj1 := nats.NewInbox() - o, err := mset.AddConsumer(&server.ConsumerConfig{Durable: dname, DeliverSubject: subj1, AckPolicy: server.AckExplicit}) + o, err := mset.AddConsumer(&server.ConsumerConfig{ + Durable: dname, + DeliverSubject: subj1, + AckPolicy: server.AckExplicit, + AckWait: 25 * time.Millisecond}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -4076,7 +4088,11 @@ func TestJetStreamDurableConsumerReconnectWithOnlyPending(t *testing.T) { // Now we should be able to replace the delivery subject. subj2 := nats.NewInbox() - o, err = mset.AddConsumer(&server.ConsumerConfig{Durable: dname, DeliverSubject: subj2, AckPolicy: server.AckExplicit}) + o, err = mset.AddConsumer(&server.ConsumerConfig{ + Durable: dname, + DeliverSubject: subj2, + AckPolicy: server.AckExplicit, + AckWait: 25 * time.Millisecond}) if err != nil { t.Fatalf("Unexpected error trying to add a new durable consumer: %v", err) } @@ -4084,15 +4100,18 @@ func TestJetStreamDurableConsumerReconnectWithOnlyPending(t *testing.T) { defer sub.Unsubscribe() nc.Flush() - // We should get msg "1" and "2" delivered. + // We should get msg "1" and "2" delivered. They will be reversed. for i := 0; i < 2; i++ { - msg, err := sub.NextMsg(250 * time.Millisecond) + msg, err := sub.NextMsg(500 * time.Millisecond) if err != nil { t.Fatalf("Unexpected error: %v", err) } - expected := fmt.Sprintf("%d", i+1) - if string(msg.Data) != expected { - t.Fatalf("Expected message %q, got %q", expected, msg.Data) + sseq, _, dc, _ := o.ReplyInfo(msg.Reply) + if sseq == 1 && dc == 1 { + t.Fatalf("Expected a redelivery count greater then 1 for sseq 1, got %d", dc) + } + if sseq != 1 && sseq != 2 { + t.Fatalf("Expected stream sequence of 1 or 2 but got %d", sseq) } } }) @@ -7901,13 +7920,13 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) { sub2, _ := nc2.SubscribeSync(nats.NewInbox()) defer sub2.Unsubscribe() - nc2.Flush() o2, err := mset.AddConsumer(&server.ConsumerConfig{ Durable: "dur2", DeliverSubject: sub2.Subject, FilterSubject: "foo.bar", AckPolicy: server.AckExplicit, + AckWait: 25 * time.Millisecond, }) if err != nil { t.Fatalf("Unexpected error adding consumer: %v", err) @@ -7935,17 +7954,12 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) { m.Respond(nil) } } + // To make sure acks are processed for checking state after sending new ones. nc1.Flush() nc2.Flush() // Now close the 2nd subscription... sub2.Unsubscribe() - checkFor(t, time.Second, 10*time.Millisecond, func() error { - if o2.Active() { - return fmt.Errorf("Consumer still active") - } - return nil - }) // Send 2 more new messages for i := 0; i < toSend; i++ { @@ -7976,26 +7990,33 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) { DeliverSubject: sub2.Subject, FilterSubject: "foo.bar", AckPolicy: server.AckExplicit, + AckWait: 25 * time.Millisecond, }) if err != nil { t.Fatalf("Unexpected error adding consumer: %v", err) } defer o2.Delete() - checkFor(t, time.Second, 100*time.Millisecond, func() error { - if nmsgs, _, _ := sub2.Pending(); nmsgs != toSend { - return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, toSend) - } - return nil - }) - // Those messages should be redelivered to the 2nd consumer - for i := 0; i < toSend; i++ { + for i := 1; i <= toSend; i++ { m, err := sub2.NextMsg(time.Second) if err != nil { - t.Fatalf("Error acking message: %v", err) + t.Fatalf("Error receiving message %d: %v", i, err) } m.Respond(nil) + + sseq, dseq, dc, _ := o2.ReplyInfo(m.Reply) + if dseq != uint64(i+3) { + t.Fatalf("Expected consumer sequence of %d got %d", i+3, dseq) + } + // Depending on timing from above we could receive stream sequences out of order but + // we know we want 3 & 4. + if sseq != 3 && sseq != 4 { + t.Fatalf("Expected stream sequence of 3 or 4 but got %d", sseq) + } + if sseq == 3 && dc == 1 { + t.Fatalf("Expected a redelivery count greater then 1 for sseq 3, got %d", dc) + } } }) } From 9ad408e0b37c64f00b6af1ec903a7e146a95e51e Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 2 Oct 2020 16:21:23 -0700 Subject: [PATCH 4/5] Use closed variable on consumer, don't check consumer sequences Signed-off-by: Derek Collison --- server/consumer.go | 2 +- test/jetstream_test.go | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 21abc4db..2dde289d 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -664,7 +664,7 @@ func (o *Consumer) updateDeliverSubject(newDeliver string) { o.mu.Lock() defer o.mu.Unlock() - if o.mset == nil || o.isPullMode() { + if o.closed || o.isPullMode() { return } diff --git a/test/jetstream_test.go b/test/jetstream_test.go index ef54db19..1fda0898 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -8005,10 +8005,7 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) { } m.Respond(nil) - sseq, dseq, dc, _ := o2.ReplyInfo(m.Reply) - if dseq != uint64(i+3) { - t.Fatalf("Expected consumer sequence of %d got %d", i+3, dseq) - } + sseq, _, dc, _ := o2.ReplyInfo(m.Reply) // Depending on timing from above we could receive stream sequences out of order but // we know we want 3 & 4. if sseq != 3 && sseq != 4 { From cdfb7ba03b953f345fe470dfa4aab3525ba7de3c Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 2 Oct 2020 16:30:40 -0700 Subject: [PATCH 5/5] Remove delivery count check, could flap Signed-off-by: Derek Collison --- test/jetstream_test.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/test/jetstream_test.go b/test/jetstream_test.go index 1fda0898..48635633 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -8005,15 +8005,12 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) { } m.Respond(nil) - sseq, _, dc, _ := o2.ReplyInfo(m.Reply) + sseq := o2.StreamSeqFromReply(m.Reply) // Depending on timing from above we could receive stream sequences out of order but // we know we want 3 & 4. if sseq != 3 && sseq != 4 { t.Fatalf("Expected stream sequence of 3 or 4 but got %d", sseq) } - if sseq == 3 && dc == 1 { - t.Fatalf("Expected a redelivery count greater then 1 for sseq 3, got %d", dc) - } } }) }