diff --git a/server/events.go b/server/events.go index 826e76a4..220b2246 100644 --- a/server/events.go +++ b/server/events.go @@ -83,6 +83,8 @@ const ( // FIXME(dlc) - make configurable. var eventsHBInterval = 30 * time.Second +type sysMsgHandler func(sub *subscription, client *client, acc *Account, subject, reply string, hdr, msg []byte) + // Used if we have to queue things internally to avoid the route/gw path. type inSysMsg struct { sub *subscription @@ -90,8 +92,9 @@ type inSysMsg struct { acc *Account subj string rply string + hdr []byte msg []byte - cb msgHandler + cb sysMsgHandler } // Used to send and receive messages from inside the server. @@ -329,7 +332,7 @@ func (s *Server) internalReceiveLoop() { msgs := recvq.pop() for _, m := range msgs { if m.cb != nil { - m.cb(m.sub, m.c, m.acc, m.subj, m.rply, m.msg) + m.cb(m.sub, m.c, m.acc, m.subj, m.rply, m.hdr, m.msg) } } recvq.recycle(&msgs) @@ -919,41 +922,41 @@ func (s *Server) initEventTracking() { if _, err := s.sysSubscribe(serverStatsPingReqSubj, s.noInlineCallback(s.statszReq)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) } - monSrvc := map[string]msgHandler{ + monSrvc := map[string]sysMsgHandler{ "STATSZ": s.statszReq, - "VARZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { + "VARZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { optz := &VarzEventOptions{} s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Varz(&optz.VarzOptions) }) }, - "SUBSZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { + "SUBSZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { optz := &SubszEventOptions{} s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Subsz(&optz.SubszOptions) }) }, - "CONNZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { + "CONNZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { optz := &ConnzEventOptions{} s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Connz(&optz.ConnzOptions) }) }, - "ROUTEZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { + "ROUTEZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { optz := &RoutezEventOptions{} s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Routez(&optz.RoutezOptions) }) }, - "GATEWAYZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { + "GATEWAYZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { optz := &GatewayzEventOptions{} s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Gatewayz(&optz.GatewayzOptions) }) }, - "LEAFZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { + "LEAFZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { optz := &LeafzEventOptions{} s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Leafz(&optz.LeafzOptions) }) }, - "ACCOUNTZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { + "ACCOUNTZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { optz := &AccountzEventOptions{} s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Accountz(&optz.AccountzOptions) }) }, - "JSZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { + "JSZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { optz := &JszEventOptions{} s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Jsz(&optz.JSzOptions) }) }, - "HEALTHZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { + "HEALTHZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { optz := &HealthzEventOptions{} s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.healthz(&optz.HealthzOptions), nil }) }, @@ -975,8 +978,8 @@ func (s *Server) initEventTracking() { return tk[accReqAccIndex], nil } } - monAccSrvc := map[string]msgHandler{ - "SUBSZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { + monAccSrvc := map[string]sysMsgHandler{ + "SUBSZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { optz := &SubszEventOptions{} s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { if acc, err := extractAccount(c, subject, msg); err != nil { @@ -988,7 +991,7 @@ func (s *Server) initEventTracking() { } }) }, - "CONNZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { + "CONNZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { optz := &ConnzEventOptions{} s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { if acc, err := extractAccount(c, subject, msg); err != nil { @@ -999,7 +1002,7 @@ func (s *Server) initEventTracking() { } }) }, - "LEAFZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { + "LEAFZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { optz := &LeafzEventOptions{} s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { if acc, err := extractAccount(c, subject, msg); err != nil { @@ -1010,7 +1013,7 @@ func (s *Server) initEventTracking() { } }) }, - "JSZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { + "JSZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { optz := &JszEventOptions{} s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { if acc, err := extractAccount(c, subject, msg); err != nil { @@ -1021,7 +1024,7 @@ func (s *Server) initEventTracking() { } }) }, - "INFO": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { + "INFO": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { optz := &AccInfoEventOptions{} s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { if acc, err := extractAccount(c, subject, msg); err != nil { @@ -1034,7 +1037,7 @@ func (s *Server) initEventTracking() { // STATZ is essentially a duplicate of CONNS with an envelope identical to the others. // For historical reasons CONNS is the odd one out. // STATZ is also less heavy weight than INFO - "STATZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { + "STATZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { optz := &AccountStatzEventOptions{} s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { if acc, err := extractAccount(c, subject, msg); err != nil { @@ -1063,7 +1066,7 @@ func (s *Server) initEventTracking() { // For now only the STATZ subject has an account specific ping equivalent. if _, err := s.sysSubscribe(fmt.Sprintf(accPingReqSubj, "STATZ"), - s.noInlineCallback(func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { + s.noInlineCallback(func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { optz := &AccountStatzEventOptions{} s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { if stz, err := s.AccountStatz(&optz.AccountStatzOptions); err != nil { @@ -1158,7 +1161,7 @@ func (s *Server) addSystemAccountExports(sacc *Account) { } // accountClaimUpdate will receive claim updates for accounts. -func (s *Server) accountClaimUpdate(sub *subscription, c *client, _ *Account, subject, resp string, rmsg []byte) { +func (s *Server) accountClaimUpdate(sub *subscription, c *client, _ *Account, subject, resp string, hdr, msg []byte) { if !s.EventsEnabled() { return } @@ -1172,7 +1175,7 @@ func (s *Server) accountClaimUpdate(sub *subscription, c *client, _ *Account, su s.Debugf("Received account claims update on bad subject %q", subject) return } - if _, msg := c.msgParts(rmsg); len(msg) == 0 { + if len(msg) == 0 { err := errors.New("request body is empty") respondToUpdate(s, resp, pubKey, "jwt update error", err) } else if claim, err := jwt.DecodeAccountClaims(string(msg)); err != nil { @@ -1215,7 +1218,7 @@ func (s *Server) sameDomain(domain string) bool { } // remoteServerShutdownEvent is called when we get an event from another server shutting down. -func (s *Server) remoteServerShutdown(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { +func (s *Server) remoteServerShutdown(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { s.mu.Lock() defer s.mu.Unlock() if !s.eventsEnabled() { @@ -1227,7 +1230,6 @@ func (s *Server) remoteServerShutdown(sub *subscription, c *client, _ *Account, return } - _, msg := c.msgParts(rmsg) if len(msg) == 0 { s.Errorf("Remote server sent invalid (empty) shutdown message to %q", subject) return @@ -1255,9 +1257,9 @@ func (s *Server) remoteServerShutdown(sub *subscription, c *client, _ *Account, } // remoteServerUpdate listens for statsz updates from other servers. -func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { +func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { var ssm ServerStatsMsg - if _, msg := c.msgParts(rmsg); len(msg) == 0 { + if len(msg) == 0 { s.Debugf("Received empty server info for remote server update") return } else if err := json.Unmarshal(msg, &ssm); err != nil { @@ -1387,7 +1389,7 @@ func (s *Server) shutdownEventing() { } // Request for our local connection count. -func (s *Server) connsRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { +func (s *Server) connsRequest(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { if !s.eventsRunning() { return } @@ -1398,7 +1400,7 @@ func (s *Server) connsRequest(sub *subscription, c *client, _ *Account, subject, } a := tk[accReqAccIndex] m := accNumConnsReq{Account: a} - if _, msg := c.msgParts(rmsg); len(msg) > 0 { + if len(msg) > 0 { if err := json.Unmarshal(msg, &m); err != nil { s.sys.client.Errorf("Error unmarshalling account connections request message: %v", err) return @@ -1426,7 +1428,7 @@ func (s *Server) connsRequest(sub *subscription, c *client, _ *Account, subject, } // leafNodeConnected is an event we will receive when a leaf node for a given account connects. -func (s *Server) leafNodeConnected(sub *subscription, _ *client, _ *Account, subject, reply string, msg []byte) { +func (s *Server) leafNodeConnected(sub *subscription, _ *client, _ *Account, subject, reply string, hdr, msg []byte) { m := accNumConnsReq{} if err := json.Unmarshal(msg, &m); err != nil { s.sys.client.Errorf("Error unmarshalling account connections request message: %v", err) @@ -1585,7 +1587,7 @@ type ServerAPIConnzResponse struct { } // statszReq is a request for us to respond with current statsz. -func (s *Server) statszReq(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { +func (s *Server) statszReq(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { if !s.EventsEnabled() { return } @@ -1596,7 +1598,7 @@ func (s *Server) statszReq(sub *subscription, c *client, _ *Account, subject, re } opts := StatszEventOptions{} - if _, msg := c.msgParts(rmsg); len(msg) != 0 { + if len(msg) != 0 { if err := json.Unmarshal(msg, &opts); err != nil { response := &ServerAPIResponse{ Server: &ServerInfo{}, @@ -1667,12 +1669,12 @@ func (s *Server) zReq(c *client, reply string, rmsg []byte, fOpts *EventFilterOp } // remoteConnsUpdate gets called when we receive a remote update from another server. -func (s *Server) remoteConnsUpdate(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { +func (s *Server) remoteConnsUpdate(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { if !s.eventsRunning() { return } var m AccountNumConns - if _, msg := c.msgParts(rmsg); len(msg) == 0 { + if len(msg) == 0 { s.sys.client.Errorf("No message body provided") return } else if err := json.Unmarshal(msg, &m); err != nil { @@ -2029,7 +2031,7 @@ type msgHandler func(sub *subscription, client *client, acc *Account, subject, r // Create a wrapped callback handler for the subscription that will move it to an // internal recvQ for processing not inline with routes etc. -func (s *Server) noInlineCallback(cb msgHandler) msgHandler { +func (s *Server) noInlineCallback(cb sysMsgHandler) msgHandler { s.mu.RLock() if !s.eventsEnabled() { s.mu.RUnlock() @@ -2040,8 +2042,9 @@ func (s *Server) noInlineCallback(cb msgHandler) msgHandler { s.mu.RUnlock() return func(sub *subscription, c *client, acc *Account, subj, rply string, rmsg []byte) { - // Need to copy. - recvq.push(&inSysMsg{sub, c, acc, subj, rply, copyBytes(rmsg), cb}) + // Need to copy and split here. + hdr, msg := c.msgParts(rmsg) + recvq.push(&inSysMsg{sub, c, acc, subj, rply, copyBytes(hdr), copyBytes(msg), cb}) } } @@ -2120,7 +2123,7 @@ func remoteLatencySubjectForResponse(subject []byte) string { } // remoteLatencyUpdate is used to track remote latency measurements for tracking on exported services. -func (s *Server) remoteLatencyUpdate(sub *subscription, _ *client, _ *Account, subject, _ string, msg []byte) { +func (s *Server) remoteLatencyUpdate(sub *subscription, _ *client, _ *Account, subject, _ string, hdr, msg []byte) { if !s.eventsRunning() { return } @@ -2256,7 +2259,7 @@ func totalSubs(rr *SublistResult, qg []byte) (nsubs int32) { // Allows users of large systems to debug active subscribers for a given subject. // Payload should be the subject of interest. -func (s *Server) debugSubscribers(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { +func (s *Server) debugSubscribers(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { // Even though this is an internal only subscription, meaning interest was not forwarded, we could // get one here from a GW in optimistic mode. Ignore for now. // FIXME(dlc) - Should we send no interest here back to the GW? @@ -2264,7 +2267,7 @@ func (s *Server) debugSubscribers(sub *subscription, c *client, _ *Account, subj return } - _, acc, _, msg, err := s.getRequestInfo(c, rmsg) + _, acc, _, msg, err := s.getRequestInfo(c, msg) if err != nil { return } @@ -2366,12 +2369,12 @@ func (s *Server) debugSubscribers(sub *subscription, c *client, _ *Account, subj // Request for our local subscription count. This will come from a remote origin server // that received the initial request. -func (s *Server) nsubsRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { +func (s *Server) nsubsRequest(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { if !s.eventsRunning() { return } m := accNumSubsReq{} - if _, msg := c.msgParts(rmsg); len(msg) == 0 { + if len(msg) == 0 { s.sys.client.Errorf("request requires a body") return } else if err := json.Unmarshal(msg, &m); err != nil { diff --git a/server/events_test.go b/server/events_test.go index 70745e3f..9ad9ef4a 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -1641,7 +1641,7 @@ func TestSystemAccountWithBadRemoteLatencyUpdate(t *testing.T) { ReqId: "_INBOX.22", } b, _ := json.Marshal(&rl) - s.remoteLatencyUpdate(nil, nil, nil, "foo", _EMPTY_, b) + s.remoteLatencyUpdate(nil, nil, nil, "foo", _EMPTY_, nil, b) } func TestSystemAccountWithGateways(t *testing.T) {