From d520a27c36f51ede2e52ded8956562d5f894c8bc Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Wed, 27 Apr 2022 03:32:08 -0400 Subject: [PATCH] [fixed] step down timing, consumer stream seqno, clear redelivery (#3079) Step down timing for consumers or streams. Signals loss of leadership and sleeps before stepping down. This makes it less likely that messages are being processed during step down. When becoming leader, consumer stream seqno got reset, even though the consumer existed already. Proper cleanup of redelivery data structures and timer Signed-off-by: Matthias Hanel --- server/consumer.go | 43 +++++++++++++++++++++++-------------- server/jetstream.go | 2 +- server/jetstream_api.go | 6 ++++++ server/jetstream_cluster.go | 2 +- server/server.go | 2 +- server/stream.go | 2 +- 6 files changed, 37 insertions(+), 20 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 4b9e1ca0..ce079f0c 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -870,8 +870,11 @@ func (o *consumer) setLeader(isLeader bool) { mset.mu.RUnlock() o.mu.Lock() + o.rdq, o.rdqi = nil, nil // Restore our saved state. During non-leader status we just update our underlying store. - o.readStoredState(lseq) + hadState, _ := o.readStoredState(lseq) + // Setup initial pending and proper start sequence. + o.setInitialPendingAndStart(hadState) // Do info sub. if o.infoSub == nil && jsa != nil { @@ -906,9 +909,6 @@ func (o *consumer) setLeader(isLeader bool) { } } - // Setup initial pending and proper start sequence. - o.setInitialPendingAndStart() - // If push mode, register for notifications on interest. if o.isPushMode() { o.inch = make(chan bool, 8) @@ -964,6 +964,14 @@ func (o *consumer) setLeader(isLeader bool) { } else { // Shutdown the go routines and the subscriptions. o.mu.Lock() + if o.qch != nil { + close(o.qch) + o.qch = nil + } + // Make sure to clear out any re delivery queues + stopAndClearTimer(&o.ptmr) + o.rdq, o.rdqi = nil, nil + o.pending = nil // ok if they are nil, we protect inside unsubscribe() o.unsubscribe(o.ackSub) o.unsubscribe(o.reqSub) @@ -973,10 +981,6 @@ func (o *consumer) setLeader(isLeader bool) { o.srv.sysUnsubscribe(o.infoSub) o.infoSub = nil } - if o.qch != nil { - close(o.qch) - o.qch = nil - } // Reset waiting if we are in pull mode. if o.isPullMode() { o.waiting = newWaitQueue(o.cfg.MaxWaiting) @@ -1913,18 +1917,19 @@ func (o *consumer) checkRedelivered(slseq uint64) { // This will restore the state from disk. // Lock should be held. -func (o *consumer) readStoredState(slseq uint64) error { +func (o *consumer) readStoredState(slseq uint64) (hadState bool, err error) { if o.store == nil { - return nil + return false, nil } state, err := o.store.State() if err == nil && state != nil && (state.Delivered.Consumer != 0 || state.Delivered.Stream != 0) { o.applyState(state) + hadState = true if len(o.rdc) > 0 { o.checkRedelivered(slseq) } } - return err + return hadState, err } // Apply the consumer stored state. @@ -3967,7 +3972,7 @@ func (o *consumer) requestNextMsgSubject() string { // Will set the initial pending and start sequence. // mset lock should be held. -func (o *consumer) setInitialPendingAndStart() { +func (o *consumer) setInitialPendingAndStart(hadState bool) { mset := o.mset if mset == nil || mset.store == nil { return @@ -3995,7 +4000,9 @@ func (o *consumer) setInitialPendingAndStart() { // Here we are filtered. if dp == DeliverLastPerSubject && o.hasSkipListPending() && o.sseq < o.lss.resume { ss := mset.store.FilteredState(o.lss.resume+1, o.cfg.FilterSubject) - o.sseq = o.lss.seqs[0] + if !hadState { + o.sseq = o.lss.seqs[0] + } o.sgap = ss.Msgs + uint64(len(o.lss.seqs)) o.lsgap = ss.Last } else if ss := mset.store.FilteredState(o.sseq, o.cfg.FilterSubject); ss.Msgs > 0 { @@ -4003,15 +4010,19 @@ func (o *consumer) setInitialPendingAndStart() { o.lsgap = ss.Last // See if we should update our starting sequence. if dp == DeliverLast || dp == DeliverLastPerSubject { - o.sseq = ss.Last + if !hadState { + o.sseq = ss.Last + } } else if dp == DeliverNew { // If our original is larger we will ignore, we don't want to go backwards with DeliverNew. // If its greater, we need to adjust pending. if ss.Last >= o.sseq { o.sgap -= (ss.Last - o.sseq + 1) - o.sseq = ss.Last + 1 + if !hadState { + o.sseq = ss.Last + 1 + } } - } else { + } else if !hadState { // DeliverAll, DeliverByStartSequence, DeliverByStartTime o.sseq = ss.First } diff --git a/server/jetstream.go b/server/jetstream.go index 0a71d1d0..e26404ca 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1266,7 +1266,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro } lseq := e.mset.lastSeq() obs.mu.Lock() - err = obs.readStoredState(lseq) + _, err = obs.readStoredState(lseq) obs.mu.Unlock() if err != nil { s.Warnf(" Error restoring consumer %q state: %v", cfg.Name, err) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index ac2438c5..b9be1f3f 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1835,6 +1835,9 @@ func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, _ * // Call actual stepdown. if mset != nil { if node := mset.raftNode(); node != nil { + mset.setLeader(false) + // TODO (mh) eventually make sure all go routines exited and all channels are cleared + time.Sleep(250 * time.Millisecond) node.StepDown() } } @@ -1937,6 +1940,9 @@ func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, _ // Call actual stepdown. if n := o.raftNode(); n != nil { + o.setLeader(false) + // TODO (mh) eventually make sure all go routines exited and all channels are cleared + time.Sleep(250 * time.Millisecond) n.StepDown() } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index d9327b40..b196cdd5 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -5927,7 +5927,7 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) (e error) { o.mu.Lock() if o.isLeader() { // This expects mset lock to be held. - o.setInitialPendingAndStart() + o.setInitialPendingAndStart(false) } o.mu.Unlock() } diff --git a/server/server.go b/server/server.go index 70fa1e63..1e97a16d 100644 --- a/server/server.go +++ b/server/server.go @@ -3380,7 +3380,7 @@ func (s *Server) lameDuckMode() { // If we are running any raftNodes transfer leaders. if hadTransfers := s.transferRaftLeaders(); hadTransfers { - // They will tranfer leadership quickly, but wait here for a second. + // They will transfer leadership quickly, but wait here for a second. select { case <-time.After(time.Second): case <-s.quitCh: diff --git a/server/stream.go b/server/stream.go index 464abf0d..e1aee1a2 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4196,7 +4196,7 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error obs.setCreatedTime(cfg.Created) } obs.mu.Lock() - err = obs.readStoredState(lseq) + _, err = obs.readStoredState(lseq) obs.mu.Unlock() if err != nil { mset.stop(true, false)