mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
On RaftNode shutdown, wait for raft specific go routines
Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
@@ -511,7 +511,6 @@ func (js *jetStream) setupMetaGroup() error {
|
||||
sacc := s.SystemAccount()
|
||||
|
||||
js.mu.Lock()
|
||||
defer js.mu.Unlock()
|
||||
js.cluster = &jetStreamCluster{
|
||||
meta: n,
|
||||
streams: make(map[string]map[string]*streamAssignment),
|
||||
@@ -519,8 +518,9 @@ func (js *jetStream) setupMetaGroup() error {
|
||||
c: c,
|
||||
}
|
||||
c.registerWithAccount(sacc)
|
||||
js.mu.Unlock()
|
||||
|
||||
js.srv.startGoRoutine(js.monitorCluster)
|
||||
js.startGoRaftRoutine(n, func() { js.monitorCluster(n) })
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -710,10 +710,11 @@ func (cc *jetStreamCluster) isConsumerLeader(account, stream, consumer string) b
|
||||
return false
|
||||
}
|
||||
|
||||
func (js *jetStream) monitorCluster() {
|
||||
s, n := js.server(), js.getMetaGroup()
|
||||
qch, lch, ach := n.QuitC(), n.LeadChangeC(), n.ApplyC()
|
||||
|
||||
func (js *jetStream) monitorCluster(n RaftNode) {
|
||||
s := js.server()
|
||||
qch, qwg := n.QuitC()
|
||||
defer qwg.Done()
|
||||
lch, ach := n.LeadChangeC(), n.ApplyC()
|
||||
defer s.grWG.Done()
|
||||
|
||||
s.Debugf("Starting metadata monitor")
|
||||
@@ -1233,16 +1234,13 @@ func (mset *stream) raftNode() RaftNode {
|
||||
}
|
||||
|
||||
// Monitor our stream node for this stream.
|
||||
func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) {
|
||||
s, cc, n := js.server(), js.cluster, sa.Group.node
|
||||
func (js *jetStream) monitorStream(mset *stream, n RaftNode, sa *streamAssignment) {
|
||||
s, cc := js.server(), js.cluster
|
||||
defer s.grWG.Done()
|
||||
|
||||
if n == nil {
|
||||
s.Warnf("No RAFT group for '%s > %s", sa.Client.serviceAccount(), sa.Config.Name)
|
||||
return
|
||||
}
|
||||
|
||||
qch, lch, ach := n.QuitC(), n.LeadChangeC(), n.ApplyC()
|
||||
qch, wg := n.QuitC()
|
||||
defer wg.Done()
|
||||
lch, ach := n.LeadChangeC(), n.ApplyC()
|
||||
|
||||
s.Debugf("Starting stream monitor for '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name)
|
||||
defer s.Debugf("Exiting stream monitor for '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name)
|
||||
@@ -1888,7 +1886,11 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, sa *streamAssignme
|
||||
mset, err := acc.lookupStream(sa.Config.Name)
|
||||
if err == nil && mset != nil {
|
||||
if rg.node != nil && !alreadyRunning {
|
||||
s.startGoRoutine(func() { js.monitorStream(mset, sa) })
|
||||
if n := sa.Group.node; n == nil {
|
||||
s.Warnf("No RAFT group for '%s > %s", sa.Client.serviceAccount(), sa.Config.Name)
|
||||
} else {
|
||||
js.startGoRaftRoutine(n, func() { js.monitorStream(mset, n, sa) })
|
||||
}
|
||||
}
|
||||
mset.setStreamAssignment(sa)
|
||||
if err = mset.update(sa.Config); err != nil {
|
||||
@@ -2026,7 +2028,11 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme
|
||||
// Start our monitoring routine.
|
||||
if rg.node != nil {
|
||||
if !alreadyRunning {
|
||||
s.startGoRoutine(func() { js.monitorStream(mset, sa) })
|
||||
if n := sa.Group.node; n == nil {
|
||||
s.Warnf("No RAFT group for '%s > %s", sa.Client.serviceAccount(), sa.Config.Name)
|
||||
} else {
|
||||
js.startGoRaftRoutine(n, func() { js.monitorStream(mset, n, sa) })
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Single replica stream, process manually here.
|
||||
@@ -2289,6 +2295,16 @@ func (js *jetStream) processConsumerRemoval(ca *consumerAssignment) {
|
||||
}
|
||||
}
|
||||
|
||||
func (js *jetStream) startGoRaftRoutine(n RaftNode, f func()) {
|
||||
if n == nil {
|
||||
return
|
||||
}
|
||||
_, wg := n.QuitC()
|
||||
srv := js.server()
|
||||
wg.Add(1)
|
||||
srv.startGoRoutine(f)
|
||||
}
|
||||
|
||||
type consumerAssignmentResult struct {
|
||||
Account string `json:"account"`
|
||||
Stream string `json:"stream"`
|
||||
@@ -2401,7 +2417,11 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment) {
|
||||
// Start our monitoring routine.
|
||||
if rg.node != nil {
|
||||
if !alreadyRunning {
|
||||
s.startGoRoutine(func() { js.monitorConsumer(o, ca) })
|
||||
if n := o.raftNode(); n == nil {
|
||||
s.Warnf("No RAFT group for consumer")
|
||||
} else {
|
||||
js.startGoRaftRoutine(n, func() { js.monitorConsumer(o, n, ca) })
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Single replica consumer, process manually here.
|
||||
@@ -2529,16 +2549,12 @@ func (o *consumer) raftNode() RaftNode {
|
||||
return o.node
|
||||
}
|
||||
|
||||
func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
|
||||
s, n := js.server(), o.raftNode()
|
||||
func (js *jetStream) monitorConsumer(o *consumer, n RaftNode, ca *consumerAssignment) {
|
||||
s := js.server()
|
||||
defer s.grWG.Done()
|
||||
|
||||
if n == nil {
|
||||
s.Warnf("No RAFT group for consumer")
|
||||
return
|
||||
}
|
||||
|
||||
qch, lch, ach := n.QuitC(), n.LeadChangeC(), n.ApplyC()
|
||||
qch, wg := n.QuitC()
|
||||
defer wg.Done()
|
||||
lch, ach := n.LeadChangeC(), n.ApplyC()
|
||||
|
||||
s.Debugf("Starting consumer monitor for '%s > %s > %s", o.acc.Name, ca.Stream, ca.Name)
|
||||
defer s.Debugf("Exiting consumer monitor for '%s > %s > %s'", o.acc.Name, ca.Stream, ca.Name)
|
||||
@@ -4148,7 +4164,10 @@ RETRY:
|
||||
sreq = nil
|
||||
|
||||
// Run our own select loop here.
|
||||
for qch, lch := n.QuitC(), n.LeadChangeC(); ; {
|
||||
qch, wg := n.QuitC()
|
||||
wg.Add(1)
|
||||
defer wg.Done()
|
||||
for lch := n.LeadChangeC(); ; {
|
||||
select {
|
||||
case mrec := <-msgsC:
|
||||
notActive.Reset(activityInterval)
|
||||
|
||||
@@ -58,7 +58,8 @@ type RaftNode interface {
|
||||
PauseApply()
|
||||
ResumeApply()
|
||||
LeadChangeC() <-chan bool
|
||||
QuitC() <-chan struct{}
|
||||
// returns quit channel to receive quit from and wait group to decrement afterwards
|
||||
QuitC() (<-chan struct{}, *sync.WaitGroup)
|
||||
Created() time.Time
|
||||
Stop()
|
||||
Delete()
|
||||
@@ -184,6 +185,9 @@ type raft struct {
|
||||
votes chan *voteResponse
|
||||
leadc chan bool
|
||||
stepdown chan string
|
||||
|
||||
// wait group to clean up raft specific go routines
|
||||
wgQuit sync.WaitGroup
|
||||
}
|
||||
|
||||
// cacthupState structure that holds our subscription, and catchup term and index
|
||||
@@ -338,6 +342,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
|
||||
c: s.createInternalSystemClient(),
|
||||
sq: sq,
|
||||
quit: make(chan struct{}),
|
||||
wgQuit: sync.WaitGroup{},
|
||||
wtvch: make(chan struct{}, 1),
|
||||
wpsch: make(chan struct{}, 1),
|
||||
reqs: make(chan *voteRequest, 8),
|
||||
@@ -1132,9 +1137,9 @@ func (n *raft) Peers() []*Peer {
|
||||
return peers
|
||||
}
|
||||
|
||||
func (n *raft) ApplyC() <-chan *CommittedEntry { return n.applyc }
|
||||
func (n *raft) LeadChangeC() <-chan bool { return n.leadc }
|
||||
func (n *raft) QuitC() <-chan struct{} { return n.quit }
|
||||
func (n *raft) ApplyC() <-chan *CommittedEntry { return n.applyc }
|
||||
func (n *raft) LeadChangeC() <-chan bool { return n.leadc }
|
||||
func (n *raft) QuitC() (<-chan struct{}, *sync.WaitGroup) { return n.quit, &n.wgQuit }
|
||||
|
||||
func (n *raft) Created() time.Time {
|
||||
n.RLock()
|
||||
@@ -1180,6 +1185,8 @@ func (n *raft) shutdown(shouldDelete bool) {
|
||||
}
|
||||
n.Unlock()
|
||||
|
||||
n.wgQuit.Wait()
|
||||
|
||||
s.unregisterRaftNode(g)
|
||||
if shouldDelete {
|
||||
n.debug("Deleted")
|
||||
|
||||
@@ -2536,11 +2536,13 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
|
||||
|
||||
// Cluster cleanup
|
||||
if n := mset.node; n != nil {
|
||||
mset.mu.Unlock()
|
||||
if deleteFlag {
|
||||
n.Delete()
|
||||
} else {
|
||||
n.Stop()
|
||||
}
|
||||
mset.mu.Lock()
|
||||
}
|
||||
|
||||
// Send stream delete advisory after the consumers.
|
||||
|
||||
Reference in New Issue
Block a user