Non-inline system callbacks need hdr and msg already split due to client context to split

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-04-04 14:15:30 -07:00
parent 9f95e993e2
commit c323ec3086
2 changed files with 45 additions and 42 deletions

View File

@@ -83,6 +83,8 @@ const (
// FIXME(dlc) - make configurable.
var eventsHBInterval = 30 * time.Second
type sysMsgHandler func(sub *subscription, client *client, acc *Account, subject, reply string, hdr, msg []byte)
// Used if we have to queue things internally to avoid the route/gw path.
type inSysMsg struct {
sub *subscription
@@ -90,8 +92,9 @@ type inSysMsg struct {
acc *Account
subj string
rply string
hdr []byte
msg []byte
cb msgHandler
cb sysMsgHandler
}
// Used to send and receive messages from inside the server.
@@ -329,7 +332,7 @@ func (s *Server) internalReceiveLoop() {
msgs := recvq.pop()
for _, m := range msgs {
if m.cb != nil {
m.cb(m.sub, m.c, m.acc, m.subj, m.rply, m.msg)
m.cb(m.sub, m.c, m.acc, m.subj, m.rply, m.hdr, m.msg)
}
}
recvq.recycle(&msgs)
@@ -919,41 +922,41 @@ func (s *Server) initEventTracking() {
if _, err := s.sysSubscribe(serverStatsPingReqSubj, s.noInlineCallback(s.statszReq)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
}
monSrvc := map[string]msgHandler{
monSrvc := map[string]sysMsgHandler{
"STATSZ": s.statszReq,
"VARZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
"VARZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
optz := &VarzEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Varz(&optz.VarzOptions) })
},
"SUBSZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
"SUBSZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
optz := &SubszEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Subsz(&optz.SubszOptions) })
},
"CONNZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
"CONNZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
optz := &ConnzEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Connz(&optz.ConnzOptions) })
},
"ROUTEZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
"ROUTEZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
optz := &RoutezEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Routez(&optz.RoutezOptions) })
},
"GATEWAYZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
"GATEWAYZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
optz := &GatewayzEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Gatewayz(&optz.GatewayzOptions) })
},
"LEAFZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
"LEAFZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
optz := &LeafzEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Leafz(&optz.LeafzOptions) })
},
"ACCOUNTZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
"ACCOUNTZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
optz := &AccountzEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Accountz(&optz.AccountzOptions) })
},
"JSZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
"JSZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
optz := &JszEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Jsz(&optz.JSzOptions) })
},
"HEALTHZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
"HEALTHZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
optz := &HealthzEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.healthz(&optz.HealthzOptions), nil })
},
@@ -975,8 +978,8 @@ func (s *Server) initEventTracking() {
return tk[accReqAccIndex], nil
}
}
monAccSrvc := map[string]msgHandler{
"SUBSZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
monAccSrvc := map[string]sysMsgHandler{
"SUBSZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
optz := &SubszEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(c, subject, msg); err != nil {
@@ -988,7 +991,7 @@ func (s *Server) initEventTracking() {
}
})
},
"CONNZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
"CONNZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
optz := &ConnzEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(c, subject, msg); err != nil {
@@ -999,7 +1002,7 @@ func (s *Server) initEventTracking() {
}
})
},
"LEAFZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
"LEAFZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
optz := &LeafzEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(c, subject, msg); err != nil {
@@ -1010,7 +1013,7 @@ func (s *Server) initEventTracking() {
}
})
},
"JSZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
"JSZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
optz := &JszEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(c, subject, msg); err != nil {
@@ -1021,7 +1024,7 @@ func (s *Server) initEventTracking() {
}
})
},
"INFO": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
"INFO": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
optz := &AccInfoEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(c, subject, msg); err != nil {
@@ -1034,7 +1037,7 @@ func (s *Server) initEventTracking() {
// STATZ is essentially a duplicate of CONNS with an envelope identical to the others.
// For historical reasons CONNS is the odd one out.
// STATZ is also less heavy weight than INFO
"STATZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
"STATZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
optz := &AccountStatzEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(c, subject, msg); err != nil {
@@ -1063,7 +1066,7 @@ func (s *Server) initEventTracking() {
// For now only the STATZ subject has an account specific ping equivalent.
if _, err := s.sysSubscribe(fmt.Sprintf(accPingReqSubj, "STATZ"),
s.noInlineCallback(func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
s.noInlineCallback(func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
optz := &AccountStatzEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if stz, err := s.AccountStatz(&optz.AccountStatzOptions); err != nil {
@@ -1158,7 +1161,7 @@ func (s *Server) addSystemAccountExports(sacc *Account) {
}
// accountClaimUpdate will receive claim updates for accounts.
func (s *Server) accountClaimUpdate(sub *subscription, c *client, _ *Account, subject, resp string, rmsg []byte) {
func (s *Server) accountClaimUpdate(sub *subscription, c *client, _ *Account, subject, resp string, hdr, msg []byte) {
if !s.EventsEnabled() {
return
}
@@ -1172,7 +1175,7 @@ func (s *Server) accountClaimUpdate(sub *subscription, c *client, _ *Account, su
s.Debugf("Received account claims update on bad subject %q", subject)
return
}
if _, msg := c.msgParts(rmsg); len(msg) == 0 {
if len(msg) == 0 {
err := errors.New("request body is empty")
respondToUpdate(s, resp, pubKey, "jwt update error", err)
} else if claim, err := jwt.DecodeAccountClaims(string(msg)); err != nil {
@@ -1215,7 +1218,7 @@ func (s *Server) sameDomain(domain string) bool {
}
// remoteServerShutdownEvent is called when we get an event from another server shutting down.
func (s *Server) remoteServerShutdown(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
func (s *Server) remoteServerShutdown(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
s.mu.Lock()
defer s.mu.Unlock()
if !s.eventsEnabled() {
@@ -1227,7 +1230,6 @@ func (s *Server) remoteServerShutdown(sub *subscription, c *client, _ *Account,
return
}
_, msg := c.msgParts(rmsg)
if len(msg) == 0 {
s.Errorf("Remote server sent invalid (empty) shutdown message to %q", subject)
return
@@ -1255,9 +1257,9 @@ func (s *Server) remoteServerShutdown(sub *subscription, c *client, _ *Account,
}
// remoteServerUpdate listens for statsz updates from other servers.
func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
var ssm ServerStatsMsg
if _, msg := c.msgParts(rmsg); len(msg) == 0 {
if len(msg) == 0 {
s.Debugf("Received empty server info for remote server update")
return
} else if err := json.Unmarshal(msg, &ssm); err != nil {
@@ -1387,7 +1389,7 @@ func (s *Server) shutdownEventing() {
}
// Request for our local connection count.
func (s *Server) connsRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
func (s *Server) connsRequest(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
if !s.eventsRunning() {
return
}
@@ -1398,7 +1400,7 @@ func (s *Server) connsRequest(sub *subscription, c *client, _ *Account, subject,
}
a := tk[accReqAccIndex]
m := accNumConnsReq{Account: a}
if _, msg := c.msgParts(rmsg); len(msg) > 0 {
if len(msg) > 0 {
if err := json.Unmarshal(msg, &m); err != nil {
s.sys.client.Errorf("Error unmarshalling account connections request message: %v", err)
return
@@ -1426,7 +1428,7 @@ func (s *Server) connsRequest(sub *subscription, c *client, _ *Account, subject,
}
// leafNodeConnected is an event we will receive when a leaf node for a given account connects.
func (s *Server) leafNodeConnected(sub *subscription, _ *client, _ *Account, subject, reply string, msg []byte) {
func (s *Server) leafNodeConnected(sub *subscription, _ *client, _ *Account, subject, reply string, hdr, msg []byte) {
m := accNumConnsReq{}
if err := json.Unmarshal(msg, &m); err != nil {
s.sys.client.Errorf("Error unmarshalling account connections request message: %v", err)
@@ -1585,7 +1587,7 @@ type ServerAPIConnzResponse struct {
}
// statszReq is a request for us to respond with current statsz.
func (s *Server) statszReq(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
func (s *Server) statszReq(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
if !s.EventsEnabled() {
return
}
@@ -1596,7 +1598,7 @@ func (s *Server) statszReq(sub *subscription, c *client, _ *Account, subject, re
}
opts := StatszEventOptions{}
if _, msg := c.msgParts(rmsg); len(msg) != 0 {
if len(msg) != 0 {
if err := json.Unmarshal(msg, &opts); err != nil {
response := &ServerAPIResponse{
Server: &ServerInfo{},
@@ -1667,12 +1669,12 @@ func (s *Server) zReq(c *client, reply string, rmsg []byte, fOpts *EventFilterOp
}
// remoteConnsUpdate gets called when we receive a remote update from another server.
func (s *Server) remoteConnsUpdate(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
func (s *Server) remoteConnsUpdate(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
if !s.eventsRunning() {
return
}
var m AccountNumConns
if _, msg := c.msgParts(rmsg); len(msg) == 0 {
if len(msg) == 0 {
s.sys.client.Errorf("No message body provided")
return
} else if err := json.Unmarshal(msg, &m); err != nil {
@@ -2029,7 +2031,7 @@ type msgHandler func(sub *subscription, client *client, acc *Account, subject, r
// Create a wrapped callback handler for the subscription that will move it to an
// internal recvQ for processing not inline with routes etc.
func (s *Server) noInlineCallback(cb msgHandler) msgHandler {
func (s *Server) noInlineCallback(cb sysMsgHandler) msgHandler {
s.mu.RLock()
if !s.eventsEnabled() {
s.mu.RUnlock()
@@ -2040,8 +2042,9 @@ func (s *Server) noInlineCallback(cb msgHandler) msgHandler {
s.mu.RUnlock()
return func(sub *subscription, c *client, acc *Account, subj, rply string, rmsg []byte) {
// Need to copy.
recvq.push(&inSysMsg{sub, c, acc, subj, rply, copyBytes(rmsg), cb})
// Need to copy and split here.
hdr, msg := c.msgParts(rmsg)
recvq.push(&inSysMsg{sub, c, acc, subj, rply, copyBytes(hdr), copyBytes(msg), cb})
}
}
@@ -2120,7 +2123,7 @@ func remoteLatencySubjectForResponse(subject []byte) string {
}
// remoteLatencyUpdate is used to track remote latency measurements for tracking on exported services.
func (s *Server) remoteLatencyUpdate(sub *subscription, _ *client, _ *Account, subject, _ string, msg []byte) {
func (s *Server) remoteLatencyUpdate(sub *subscription, _ *client, _ *Account, subject, _ string, hdr, msg []byte) {
if !s.eventsRunning() {
return
}
@@ -2256,7 +2259,7 @@ func totalSubs(rr *SublistResult, qg []byte) (nsubs int32) {
// Allows users of large systems to debug active subscribers for a given subject.
// Payload should be the subject of interest.
func (s *Server) debugSubscribers(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
func (s *Server) debugSubscribers(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
// Even though this is an internal only subscription, meaning interest was not forwarded, we could
// get one here from a GW in optimistic mode. Ignore for now.
// FIXME(dlc) - Should we send no interest here back to the GW?
@@ -2264,7 +2267,7 @@ func (s *Server) debugSubscribers(sub *subscription, c *client, _ *Account, subj
return
}
_, acc, _, msg, err := s.getRequestInfo(c, rmsg)
_, acc, _, msg, err := s.getRequestInfo(c, msg)
if err != nil {
return
}
@@ -2366,12 +2369,12 @@ func (s *Server) debugSubscribers(sub *subscription, c *client, _ *Account, subj
// Request for our local subscription count. This will come from a remote origin server
// that received the initial request.
func (s *Server) nsubsRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
func (s *Server) nsubsRequest(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
if !s.eventsRunning() {
return
}
m := accNumSubsReq{}
if _, msg := c.msgParts(rmsg); len(msg) == 0 {
if len(msg) == 0 {
s.sys.client.Errorf("request requires a body")
return
} else if err := json.Unmarshal(msg, &m); err != nil {