[fixed] header handling in system services

On export/import a header was inserted which broke parsing of the
message.
Fixed unit test broken by .beta in version

Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
Matthias Hanel
2021-07-01 18:32:53 -04:00
parent 73b2beb2f3
commit a72ca8a9bf
3 changed files with 38 additions and 31 deletions

View File

@@ -721,37 +721,37 @@ func (s *Server) initEventTracking() {
}
monSrvc := map[string]msgHandler{
"STATSZ": s.statszReq,
"VARZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) {
"VARZ": func(sub *subscription, c *client, subject, reply string, msg []byte) {
optz := &VarzEventOptions{}
s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Varz(&optz.VarzOptions) })
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Varz(&optz.VarzOptions) })
},
"SUBSZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) {
"SUBSZ": func(sub *subscription, c *client, subject, reply string, msg []byte) {
optz := &SubszEventOptions{}
s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Subsz(&optz.SubszOptions) })
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Subsz(&optz.SubszOptions) })
},
"CONNZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) {
"CONNZ": func(sub *subscription, c *client, subject, reply string, msg []byte) {
optz := &ConnzEventOptions{}
s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Connz(&optz.ConnzOptions) })
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Connz(&optz.ConnzOptions) })
},
"ROUTEZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) {
"ROUTEZ": func(sub *subscription, c *client, subject, reply string, msg []byte) {
optz := &RoutezEventOptions{}
s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Routez(&optz.RoutezOptions) })
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Routez(&optz.RoutezOptions) })
},
"GATEWAYZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) {
"GATEWAYZ": func(sub *subscription, c *client, subject, reply string, msg []byte) {
optz := &GatewayzEventOptions{}
s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Gatewayz(&optz.GatewayzOptions) })
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Gatewayz(&optz.GatewayzOptions) })
},
"LEAFZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) {
"LEAFZ": func(sub *subscription, c *client, subject, reply string, msg []byte) {
optz := &LeafzEventOptions{}
s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Leafz(&optz.LeafzOptions) })
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Leafz(&optz.LeafzOptions) })
},
"ACCOUNTZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) {
"ACCOUNTZ": func(sub *subscription, c *client, subject, reply string, msg []byte) {
optz := &AccountzEventOptions{}
s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Accountz(&optz.AccountzOptions) })
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Accountz(&optz.AccountzOptions) })
},
"JSZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) {
"JSZ": func(sub *subscription, c *client, subject, reply string, msg []byte) {
optz := &JszEventOptions{}
s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Jsz(&optz.JSzOptions) })
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Jsz(&optz.JSzOptions) })
},
}
for name, req := range monSrvc {
@@ -772,9 +772,9 @@ func (s *Server) initEventTracking() {
}
}
monAccSrvc := map[string]msgHandler{
"SUBSZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) {
"SUBSZ": func(sub *subscription, c *client, subject, reply string, msg []byte) {
optz := &SubszEventOptions{}
s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(subject); err != nil {
return nil, err
} else {
@@ -784,9 +784,9 @@ func (s *Server) initEventTracking() {
}
})
},
"CONNZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) {
"CONNZ": func(sub *subscription, c *client, subject, reply string, msg []byte) {
optz := &ConnzEventOptions{}
s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(subject); err != nil {
return nil, err
} else {
@@ -795,9 +795,9 @@ func (s *Server) initEventTracking() {
}
})
},
"LEAFZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) {
"LEAFZ": func(sub *subscription, c *client, subject, reply string, msg []byte) {
optz := &LeafzEventOptions{}
s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(subject); err != nil {
return nil, err
} else {
@@ -806,9 +806,9 @@ func (s *Server) initEventTracking() {
}
})
},
"JSZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) {
"JSZ": func(sub *subscription, c *client, subject, reply string, msg []byte) {
optz := &JszEventOptions{}
s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(subject); err != nil {
return nil, err
} else {
@@ -817,9 +817,9 @@ func (s *Server) initEventTracking() {
}
})
},
"INFO": func(sub *subscription, _ *client, subject, reply string, msg []byte) {
"INFO": func(sub *subscription, c *client, subject, reply string, msg []byte) {
optz := &AccInfoEventOptions{}
s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(subject); err != nil {
return nil, err
} else {
@@ -1227,7 +1227,7 @@ func (s *Server) statszReq(sub *subscription, _ *client, subject, reply string,
var errSkipZreq = errors.New("filtered response")
func (s *Server) zReq(reply string, msg []byte, fOpts *EventFilterOptions, optz interface{}, respf func() (interface{}, error)) {
func (s *Server) zReq(c *client, reply string, rmsg []byte, fOpts *EventFilterOptions, optz interface{}, respf func() (interface{}, error)) {
if !s.EventsEnabled() || reply == _EMPTY_ {
return
}
@@ -1235,6 +1235,7 @@ func (s *Server) zReq(reply string, msg []byte, fOpts *EventFilterOptions, optz
response := map[string]interface{}{"server": server}
var err error
status := 0
_, msg := c.msgParts(rmsg)
if len(msg) != 0 {
if err = json.Unmarshal(msg, optz); err != nil {
status = http.StatusBadRequest // status is only included on error, so record how far execution got
@@ -1568,7 +1569,8 @@ func (s *Server) sendAuthErrorEvent(c *client) {
// Internal message callback. If the msg is needed past the callback it is
// required to be copied.
type msgHandler func(sub *subscription, client *client, subject, reply string, msg []byte)
// rmsg contains header and the message. use client.msgParts(rmsg) to split them apart
type msgHandler func(sub *subscription, client *client, subject, reply string, rmsg []byte)
// Create an internal subscription. sysSubscribeQ for queue groups.
func (s *Server) sysSubscribe(subject string, cb msgHandler) (*subscription, error) {

View File

@@ -2014,8 +2014,13 @@ func TestServerEventsPingMonitorz(t *testing.T) {
reply := nc.NewRespInbox()
replySubj, _ := nc.SubscribeSync(reply)
destSubj := fmt.Sprintf("%s.%s", serverStatsPingReqSubj, test.endpoint)
nc.PublishRequest(destSubj, reply, opt)
// set a header to make sure request parsing knows to ignore them
nc.PublishMsg(&nats.Msg{
Subject: fmt.Sprintf("%s.%s", serverStatsPingReqSubj, test.endpoint),
Reply: reply,
Header: nats.Header{"header": []string{"for header sake"}},
Data: opt,
})
// Receive both manually.
msg, err := replySubj.NextMsg(time.Second)

View File

@@ -108,7 +108,7 @@ func validateTrustedOperators(o *Options) error {
return fmt.Errorf("using nats based account resolver - the system account needs to be specified in configuration or the operator jwt")
}
}
ver := strings.Split(strings.Split(VERSION, "-")[0], ".RC")[0]
ver := strings.Split(strings.Split(strings.Split(VERSION, "-")[0], ".RC")[0], ".beta")[0]
srvMajor, srvMinor, srvUpdate, _ := jwt.ParseServerVersion(ver)
for _, opc := range o.TrustedOperators {
if major, minor, update, err := jwt.ParseServerVersion(opc.AssertServerVersion); err != nil {