mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Add Raft goroutine labels, tweak logging (#4545)
This adds some more debugging information to the Raft goroutines in
pprof and improves the logging when a consumer was already running.
Example:
```
1 @ 0x1025b1838 0x1025c2ac8 0x102a47d1c 0x102a47244 0x102a858e0 0x1025e5ad4
# labels: {"account":"$SYS", "group":"_meta_", "type":"metaleader"}
# 0x102a47d1b github.com/nats-io/nats-server/v2/server.(*raft).runAsFollower+0xbb server/raft.go:1795
# 0x102a47243 github.com/nats-io/nats-server/v2/server.(*raft).run+0x2c3 server/raft.go:1715
# 0x102a858df github.com/nats-io/nats-server/v2/server.(*Server).startGoRoutine.func1+0x17f server/server.go:3609
1 @ 0x1025b1838 0x1025c2ac8 0x102a47d1c 0x102a47244 0x102a858e0 0x1025e5ad4
# labels: {"account":"$G", "group":"S-R3M-hn5zv7o3", "stream":"benchstream", "type":"stream"}
# 0x102a47d1b github.com/nats-io/nats-server/v2/server.(*raft).runAsFollower+0xbb server/raft.go:1795
# 0x102a47243 github.com/nats-io/nats-server/v2/server.(*raft).run+0x2c3 server/raft.go:1715
# 0x102a858df github.com/nats-io/nats-server/v2/server.(*Server).startGoRoutine.func1+0x17f server/server.go:3609
1 @ 0x1025b1838 0x1025c2ac8 0x102a49b60 0x102a47250 0x102a858e0 0x1025e5ad4
# labels: {"account":"$G", "consumer":"foobar", "group":"C-R3M-djqHTUCq", "stream":"benchstream", "type":"consumer"}
# 0x102a49b5f github.com/nats-io/nats-server/v2/server.(*raft).runAsLeader+0x4bf server/raft.go:2198
# 0x102a4724f github.com/nats-io/nats-server/v2/server.(*raft).run+0x2cf server/raft.go:1719
# 0x102a858df github.com/nats-io/nats-server/v2/server.(*Server).startGoRoutine.func1+0x17f server/server.go:3609
```
Signed-off-by: Neil Twigg <neil@nats.io>
This commit is contained in:
@@ -810,7 +810,10 @@ func (js *jetStream) setupMetaGroup() error {
|
||||
}
|
||||
|
||||
// Start up our meta node.
|
||||
n, err := s.startRaftNode(sysAcc.GetName(), cfg)
|
||||
n, err := s.startRaftNode(sysAcc.GetName(), cfg, pprofLabels{
|
||||
"type": "metaleader",
|
||||
"account": sysAcc.Name,
|
||||
})
|
||||
if err != nil {
|
||||
s.Warnf("Could not start metadata controller: %v", err)
|
||||
return err
|
||||
@@ -1979,7 +1982,7 @@ func (rg *raftGroup) setPreferred() {
|
||||
}
|
||||
|
||||
// createRaftGroup is called to spin up this raft group if needed.
|
||||
func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage StorageType) error {
|
||||
func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage StorageType, labels pprofLabels) error {
|
||||
js.mu.Lock()
|
||||
s, cc := js.srv, js.cluster
|
||||
if cc == nil || cc.meta == nil {
|
||||
@@ -2053,7 +2056,7 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor
|
||||
s.bootstrapRaftNode(cfg, rg.Peers, true)
|
||||
}
|
||||
|
||||
n, err := s.startRaftNode(accName, cfg)
|
||||
n, err := s.startRaftNode(accName, cfg, labels)
|
||||
if err != nil || n == nil {
|
||||
s.Debugf("Error creating raft group: %v", err)
|
||||
return err
|
||||
@@ -3414,7 +3417,11 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
|
||||
if !alreadyRunning && numReplicas > 1 {
|
||||
if needsNode {
|
||||
mset.setLeader(false)
|
||||
js.createRaftGroup(acc.GetName(), rg, storage)
|
||||
js.createRaftGroup(acc.GetName(), rg, storage, pprofLabels{
|
||||
"type": "stream",
|
||||
"account": mset.accName(),
|
||||
"stream": mset.name(),
|
||||
})
|
||||
}
|
||||
mset.monitorWg.Add(1)
|
||||
// Start monitoring..
|
||||
@@ -3522,7 +3529,11 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme
|
||||
js.mu.RUnlock()
|
||||
|
||||
// Process the raft group and make sure it's running if needed.
|
||||
err := js.createRaftGroup(acc.GetName(), rg, storage)
|
||||
err := js.createRaftGroup(acc.GetName(), rg, storage, pprofLabels{
|
||||
"type": "stream",
|
||||
"account": acc.Name,
|
||||
"stream": sa.Config.Name,
|
||||
})
|
||||
|
||||
// If we are restoring, create the stream if we are R>1 and not the preferred who handles the
|
||||
// receipt of the snapshot itself.
|
||||
@@ -3587,7 +3598,11 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme
|
||||
s.Warnf("JetStream cluster error updating stream %q for account %q: %v", sa.Config.Name, acc.Name, err)
|
||||
if osa != nil {
|
||||
// Process the raft group and make sure it's running if needed.
|
||||
js.createRaftGroup(acc.GetName(), osa.Group, storage)
|
||||
js.createRaftGroup(acc.GetName(), osa.Group, storage, pprofLabels{
|
||||
"type": "stream",
|
||||
"account": mset.accName(),
|
||||
"stream": mset.name(),
|
||||
})
|
||||
mset.setStreamAssignment(osa)
|
||||
}
|
||||
if rg.node != nil {
|
||||
@@ -4105,7 +4120,12 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
|
||||
storage = MemoryStorage
|
||||
}
|
||||
// No-op if R1.
|
||||
js.createRaftGroup(accName, rg, storage)
|
||||
js.createRaftGroup(accName, rg, storage, pprofLabels{
|
||||
"type": "consumer",
|
||||
"account": mset.accName(),
|
||||
"stream": ca.Stream,
|
||||
"consumer": ca.Name,
|
||||
})
|
||||
} else {
|
||||
// If we are clustered update the known peers.
|
||||
js.mu.RLock()
|
||||
@@ -4180,7 +4200,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
|
||||
|
||||
// Set CA for our consumer.
|
||||
o.setConsumerAssignment(cca)
|
||||
s.Debugf("JetStream cluster, consumer was already running")
|
||||
s.Debugf("JetStream cluster, consumer '%s > %s > %s' was already running", ca.Client.serviceAccount(), ca.Stream, ca.Name)
|
||||
}
|
||||
|
||||
// If we have an initial state set apply that now.
|
||||
|
||||
@@ -343,7 +343,7 @@ func (s *Server) bootstrapRaftNode(cfg *RaftConfig, knownPeers []string, allPeer
|
||||
}
|
||||
|
||||
// startRaftNode will start the raft node.
|
||||
func (s *Server) startRaftNode(accName string, cfg *RaftConfig) (RaftNode, error) {
|
||||
func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabels) (RaftNode, error) {
|
||||
if cfg == nil {
|
||||
return nil, errNilCfg
|
||||
}
|
||||
@@ -497,8 +497,9 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig) (RaftNode, error
|
||||
n.llqrt = time.Now()
|
||||
n.Unlock()
|
||||
|
||||
labels["group"] = n.group
|
||||
s.registerRaftNode(n.group, n)
|
||||
s.startGoRoutine(n.run)
|
||||
s.startGoRoutine(n.run, labels)
|
||||
s.startGoRoutine(n.fileWriter)
|
||||
|
||||
return n, nil
|
||||
@@ -653,7 +654,7 @@ func (n *raft) Propose(data []byte) error {
|
||||
n.RLock()
|
||||
if n.state != Leader {
|
||||
n.RUnlock()
|
||||
n.debug("Proposal ignored, not leader")
|
||||
n.debug("Proposal ignored, not leader (state: %v)", n.state)
|
||||
return errNotLeader
|
||||
}
|
||||
// Error if we had a previous write error.
|
||||
@@ -674,7 +675,7 @@ func (n *raft) ProposeDirect(entries []*Entry) error {
|
||||
n.RLock()
|
||||
if n.state != Leader {
|
||||
n.RUnlock()
|
||||
n.debug("Proposal ignored, not leader")
|
||||
n.debug("Direct proposal ignored, not leader (state: %v)", n.state)
|
||||
return errNotLeader
|
||||
}
|
||||
// Error if we had a previous write error.
|
||||
|
||||
@@ -115,7 +115,7 @@ func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf s
|
||||
require_NoError(c.t, err)
|
||||
cfg := &RaftConfig{Name: name, Store: c.t.TempDir(), Log: fs}
|
||||
s.bootstrapRaftNode(cfg, peers, true)
|
||||
n, err := s.startRaftNode(globalAccountName, cfg)
|
||||
n, err := s.startRaftNode(globalAccountName, cfg, pprofLabels{})
|
||||
require_NoError(c.t, err)
|
||||
sm := smf(s, cfg, n)
|
||||
sg = append(sg, sm)
|
||||
@@ -230,7 +230,7 @@ func (a *stateAdder) restart() {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
a.n, err = a.s.startRaftNode(globalAccountName, a.cfg)
|
||||
a.n, err = a.s.startRaftNode(globalAccountName, a.cfg, pprofLabels{})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user