diff --git a/server/client.go b/server/client.go index 9a09cef3..73d4f453 100644 --- a/server/client.go +++ b/server/client.go @@ -69,6 +69,7 @@ type clientFlag byte // Some client state represented as flags const ( connectReceived clientFlag = 1 << iota // The CONNECT proto has been received + infoReceived // The INFO protocol has been received firstPongSent // The first PONG has been sent handshakeComplete // For TLS clients, indicate that the handshake is complete clearConnection // Marks that clearConnection has already been called. @@ -856,9 +857,12 @@ func (c *client) maxPayloadViolation(sz int, max int64) { } // queueOutbound queues data for client/route connections. -// Return pending length. +// Return if the data is referenced or not. If referenced, the caller +// should not reuse the `data` array. // Lock should be held. -func (c *client) queueOutbound(data []byte) { +func (c *client) queueOutbound(data []byte) bool { + // Assume data will not be referenced + referenced := false // Add to pending bytes total. c.out.pb += int64(len(data)) @@ -868,7 +872,7 @@ func (c *client) queueOutbound(data []byte) { c.clearConnection(SlowConsumerPendingBytes) atomic.AddInt64(&c.srv.slowConsumers, 1) c.Noticef("Slow Consumer Detected: MaxPending of %d Exceeded", c.out.mp) - return + return referenced } if c.out.p == nil && len(data) < maxBufSize { @@ -901,6 +905,7 @@ func (c *client) queueOutbound(data []byte) { // FIXME(dlc) - do we need signaling of ownership here if we want len(data) < if len(data) > maxBufSize { c.out.nb = append(c.out.nb, data) + referenced = true } else { // We will copy to primary. if c.out.p == nil { @@ -924,6 +929,7 @@ func (c *client) queueOutbound(data []byte) { } else { c.out.p = append(c.out.p, data...) } + return referenced } // Assume the lock is held upon entry. @@ -993,6 +999,12 @@ func (c *client) processPing() { } c.sendPong() + // If not a CLIENT, we are done + if c.typ != CLIENT { + c.mu.Unlock() + return + } + // The CONNECT should have been received, but make sure it // is so before proceeding if !c.flags.isSet(connectReceived) { @@ -1667,19 +1679,18 @@ func (c *client) processPingTimer() { c.Debugf("%s Ping Timer", c.typeString()) - // Check for violation - if c.ping.out+1 > c.srv.getOpts().MaxPingsOut { - c.Debugf("Stale Client Connection - Closing") - c.sendProto([]byte(fmt.Sprintf("-ERR '%s'\r\n", "Stale Connection")), true) - c.clearConnection(StaleConnection) - return - } - // If we have had activity within the PingInterval no // need to send a ping. if delta := time.Since(c.last); delta < c.srv.getOpts().PingInterval { c.Debugf("Delaying PING due to activity %v ago", delta.Round(time.Second)) } else { + // Check for violation + if c.ping.out+1 > c.srv.getOpts().MaxPingsOut { + c.Debugf("Stale Client Connection - Closing") + c.sendProto([]byte(fmt.Sprintf("-ERR '%s'\r\n", "Stale Connection")), true) + c.clearConnection(StaleConnection) + return + } // Send PING c.sendPing() } @@ -1824,12 +1835,9 @@ func (c *client) closeConnection(reason ClosedState) { // Remove clients subscriptions. srv.sl.RemoveBatch(subs) - if c.typ != ROUTER { - for _, sub := range subs { - // Forward on unsubscribes if we are not - // a router ourselves. - srv.broadcastUnSubscribe(sub) - } + if c.typ == CLIENT { + // Forward UNSUBs protocols to all routes + srv.broadcastUnSubscribeBatch(subs) } } diff --git a/server/reload.go b/server/reload.go index aa938ff0..410ede1d 100644 --- a/server/reload.go +++ b/server/reload.go @@ -38,20 +38,37 @@ type option interface { // IsAuthChange indicates if this option requires reloading authorization. IsAuthChange() bool + + // IsClusterPermsChange indicates if this option requires reloading + // cluster permissions. + IsClusterPermsChange() bool +} + +// noopOption is a base struct that provides default no-op behaviors. +type noopOption struct{} + +func (n noopOption) IsLoggingChange() bool { + return false +} + +func (n noopOption) IsAuthChange() bool { + return false +} + +func (n noopOption) IsClusterPermsChange() bool { + return false } // loggingOption is a base struct that provides default option behaviors for // logging-related options. -type loggingOption struct{} +type loggingOption struct { + noopOption +} func (l loggingOption) IsLoggingChange() bool { return true } -func (l loggingOption) IsAuthChange() bool { - return false -} - // traceOption implements the option interface for the `trace` setting. type traceOption struct { loggingOption @@ -119,17 +136,6 @@ func (r *remoteSyslogOption) Apply(server *Server) { server.Noticef("Reloaded: remote_syslog = %v", r.newValue) } -// noopOption is a base struct that provides default no-op behaviors. -type noopOption struct{} - -func (n noopOption) IsLoggingChange() bool { - return false -} - -func (n noopOption) IsAuthChange() bool { - return false -} - // tlsOption implements the option interface for the `tls` setting. type tlsOption struct { noopOption @@ -164,10 +170,8 @@ func (t *tlsTimeoutOption) Apply(server *Server) { } // authOption is a base struct that provides default option behaviors. -type authOption struct{} - -func (o authOption) IsLoggingChange() bool { - return false +type authOption struct { + noopOption } func (o authOption) IsAuthChange() bool { @@ -235,7 +239,8 @@ func (u *usersOption) Apply(server *Server) { // clusterOption implements the option interface for the `cluster` setting. type clusterOption struct { authOption - newValue ClusterOpts + newValue ClusterOpts + permsChanged bool } // Apply the cluster change. @@ -256,6 +261,10 @@ func (c *clusterOption) Apply(server *Server) { server.Noticef("Reloaded: cluster") } +func (c *clusterOption) IsClusterPermsChange() bool { + return c.permsChanged +} + // routesOption implements the option interface for the cluster `routes` // setting. type routesOption struct { @@ -503,6 +512,10 @@ func (s *Server) reloadOptions(newOpts *Options) error { if err != nil { return err } + // Need to save off previous cluster permissions + s.mu.Lock() + s.oldClusterPerms = s.opts.Cluster.Permissions + s.mu.Unlock() s.setOpts(newOpts) s.applyOptions(changed) return nil @@ -557,10 +570,12 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { diffOpts = append(diffOpts, &usersOption{newValue: newValue.([]*User)}) case "cluster": newClusterOpts := newValue.(ClusterOpts) - if err := validateClusterOpts(oldValue.(ClusterOpts), newClusterOpts); err != nil { + oldClusterOpts := oldValue.(ClusterOpts) + if err := validateClusterOpts(oldClusterOpts, newClusterOpts); err != nil { return nil, err } - diffOpts = append(diffOpts, &clusterOption{newValue: newClusterOpts}) + permsChanged := !reflect.DeepEqual(newClusterOpts.Permissions, oldClusterOpts.Permissions) + diffOpts = append(diffOpts, &clusterOption{newValue: newClusterOpts, permsChanged: permsChanged}) case "routes": add, remove := diffRoutes(oldValue.([]*url.URL), newValue.([]*url.URL)) diffOpts = append(diffOpts, &routesOption{add: add, remove: remove}) @@ -612,8 +627,9 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { func (s *Server) applyOptions(opts []option) { var ( - reloadLogging = false - reloadAuth = false + reloadLogging = false + reloadAuth = false + reloadClusterPerms = false ) for _, opt := range opts { opt.Apply(s) @@ -623,6 +639,9 @@ func (s *Server) applyOptions(opts []option) { if opt.IsAuthChange() { reloadAuth = true } + if opt.IsClusterPermsChange() { + reloadClusterPerms = true + } } if reloadLogging { @@ -631,6 +650,9 @@ func (s *Server) applyOptions(opts []option) { if reloadAuth { s.reloadAuthorization() } + if reloadClusterPerms { + s.reloadClusterPermissions() + } s.Noticef("Reloaded server configuration") } @@ -674,6 +696,123 @@ func (s *Server) reloadAuthorization() { } } +// reloadClusterPermissions reconfigures the cluster's permssions +// and set the permissions to all existing routes, sending an +// update INFO protocol so that remote can resend their local +// subs if needed, and sending local subs matching cluster's +// import subjects. +func (s *Server) reloadClusterPermissions() { + s.mu.Lock() + var ( + infoJSON []byte + oldPerms = s.oldClusterPerms + newPerms = s.opts.Cluster.Permissions + routes = make(map[uint64]*client, len(s.routes)) + withNewProto int + ) + // We can clear this now that we have captured it with oldPerms. + s.oldClusterPerms = nil + // Get all connected routes + for i, route := range s.routes { + // Count the number of routes that can understand receiving INFO updates. + route.mu.Lock() + if route.opts.Protocol >= routeProtoInfo { + withNewProto++ + } + route.mu.Unlock() + routes[i] = route + } + // If new permissions is nil, then clear routeInfo import/export + if newPerms == nil { + s.routeInfo.Import = nil + s.routeInfo.Export = nil + } else { + s.routeInfo.Import = newPerms.Import + s.routeInfo.Export = newPerms.Export + } + // Regenerate route INFO + s.generateRouteInfoJSON() + infoJSON = s.routeInfoJSON + s.mu.Unlock() + + // If there were no route, we are done + if len(routes) == 0 { + return + } + + // If only older servers, simply close all routes and they will do the right + // thing on reconnect. + if withNewProto == 0 { + for _, route := range routes { + route.closeConnection(RouteRemoved) + } + return + } + + // Fake clients to test cluster permissions + oldPermsTester := &client{} + oldPermsTester.setRoutePermissions(oldPerms) + newPermsTester := &client{} + newPermsTester.setRoutePermissions(newPerms) + + var ( + _localSubs [4096]*subscription + localSubs = _localSubs[:0] + subsNeedSUB []*subscription + subsNeedUNSUB []*subscription + deleteRoutedSubs []*subscription + ) + s.sl.localSubs(&localSubs) + + // Go through all local subscriptions + for _, sub := range localSubs { + // Get all subs that can now be imported + couldImportThen := oldPermsTester.canImport(sub.subject) + canImportNow := newPermsTester.canImport(sub.subject) + if canImportNow { + // If we could not before, then will need to send a SUB protocol. + if !couldImportThen { + subsNeedSUB = append(subsNeedSUB, sub) + } + } else if couldImportThen { + // We were previously able to import this sub, but now + // we can't so we need to send an UNSUB protocol + subsNeedUNSUB = append(subsNeedUNSUB, sub) + } + } + + for _, route := range routes { + route.mu.Lock() + // If route is to older server, simply close connection. + if route.opts.Protocol < routeProtoInfo { + route.mu.Unlock() + route.closeConnection(RouteRemoved) + continue + } + route.setRoutePermissions(newPerms) + for _, sub := range route.subs { + // If we can't export, we need to drop the subscriptions that + // we have on behalf of this route. + if !route.canExport(sub.subject) { + delete(route.subs, string(sub.sid)) + deleteRoutedSubs = append(deleteRoutedSubs, sub) + } + } + // Send an update INFO, which will allow remote server to show + // our current route config in monitoring and resend subscriptions + // that we now possibly allow with a change of Export permissions. + route.sendInfo(infoJSON) + // Now send SUB and UNSUB protocols as needed. + closed := route.sendRouteSubProtos(subsNeedSUB, nil) + if !closed { + route.sendRouteUnSubProtos(subsNeedUNSUB, nil) + } + route.mu.Unlock() + } + // Remove as a batch all the subs that we have removed from each route. + s.sl.RemoveBatch(deleteRoutedSubs) +} + // validateClusterOpts ensures the new ClusterOpts does not change host or // port, which do not support reload. func validateClusterOpts(old, new ClusterOpts) error { diff --git a/server/reload_test.go b/server/reload_test.go index 5fa68f86..fa064fb3 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -1972,3 +1972,462 @@ func TestConfigReloadClusterWorks(t *testing.T) { t.Fatalf("Expected server B route ID to be %v, got %v", bcid, newbcid) } } + +func TestConfigReloadClusterPerms(t *testing.T) { + confATemplate := ` + port: -1 + cluster { + listen: 127.0.0.1:-1 + permissions { + import { + allow: %s + } + export { + allow: %s + } + } + } + ` + confA := createConfFile(t, []byte(fmt.Sprintf(confATemplate, `"foo"`, `"foo"`))) + defer os.Remove(confA) + srva, _ := RunServerWithConfig(confA) + defer srva.Shutdown() + + confBTemplate := ` + port: -1 + cluster { + listen: 127.0.0.1:-1 + permissions { + import { + allow: %s + } + export { + allow: %s + } + } + routes = [ + "nats://127.0.0.1:%d" + ] + } + ` + confB := createConfFile(t, []byte(fmt.Sprintf(confBTemplate, `"foo"`, `"foo"`, srva.ClusterAddr().Port))) + defer os.Remove(confB) + srvb, _ := RunServerWithConfig(confB) + defer srvb.Shutdown() + + checkClusterFormed(t, srva, srvb) + + // Create a connection on A + nca, err := nats.Connect(fmt.Sprintf("nats://127.0.0.1:%d", srva.Addr().(*net.TCPAddr).Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nca.Close() + // Create a subscription on "foo" and "bar", only "foo" will be also on server B. + subFooOnA, err := nca.SubscribeSync("foo") + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + subBarOnA, err := nca.SubscribeSync("bar") + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + + // Connect on B and do the same + ncb, err := nats.Connect(fmt.Sprintf("nats://127.0.0.1:%d", srvb.Addr().(*net.TCPAddr).Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer ncb.Close() + // Create a subscription on "foo" and "bar", only "foo" will be also on server B. + subFooOnB, err := ncb.SubscribeSync("foo") + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + subBarOnB, err := ncb.SubscribeSync("bar") + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + + // Check subscriptions on each server. There should be 3 on each server, + // foo and bar locally and foo from remote server. + checkExpectedSubs(t, 3, srva, srvb) + + sendMsg := func(t *testing.T, subj string, nc *nats.Conn) { + t.Helper() + if err := nc.Publish(subj, []byte("msg")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + } + + checkSub := func(t *testing.T, sub *nats.Subscription, shouldReceive bool) { + t.Helper() + _, err := sub.NextMsg(100 * time.Millisecond) + if shouldReceive && err != nil { + t.Fatalf("Expected message on %q, got %v", sub.Subject, err) + } else if !shouldReceive && err == nil { + t.Fatalf("Expected no message on %q, got one", sub.Subject) + } + } + + // Produce from A and check received on both sides + sendMsg(t, "foo", nca) + checkSub(t, subFooOnA, true) + checkSub(t, subFooOnB, true) + // Now from B: + sendMsg(t, "foo", ncb) + checkSub(t, subFooOnA, true) + checkSub(t, subFooOnB, true) + + // Publish on bar from A and make sure only local sub receives + sendMsg(t, "bar", nca) + checkSub(t, subBarOnA, true) + checkSub(t, subBarOnB, false) + + // Publish on bar from B and make sure only local sub receives + sendMsg(t, "bar", ncb) + checkSub(t, subBarOnA, false) + checkSub(t, subBarOnB, true) + + // We will now both import/export foo and bar. Start with reloading A. + reloadUpdateConfig(t, srva, confA, fmt.Sprintf(confATemplate, `["foo", "bar"]`, `["foo", "bar"]`)) + + // Since B has not been updated yet, the state should remain the same, + // that is 3 subs on each server. + checkExpectedSubs(t, 3, srva, srvb) + + // Now update and reload B. Add "baz" for another test down below + reloadUpdateConfig(t, srvb, confB, fmt.Sprintf(confBTemplate, `["foo", "bar", "baz"]`, `["foo", "bar", "baz"]`, srva.ClusterAddr().Port)) + + // Now 4 on each server + checkExpectedSubs(t, 4, srva, srvb) + + // Make sure that we can receive all messages + sendMsg(t, "foo", nca) + checkSub(t, subFooOnA, true) + checkSub(t, subFooOnB, true) + sendMsg(t, "foo", ncb) + checkSub(t, subFooOnA, true) + checkSub(t, subFooOnB, true) + + sendMsg(t, "bar", nca) + checkSub(t, subBarOnA, true) + checkSub(t, subBarOnB, true) + sendMsg(t, "bar", ncb) + checkSub(t, subBarOnA, true) + checkSub(t, subBarOnB, true) + + // Create subscription on baz on server B. + subBazOnB, err := ncb.SubscribeSync("baz") + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + // Check subscriptions count + checkExpectedSubs(t, 5, srvb) + checkExpectedSubs(t, 4, srva) + + sendMsg(t, "baz", nca) + checkSub(t, subBazOnB, false) + sendMsg(t, "baz", ncb) + checkSub(t, subBazOnB, true) + + // Test UNSUB by denying something that was previously imported + reloadUpdateConfig(t, srva, confA, fmt.Sprintf(confATemplate, `"foo"`, `["foo", "bar"]`)) + // Since A no longer imports "bar", we should have one less subscription + // on B (B will have received an UNSUB for bar) + checkExpectedSubs(t, 4, srvb) + // A, however, should still have same number of subs. + checkExpectedSubs(t, 4, srva) + + // Remove all permissions from A. + reloadUpdateConfig(t, srva, confA, ` + port: -1 + cluster { + listen: 127.0.0.1:-1 + } + `) + // Server A should now have baz sub + checkExpectedSubs(t, 5, srvb) + checkExpectedSubs(t, 5, srva) + + sendMsg(t, "baz", nca) + checkSub(t, subBazOnB, true) + sendMsg(t, "baz", ncb) + checkSub(t, subBazOnB, true) + + // Finally, remove permissions from B + reloadUpdateConfig(t, srvb, confB, fmt.Sprintf(` + port: -1 + cluster { + listen: 127.0.0.1:-1 + routes = [ + "nats://127.0.0.1:%d" + ] + } + `, srva.ClusterAddr().Port)) + // Check expected subscriptions count. + checkExpectedSubs(t, 5, srvb) + checkExpectedSubs(t, 5, srva) +} + +func TestConfigReloadClusterPermsImport(t *testing.T) { + confATemplate := ` + port: -1 + cluster { + listen: 127.0.0.1:-1 + permissions { + import: { + allow: %s + } + } + } + ` + confA := createConfFile(t, []byte(fmt.Sprintf(confATemplate, `["foo", "bar"]`))) + defer os.Remove(confA) + srva, _ := RunServerWithConfig(confA) + defer srva.Shutdown() + + confBTemplate := ` + port: -1 + cluster { + listen: 127.0.0.1:-1 + routes = [ + "nats://127.0.0.1:%d" + ] + } + ` + confB := createConfFile(t, []byte(fmt.Sprintf(confBTemplate, srva.ClusterAddr().Port))) + defer os.Remove(confB) + srvb, _ := RunServerWithConfig(confB) + defer srvb.Shutdown() + + checkClusterFormed(t, srva, srvb) + + // Create a connection on A + nca, err := nats.Connect(fmt.Sprintf("nats://127.0.0.1:%d", srva.Addr().(*net.TCPAddr).Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nca.Close() + // Create a subscription on "foo" and "bar" + if _, err := nca.SubscribeSync("foo"); err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + if _, err := nca.SubscribeSync("bar"); err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + + checkExpectedSubs(t, 2, srva, srvb) + + // Drop foo + reloadUpdateConfig(t, srva, confA, fmt.Sprintf(confATemplate, `"bar"`)) + checkExpectedSubs(t, 2, srva) + checkExpectedSubs(t, 1, srvb) + + // Add it back + reloadUpdateConfig(t, srva, confA, fmt.Sprintf(confATemplate, `["foo", "bar"]`)) + checkExpectedSubs(t, 2, srva, srvb) + + // Empty Import means implicit allow + reloadUpdateConfig(t, srva, confA, ` + port: -1 + cluster { + listen: 127.0.0.1:-1 + permissions { + export: ">" + } + } + `) + checkExpectedSubs(t, 2, srva, srvb) + + confATemplate = ` + port: -1 + cluster { + listen: 127.0.0.1:-1 + permissions { + import: { + allow: ["foo", "bar"] + deny: %s + } + } + } + ` + // Now deny all: + reloadUpdateConfig(t, srva, confA, fmt.Sprintf(confATemplate, `["foo", "bar"]`)) + checkExpectedSubs(t, 2, srva) + checkExpectedSubs(t, 0, srvb) + + // Drop foo from the deny list + reloadUpdateConfig(t, srva, confA, fmt.Sprintf(confATemplate, `"bar"`)) + checkExpectedSubs(t, 2, srva) + checkExpectedSubs(t, 1, srvb) +} + +func TestConfigReloadClusterPermsExport(t *testing.T) { + confATemplate := ` + port: -1 + cluster { + listen: 127.0.0.1:-1 + permissions { + export: { + allow: %s + } + } + } + ` + confA := createConfFile(t, []byte(fmt.Sprintf(confATemplate, `["foo", "bar"]`))) + defer os.Remove(confA) + srva, _ := RunServerWithConfig(confA) + defer srva.Shutdown() + + confBTemplate := ` + port: -1 + cluster { + listen: 127.0.0.1:-1 + routes = [ + "nats://127.0.0.1:%d" + ] + } + ` + confB := createConfFile(t, []byte(fmt.Sprintf(confBTemplate, srva.ClusterAddr().Port))) + defer os.Remove(confB) + srvb, _ := RunServerWithConfig(confB) + defer srvb.Shutdown() + + checkClusterFormed(t, srva, srvb) + + // Create a connection on B + ncb, err := nats.Connect(fmt.Sprintf("nats://127.0.0.1:%d", srvb.Addr().(*net.TCPAddr).Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer ncb.Close() + // Create a subscription on "foo" and "bar" + if _, err := ncb.SubscribeSync("foo"); err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + if _, err := ncb.SubscribeSync("bar"); err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + + checkExpectedSubs(t, 2, srva, srvb) + + // Drop foo + reloadUpdateConfig(t, srva, confA, fmt.Sprintf(confATemplate, `"bar"`)) + checkExpectedSubs(t, 2, srvb) + checkExpectedSubs(t, 1, srva) + + // Add it back + reloadUpdateConfig(t, srva, confA, fmt.Sprintf(confATemplate, `["foo", "bar"]`)) + checkExpectedSubs(t, 2, srva, srvb) + + // Empty Export means implicit allow + reloadUpdateConfig(t, srva, confA, ` + port: -1 + cluster { + listen: 127.0.0.1:-1 + permissions { + import: ">" + } + } + `) + checkExpectedSubs(t, 2, srva, srvb) + + confATemplate = ` + port: -1 + cluster { + listen: 127.0.0.1:-1 + permissions { + export: { + allow: ["foo", "bar"] + deny: %s + } + } + } + ` + // Now deny all: + reloadUpdateConfig(t, srva, confA, fmt.Sprintf(confATemplate, `["foo", "bar"]`)) + checkExpectedSubs(t, 0, srva) + checkExpectedSubs(t, 2, srvb) + + // Drop foo from the deny list + reloadUpdateConfig(t, srva, confA, fmt.Sprintf(confATemplate, `"bar"`)) + checkExpectedSubs(t, 1, srva) + checkExpectedSubs(t, 2, srvb) +} + +func TestConfigReloadClusterPermsOldServer(t *testing.T) { + confATemplate := ` + port: -1 + cluster { + listen: 127.0.0.1:-1 + permissions { + export: { + allow: %s + } + } + } + ` + confA := createConfFile(t, []byte(fmt.Sprintf(confATemplate, `["foo", "bar"]`))) + defer os.Remove(confA) + srva, _ := RunServerWithConfig(confA) + defer srva.Shutdown() + + optsB := DefaultOptions() + optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srva.ClusterAddr().Port)) + // Make server B behave like an old server + testRouteProto = routeProtoZero + defer func() { testRouteProto = routeProtoInfo }() + srvb := RunServer(optsB) + defer srvb.Shutdown() + testRouteProto = routeProtoInfo + + checkClusterFormed(t, srva, srvb) + + // Get the route's connection ID + getRouteRID := func() uint64 { + rid := uint64(0) + srvb.mu.Lock() + for _, r := range srvb.routes { + r.mu.Lock() + rid = r.cid + r.mu.Unlock() + break + } + srvb.mu.Unlock() + return rid + } + orgRID := getRouteRID() + + // Cause a config reload on A + reloadUpdateConfig(t, srva, confA, fmt.Sprintf(confATemplate, `"bar"`)) + + // Check that new route gets created + check := func(t *testing.T) { + t.Helper() + checkFor(t, 3*time.Second, 15*time.Millisecond, func() error { + if rid := getRouteRID(); rid == orgRID { + return fmt.Errorf("Route does not seem to have been recreated") + } + return nil + }) + } + check(t) + + // Save the current value + orgRID = getRouteRID() + + // Add another server that supports INFO updates + + optsC := DefaultOptions() + optsC.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srva.ClusterAddr().Port)) + srvc := RunServer(optsC) + defer srvc.Shutdown() + + checkClusterFormed(t, srva, srvb, srvc) + + // Cause a config reload on A + reloadUpdateConfig(t, srva, confA, fmt.Sprintf(confATemplate, `"foo"`)) + // Check that new route gets created + check(t) +} diff --git a/server/route.go b/server/route.go index 6f95c0eb..55d5fc0f 100644 --- a/server/route.go +++ b/server/route.go @@ -39,6 +39,18 @@ const ( Explicit ) +const ( + // routeProtoZero is the original Route protocol from 2009. + // http://nats.io/documentation/internals/nats-protocol/ + routeProtoZero = iota + // routeProtoInfo signals a route can receive more then the original INFO block. + // This can be used to update remote cluster permissions, etc... + routeProtoInfo +) + +// Used by tests +var testRouteProto = routeProtoInfo + type route struct { remoteID string didSolicit bool @@ -305,6 +317,9 @@ func (c *client) processRouteInfo(info *Info) { // in closeConnection(). c.route.remoteID = info.ID + // Get the route's proto version + c.opts.Protocol = info.Proto + // Detect route to self. if c.route.remoteID == s.info.ID { c.mu.Unlock() @@ -316,6 +331,14 @@ func (c *client) processRouteInfo(info *Info) { c.route.authRequired = info.AuthRequired c.route.tlsRequired = info.TLSRequired + // If this is an update due to config reload on the remote server, + // need to possibly send local subs to the remote server. + if c.flags.isSet(infoReceived) { + s.updateRemoteRoutePerms(c, info) + c.mu.Unlock() + return + } + // Copy over permissions as well. c.opts.Import = info.Import c.opts.Export = info.Export @@ -335,6 +358,10 @@ func (c *client) processRouteInfo(info *Info) { c.route.url = url } + // Mark that the INFO protocol has been received. Will allow + // to detect INFO updates. + c.flags.set(infoReceived) + // Check to see if we have this remote already registered. // This can happen when both servers have routes to each other. c.mu.Unlock() @@ -376,6 +403,41 @@ func (c *client) processRouteInfo(info *Info) { } } +// Possibly sends local subscriptions interest to this route +// based on changes in the remote's Export permissions. +// Lock assumed held on entry +func (s *Server) updateRemoteRoutePerms(route *client, info *Info) { + // Interested only on Export permissions for the remote server. + // Create "fake" clients that we will use to check permissions + // using the old permissions... + oldPerms := &RoutePermissions{Export: route.opts.Export} + oldPermsTester := &client{} + oldPermsTester.setRoutePermissions(oldPerms) + // and the new ones. + newPerms := &RoutePermissions{Export: info.Export} + newPermsTester := &client{} + newPermsTester.setRoutePermissions(newPerms) + + route.opts.Import = info.Import + route.opts.Export = info.Export + + var ( + _localSubs [4096]*subscription + localSubs = _localSubs[:0] + ) + s.sl.localSubs(&localSubs) + + route.sendRouteSubProtos(localSubs, func(sub *subscription) bool { + subj := sub.subject + // If the remote can now export but could not before, and this server can import this + // subject, then send SUB protocol. + if newPermsTester.canExport(subj) && !oldPermsTester.canExport(subj) && route.canImport(subj) { + return true + } + return false + }) +} + // sendAsyncInfoToClients sends an INFO protocol to all // connected clients that accept async INFO updates. // The server lock is held on entry. @@ -519,21 +581,102 @@ func (s *Server) sendLocalSubsToRoute(route *client) { s.sl.localSubs(&subs) route.mu.Lock() + closed := route.sendRouteSubProtos(subs, func(sub *subscription) bool { + return route.canImport(sub.subject) + }) + route.mu.Unlock() + if !closed { + route.Debugf("Sent local subscriptions to route") + } +} + +// Sends SUBs protocols for the given subscriptions. If a filter is specified, it is +// invoked for each subscription. If the filter returns false, the subscription is skipped. +// This function may release the route's lock due to flushing of outbound data. A boolean +// is returned to indicate if the connection has been closed during this call. +// Lock is held on entry. +func (c *client) sendRouteSubProtos(subs []*subscription, filter func(sub *subscription) bool) bool { + return c.sendRouteSubOrUnSubProtos(subs, true, filter) +} + +// Sends UNSUBs protocols for the given subscriptions. If a filter is specified, it is +// invoked for each subscription. If the filter returns false, the subscription is skipped. +// This function may release the route's lock due to flushing of outbound data. A boolean +// is returned to indicate if the connection has been closed during this call. +// Lock is held on entry. +func (c *client) sendRouteUnSubProtos(subs []*subscription, filter func(sub *subscription) bool) bool { + return c.sendRouteSubOrUnSubProtos(subs, false, filter) +} + +// Low-level function that sends SUBs or UNSUBs protcols for the given subscriptions. +// Use sendRouteSubProtos or sendRouteUnSubProtos instead for clarity. +// Lock is held on entry. +func (c *client) sendRouteSubOrUnSubProtos(subs []*subscription, isSubProto bool, filter func(sub *subscription) bool) bool { + const staticBufSize = maxBufSize * 2 + var ( + _buf [staticBufSize]byte // array on stack + buf = _buf[:0] // our buffer will initially point to the stack buffer + mbs = staticBufSize // max size of the buffer + mpMax = int(c.out.mp / 2) // 50% of max_pending + closed bool + ) + // We need to make sure that we stay below the user defined max pending bytes. + if mbs > mpMax { + mbs = mpMax + } for _, sub := range subs { - // Send SUB interest only if subject has a match in import permissions - if !route.canImport(sub.subject) { + if filter != nil && !filter(sub) { continue } - proto := fmt.Sprintf(subProto, sub.subject, sub.queue, routeSid(sub)) - route.queueOutbound([]byte(proto)) - if route.out.pb > int64(route.out.sz*2) { - route.flushSignal() + rsid := routeSid(sub) + // Check if proto is going to fit. + curSize := len(buf) + if isSubProto { + // "SUB " + subject + " " + queue + " " + ... + curSize += 4 + len(sub.subject) + 1 + len(sub.queue) + 1 + } else { + // "UNSUB " + ... + curSize += 6 } + // rsid + "\r\n" + curSize += len(rsid) + 2 + if curSize >= mbs { + if c.queueOutbound(buf) { + // Need to allocate new array + buf = make([]byte, 0, mbs) + } else { + // We can reuse previous buffer + buf = buf[:0] + } + // Update last activity because flushOutbound() will release + // the lock, which could cause pingTimer to think that this + // connection is stale otherwise. + c.last = time.Now() + c.flushOutbound() + if closed = c.flags.isSet(clearConnection); closed { + break + } + } + if isSubProto { + buf = append(buf, []byte("SUB ")...) + buf = append(buf, sub.subject...) + buf = append(buf, ' ') + if len(sub.queue) > 0 { + buf = append(buf, sub.queue...) + } + } else { + buf = append(buf, []byte("UNSUB ")...) + } + buf = append(buf, ' ') + buf = append(buf, rsid...) + buf = append(buf, []byte(CR_LF)...) } - route.flushSignal() - route.mu.Unlock() - - route.Debugf("Sent local subscriptions to route") + if !closed && len(buf) > 0 { + c.queueOutbound(buf) + c.flushOutbound() + closed = c.flags.isSet(clearConnection) + } + return closed } func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { @@ -835,6 +978,7 @@ func (s *Server) broadcastInterestToRoutes(sub *subscription, proto string) { arg = []byte(proto[:len(proto)-LEN_CR_LF]) } protoAsBytes := []byte(proto) + checkPerms := true s.mu.Lock() for _, route := range s.routes { // FIXME(dlc) - Make same logic as deliverMsg @@ -843,9 +987,12 @@ func (s *Server) broadcastInterestToRoutes(sub *subscription, proto string) { // route will have the same `perms`, so check with the first route // and send SUB interest only if subject has a match in import permissions. // If there is no match, we stop here. - if !route.canImport(sub.subject) { - route.mu.Unlock() - break + if checkPerms { + checkPerms = false + if !route.canImport(sub.subject) { + route.mu.Unlock() + break + } } route.sendProto(protoAsBytes, true) route.mu.Unlock() @@ -883,6 +1030,27 @@ func (s *Server) broadcastUnSubscribe(sub *subscription) { s.broadcastInterestToRoutes(sub, proto) } +// Sends UNSUB protocols for each of the subscriptions in the given array +// to all connected routes. Used when a client connection is closed. Note +// that when that happens, the subscriptions' MAX have been cleared (force unsub). +func (s *Server) broadcastUnSubscribeBatch(subs []*subscription) { + var ( + _routes [32]*client + routes = _routes[:0] + ) + s.mu.Lock() + for _, route := range s.routes { + routes = append(routes, route) + } + s.mu.Unlock() + + for _, route := range routes { + route.mu.Lock() + route.sendRouteUnSubProtos(subs, nil) + route.mu.Unlock() + } +} + func (s *Server) routeAcceptLoop(ch chan struct{}) { defer func() { if ch != nil { @@ -910,6 +1078,9 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) { net.JoinHostPort(opts.Cluster.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port))) s.mu.Lock() + // For tests, we want to be able to make this server behave + // as an older server. + proto := testRouteProto // Check for TLSConfig tlsReq := opts.Cluster.TLSConfig != nil info := Info{ @@ -920,6 +1091,7 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) { TLSRequired: tlsReq, TLSVerify: tlsReq, MaxPayload: s.info.MaxPayload, + Proto: proto, } // Set this if only if advertise is not disabled if !opts.Cluster.NoAdvertise { diff --git a/server/routes_test.go b/server/routes_test.go index bc26ca4a..e84a54cf 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -1092,3 +1092,32 @@ func TestRoutePermsAppliedOnInboundAndOutboundRoute(t *testing.T) { // Now check for permissions set on server initiating the route connection check(t, srvb) } + +func TestRouteSendLocalSubsWithLowMaxPending(t *testing.T) { + optsA := DefaultOptions() + optsA.MaxPending = 1024 + srvA := RunServer(optsA) + defer srvA.Shutdown() + + nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + numSubs := 1000 + for i := 0; i < numSubs; i++ { + nc.Subscribe("foo.bar", func(_ *nats.Msg) {}) + } + checkExpectedSubs(t, numSubs, srvA) + + // Now create a route between B and A + optsB := DefaultOptions() + optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host, optsA.Cluster.Port)) + srvB := RunServer(optsB) + defer srvB.Shutdown() + + checkClusterFormed(t, srvA, srvB) + + // Check that all subs have been sent ok + checkExpectedSubs(t, numSubs, srvA, srvB) +} diff --git a/server/server.go b/server/server.go index cc2c2e33..1f890d33 100644 --- a/server/server.go +++ b/server/server.go @@ -128,6 +128,9 @@ type Server struct { clientActualPort int clusterActualPort int + // Use during reload + oldClusterPerms *RoutePermissions + // Used by tests to check that http.Servers do // not set any timeout. monitoringServer *http.Server