mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
[fixed] step down timing, consumer stream seqno, clear redelivery (#3079)
Step down timing for consumers or streams. Signals loss of leadership and sleeps before stepping down. This makes it less likely that messages are being processed during step down. When becoming leader, consumer stream seqno got reset, even though the consumer existed already. Proper cleanup of redelivery data structures and timer Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
@@ -870,8 +870,11 @@ func (o *consumer) setLeader(isLeader bool) {
|
||||
mset.mu.RUnlock()
|
||||
|
||||
o.mu.Lock()
|
||||
o.rdq, o.rdqi = nil, nil
|
||||
// Restore our saved state. During non-leader status we just update our underlying store.
|
||||
o.readStoredState(lseq)
|
||||
hadState, _ := o.readStoredState(lseq)
|
||||
// Setup initial pending and proper start sequence.
|
||||
o.setInitialPendingAndStart(hadState)
|
||||
|
||||
// Do info sub.
|
||||
if o.infoSub == nil && jsa != nil {
|
||||
@@ -906,9 +909,6 @@ func (o *consumer) setLeader(isLeader bool) {
|
||||
}
|
||||
}
|
||||
|
||||
// Setup initial pending and proper start sequence.
|
||||
o.setInitialPendingAndStart()
|
||||
|
||||
// If push mode, register for notifications on interest.
|
||||
if o.isPushMode() {
|
||||
o.inch = make(chan bool, 8)
|
||||
@@ -964,6 +964,14 @@ func (o *consumer) setLeader(isLeader bool) {
|
||||
} else {
|
||||
// Shutdown the go routines and the subscriptions.
|
||||
o.mu.Lock()
|
||||
if o.qch != nil {
|
||||
close(o.qch)
|
||||
o.qch = nil
|
||||
}
|
||||
// Make sure to clear out any re delivery queues
|
||||
stopAndClearTimer(&o.ptmr)
|
||||
o.rdq, o.rdqi = nil, nil
|
||||
o.pending = nil
|
||||
// ok if they are nil, we protect inside unsubscribe()
|
||||
o.unsubscribe(o.ackSub)
|
||||
o.unsubscribe(o.reqSub)
|
||||
@@ -973,10 +981,6 @@ func (o *consumer) setLeader(isLeader bool) {
|
||||
o.srv.sysUnsubscribe(o.infoSub)
|
||||
o.infoSub = nil
|
||||
}
|
||||
if o.qch != nil {
|
||||
close(o.qch)
|
||||
o.qch = nil
|
||||
}
|
||||
// Reset waiting if we are in pull mode.
|
||||
if o.isPullMode() {
|
||||
o.waiting = newWaitQueue(o.cfg.MaxWaiting)
|
||||
@@ -1913,18 +1917,19 @@ func (o *consumer) checkRedelivered(slseq uint64) {
|
||||
|
||||
// This will restore the state from disk.
|
||||
// Lock should be held.
|
||||
func (o *consumer) readStoredState(slseq uint64) error {
|
||||
func (o *consumer) readStoredState(slseq uint64) (hadState bool, err error) {
|
||||
if o.store == nil {
|
||||
return nil
|
||||
return false, nil
|
||||
}
|
||||
state, err := o.store.State()
|
||||
if err == nil && state != nil && (state.Delivered.Consumer != 0 || state.Delivered.Stream != 0) {
|
||||
o.applyState(state)
|
||||
hadState = true
|
||||
if len(o.rdc) > 0 {
|
||||
o.checkRedelivered(slseq)
|
||||
}
|
||||
}
|
||||
return err
|
||||
return hadState, err
|
||||
}
|
||||
|
||||
// Apply the consumer stored state.
|
||||
@@ -3967,7 +3972,7 @@ func (o *consumer) requestNextMsgSubject() string {
|
||||
|
||||
// Will set the initial pending and start sequence.
|
||||
// mset lock should be held.
|
||||
func (o *consumer) setInitialPendingAndStart() {
|
||||
func (o *consumer) setInitialPendingAndStart(hadState bool) {
|
||||
mset := o.mset
|
||||
if mset == nil || mset.store == nil {
|
||||
return
|
||||
@@ -3995,7 +4000,9 @@ func (o *consumer) setInitialPendingAndStart() {
|
||||
// Here we are filtered.
|
||||
if dp == DeliverLastPerSubject && o.hasSkipListPending() && o.sseq < o.lss.resume {
|
||||
ss := mset.store.FilteredState(o.lss.resume+1, o.cfg.FilterSubject)
|
||||
o.sseq = o.lss.seqs[0]
|
||||
if !hadState {
|
||||
o.sseq = o.lss.seqs[0]
|
||||
}
|
||||
o.sgap = ss.Msgs + uint64(len(o.lss.seqs))
|
||||
o.lsgap = ss.Last
|
||||
} else if ss := mset.store.FilteredState(o.sseq, o.cfg.FilterSubject); ss.Msgs > 0 {
|
||||
@@ -4003,15 +4010,19 @@ func (o *consumer) setInitialPendingAndStart() {
|
||||
o.lsgap = ss.Last
|
||||
// See if we should update our starting sequence.
|
||||
if dp == DeliverLast || dp == DeliverLastPerSubject {
|
||||
o.sseq = ss.Last
|
||||
if !hadState {
|
||||
o.sseq = ss.Last
|
||||
}
|
||||
} else if dp == DeliverNew {
|
||||
// If our original is larger we will ignore, we don't want to go backwards with DeliverNew.
|
||||
// If its greater, we need to adjust pending.
|
||||
if ss.Last >= o.sseq {
|
||||
o.sgap -= (ss.Last - o.sseq + 1)
|
||||
o.sseq = ss.Last + 1
|
||||
if !hadState {
|
||||
o.sseq = ss.Last + 1
|
||||
}
|
||||
}
|
||||
} else {
|
||||
} else if !hadState {
|
||||
// DeliverAll, DeliverByStartSequence, DeliverByStartTime
|
||||
o.sseq = ss.First
|
||||
}
|
||||
|
||||
@@ -1266,7 +1266,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
|
||||
}
|
||||
lseq := e.mset.lastSeq()
|
||||
obs.mu.Lock()
|
||||
err = obs.readStoredState(lseq)
|
||||
_, err = obs.readStoredState(lseq)
|
||||
obs.mu.Unlock()
|
||||
if err != nil {
|
||||
s.Warnf(" Error restoring consumer %q state: %v", cfg.Name, err)
|
||||
|
||||
@@ -1835,6 +1835,9 @@ func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, _ *
|
||||
// Call actual stepdown.
|
||||
if mset != nil {
|
||||
if node := mset.raftNode(); node != nil {
|
||||
mset.setLeader(false)
|
||||
// TODO (mh) eventually make sure all go routines exited and all channels are cleared
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
node.StepDown()
|
||||
}
|
||||
}
|
||||
@@ -1937,6 +1940,9 @@ func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, _
|
||||
|
||||
// Call actual stepdown.
|
||||
if n := o.raftNode(); n != nil {
|
||||
o.setLeader(false)
|
||||
// TODO (mh) eventually make sure all go routines exited and all channels are cleared
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
n.StepDown()
|
||||
}
|
||||
|
||||
|
||||
@@ -5927,7 +5927,7 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) (e error) {
|
||||
o.mu.Lock()
|
||||
if o.isLeader() {
|
||||
// This expects mset lock to be held.
|
||||
o.setInitialPendingAndStart()
|
||||
o.setInitialPendingAndStart(false)
|
||||
}
|
||||
o.mu.Unlock()
|
||||
}
|
||||
|
||||
@@ -3380,7 +3380,7 @@ func (s *Server) lameDuckMode() {
|
||||
|
||||
// If we are running any raftNodes transfer leaders.
|
||||
if hadTransfers := s.transferRaftLeaders(); hadTransfers {
|
||||
// They will tranfer leadership quickly, but wait here for a second.
|
||||
// They will transfer leadership quickly, but wait here for a second.
|
||||
select {
|
||||
case <-time.After(time.Second):
|
||||
case <-s.quitCh:
|
||||
|
||||
@@ -4196,7 +4196,7 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error
|
||||
obs.setCreatedTime(cfg.Created)
|
||||
}
|
||||
obs.mu.Lock()
|
||||
err = obs.readStoredState(lseq)
|
||||
_, err = obs.readStoredState(lseq)
|
||||
obs.mu.Unlock()
|
||||
if err != nil {
|
||||
mset.stop(true, false)
|
||||
|
||||
Reference in New Issue
Block a user