From a75be04b0a82ebf157ffe8397e3253ec1b28c053 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 1 Oct 2020 21:24:40 -0700 Subject: [PATCH] Various fixes for this PR. - Fix for updating delivery subject and adjusting next delivery sequences. - When acking explicitly but out of order, need to make sure we set floor correctly. - Only update ack floors on an ack if the message is present. - Fix for needAck for explicitAck out of order consumers detecting if message has been acked. - Fix for race not locking stream when checking interest during stop. - Fix for filestore determing if a message block still has a message. Added check to first sequence as well as cache. - Some additions to the original test. Signed-off-by: Derek Collison --- server/consumer.go | 57 +++++++++++++++++++++++++----------------- server/filestore.go | 12 ++++++--- server/stream.go | 6 ++--- test/jetstream_test.go | 42 +++++++++++++++++++++++-------- 4 files changed, 75 insertions(+), 42 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index a4a087fb..ef8607ab 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -673,8 +673,8 @@ func (o *Consumer) updateDeliverSubject(newDeliver string) { o.dsubj = newDeliver o.config.DeliverSubject = newDeliver // FIXME(dlc) - check partitions, we may need offset. - o.dseq = o.adflr - o.sseq = o.asflr + o.dseq = o.adflr + 1 + o.sseq = o.asflr + 1 // If we never received an ack, set to 1. if o.dseq == 0 { @@ -977,14 +977,16 @@ func (o *Consumer) processAckMsg(sseq, dseq, dcount uint64, doSample bool) { o.sampleAck(sseq, dseq, dcount) } delete(o.pending, sseq) + // Consumers sequence numbers can skip during redlivery since + // they always increment. So if we do not have any pending treat + // as all scenario below. Otherwise check that we filled in a gap. + if len(o.pending) == 0 { + o.adflr, o.asflr = o.dseq-1, o.sseq-1 + } else if dseq == o.adflr+1 { + o.adflr, o.asflr = dseq, sseq + } } - // Consumers sequence numbers can skip during redlivery since - // they always increment. So if we do not have any pending treat - // as all scenario below. Otherwise check that we filled in a gap. - // TODO(dlc) - check this. - if len(o.pending) == 0 || dseq == o.adflr+1 { - o.adflr, o.asflr = dseq, sseq - } + // We do these regardless. delete(o.rdc, sseq) o.removeFromRedeliverQueue(sseq) case AckAll: @@ -1024,19 +1026,24 @@ func (o *Consumer) processAckMsg(sseq, dseq, dcount uint64, doSample bool) { } // Check if we need an ack for this store seq. +// This is called for interest based retention streams to remove messages. func (o *Consumer) needAck(sseq uint64) bool { - var na bool + var needAck bool o.mu.Lock() switch o.config.AckPolicy { case AckNone, AckAll: - na = sseq > o.asflr + needAck = sseq > o.asflr case AckExplicit: - if sseq > o.asflr && len(o.pending) > 0 { - _, na = o.pending[sseq] + if sseq > o.asflr { + // Generally this means we need an ack, but just double check pending acks. + needAck = true + if len(o.pending) > 0 { + _, needAck = o.pending[sseq] + } } } o.mu.Unlock() - return na + return needAck } // Default is 1 if msg is nil. @@ -1364,26 +1371,27 @@ func (o *Consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dcount u sendq := o.mset.sendq ap := o.config.AckPolicy - // This needs to be unlocked since the other side may need this lock on failed delivery. + // This needs to be unlocked since the other side may need this lock on a failed delivery. o.mu.Unlock() // Send message. sendq <- pmsg + // If we are ack none and mset is interest only we should make sure stream removes interest. - if ap == AckNone && mset.config.Retention == InterestPolicy { + if ap == AckNone && mset.config.Retention == InterestPolicy && !mset.checkInterest(seq, o) { // FIXME(dlc) - we have mset lock here, but should we?? - if !mset.checkInterest(seq, o) { - mset.store.RemoveMsg(seq) - } + mset.store.RemoveMsg(seq) } o.mu.Lock() - if ap == AckNone { + if ap == AckExplicit || ap == AckAll { + o.trackPending(seq) + } else if ap == AckNone { o.adflr = o.dseq o.asflr = seq - } else if ap == AckExplicit || ap == AckAll { - o.trackPending(seq) } + o.dseq++ + o.updateStore() } @@ -1763,7 +1771,10 @@ func (o *Consumer) stop(dflag, doSignal, advisory bool) error { // Sort just to keep pending sparse array state small. sort.Slice(seqs, func(i, j int) bool { return seqs[i] < seqs[j] }) for _, seq := range seqs { - if !mset.checkInterest(seq, o) { + mset.mu.Lock() + hasNoInterest := !mset.checkInterest(seq, o) + mset.mu.Unlock() + if hasNoInterest { mset.store.RemoveMsg(seq) } } diff --git a/server/filestore.go b/server/filestore.go index 9795efe4..e04acae5 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -713,13 +713,14 @@ func (fs *fileStore) removeMsg(seq uint64, secure bool) (bool, error) { return false, nil } sm, _ := mb.fetchMsg(seq) - // We have the message here, so we can delete it. - if sm != nil { + // We might have the message here, so we can delete it. + found := sm != nil + if found { if err := fs.deleteMsgFromBlock(mb, seq, sm, secure); err != nil { return false, err } } - return sm != nil, nil + return found, nil } // Loop on requests to write out our index file. This is used when calling @@ -798,11 +799,13 @@ func (fs *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStored } } - if seq < mb.cache.fseq || (seq-mb.cache.fseq) >= uint64(len(mb.cache.idx)) { + // See if the sequence numbers is still relevant. Check first and cache first. + if seq < mb.first.seq || seq < mb.cache.fseq || (seq-mb.cache.fseq) >= uint64(len(mb.cache.idx)) { mb.mu.Unlock() fs.mu.Unlock() return nil } + // Now check dmap if it is there. if mb.dmap != nil { if _, ok := mb.dmap[seq]; ok { @@ -843,6 +846,7 @@ func (fs *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStored mb.dmap[seq] = struct{}{} shouldWriteIndex = true } + if secure { fs.eraseMsg(mb, sm) } diff --git a/server/stream.go b/server/stream.go index 6cab1972..0361ee57 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1120,14 +1120,12 @@ func (mset *Stream) partitionUnique(partition string) bool { // Lock should be held. func (mset *Stream) checkInterest(seq uint64, obs *Consumer) bool { - var needAck bool for _, o := range mset.consumers { if o != obs && o.needAck(seq) { - needAck = true - break + return true } } - return needAck + return false } // ackMsg is called into from an observable when we have a WorkQueue or Interest retention policy. diff --git a/test/jetstream_test.go b/test/jetstream_test.go index a9c94173..2fc24665 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -872,7 +872,7 @@ func TestJetStreamAddStreamSameConfigOK(t *testing.T) { func sendStreamMsg(t *testing.T, nc *nats.Conn, subject, msg string) { t.Helper() - resp, _ := nc.Request(subject, []byte(msg), 100*time.Millisecond) + resp, _ := nc.Request(subject, []byte(msg), 500*time.Millisecond) if resp == nil { t.Fatalf("No response for %q, possible timeout?", msg) } @@ -2928,7 +2928,7 @@ func TestJetStreamConsumerMaxDeliveryAndServerRestart(t *testing.T) { checkSubPending := func(numExpected int) { t.Helper() checkFor(t, time.Second, 10*time.Millisecond, func() error { - if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != numExpected { + if nmsgs, _, _ := sub.Pending(); nmsgs != numExpected { return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, numExpected) } return nil @@ -4001,7 +4001,7 @@ func TestJetStreamDurableConsumerReconnect(t *testing.T) { } // We should get the remaining messages here. - for i := toSend / 2; i <= toSend; i++ { + for i := toSend/2 + 1; i <= toSend; i++ { m := getMsg(i) m.Respond(nil) } @@ -5031,11 +5031,12 @@ func TestJetStreamInterestRetentionStream(t *testing.T) { // we should have 1, 2, 3 acks now. checkNumMsgs(totalMsgs - 3) - // Now ack last ackall message. This should clear all of them. - for i := 4; i <= totalMsgs; i++ { + nm, _, _ := sub2.Pending() + // Now ack last ackAll message. This should clear all of them. + for i := 1; i <= nm; i++ { if m, err := sub2.NextMsg(time.Second); err != nil { t.Fatalf("Unexpected error: %v", err) - } else if i == totalMsgs { + } else if i == nm { m.Respond(nil) } } @@ -7913,7 +7914,7 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) { } defer o2.Delete() - // Send 1 message + // Send 2 messages toSend := 2 for i := 0; i < toSend; i++ { sendStreamMsg(t, nc1, "foo.bar", fmt.Sprintf("msg%v", i+1)) @@ -7923,7 +7924,7 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) { t.Fatalf("Expected %v messages, got %d", toSend, state.Msgs) } - // Receive this first message and ack it. + // Receive the messages and ack them. subs := []*nats.Subscription{sub1, sub2} for _, sub := range subs { for i := 0; i < toSend; i++ { @@ -7934,15 +7935,26 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) { m.Respond(nil) } } + nc1.Flush() + nc2.Flush() // Now close the 2nd subscription... sub2.Unsubscribe() - nc2.Flush() + checkFor(t, time.Second, 10*time.Millisecond, func() error { + if o2.Active() { + return fmt.Errorf("Consumer still active") + } + return nil + }) - // Send new messages + // Send 2 more new messages for i := 0; i < toSend; i++ { sendStreamMsg(t, nc1, "foo.bar", fmt.Sprintf("msg%v", 2+i+1)) } + state = mset.State() + if state.Msgs != uint64(toSend) { + t.Fatalf("Expected %v messages, got %d", toSend, state.Msgs) + } // first subscription should get it and will ack it. for i := 0; i < toSend; i++ { @@ -7952,11 +7964,12 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) { } m.Respond(nil) } + // For acks from m.Respond above + nc1.Flush() // Now recreate the subscription for the 2nd JS consumer sub2, _ = nc2.SubscribeSync(nats.NewInbox()) defer sub2.Unsubscribe() - nc2.Flush() o2, err = mset.AddConsumer(&server.ConsumerConfig{ Durable: "dur2", @@ -7969,6 +7982,13 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) { } defer o2.Delete() + checkFor(t, time.Second, 100*time.Millisecond, func() error { + if nmsgs, _, _ := sub2.Pending(); nmsgs != toSend { + return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, toSend) + } + return nil + }) + // Those messages should be redelivered to the 2nd consumer for i := 0; i < toSend; i++ { m, err := sub2.NextMsg(time.Second)