diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 90485042..311dd0ec 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -810,7 +810,10 @@ func (js *jetStream) setupMetaGroup() error { } // Start up our meta node. - n, err := s.startRaftNode(sysAcc.GetName(), cfg) + n, err := s.startRaftNode(sysAcc.GetName(), cfg, pprofLabels{ + "type": "metaleader", + "account": sysAcc.Name, + }) if err != nil { s.Warnf("Could not start metadata controller: %v", err) return err @@ -1979,7 +1982,7 @@ func (rg *raftGroup) setPreferred() { } // createRaftGroup is called to spin up this raft group if needed. -func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage StorageType) error { +func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage StorageType, labels pprofLabels) error { js.mu.Lock() s, cc := js.srv, js.cluster if cc == nil || cc.meta == nil { @@ -2053,7 +2056,7 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor s.bootstrapRaftNode(cfg, rg.Peers, true) } - n, err := s.startRaftNode(accName, cfg) + n, err := s.startRaftNode(accName, cfg, labels) if err != nil || n == nil { s.Debugf("Error creating raft group: %v", err) return err @@ -3414,7 +3417,11 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss if !alreadyRunning && numReplicas > 1 { if needsNode { mset.setLeader(false) - js.createRaftGroup(acc.GetName(), rg, storage) + js.createRaftGroup(acc.GetName(), rg, storage, pprofLabels{ + "type": "stream", + "account": mset.accName(), + "stream": mset.name(), + }) } mset.monitorWg.Add(1) // Start monitoring.. @@ -3522,7 +3529,11 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme js.mu.RUnlock() // Process the raft group and make sure it's running if needed. - err := js.createRaftGroup(acc.GetName(), rg, storage) + err := js.createRaftGroup(acc.GetName(), rg, storage, pprofLabels{ + "type": "stream", + "account": acc.Name, + "stream": sa.Config.Name, + }) // If we are restoring, create the stream if we are R>1 and not the preferred who handles the // receipt of the snapshot itself. @@ -3587,7 +3598,11 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme s.Warnf("JetStream cluster error updating stream %q for account %q: %v", sa.Config.Name, acc.Name, err) if osa != nil { // Process the raft group and make sure it's running if needed. - js.createRaftGroup(acc.GetName(), osa.Group, storage) + js.createRaftGroup(acc.GetName(), osa.Group, storage, pprofLabels{ + "type": "stream", + "account": mset.accName(), + "stream": mset.name(), + }) mset.setStreamAssignment(osa) } if rg.node != nil { @@ -4105,7 +4120,12 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state storage = MemoryStorage } // No-op if R1. - js.createRaftGroup(accName, rg, storage) + js.createRaftGroup(accName, rg, storage, pprofLabels{ + "type": "consumer", + "account": mset.accName(), + "stream": ca.Stream, + "consumer": ca.Name, + }) } else { // If we are clustered update the known peers. js.mu.RLock() @@ -4180,7 +4200,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state // Set CA for our consumer. o.setConsumerAssignment(cca) - s.Debugf("JetStream cluster, consumer was already running") + s.Debugf("JetStream cluster, consumer '%s > %s > %s' was already running", ca.Client.serviceAccount(), ca.Stream, ca.Name) } // If we have an initial state set apply that now. diff --git a/server/raft.go b/server/raft.go index 242b0e70..076b8e0d 100644 --- a/server/raft.go +++ b/server/raft.go @@ -343,7 +343,7 @@ func (s *Server) bootstrapRaftNode(cfg *RaftConfig, knownPeers []string, allPeer } // startRaftNode will start the raft node. -func (s *Server) startRaftNode(accName string, cfg *RaftConfig) (RaftNode, error) { +func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabels) (RaftNode, error) { if cfg == nil { return nil, errNilCfg } @@ -497,8 +497,9 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig) (RaftNode, error n.llqrt = time.Now() n.Unlock() + labels["group"] = n.group s.registerRaftNode(n.group, n) - s.startGoRoutine(n.run) + s.startGoRoutine(n.run, labels) s.startGoRoutine(n.fileWriter) return n, nil @@ -653,7 +654,7 @@ func (n *raft) Propose(data []byte) error { n.RLock() if n.state != Leader { n.RUnlock() - n.debug("Proposal ignored, not leader") + n.debug("Proposal ignored, not leader (state: %v)", n.state) return errNotLeader } // Error if we had a previous write error. @@ -674,7 +675,7 @@ func (n *raft) ProposeDirect(entries []*Entry) error { n.RLock() if n.state != Leader { n.RUnlock() - n.debug("Proposal ignored, not leader") + n.debug("Direct proposal ignored, not leader (state: %v)", n.state) return errNotLeader } // Error if we had a previous write error. diff --git a/server/raft_helpers_test.go b/server/raft_helpers_test.go index 6d235d35..e7a7ef88 100644 --- a/server/raft_helpers_test.go +++ b/server/raft_helpers_test.go @@ -115,7 +115,7 @@ func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf s require_NoError(c.t, err) cfg := &RaftConfig{Name: name, Store: c.t.TempDir(), Log: fs} s.bootstrapRaftNode(cfg, peers, true) - n, err := s.startRaftNode(globalAccountName, cfg) + n, err := s.startRaftNode(globalAccountName, cfg, pprofLabels{}) require_NoError(c.t, err) sm := smf(s, cfg, n) sg = append(sg, sm) @@ -230,7 +230,7 @@ func (a *stateAdder) restart() { if err != nil { panic(err) } - a.n, err = a.s.startRaftNode(globalAccountName, a.cfg) + a.n, err = a.s.startRaftNode(globalAccountName, a.cfg, pprofLabels{}) if err != nil { panic(err) }