diff --git a/server/events.go b/server/events.go index dce08086..8a3391ce 100644 --- a/server/events.go +++ b/server/events.go @@ -949,6 +949,10 @@ func (s *Server) initEventTracking() { optz := &HealthzEventOptions{} s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.healthz(&optz.HealthzOptions), nil }) }, + "PROFILEZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { + optz := &ProfilezEventOptions{} + s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.profilez(&optz.ProfilezOptions), nil }) + }, } for name, req := range monSrvc { subject = fmt.Sprintf(serverDirectReqSubj, s.info.ID, name) @@ -1574,6 +1578,12 @@ type HealthzEventOptions struct { EventFilterOptions } +// In the context of system events, ProfilezEventOptions are options passed to Profilez +type ProfilezEventOptions struct { + ProfilezOptions + EventFilterOptions +} + // returns true if the request does NOT apply to this server and can be ignored. // DO NOT hold the server lock when func (s *Server) filterRequest(fOpts *EventFilterOptions) bool { diff --git a/server/events_test.go b/server/events_test.go index c6fc99d7..05eeb1fd 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -1661,7 +1661,7 @@ func TestSystemAccountWithGateways(t *testing.T) { // If this tests fails with wrong number after 10 seconds we may have // added a new inititial subscription for the eventing system. - checkExpectedSubs(t, 50, sa) + checkExpectedSubs(t, 52, sa) // Create a client on B and see if we receive the event urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port) diff --git a/server/monitor.go b/server/monitor.go index 62beb564..4fb32c6e 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -14,6 +14,7 @@ package server import ( + "bytes" "crypto/sha256" "crypto/tls" "crypto/x509" @@ -26,6 +27,7 @@ import ( "os" "path/filepath" "runtime" + "runtime/pprof" "sort" "strconv" "strings" @@ -2639,6 +2641,12 @@ type HealthzOptions struct { JSServerOnly bool `json:"js-server-only,omitempty"` } +// ProfilezOptions are options passed to Profilez +type ProfilezOptions struct { + Name string `json:"name"` + Debug int `json:"debug"` +} + type StreamDetail struct { Name string `json:"name"` Created time.Time `json:"created"` @@ -3123,3 +3131,36 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus { // Success. return health } + +type ProfilezStatus struct { + Profile []byte `json:"profile"` + Error string `json:"error"` +} + +func (s *Server) profilez(opts *ProfilezOptions) *ProfilezStatus { + if s.profiler == nil { + return &ProfilezStatus{ + Error: "Profiling is not enabled", + } + } + if opts.Name == _EMPTY_ { + return &ProfilezStatus{ + Error: "Profile name not specified", + } + } + profile := pprof.Lookup(opts.Name) + if profile == nil { + return &ProfilezStatus{ + Error: fmt.Sprintf("Profile %q not found", opts.Name), + } + } + var buffer bytes.Buffer + if err := profile.WriteTo(&buffer, opts.Debug); err != nil { + return &ProfilezStatus{ + Error: fmt.Sprintf("Profile %q error: %s", opts.Name, err), + } + } + return &ProfilezStatus{ + Profile: buffer.Bytes(), + } +} diff --git a/server/monitor_test.go b/server/monitor_test.go index 89b8f1ff..cebd90c4 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -3925,7 +3925,7 @@ func TestMonitorAccountz(t *testing.T) { body = string(readBody(t, fmt.Sprintf("http://127.0.0.1:%d%s?acc=$SYS", s.MonitorAddr().Port, AccountzPath))) require_Contains(t, body, `"account_detail": {`) require_Contains(t, body, `"account_name": "$SYS",`) - require_Contains(t, body, `"subscriptions": 44,`) + require_Contains(t, body, `"subscriptions": 46,`) require_Contains(t, body, `"is_system": true,`) require_Contains(t, body, `"system_account": "$SYS"`) @@ -4585,3 +4585,38 @@ func TestServerIDZRequest(t *testing.T) { require_True(t, sid.Name == "TEST22") require_True(t, strings.HasPrefix(sid.ID, "N")) } + +func TestMonitorProfilez(t *testing.T) { + s := RunServer(DefaultOptions()) + defer s.Shutdown() + + // First of all, check that the profiles aren't accessible + // when profiling hasn't been started in the usual way. + if ps := s.profilez(&ProfilezOptions{ + Name: "allocs", Debug: 0, + }); ps.Error == "" { + t.Fatal("Profile should not be accessible when profiling not started") + } + + // Then start profiling. + s.StartProfiler() + + // Now check that all of the profiles that we expect are + // returning instead of erroring. + for _, try := range []*ProfilezOptions{ + {Name: "allocs", Debug: 0}, + {Name: "allocs", Debug: 1}, + {Name: "block", Debug: 0}, + {Name: "goroutine", Debug: 0}, + {Name: "goroutine", Debug: 1}, + {Name: "goroutine", Debug: 2}, + {Name: "heap", Debug: 0}, + {Name: "heap", Debug: 1}, + {Name: "mutex", Debug: 0}, + {Name: "threadcreate", Debug: 0}, + } { + if ps := s.profilez(try); ps.Error != _EMPTY_ { + t.Fatalf("Unexpected error on %v: %s", try, ps.Error) + } + } +}