diff --git a/server/consumer.go b/server/consumer.go index 4b9e1ca0..ce079f0c 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -870,8 +870,11 @@ func (o *consumer) setLeader(isLeader bool) { mset.mu.RUnlock() o.mu.Lock() + o.rdq, o.rdqi = nil, nil // Restore our saved state. During non-leader status we just update our underlying store. - o.readStoredState(lseq) + hadState, _ := o.readStoredState(lseq) + // Setup initial pending and proper start sequence. + o.setInitialPendingAndStart(hadState) // Do info sub. if o.infoSub == nil && jsa != nil { @@ -906,9 +909,6 @@ func (o *consumer) setLeader(isLeader bool) { } } - // Setup initial pending and proper start sequence. - o.setInitialPendingAndStart() - // If push mode, register for notifications on interest. if o.isPushMode() { o.inch = make(chan bool, 8) @@ -964,6 +964,14 @@ func (o *consumer) setLeader(isLeader bool) { } else { // Shutdown the go routines and the subscriptions. o.mu.Lock() + if o.qch != nil { + close(o.qch) + o.qch = nil + } + // Make sure to clear out any re delivery queues + stopAndClearTimer(&o.ptmr) + o.rdq, o.rdqi = nil, nil + o.pending = nil // ok if they are nil, we protect inside unsubscribe() o.unsubscribe(o.ackSub) o.unsubscribe(o.reqSub) @@ -973,10 +981,6 @@ func (o *consumer) setLeader(isLeader bool) { o.srv.sysUnsubscribe(o.infoSub) o.infoSub = nil } - if o.qch != nil { - close(o.qch) - o.qch = nil - } // Reset waiting if we are in pull mode. if o.isPullMode() { o.waiting = newWaitQueue(o.cfg.MaxWaiting) @@ -1913,18 +1917,19 @@ func (o *consumer) checkRedelivered(slseq uint64) { // This will restore the state from disk. // Lock should be held. -func (o *consumer) readStoredState(slseq uint64) error { +func (o *consumer) readStoredState(slseq uint64) (hadState bool, err error) { if o.store == nil { - return nil + return false, nil } state, err := o.store.State() if err == nil && state != nil && (state.Delivered.Consumer != 0 || state.Delivered.Stream != 0) { o.applyState(state) + hadState = true if len(o.rdc) > 0 { o.checkRedelivered(slseq) } } - return err + return hadState, err } // Apply the consumer stored state. @@ -3967,7 +3972,7 @@ func (o *consumer) requestNextMsgSubject() string { // Will set the initial pending and start sequence. // mset lock should be held. -func (o *consumer) setInitialPendingAndStart() { +func (o *consumer) setInitialPendingAndStart(hadState bool) { mset := o.mset if mset == nil || mset.store == nil { return @@ -3995,7 +4000,9 @@ func (o *consumer) setInitialPendingAndStart() { // Here we are filtered. if dp == DeliverLastPerSubject && o.hasSkipListPending() && o.sseq < o.lss.resume { ss := mset.store.FilteredState(o.lss.resume+1, o.cfg.FilterSubject) - o.sseq = o.lss.seqs[0] + if !hadState { + o.sseq = o.lss.seqs[0] + } o.sgap = ss.Msgs + uint64(len(o.lss.seqs)) o.lsgap = ss.Last } else if ss := mset.store.FilteredState(o.sseq, o.cfg.FilterSubject); ss.Msgs > 0 { @@ -4003,15 +4010,19 @@ func (o *consumer) setInitialPendingAndStart() { o.lsgap = ss.Last // See if we should update our starting sequence. if dp == DeliverLast || dp == DeliverLastPerSubject { - o.sseq = ss.Last + if !hadState { + o.sseq = ss.Last + } } else if dp == DeliverNew { // If our original is larger we will ignore, we don't want to go backwards with DeliverNew. // If its greater, we need to adjust pending. if ss.Last >= o.sseq { o.sgap -= (ss.Last - o.sseq + 1) - o.sseq = ss.Last + 1 + if !hadState { + o.sseq = ss.Last + 1 + } } - } else { + } else if !hadState { // DeliverAll, DeliverByStartSequence, DeliverByStartTime o.sseq = ss.First } diff --git a/server/jetstream.go b/server/jetstream.go index 0a71d1d0..e26404ca 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1266,7 +1266,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro } lseq := e.mset.lastSeq() obs.mu.Lock() - err = obs.readStoredState(lseq) + _, err = obs.readStoredState(lseq) obs.mu.Unlock() if err != nil { s.Warnf(" Error restoring consumer %q state: %v", cfg.Name, err) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index ac2438c5..b9be1f3f 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1835,6 +1835,9 @@ func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, _ * // Call actual stepdown. if mset != nil { if node := mset.raftNode(); node != nil { + mset.setLeader(false) + // TODO (mh) eventually make sure all go routines exited and all channels are cleared + time.Sleep(250 * time.Millisecond) node.StepDown() } } @@ -1937,6 +1940,9 @@ func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, _ // Call actual stepdown. if n := o.raftNode(); n != nil { + o.setLeader(false) + // TODO (mh) eventually make sure all go routines exited and all channels are cleared + time.Sleep(250 * time.Millisecond) n.StepDown() } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index d9327b40..b196cdd5 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -5927,7 +5927,7 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) (e error) { o.mu.Lock() if o.isLeader() { // This expects mset lock to be held. - o.setInitialPendingAndStart() + o.setInitialPendingAndStart(false) } o.mu.Unlock() } diff --git a/server/server.go b/server/server.go index 70fa1e63..1e97a16d 100644 --- a/server/server.go +++ b/server/server.go @@ -3380,7 +3380,7 @@ func (s *Server) lameDuckMode() { // If we are running any raftNodes transfer leaders. if hadTransfers := s.transferRaftLeaders(); hadTransfers { - // They will tranfer leadership quickly, but wait here for a second. + // They will transfer leadership quickly, but wait here for a second. select { case <-time.After(time.Second): case <-s.quitCh: diff --git a/server/stream.go b/server/stream.go index 464abf0d..e1aee1a2 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4196,7 +4196,7 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error obs.setCreatedTime(cfg.Created) } obs.mu.Lock() - err = obs.readStoredState(lseq) + _, err = obs.readStoredState(lseq) obs.mu.Unlock() if err != nil { mset.stop(true, false)