Fold replay original logic into main loop, fixed pull bug on replay original

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-04-12 10:43:12 -07:00
parent 7f458282b3
commit ffae57e20e
2 changed files with 95 additions and 132 deletions

View File

@@ -802,17 +802,23 @@ func (o *Consumer) processNextMsgReq(_ *subscription, _ *client, _, reply string
o.mu.Unlock()
return
}
shouldSignal := false
for i := 0; i < batchSize; i++ {
// If we are in replay mode, defer to processReplay for delivery.
if o.replay {
o.waiting = append(o.waiting, reply)
} else if subj, msg, seq, dc, err := o.getNextMsg(); err == nil {
shouldSignal = true
} else if subj, msg, seq, dc, _, err := o.getNextMsg(); err == nil {
o.deliverMsg(reply, subj, msg, seq, dc)
} else {
o.waiting = append(o.waiting, reply)
}
}
o.mu.Unlock()
if shouldSignal {
mset.signalConsumers()
}
}
// Increase the delivery count for this message.
@@ -861,9 +867,9 @@ func (o *Consumer) isFilteredMatch(subj string) bool {
// Get next available message from underlying store.
// Is partition aware and redeliver aware.
// Lock should be held.
func (o *Consumer) getNextMsg() (string, []byte, uint64, uint64, error) {
func (o *Consumer) getNextMsg() (string, []byte, uint64, uint64, int64, error) {
if o.mset == nil {
return _EMPTY_, nil, 0, 0, fmt.Errorf("consumer not valid")
return _EMPTY_, nil, 0, 0, 0, fmt.Errorf("consumer not valid")
}
for {
seq, dcount := o.sseq, uint64(1)
@@ -881,7 +887,7 @@ func (o *Consumer) getNextMsg() (string, []byte, uint64, uint64, error) {
continue
}
}
subj, msg, _, err := o.mset.store.LoadMsg(seq)
subj, msg, ts, err := o.mset.store.LoadMsg(seq)
if err == nil {
if dcount == 1 { // First delivery.
o.sseq++
@@ -890,140 +896,18 @@ func (o *Consumer) getNextMsg() (string, []byte, uint64, uint64, error) {
}
}
// We have the msg here.
return subj, msg, seq, dcount, nil
return subj, msg, seq, dcount, ts, nil
}
// We got an error here. If this is an EOF we will return, otherwise
// we can continue looking.
if err == ErrStoreEOF || err == ErrStoreClosed {
return "", nil, 0, 0, err
return "", nil, 0, 0, 0, err
}
// Skip since its probably deleted or expired.
o.sseq++
}
}
// Returns if we should be doing a non-instant replay of stored messages.
func (o *Consumer) needReplay() bool {
o.mu.Lock()
doReplay := o.replay
o.mu.Unlock()
return doReplay
}
func (o *Consumer) clearReplayState() {
o.mu.Lock()
o.replay = false
o.mu.Unlock()
}
// Wait for pull requests.
// FIXME(dlc) - for short wait periods is ok but should signal when waiting comes in.
func (o *Consumer) waitForPullRequests(wait time.Duration) {
o.mu.Lock()
qch := o.qch
if qch == nil || !o.isPullMode() || len(o.waiting) > 0 {
wait = 0
}
o.mu.Unlock()
select {
case <-qch:
case <-time.After(wait):
}
}
// This function is responsible for message replay that is not instant/firehose.
func (o *Consumer) processReplay() error {
defer o.clearReplayState()
o.mu.Lock()
mset := o.mset
partition := o.config.FilterSubject
pullMode := o.isPullMode()
o.mu.Unlock()
if mset == nil {
return fmt.Errorf("consumer not valid")
}
// Grab last queued up for us before we start.
lseq := mset.State().LastSeq
var lts int64 // last time stamp seen.
// If we are in pull mode, wait up to the waittime to have
// someone show up to start the replay.
if pullMode {
o.waitForPullRequests(time.Millisecond)
}
// Loop through all messages to replay.
for {
var delay time.Duration
o.mu.Lock()
mset = o.mset
if mset == nil {
o.mu.Unlock()
return fmt.Errorf("consumer not valid")
}
// If push mode but we have no interest wait for it to show up.
if o.isPushMode() && !o.active {
// We will wait here for new messages to arrive.
o.mu.Unlock()
mset.waitForMsgs()
continue
}
subj, msg, ts, err := o.mset.store.LoadMsg(o.sseq)
if err != nil && err != ErrStoreMsgNotFound {
o.mu.Unlock()
return err
}
if lts > 0 {
if delay = time.Duration(ts - lts); delay > time.Millisecond {
qch := o.qch
o.mu.Unlock()
select {
case <-qch:
return fmt.Errorf("consumer not valid")
case <-time.After(delay):
}
o.mu.Lock()
}
}
// We have a message to deliver here.
if err == nil && (partition == _EMPTY_ || o.isFilteredMatch(subj)) {
// FIXME(dlc) - pull based.
if !pullMode {
o.deliverMsg(o.dsubj, subj, msg, o.sseq, 1)
} else {
// This is pull mode. We should have folks waiting, but if not
// just return and let the rest be delivered as needed.
if len(o.waiting) > 0 {
dsubj := o.waiting[0]
o.waiting = append(o.waiting[:0], o.waiting[1:]...)
o.deliverMsg(dsubj, subj, msg, o.sseq, 1)
} else {
lseq = o.sseq
}
}
lts = ts
}
sseq := o.sseq
o.sseq++
o.mu.Unlock()
if sseq >= lseq {
break
}
}
return nil
}
// Will check to make sure those waiting still have registered interest.
func (o *Consumer) checkWaitingForInterest() bool {
for len(o.waiting) > 0 {
@@ -1039,10 +923,21 @@ func (o *Consumer) checkWaitingForInterest() bool {
func (o *Consumer) loopAndDeliverMsgs(s *Server, a *Account) {
// On startup check to see if we are in a a reply situtation where replay policy is not instant.
// Process the replay, return on error.
if o.needReplay() && o.processReplay() != nil {
return
var (
lts int64 // last time stamp seen, used for replay.
lseq uint64
)
o.mu.Lock()
if o.replay {
// consumer is closed when mset is set to nil.
if o.mset == nil {
o.mu.Unlock()
return
}
lseq = o.mset.State().LastSeq
}
o.mu.Unlock()
// Deliver all the msgs we have now, once done or on a condition, we wait for new ones.
for {
@@ -1052,6 +947,8 @@ func (o *Consumer) loopAndDeliverMsgs(s *Server, a *Account) {
subj, dsubj string
msg []byte
err error
ts int64
delay time.Duration
)
o.mu.Lock()
@@ -1072,7 +969,7 @@ func (o *Consumer) loopAndDeliverMsgs(s *Server, a *Account) {
goto waitForMsgs
}
subj, msg, seq, dcnt, err = o.getNextMsg()
subj, msg, seq, dcnt, ts, err = o.getNextMsg()
// On error either wait or return.
if err != nil {
@@ -1091,12 +988,32 @@ func (o *Consumer) loopAndDeliverMsgs(s *Server, a *Account) {
dsubj = o.dsubj
}
// If we are in a replay scenario and have not caught up check if we need to dely here.
if o.replay && lts > 0 {
if delay = time.Duration(ts - lts); delay > time.Millisecond {
qch := o.qch
o.mu.Unlock()
select {
case <-qch:
return
case <-time.After(delay):
}
o.mu.Lock()
}
}
// Track this regardless.
lts = ts
o.deliverMsg(dsubj, subj, msg, seq, dcnt)
o.mu.Unlock()
continue
waitForMsgs:
// If we were in a replay state check to see if we are caught up. If so clear.
if o.replay && o.sseq > lseq {
o.replay = false
}
// We will wait here for new messages to arrive.
o.mu.Unlock()
mset.waitForMsgs()

View File

@@ -400,6 +400,52 @@ func TestJetStreamConsumerMaxDeliveries(t *testing.T) {
}
}
func TestJetStreamPullConsumerDelayedFirstPullWithReplayOriginal(t *testing.T) {
cases := []struct {
name string
mconfig *server.StreamConfig
}{
{"MemoryStore", &server.StreamConfig{Name: "MY_WQ", Storage: server.MemoryStorage}},
{"FileStore", &server.StreamConfig{Name: "MY_WQ", Storage: server.FileStorage}},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
mset, err := s.GlobalAccount().AddStream(c.mconfig)
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
defer mset.Delete()
nc := clientConnectToServer(t, s)
defer nc.Close()
// Queue up our work item.
sendStreamMsg(t, nc, c.mconfig.Name, "Hello World!")
o, err := mset.AddConsumer(&server.ConsumerConfig{
Durable: "d",
DeliverAll: true,
AckPolicy: server.AckExplicit,
ReplayPolicy: server.ReplayOriginal,
})
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
defer o.Delete()
// Force delay here which triggers the bug.
time.Sleep(250 * time.Millisecond)
if _, err = nc.Request(o.RequestNextMsgSubject(), nil, time.Second); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
})
}
}
func TestJetStreamAddStreamMaxMsgSize(t *testing.T) {
cases := []struct {
name string