From ffae57e20ecc2b3d2793c0f46b102d8a1fb9582f Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 12 Apr 2020 10:43:12 -0700 Subject: [PATCH] Fold replay original logic into main loop, fixed pull bug on replay original Signed-off-by: Derek Collison --- server/consumer.go | 181 +++++++++++------------------------------ test/jetstream_test.go | 46 +++++++++++ 2 files changed, 95 insertions(+), 132 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 7262d1a9..c0b6a3cb 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -802,17 +802,23 @@ func (o *Consumer) processNextMsgReq(_ *subscription, _ *client, _, reply string o.mu.Unlock() return } + shouldSignal := false + for i := 0; i < batchSize; i++ { // If we are in replay mode, defer to processReplay for delivery. if o.replay { o.waiting = append(o.waiting, reply) - } else if subj, msg, seq, dc, err := o.getNextMsg(); err == nil { + shouldSignal = true + } else if subj, msg, seq, dc, _, err := o.getNextMsg(); err == nil { o.deliverMsg(reply, subj, msg, seq, dc) } else { o.waiting = append(o.waiting, reply) } } o.mu.Unlock() + if shouldSignal { + mset.signalConsumers() + } } // Increase the delivery count for this message. @@ -861,9 +867,9 @@ func (o *Consumer) isFilteredMatch(subj string) bool { // Get next available message from underlying store. // Is partition aware and redeliver aware. // Lock should be held. -func (o *Consumer) getNextMsg() (string, []byte, uint64, uint64, error) { +func (o *Consumer) getNextMsg() (string, []byte, uint64, uint64, int64, error) { if o.mset == nil { - return _EMPTY_, nil, 0, 0, fmt.Errorf("consumer not valid") + return _EMPTY_, nil, 0, 0, 0, fmt.Errorf("consumer not valid") } for { seq, dcount := o.sseq, uint64(1) @@ -881,7 +887,7 @@ func (o *Consumer) getNextMsg() (string, []byte, uint64, uint64, error) { continue } } - subj, msg, _, err := o.mset.store.LoadMsg(seq) + subj, msg, ts, err := o.mset.store.LoadMsg(seq) if err == nil { if dcount == 1 { // First delivery. o.sseq++ @@ -890,140 +896,18 @@ func (o *Consumer) getNextMsg() (string, []byte, uint64, uint64, error) { } } // We have the msg here. - return subj, msg, seq, dcount, nil + return subj, msg, seq, dcount, ts, nil } // We got an error here. If this is an EOF we will return, otherwise // we can continue looking. if err == ErrStoreEOF || err == ErrStoreClosed { - return "", nil, 0, 0, err + return "", nil, 0, 0, 0, err } // Skip since its probably deleted or expired. o.sseq++ } } -// Returns if we should be doing a non-instant replay of stored messages. -func (o *Consumer) needReplay() bool { - o.mu.Lock() - doReplay := o.replay - o.mu.Unlock() - return doReplay -} - -func (o *Consumer) clearReplayState() { - o.mu.Lock() - o.replay = false - o.mu.Unlock() -} - -// Wait for pull requests. -// FIXME(dlc) - for short wait periods is ok but should signal when waiting comes in. -func (o *Consumer) waitForPullRequests(wait time.Duration) { - o.mu.Lock() - qch := o.qch - if qch == nil || !o.isPullMode() || len(o.waiting) > 0 { - wait = 0 - } - o.mu.Unlock() - - select { - case <-qch: - case <-time.After(wait): - } -} - -// This function is responsible for message replay that is not instant/firehose. -func (o *Consumer) processReplay() error { - defer o.clearReplayState() - - o.mu.Lock() - mset := o.mset - partition := o.config.FilterSubject - pullMode := o.isPullMode() - o.mu.Unlock() - - if mset == nil { - return fmt.Errorf("consumer not valid") - } - - // Grab last queued up for us before we start. - lseq := mset.State().LastSeq - var lts int64 // last time stamp seen. - - // If we are in pull mode, wait up to the waittime to have - // someone show up to start the replay. - if pullMode { - o.waitForPullRequests(time.Millisecond) - } - - // Loop through all messages to replay. - for { - var delay time.Duration - - o.mu.Lock() - mset = o.mset - if mset == nil { - o.mu.Unlock() - return fmt.Errorf("consumer not valid") - } - - // If push mode but we have no interest wait for it to show up. - if o.isPushMode() && !o.active { - // We will wait here for new messages to arrive. - o.mu.Unlock() - mset.waitForMsgs() - continue - } - - subj, msg, ts, err := o.mset.store.LoadMsg(o.sseq) - if err != nil && err != ErrStoreMsgNotFound { - o.mu.Unlock() - return err - } - - if lts > 0 { - if delay = time.Duration(ts - lts); delay > time.Millisecond { - qch := o.qch - o.mu.Unlock() - select { - case <-qch: - return fmt.Errorf("consumer not valid") - case <-time.After(delay): - } - o.mu.Lock() - } - } - // We have a message to deliver here. - if err == nil && (partition == _EMPTY_ || o.isFilteredMatch(subj)) { - // FIXME(dlc) - pull based. - if !pullMode { - o.deliverMsg(o.dsubj, subj, msg, o.sseq, 1) - } else { - // This is pull mode. We should have folks waiting, but if not - // just return and let the rest be delivered as needed. - if len(o.waiting) > 0 { - dsubj := o.waiting[0] - o.waiting = append(o.waiting[:0], o.waiting[1:]...) - o.deliverMsg(dsubj, subj, msg, o.sseq, 1) - } else { - lseq = o.sseq - } - } - lts = ts - } - - sseq := o.sseq - o.sseq++ - o.mu.Unlock() - - if sseq >= lseq { - break - } - } - - return nil -} - // Will check to make sure those waiting still have registered interest. func (o *Consumer) checkWaitingForInterest() bool { for len(o.waiting) > 0 { @@ -1039,10 +923,21 @@ func (o *Consumer) checkWaitingForInterest() bool { func (o *Consumer) loopAndDeliverMsgs(s *Server, a *Account) { // On startup check to see if we are in a a reply situtation where replay policy is not instant. - // Process the replay, return on error. - if o.needReplay() && o.processReplay() != nil { - return + var ( + lts int64 // last time stamp seen, used for replay. + lseq uint64 + ) + + o.mu.Lock() + if o.replay { + // consumer is closed when mset is set to nil. + if o.mset == nil { + o.mu.Unlock() + return + } + lseq = o.mset.State().LastSeq } + o.mu.Unlock() // Deliver all the msgs we have now, once done or on a condition, we wait for new ones. for { @@ -1052,6 +947,8 @@ func (o *Consumer) loopAndDeliverMsgs(s *Server, a *Account) { subj, dsubj string msg []byte err error + ts int64 + delay time.Duration ) o.mu.Lock() @@ -1072,7 +969,7 @@ func (o *Consumer) loopAndDeliverMsgs(s *Server, a *Account) { goto waitForMsgs } - subj, msg, seq, dcnt, err = o.getNextMsg() + subj, msg, seq, dcnt, ts, err = o.getNextMsg() // On error either wait or return. if err != nil { @@ -1091,12 +988,32 @@ func (o *Consumer) loopAndDeliverMsgs(s *Server, a *Account) { dsubj = o.dsubj } + // If we are in a replay scenario and have not caught up check if we need to dely here. + if o.replay && lts > 0 { + if delay = time.Duration(ts - lts); delay > time.Millisecond { + qch := o.qch + o.mu.Unlock() + select { + case <-qch: + return + case <-time.After(delay): + } + o.mu.Lock() + } + } + // Track this regardless. + lts = ts + o.deliverMsg(dsubj, subj, msg, seq, dcnt) o.mu.Unlock() continue waitForMsgs: + // If we were in a replay state check to see if we are caught up. If so clear. + if o.replay && o.sseq > lseq { + o.replay = false + } // We will wait here for new messages to arrive. o.mu.Unlock() mset.waitForMsgs() diff --git a/test/jetstream_test.go b/test/jetstream_test.go index cf4f256e..085ce0ca 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -400,6 +400,52 @@ func TestJetStreamConsumerMaxDeliveries(t *testing.T) { } } +func TestJetStreamPullConsumerDelayedFirstPullWithReplayOriginal(t *testing.T) { + cases := []struct { + name string + mconfig *server.StreamConfig + }{ + {"MemoryStore", &server.StreamConfig{Name: "MY_WQ", Storage: server.MemoryStorage}}, + {"FileStore", &server.StreamConfig{Name: "MY_WQ", Storage: server.FileStorage}}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + mset, err := s.GlobalAccount().AddStream(c.mconfig) + if err != nil { + t.Fatalf("Unexpected error adding stream: %v", err) + } + defer mset.Delete() + + nc := clientConnectToServer(t, s) + defer nc.Close() + + // Queue up our work item. + sendStreamMsg(t, nc, c.mconfig.Name, "Hello World!") + + o, err := mset.AddConsumer(&server.ConsumerConfig{ + Durable: "d", + DeliverAll: true, + AckPolicy: server.AckExplicit, + ReplayPolicy: server.ReplayOriginal, + }) + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + defer o.Delete() + + // Force delay here which triggers the bug. + time.Sleep(250 * time.Millisecond) + + if _, err = nc.Request(o.RequestNextMsgSubject(), nil, time.Second); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + }) + } +} + func TestJetStreamAddStreamMaxMsgSize(t *testing.T) { cases := []struct { name string