From 2ee868ba188ad313532abf052149bf2ef5e1853f Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 5 Sep 2018 16:15:31 -0700 Subject: [PATCH 1/2] Propogate route imports and exports to other connected servers Signed-off-by: Derek Collison --- server/client.go | 24 +++++++++++----------- server/const.go | 4 ++-- server/monitor.go | 41 ++++++++++++++++++++------------------ server/monitor_test.go | 41 ++++++++++++++++++++++++++------------ server/reload_test.go | 6 +++--- server/route.go | 45 +++++++++++++++++++++++++++++++++++------- server/server.go | 32 +++++++++++++++++------------- 7 files changed, 124 insertions(+), 69 deletions(-) diff --git a/server/client.go b/server/client.go index 23a507ea..cba38914 100644 --- a/server/client.go +++ b/server/client.go @@ -242,17 +242,19 @@ type subscription struct { } type clientOpts struct { - Echo bool `json:"echo"` - Verbose bool `json:"verbose"` - Pedantic bool `json:"pedantic"` - TLSRequired bool `json:"tls_required"` - Authorization string `json:"auth_token"` - Username string `json:"user"` - Password string `json:"pass"` - Name string `json:"name"` - Lang string `json:"lang"` - Version string `json:"version"` - Protocol int `json:"protocol"` + Echo bool `json:"echo"` + Verbose bool `json:"verbose"` + Pedantic bool `json:"pedantic"` + TLSRequired bool `json:"tls_required"` + Authorization string `json:"auth_token"` + Username string `json:"user"` + Password string `json:"pass"` + Name string `json:"name"` + Lang string `json:"lang"` + Version string `json:"version"` + Protocol int `json:"protocol"` + Import *SubjectPermission `json:"import,omitempty"` // Used for routes only + Export *SubjectPermission `json:"export,omitempty"` // Used for routes only } var defaultOpts = clientOpts{Verbose: true, Pedantic: true, Echo: true} diff --git a/server/const.go b/server/const.go index ffbcfec2..24092ae0 100644 --- a/server/const.go +++ b/server/const.go @@ -55,8 +55,8 @@ const ( DEFAULT_HOST = "0.0.0.0" // MAX_CONTROL_LINE_SIZE is the maximum allowed protocol control line size. - // 1k should be plenty since payloads sans connect string are separate - MAX_CONTROL_LINE_SIZE = 1024 + // 4k should be plenty since payloads sans connect/info string are separate. + MAX_CONTROL_LINE_SIZE = 4096 // MAX_PAYLOAD_SIZE is the maximum allowed payload size. Should be using // something different if > 1MB payloads are needed. diff --git a/server/monitor.go b/server/monitor.go index 5a723a5f..9379b736 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -532,8 +532,8 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) { type Routez struct { ID string `json:"server_id"` Now time.Time `json:"now"` - Imports *SubjectPermission `json:"imports,omitempty"` - Exports *SubjectPermission `json:"exports,omitempty"` + Import *SubjectPermission `json:"import,omitempty"` + Export *SubjectPermission `json:"export,omitempty"` NumRoutes int `json:"num_routes"` Routes []*RouteInfo `json:"routes"` } @@ -546,19 +546,21 @@ type RoutezOptions struct { // RouteInfo has detailed information on a per connection basis. type RouteInfo struct { - Rid uint64 `json:"rid"` - RemoteID string `json:"remote_id"` - DidSolicit bool `json:"did_solicit"` - IsConfigured bool `json:"is_configured"` - IP string `json:"ip"` - Port int `json:"port"` - Pending int `json:"pending_size"` - InMsgs int64 `json:"in_msgs"` - OutMsgs int64 `json:"out_msgs"` - InBytes int64 `json:"in_bytes"` - OutBytes int64 `json:"out_bytes"` - NumSubs uint32 `json:"subscriptions"` - Subs []string `json:"subscriptions_list,omitempty"` + Rid uint64 `json:"rid"` + RemoteID string `json:"remote_id"` + DidSolicit bool `json:"did_solicit"` + IsConfigured bool `json:"is_configured"` + IP string `json:"ip"` + Port int `json:"port"` + Import *SubjectPermission `json:"import,omitempty"` + Export *SubjectPermission `json:"export,omitempty"` + Pending int `json:"pending_size"` + InMsgs int64 `json:"in_msgs"` + OutMsgs int64 `json:"out_msgs"` + InBytes int64 `json:"in_bytes"` + OutBytes int64 `json:"out_bytes"` + NumSubs uint32 `json:"subscriptions"` + Subs []string `json:"subscriptions_list,omitempty"` } // Routez returns a Routez struct containing inormation about routes. @@ -575,10 +577,9 @@ func (s *Server) Routez(routezOpts *RoutezOptions) (*Routez, error) { rs.ID = s.info.ID // Check for defined permissions for all connected routes. - perms := s.getOpts().Cluster.Permissions - if perms != nil { - rs.Imports = perms.Import - rs.Exports = perms.Export + if perms := s.getOpts().Cluster.Permissions; perms != nil { + rs.Import = perms.Import + rs.Export = perms.Export } // Walk the list @@ -594,6 +595,8 @@ func (s *Server) Routez(routezOpts *RoutezOptions) (*Routez, error) { InBytes: atomic.LoadInt64(&r.inBytes), OutBytes: r.outBytes, NumSubs: uint32(len(r.subs)), + Import: r.opts.Import, + Export: r.opts.Export, } if subs && len(r.subs) > 0 { diff --git a/server/monitor_test.go b/server/monitor_test.go index e2930391..276e59e6 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -1936,23 +1936,38 @@ func TestRoutezPermissions(t *testing.T) { rz := pollRoutez(t, servers[i], mode, url, nil) // For server 1, we expect to see imports and exports if i == 0 { - if rz.Imports == nil || rz.Imports.Allow == nil || - len(rz.Imports.Allow) != 1 || rz.Imports.Allow[0] != "foo" || - rz.Imports.Deny != nil { - t.Fatalf("Unexpected Imports %v", rz.Imports) + if rz.Import == nil || rz.Import.Allow == nil || + len(rz.Import.Allow) != 1 || rz.Import.Allow[0] != "foo" || + rz.Import.Deny != nil { + t.Fatalf("Unexpected Import %v", rz.Import) } - if rz.Exports == nil || rz.Exports.Allow == nil || rz.Exports.Deny == nil || - len(rz.Exports.Allow) != 1 || rz.Exports.Allow[0] != "*" || - len(rz.Exports.Deny) != 2 || rz.Exports.Deny[0] != "foo" || rz.Exports.Deny[1] != "nats" { - t.Fatalf("Unexpected Exports %v", rz.Exports) + if rz.Export == nil || rz.Export.Allow == nil || rz.Export.Deny == nil || + len(rz.Export.Allow) != 1 || rz.Export.Allow[0] != "*" || + len(rz.Export.Deny) != 2 || rz.Export.Deny[0] != "foo" || rz.Export.Deny[1] != "nats" { + t.Fatalf("Unexpected Export %v", rz.Export) } } else { - // We expect to see NO imports and exports for server B. - if rz.Imports != nil { - t.Fatal("Routez body should NOT contain \"imports\" information.") + // We expect to see NO imports and exports for server B by default. + if rz.Import != nil { + t.Fatal("Routez body should NOT contain \"import\" information.") } - if rz.Exports != nil { - t.Fatal("Routez body should NOT contain \"exports\" information.") + if rz.Export != nil { + t.Fatal("Routez body should NOT contain \"export\" information.") + } + // We do expect to see them show up for the information we have on Server A though. + if len(rz.Routes) != 1 { + t.Fatalf("Expected route array of 1, got %v\n", len(rz.Routes)) + } + route := rz.Routes[0] + if route.Import == nil || route.Import.Allow == nil || + len(route.Import.Allow) != 1 || route.Import.Allow[0] != "foo" || + route.Import.Deny != nil { + t.Fatalf("Unexpected Import %v", route.Import) + } + if route.Export == nil || route.Export.Allow == nil || route.Export.Deny == nil || + len(route.Export.Allow) != 1 || route.Export.Allow[0] != "*" || + len(route.Export.Deny) != 2 || route.Export.Deny[0] != "foo" || route.Export.Deny[1] != "nats" { + t.Fatalf("Unexpected Export %v", route.Export) } } } diff --git a/server/reload_test.go b/server/reload_test.go index 4072e31e..15414568 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -134,7 +134,7 @@ func TestConfigReloadUnsupported(t *testing.T) { Debug: false, Trace: false, Logtime: false, - MaxControlLine: 1024, + MaxControlLine: 4096, MaxPayload: 1048576, MaxConn: 65536, PingInterval: 2 * time.Minute, @@ -213,7 +213,7 @@ func TestConfigReloadInvalidConfig(t *testing.T) { Debug: false, Trace: false, Logtime: false, - MaxControlLine: 1024, + MaxControlLine: 4096, MaxPayload: 1048576, MaxConn: 65536, PingInterval: 2 * time.Minute, @@ -284,7 +284,7 @@ func TestConfigReload(t *testing.T) { Trace: false, NoLog: true, Logtime: false, - MaxControlLine: 1024, + MaxControlLine: 4096, MaxPayload: 1048576, MaxConn: 65536, PingInterval: 2 * time.Minute, diff --git a/server/route.go b/server/route.go index e7afa97d..bf07ec2b 100644 --- a/server/route.go +++ b/server/route.go @@ -21,6 +21,7 @@ import ( "math/rand" "net" "net/url" + "runtime" "strconv" "strings" "sync/atomic" @@ -53,13 +54,15 @@ type route struct { } type connectInfo struct { - Echo bool `json:"echo"` - Verbose bool `json:"verbose"` - Pedantic bool `json:"pedantic"` - User string `json:"user,omitempty"` - Pass string `json:"pass,omitempty"` - TLS bool `json:"tls_required"` - Name string `json:"name"` + Echo bool `json:"echo"` + Verbose bool `json:"verbose"` + Pedantic bool `json:"pedantic"` + User string `json:"user,omitempty"` + Pass string `json:"pass,omitempty"` + TLS bool `json:"tls_required"` + Name string `json:"name"` + Import *SubjectPermission `json:"import,omitempty"` + Export *SubjectPermission `json:"export,omitempty"` } // Used to hold onto mappings for unsubscribed @@ -268,6 +271,13 @@ func (c *client) sendConnect(tlsRequired bool) { TLS: tlsRequired, Name: c.srv.info.ID, } + + // Check for any permissions to send across. + if perms := c.srv.opts.Cluster.Permissions; perms != nil { + cinfo.Import = perms.Import + cinfo.Export = perms.Export + } + b, err := json.Marshal(cinfo) if err != nil { c.Errorf("Error marshaling CONNECT to route: %v\n", err) @@ -316,6 +326,9 @@ func (c *client) processRouteInfo(info *Info) { c.route.authRequired = info.AuthRequired c.route.tlsRequired = info.TLSRequired + c.opts.Import = info.Import + c.opts.Export = info.Export + // If we do not know this route's URL, construct one on the fly // from the information provided. if c.route.url == nil { @@ -805,6 +818,10 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) { rs := *c.route r = &rs } + + // Snapshot permissions if they exist. + imports := c.opts.Import + exports := c.opts.Export c.mu.Unlock() remote.mu.Lock() @@ -819,6 +836,14 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) { // on the opposite connection, and therefore end-up with both // connections being dropped. remote.route.retry = true + + // If we had permissions we want to move these to the remote. + if imports != nil { + remote.opts.Import = imports + } + if exports != nil { + remote.opts.Export = exports + } remote.mu.Unlock() } @@ -911,6 +936,7 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) { info := Info{ ID: s.info.ID, Version: s.info.Version, + GoVersion: runtime.Version(), AuthRequired: false, TLSRequired: tlsReq, TLSVerify: tlsReq, @@ -932,6 +958,11 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) { if opts.Cluster.Username != "" { info.AuthRequired = true } + // Check for permissions. + if opts.Cluster.Permissions != nil { + info.Import = opts.Cluster.Permissions.Import + info.Export = opts.Cluster.Permissions.Export + } s.routeInfo = info // Possibly override Host/Port and set IP based on Cluster.Advertise if err := s.setRouteInfoHostPortAndIP(); err != nil { diff --git a/server/server.go b/server/server.go index ad45067f..67dc63f0 100644 --- a/server/server.go +++ b/server/server.go @@ -41,20 +41,24 @@ import ( // Info is the information sent to clients to help them understand information // about this server. type Info struct { - ID string `json:"server_id"` - Version string `json:"version"` - Proto int `json:"proto"` - GitCommit string `json:"git_commit,omitempty"` - GoVersion string `json:"go"` - Host string `json:"host"` - Port int `json:"port"` - AuthRequired bool `json:"auth_required,omitempty"` - TLSRequired bool `json:"tls_required,omitempty"` - TLSVerify bool `json:"tls_verify,omitempty"` - MaxPayload int `json:"max_payload"` - IP string `json:"ip,omitempty"` - CID uint64 `json:"client_id,omitempty"` - ClientConnectURLs []string `json:"connect_urls,omitempty"` // Contains URLs a client can connect to. + ID string `json:"server_id"` + Version string `json:"version"` + Proto int `json:"proto"` + GitCommit string `json:"git_commit,omitempty"` + GoVersion string `json:"go"` + Host string `json:"host"` + Port int `json:"port"` + AuthRequired bool `json:"auth_required,omitempty"` + TLSRequired bool `json:"tls_required,omitempty"` + TLSVerify bool `json:"tls_verify,omitempty"` + MaxPayload int `json:"max_payload"` + IP string `json:"ip,omitempty"` + CID uint64 `json:"client_id,omitempty"` + + // Route Specific + ClientConnectURLs []string `json:"connect_urls,omitempty"` // Contains URLs a client can connect to. + Import *SubjectPermission `json:"import,omitempty"` + Export *SubjectPermission `json:"export,omitempty"` } // Server is our main struct. From f032dc4529ac1dcd37b72d7d060975271e8f664f Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 5 Sep 2018 17:03:45 -0700 Subject: [PATCH 2/2] Simplify, don't need connect processing Signed-off-by: Derek Collison --- server/client.go | 28 +++++++++++++++------------- server/route.go | 35 ++++++++--------------------------- 2 files changed, 23 insertions(+), 40 deletions(-) diff --git a/server/client.go b/server/client.go index cba38914..7e7ae933 100644 --- a/server/client.go +++ b/server/client.go @@ -242,19 +242,21 @@ type subscription struct { } type clientOpts struct { - Echo bool `json:"echo"` - Verbose bool `json:"verbose"` - Pedantic bool `json:"pedantic"` - TLSRequired bool `json:"tls_required"` - Authorization string `json:"auth_token"` - Username string `json:"user"` - Password string `json:"pass"` - Name string `json:"name"` - Lang string `json:"lang"` - Version string `json:"version"` - Protocol int `json:"protocol"` - Import *SubjectPermission `json:"import,omitempty"` // Used for routes only - Export *SubjectPermission `json:"export,omitempty"` // Used for routes only + Echo bool `json:"echo"` + Verbose bool `json:"verbose"` + Pedantic bool `json:"pedantic"` + TLSRequired bool `json:"tls_required"` + Authorization string `json:"auth_token"` + Username string `json:"user"` + Password string `json:"pass"` + Name string `json:"name"` + Lang string `json:"lang"` + Version string `json:"version"` + Protocol int `json:"protocol"` + + // Routes only + Import *SubjectPermission `json:"import,omitempty"` + Export *SubjectPermission `json:"export,omitempty"` } var defaultOpts = clientOpts{Verbose: true, Pedantic: true, Echo: true} diff --git a/server/route.go b/server/route.go index bf07ec2b..26ee2415 100644 --- a/server/route.go +++ b/server/route.go @@ -54,15 +54,13 @@ type route struct { } type connectInfo struct { - Echo bool `json:"echo"` - Verbose bool `json:"verbose"` - Pedantic bool `json:"pedantic"` - User string `json:"user,omitempty"` - Pass string `json:"pass,omitempty"` - TLS bool `json:"tls_required"` - Name string `json:"name"` - Import *SubjectPermission `json:"import,omitempty"` - Export *SubjectPermission `json:"export,omitempty"` + Echo bool `json:"echo"` + Verbose bool `json:"verbose"` + Pedantic bool `json:"pedantic"` + User string `json:"user,omitempty"` + Pass string `json:"pass,omitempty"` + TLS bool `json:"tls_required"` + Name string `json:"name"` } // Used to hold onto mappings for unsubscribed @@ -272,12 +270,6 @@ func (c *client) sendConnect(tlsRequired bool) { Name: c.srv.info.ID, } - // Check for any permissions to send across. - if perms := c.srv.opts.Cluster.Permissions; perms != nil { - cinfo.Import = perms.Import - cinfo.Export = perms.Export - } - b, err := json.Marshal(cinfo) if err != nil { c.Errorf("Error marshaling CONNECT to route: %v\n", err) @@ -326,6 +318,7 @@ func (c *client) processRouteInfo(info *Info) { c.route.authRequired = info.AuthRequired c.route.tlsRequired = info.TLSRequired + // Copy over permissions as well. c.opts.Import = info.Import c.opts.Export = info.Export @@ -818,10 +811,6 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) { rs := *c.route r = &rs } - - // Snapshot permissions if they exist. - imports := c.opts.Import - exports := c.opts.Export c.mu.Unlock() remote.mu.Lock() @@ -836,14 +825,6 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) { // on the opposite connection, and therefore end-up with both // connections being dropped. remote.route.retry = true - - // If we had permissions we want to move these to the remote. - if imports != nil { - remote.opts.Import = imports - } - if exports != nil { - remote.opts.Export = exports - } remote.mu.Unlock() }