diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index c891455d..b397ee40 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -833,7 +833,13 @@ func (js *jetStream) setupMetaGroup() error { atomic.StoreInt32(&js.clustered, 1) c.registerWithAccount(sacc) - js.srv.startGoRoutine(js.monitorCluster) + js.srv.startGoRoutine( + js.monitorCluster, + pprofLabels{ + "type": "metaleader", + "account": sacc.Name, + }, + ) return nil } @@ -3315,7 +3321,14 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss } mset.monitorWg.Add(1) // Start monitoring.. - s.startGoRoutine(func() { js.monitorStream(mset, sa, needsNode) }) + s.startGoRoutine( + func() { js.monitorStream(mset, sa, needsNode) }, + pprofLabels{ + "type": "stream", + "account": mset.accName(), + "stream": mset.name(), + }, + ) } else if numReplicas == 1 && alreadyRunning { // We downgraded to R1. Make sure we cleanup the raft node and the stream monitor. mset.removeNode() @@ -3538,7 +3551,14 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme if mset != nil { mset.monitorWg.Add(1) } - s.startGoRoutine(func() { js.monitorStream(mset, sa, false) }) + s.startGoRoutine( + func() { js.monitorStream(mset, sa, false) }, + pprofLabels{ + "type": "stream", + "account": mset.accName(), + "stream": mset.name(), + }, + ) } } else { // Single replica stream, process manually here. @@ -4150,7 +4170,15 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state // Start our monitoring routine if needed. if !alreadyRunning && !o.isMonitorRunning() { o.monitorWg.Add(1) - s.startGoRoutine(func() { js.monitorConsumer(o, ca) }) + s.startGoRoutine( + func() { js.monitorConsumer(o, ca) }, + pprofLabels{ + "type": "consumer", + "account": mset.accName(), + "stream": mset.name(), + "consumer": ca.Name, + }, + ) } // For existing consumer, only send response if not recovering. if wasExisting && !js.isMetaRecovering() { diff --git a/server/server.go b/server/server.go index a0eead46..5975764c 100644 --- a/server/server.go +++ b/server/server.go @@ -28,6 +28,7 @@ import ( "net" "net/http" "regexp" + "runtime/pprof" // Allow dynamic profiling. _ "net/http/pprof" @@ -3501,15 +3502,28 @@ func (s *Server) String() string { return s.info.Name } -func (s *Server) startGoRoutine(f func()) bool { +type pprofLabels map[string]string + +func (s *Server) startGoRoutine(f func(), tags ...pprofLabels) bool { var started bool s.grMu.Lock() + defer s.grMu.Unlock() if s.grRunning { + var labels []string + for _, m := range tags { + for k, v := range m { + labels = append(labels, k, v) + } + } s.grWG.Add(1) - go f() + go func() { + pprof.SetGoroutineLabels( + pprof.WithLabels(context.Background(), pprof.Labels(labels...)), + ) + f() + }() started = true } - s.grMu.Unlock() return started } diff --git a/server/stream.go b/server/stream.go index 741bff71..6f7d16a0 100644 --- a/server/stream.go +++ b/server/stream.go @@ -2373,7 +2373,14 @@ func (mset *stream) setupMirrorConsumer() error { mirror.qch = make(chan struct{}) mirror.wg.Add(1) ready.Add(1) - if !mset.srv.startGoRoutine(func() { mset.processMirrorMsgs(mirror, &ready) }) { + if !mset.srv.startGoRoutine( + func() { mset.processMirrorMsgs(mirror, &ready) }, + pprofLabels{ + "type": "mirror", + "account": mset.acc.Name, + "stream": mset.cfg.Name, + }, + ) { ready.Done() } } @@ -2671,7 +2678,14 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.T si.qch = make(chan struct{}) si.wg.Add(1) ready.Add(1) - if !mset.srv.startGoRoutine(func() { mset.processSourceMsgs(si, &ready) }) { + if !mset.srv.startGoRoutine( + func() { mset.processSourceMsgs(si, &ready) }, + pprofLabels{ + "type": "source", + "account": mset.acc.Name, + "stream": mset.cfg.Name, + }, + ) { ready.Done() } }