diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index c1bc9a3d..5c732ce6 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -511,7 +511,6 @@ func (js *jetStream) setupMetaGroup() error { sacc := s.SystemAccount() js.mu.Lock() - defer js.mu.Unlock() js.cluster = &jetStreamCluster{ meta: n, streams: make(map[string]map[string]*streamAssignment), @@ -519,8 +518,9 @@ func (js *jetStream) setupMetaGroup() error { c: c, } c.registerWithAccount(sacc) + js.mu.Unlock() - js.srv.startGoRoutine(js.monitorCluster) + js.startGoRaftRoutine(n, func() { js.monitorCluster(n) }) return nil } @@ -710,10 +710,11 @@ func (cc *jetStreamCluster) isConsumerLeader(account, stream, consumer string) b return false } -func (js *jetStream) monitorCluster() { - s, n := js.server(), js.getMetaGroup() - qch, lch, ach := n.QuitC(), n.LeadChangeC(), n.ApplyC() - +func (js *jetStream) monitorCluster(n RaftNode) { + s := js.server() + qch, qwg := n.QuitC() + defer qwg.Done() + lch, ach := n.LeadChangeC(), n.ApplyC() defer s.grWG.Done() s.Debugf("Starting metadata monitor") @@ -1233,16 +1234,13 @@ func (mset *stream) raftNode() RaftNode { } // Monitor our stream node for this stream. -func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) { - s, cc, n := js.server(), js.cluster, sa.Group.node +func (js *jetStream) monitorStream(mset *stream, n RaftNode, sa *streamAssignment) { + s, cc := js.server(), js.cluster defer s.grWG.Done() - if n == nil { - s.Warnf("No RAFT group for '%s > %s", sa.Client.serviceAccount(), sa.Config.Name) - return - } - - qch, lch, ach := n.QuitC(), n.LeadChangeC(), n.ApplyC() + qch, wg := n.QuitC() + defer wg.Done() + lch, ach := n.LeadChangeC(), n.ApplyC() s.Debugf("Starting stream monitor for '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name) defer s.Debugf("Exiting stream monitor for '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name) @@ -1888,7 +1886,11 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, sa *streamAssignme mset, err := acc.lookupStream(sa.Config.Name) if err == nil && mset != nil { if rg.node != nil && !alreadyRunning { - s.startGoRoutine(func() { js.monitorStream(mset, sa) }) + if n := sa.Group.node; n == nil { + s.Warnf("No RAFT group for '%s > %s", sa.Client.serviceAccount(), sa.Config.Name) + } else { + js.startGoRaftRoutine(n, func() { js.monitorStream(mset, n, sa) }) + } } mset.setStreamAssignment(sa) if err = mset.update(sa.Config); err != nil { @@ -2026,7 +2028,11 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme // Start our monitoring routine. if rg.node != nil { if !alreadyRunning { - s.startGoRoutine(func() { js.monitorStream(mset, sa) }) + if n := sa.Group.node; n == nil { + s.Warnf("No RAFT group for '%s > %s", sa.Client.serviceAccount(), sa.Config.Name) + } else { + js.startGoRaftRoutine(n, func() { js.monitorStream(mset, n, sa) }) + } } } else { // Single replica stream, process manually here. @@ -2289,6 +2295,16 @@ func (js *jetStream) processConsumerRemoval(ca *consumerAssignment) { } } +func (js *jetStream) startGoRaftRoutine(n RaftNode, f func()) { + if n == nil { + return + } + _, wg := n.QuitC() + srv := js.server() + wg.Add(1) + srv.startGoRoutine(f) +} + type consumerAssignmentResult struct { Account string `json:"account"` Stream string `json:"stream"` @@ -2401,7 +2417,11 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment) { // Start our monitoring routine. if rg.node != nil { if !alreadyRunning { - s.startGoRoutine(func() { js.monitorConsumer(o, ca) }) + if n := o.raftNode(); n == nil { + s.Warnf("No RAFT group for consumer") + } else { + js.startGoRaftRoutine(n, func() { js.monitorConsumer(o, n, ca) }) + } } } else { // Single replica consumer, process manually here. @@ -2529,16 +2549,12 @@ func (o *consumer) raftNode() RaftNode { return o.node } -func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { - s, n := js.server(), o.raftNode() +func (js *jetStream) monitorConsumer(o *consumer, n RaftNode, ca *consumerAssignment) { + s := js.server() defer s.grWG.Done() - - if n == nil { - s.Warnf("No RAFT group for consumer") - return - } - - qch, lch, ach := n.QuitC(), n.LeadChangeC(), n.ApplyC() + qch, wg := n.QuitC() + defer wg.Done() + lch, ach := n.LeadChangeC(), n.ApplyC() s.Debugf("Starting consumer monitor for '%s > %s > %s", o.acc.Name, ca.Stream, ca.Name) defer s.Debugf("Exiting consumer monitor for '%s > %s > %s'", o.acc.Name, ca.Stream, ca.Name) @@ -4148,7 +4164,10 @@ RETRY: sreq = nil // Run our own select loop here. - for qch, lch := n.QuitC(), n.LeadChangeC(); ; { + qch, wg := n.QuitC() + wg.Add(1) + defer wg.Done() + for lch := n.LeadChangeC(); ; { select { case mrec := <-msgsC: notActive.Reset(activityInterval) diff --git a/server/raft.go b/server/raft.go index 349b5e9e..f21fcbd3 100644 --- a/server/raft.go +++ b/server/raft.go @@ -58,7 +58,8 @@ type RaftNode interface { PauseApply() ResumeApply() LeadChangeC() <-chan bool - QuitC() <-chan struct{} + // returns quit channel to receive quit from and wait group to decrement afterwards + QuitC() (<-chan struct{}, *sync.WaitGroup) Created() time.Time Stop() Delete() @@ -184,6 +185,9 @@ type raft struct { votes chan *voteResponse leadc chan bool stepdown chan string + + // wait group to clean up raft specific go routines + wgQuit sync.WaitGroup } // cacthupState structure that holds our subscription, and catchup term and index @@ -338,6 +342,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { c: s.createInternalSystemClient(), sq: sq, quit: make(chan struct{}), + wgQuit: sync.WaitGroup{}, wtvch: make(chan struct{}, 1), wpsch: make(chan struct{}, 1), reqs: make(chan *voteRequest, 8), @@ -1132,9 +1137,9 @@ func (n *raft) Peers() []*Peer { return peers } -func (n *raft) ApplyC() <-chan *CommittedEntry { return n.applyc } -func (n *raft) LeadChangeC() <-chan bool { return n.leadc } -func (n *raft) QuitC() <-chan struct{} { return n.quit } +func (n *raft) ApplyC() <-chan *CommittedEntry { return n.applyc } +func (n *raft) LeadChangeC() <-chan bool { return n.leadc } +func (n *raft) QuitC() (<-chan struct{}, *sync.WaitGroup) { return n.quit, &n.wgQuit } func (n *raft) Created() time.Time { n.RLock() @@ -1180,6 +1185,8 @@ func (n *raft) shutdown(shouldDelete bool) { } n.Unlock() + n.wgQuit.Wait() + s.unregisterRaftNode(g) if shouldDelete { n.debug("Deleted") diff --git a/server/stream.go b/server/stream.go index 38644937..a51e2614 100644 --- a/server/stream.go +++ b/server/stream.go @@ -2536,11 +2536,13 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { // Cluster cleanup if n := mset.node; n != nil { + mset.mu.Unlock() if deleteFlag { n.Delete() } else { n.Stop() } + mset.mu.Lock() } // Send stream delete advisory after the consumers.