From a72ca8a9bf785cd7847737c96ac8ccb55db68834 Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Thu, 1 Jul 2021 18:32:53 -0400 Subject: [PATCH] [fixed] header handling in system services On export/import a header was inserted which broke parsing of the message. Fixed unit test broken by .beta in version Signed-off-by: Matthias Hanel --- server/events.go | 58 ++++++++++++++++++++++--------------------- server/events_test.go | 9 +++++-- server/jwt.go | 2 +- 3 files changed, 38 insertions(+), 31 deletions(-) diff --git a/server/events.go b/server/events.go index 70d60472..069bcdda 100644 --- a/server/events.go +++ b/server/events.go @@ -721,37 +721,37 @@ func (s *Server) initEventTracking() { } monSrvc := map[string]msgHandler{ "STATSZ": s.statszReq, - "VARZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) { + "VARZ": func(sub *subscription, c *client, subject, reply string, msg []byte) { optz := &VarzEventOptions{} - s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Varz(&optz.VarzOptions) }) + s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Varz(&optz.VarzOptions) }) }, - "SUBSZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) { + "SUBSZ": func(sub *subscription, c *client, subject, reply string, msg []byte) { optz := &SubszEventOptions{} - s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Subsz(&optz.SubszOptions) }) + s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Subsz(&optz.SubszOptions) }) }, - "CONNZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) { + "CONNZ": func(sub *subscription, c *client, subject, reply string, msg []byte) { optz := &ConnzEventOptions{} - s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Connz(&optz.ConnzOptions) }) + s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Connz(&optz.ConnzOptions) }) }, - "ROUTEZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) { + "ROUTEZ": func(sub *subscription, c *client, subject, reply string, msg []byte) { optz := &RoutezEventOptions{} - s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Routez(&optz.RoutezOptions) }) + s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Routez(&optz.RoutezOptions) }) }, - "GATEWAYZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) { + "GATEWAYZ": func(sub *subscription, c *client, subject, reply string, msg []byte) { optz := &GatewayzEventOptions{} - s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Gatewayz(&optz.GatewayzOptions) }) + s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Gatewayz(&optz.GatewayzOptions) }) }, - "LEAFZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) { + "LEAFZ": func(sub *subscription, c *client, subject, reply string, msg []byte) { optz := &LeafzEventOptions{} - s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Leafz(&optz.LeafzOptions) }) + s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Leafz(&optz.LeafzOptions) }) }, - "ACCOUNTZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) { + "ACCOUNTZ": func(sub *subscription, c *client, subject, reply string, msg []byte) { optz := &AccountzEventOptions{} - s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Accountz(&optz.AccountzOptions) }) + s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Accountz(&optz.AccountzOptions) }) }, - "JSZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) { + "JSZ": func(sub *subscription, c *client, subject, reply string, msg []byte) { optz := &JszEventOptions{} - s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Jsz(&optz.JSzOptions) }) + s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Jsz(&optz.JSzOptions) }) }, } for name, req := range monSrvc { @@ -772,9 +772,9 @@ func (s *Server) initEventTracking() { } } monAccSrvc := map[string]msgHandler{ - "SUBSZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) { + "SUBSZ": func(sub *subscription, c *client, subject, reply string, msg []byte) { optz := &SubszEventOptions{} - s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { + s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { if acc, err := extractAccount(subject); err != nil { return nil, err } else { @@ -784,9 +784,9 @@ func (s *Server) initEventTracking() { } }) }, - "CONNZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) { + "CONNZ": func(sub *subscription, c *client, subject, reply string, msg []byte) { optz := &ConnzEventOptions{} - s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { + s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { if acc, err := extractAccount(subject); err != nil { return nil, err } else { @@ -795,9 +795,9 @@ func (s *Server) initEventTracking() { } }) }, - "LEAFZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) { + "LEAFZ": func(sub *subscription, c *client, subject, reply string, msg []byte) { optz := &LeafzEventOptions{} - s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { + s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { if acc, err := extractAccount(subject); err != nil { return nil, err } else { @@ -806,9 +806,9 @@ func (s *Server) initEventTracking() { } }) }, - "JSZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) { + "JSZ": func(sub *subscription, c *client, subject, reply string, msg []byte) { optz := &JszEventOptions{} - s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { + s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { if acc, err := extractAccount(subject); err != nil { return nil, err } else { @@ -817,9 +817,9 @@ func (s *Server) initEventTracking() { } }) }, - "INFO": func(sub *subscription, _ *client, subject, reply string, msg []byte) { + "INFO": func(sub *subscription, c *client, subject, reply string, msg []byte) { optz := &AccInfoEventOptions{} - s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { + s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { if acc, err := extractAccount(subject); err != nil { return nil, err } else { @@ -1227,7 +1227,7 @@ func (s *Server) statszReq(sub *subscription, _ *client, subject, reply string, var errSkipZreq = errors.New("filtered response") -func (s *Server) zReq(reply string, msg []byte, fOpts *EventFilterOptions, optz interface{}, respf func() (interface{}, error)) { +func (s *Server) zReq(c *client, reply string, rmsg []byte, fOpts *EventFilterOptions, optz interface{}, respf func() (interface{}, error)) { if !s.EventsEnabled() || reply == _EMPTY_ { return } @@ -1235,6 +1235,7 @@ func (s *Server) zReq(reply string, msg []byte, fOpts *EventFilterOptions, optz response := map[string]interface{}{"server": server} var err error status := 0 + _, msg := c.msgParts(rmsg) if len(msg) != 0 { if err = json.Unmarshal(msg, optz); err != nil { status = http.StatusBadRequest // status is only included on error, so record how far execution got @@ -1568,7 +1569,8 @@ func (s *Server) sendAuthErrorEvent(c *client) { // Internal message callback. If the msg is needed past the callback it is // required to be copied. -type msgHandler func(sub *subscription, client *client, subject, reply string, msg []byte) +// rmsg contains header and the message. use client.msgParts(rmsg) to split them apart +type msgHandler func(sub *subscription, client *client, subject, reply string, rmsg []byte) // Create an internal subscription. sysSubscribeQ for queue groups. func (s *Server) sysSubscribe(subject string, cb msgHandler) (*subscription, error) { diff --git a/server/events_test.go b/server/events_test.go index 11c859e8..69da9a16 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -2014,8 +2014,13 @@ func TestServerEventsPingMonitorz(t *testing.T) { reply := nc.NewRespInbox() replySubj, _ := nc.SubscribeSync(reply) - destSubj := fmt.Sprintf("%s.%s", serverStatsPingReqSubj, test.endpoint) - nc.PublishRequest(destSubj, reply, opt) + // set a header to make sure request parsing knows to ignore them + nc.PublishMsg(&nats.Msg{ + Subject: fmt.Sprintf("%s.%s", serverStatsPingReqSubj, test.endpoint), + Reply: reply, + Header: nats.Header{"header": []string{"for header sake"}}, + Data: opt, + }) // Receive both manually. msg, err := replySubj.NextMsg(time.Second) diff --git a/server/jwt.go b/server/jwt.go index 538af9b9..478cf4f5 100644 --- a/server/jwt.go +++ b/server/jwt.go @@ -108,7 +108,7 @@ func validateTrustedOperators(o *Options) error { return fmt.Errorf("using nats based account resolver - the system account needs to be specified in configuration or the operator jwt") } } - ver := strings.Split(strings.Split(VERSION, "-")[0], ".RC")[0] + ver := strings.Split(strings.Split(strings.Split(VERSION, "-")[0], ".RC")[0], ".beta")[0] srvMajor, srvMinor, srvUpdate, _ := jwt.ParseServerVersion(ver) for _, opc := range o.TrustedOperators { if major, minor, update, err := jwt.ParseServerVersion(opc.AssertServerVersion); err != nil {