Merge pull request #753 from nats-io/route_perms_reload

[ADDED] Support for route permissions config reload
This commit is contained in:
Ivan Kozlovic
2018-09-27 10:08:55 -06:00
committed by GitHub
6 changed files with 865 additions and 55 deletions

View File

@@ -69,6 +69,7 @@ type clientFlag byte
// Some client state represented as flags
const (
connectReceived clientFlag = 1 << iota // The CONNECT proto has been received
infoReceived // The INFO protocol has been received
firstPongSent // The first PONG has been sent
handshakeComplete // For TLS clients, indicate that the handshake is complete
clearConnection // Marks that clearConnection has already been called.
@@ -856,9 +857,12 @@ func (c *client) maxPayloadViolation(sz int, max int64) {
}
// queueOutbound queues data for client/route connections.
// Return pending length.
// Return if the data is referenced or not. If referenced, the caller
// should not reuse the `data` array.
// Lock should be held.
func (c *client) queueOutbound(data []byte) {
func (c *client) queueOutbound(data []byte) bool {
// Assume data will not be referenced
referenced := false
// Add to pending bytes total.
c.out.pb += int64(len(data))
@@ -868,7 +872,7 @@ func (c *client) queueOutbound(data []byte) {
c.clearConnection(SlowConsumerPendingBytes)
atomic.AddInt64(&c.srv.slowConsumers, 1)
c.Noticef("Slow Consumer Detected: MaxPending of %d Exceeded", c.out.mp)
return
return referenced
}
if c.out.p == nil && len(data) < maxBufSize {
@@ -901,6 +905,7 @@ func (c *client) queueOutbound(data []byte) {
// FIXME(dlc) - do we need signaling of ownership here if we want len(data) <
if len(data) > maxBufSize {
c.out.nb = append(c.out.nb, data)
referenced = true
} else {
// We will copy to primary.
if c.out.p == nil {
@@ -924,6 +929,7 @@ func (c *client) queueOutbound(data []byte) {
} else {
c.out.p = append(c.out.p, data...)
}
return referenced
}
// Assume the lock is held upon entry.
@@ -993,6 +999,12 @@ func (c *client) processPing() {
}
c.sendPong()
// If not a CLIENT, we are done
if c.typ != CLIENT {
c.mu.Unlock()
return
}
// The CONNECT should have been received, but make sure it
// is so before proceeding
if !c.flags.isSet(connectReceived) {
@@ -1667,19 +1679,18 @@ func (c *client) processPingTimer() {
c.Debugf("%s Ping Timer", c.typeString())
// Check for violation
if c.ping.out+1 > c.srv.getOpts().MaxPingsOut {
c.Debugf("Stale Client Connection - Closing")
c.sendProto([]byte(fmt.Sprintf("-ERR '%s'\r\n", "Stale Connection")), true)
c.clearConnection(StaleConnection)
return
}
// If we have had activity within the PingInterval no
// need to send a ping.
if delta := time.Since(c.last); delta < c.srv.getOpts().PingInterval {
c.Debugf("Delaying PING due to activity %v ago", delta.Round(time.Second))
} else {
// Check for violation
if c.ping.out+1 > c.srv.getOpts().MaxPingsOut {
c.Debugf("Stale Client Connection - Closing")
c.sendProto([]byte(fmt.Sprintf("-ERR '%s'\r\n", "Stale Connection")), true)
c.clearConnection(StaleConnection)
return
}
// Send PING
c.sendPing()
}
@@ -1824,12 +1835,9 @@ func (c *client) closeConnection(reason ClosedState) {
// Remove clients subscriptions.
srv.sl.RemoveBatch(subs)
if c.typ != ROUTER {
for _, sub := range subs {
// Forward on unsubscribes if we are not
// a router ourselves.
srv.broadcastUnSubscribe(sub)
}
if c.typ == CLIENT {
// Forward UNSUBs protocols to all routes
srv.broadcastUnSubscribeBatch(subs)
}
}

View File

@@ -38,20 +38,37 @@ type option interface {
// IsAuthChange indicates if this option requires reloading authorization.
IsAuthChange() bool
// IsClusterPermsChange indicates if this option requires reloading
// cluster permissions.
IsClusterPermsChange() bool
}
// noopOption is a base struct that provides default no-op behaviors.
type noopOption struct{}
func (n noopOption) IsLoggingChange() bool {
return false
}
func (n noopOption) IsAuthChange() bool {
return false
}
func (n noopOption) IsClusterPermsChange() bool {
return false
}
// loggingOption is a base struct that provides default option behaviors for
// logging-related options.
type loggingOption struct{}
type loggingOption struct {
noopOption
}
func (l loggingOption) IsLoggingChange() bool {
return true
}
func (l loggingOption) IsAuthChange() bool {
return false
}
// traceOption implements the option interface for the `trace` setting.
type traceOption struct {
loggingOption
@@ -119,17 +136,6 @@ func (r *remoteSyslogOption) Apply(server *Server) {
server.Noticef("Reloaded: remote_syslog = %v", r.newValue)
}
// noopOption is a base struct that provides default no-op behaviors.
type noopOption struct{}
func (n noopOption) IsLoggingChange() bool {
return false
}
func (n noopOption) IsAuthChange() bool {
return false
}
// tlsOption implements the option interface for the `tls` setting.
type tlsOption struct {
noopOption
@@ -164,10 +170,8 @@ func (t *tlsTimeoutOption) Apply(server *Server) {
}
// authOption is a base struct that provides default option behaviors.
type authOption struct{}
func (o authOption) IsLoggingChange() bool {
return false
type authOption struct {
noopOption
}
func (o authOption) IsAuthChange() bool {
@@ -235,7 +239,8 @@ func (u *usersOption) Apply(server *Server) {
// clusterOption implements the option interface for the `cluster` setting.
type clusterOption struct {
authOption
newValue ClusterOpts
newValue ClusterOpts
permsChanged bool
}
// Apply the cluster change.
@@ -256,6 +261,10 @@ func (c *clusterOption) Apply(server *Server) {
server.Noticef("Reloaded: cluster")
}
func (c *clusterOption) IsClusterPermsChange() bool {
return c.permsChanged
}
// routesOption implements the option interface for the cluster `routes`
// setting.
type routesOption struct {
@@ -503,6 +512,10 @@ func (s *Server) reloadOptions(newOpts *Options) error {
if err != nil {
return err
}
// Need to save off previous cluster permissions
s.mu.Lock()
s.oldClusterPerms = s.opts.Cluster.Permissions
s.mu.Unlock()
s.setOpts(newOpts)
s.applyOptions(changed)
return nil
@@ -557,10 +570,12 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
diffOpts = append(diffOpts, &usersOption{newValue: newValue.([]*User)})
case "cluster":
newClusterOpts := newValue.(ClusterOpts)
if err := validateClusterOpts(oldValue.(ClusterOpts), newClusterOpts); err != nil {
oldClusterOpts := oldValue.(ClusterOpts)
if err := validateClusterOpts(oldClusterOpts, newClusterOpts); err != nil {
return nil, err
}
diffOpts = append(diffOpts, &clusterOption{newValue: newClusterOpts})
permsChanged := !reflect.DeepEqual(newClusterOpts.Permissions, oldClusterOpts.Permissions)
diffOpts = append(diffOpts, &clusterOption{newValue: newClusterOpts, permsChanged: permsChanged})
case "routes":
add, remove := diffRoutes(oldValue.([]*url.URL), newValue.([]*url.URL))
diffOpts = append(diffOpts, &routesOption{add: add, remove: remove})
@@ -612,8 +627,9 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
func (s *Server) applyOptions(opts []option) {
var (
reloadLogging = false
reloadAuth = false
reloadLogging = false
reloadAuth = false
reloadClusterPerms = false
)
for _, opt := range opts {
opt.Apply(s)
@@ -623,6 +639,9 @@ func (s *Server) applyOptions(opts []option) {
if opt.IsAuthChange() {
reloadAuth = true
}
if opt.IsClusterPermsChange() {
reloadClusterPerms = true
}
}
if reloadLogging {
@@ -631,6 +650,9 @@ func (s *Server) applyOptions(opts []option) {
if reloadAuth {
s.reloadAuthorization()
}
if reloadClusterPerms {
s.reloadClusterPermissions()
}
s.Noticef("Reloaded server configuration")
}
@@ -674,6 +696,123 @@ func (s *Server) reloadAuthorization() {
}
}
// reloadClusterPermissions reconfigures the cluster's permssions
// and set the permissions to all existing routes, sending an
// update INFO protocol so that remote can resend their local
// subs if needed, and sending local subs matching cluster's
// import subjects.
func (s *Server) reloadClusterPermissions() {
s.mu.Lock()
var (
infoJSON []byte
oldPerms = s.oldClusterPerms
newPerms = s.opts.Cluster.Permissions
routes = make(map[uint64]*client, len(s.routes))
withNewProto int
)
// We can clear this now that we have captured it with oldPerms.
s.oldClusterPerms = nil
// Get all connected routes
for i, route := range s.routes {
// Count the number of routes that can understand receiving INFO updates.
route.mu.Lock()
if route.opts.Protocol >= routeProtoInfo {
withNewProto++
}
route.mu.Unlock()
routes[i] = route
}
// If new permissions is nil, then clear routeInfo import/export
if newPerms == nil {
s.routeInfo.Import = nil
s.routeInfo.Export = nil
} else {
s.routeInfo.Import = newPerms.Import
s.routeInfo.Export = newPerms.Export
}
// Regenerate route INFO
s.generateRouteInfoJSON()
infoJSON = s.routeInfoJSON
s.mu.Unlock()
// If there were no route, we are done
if len(routes) == 0 {
return
}
// If only older servers, simply close all routes and they will do the right
// thing on reconnect.
if withNewProto == 0 {
for _, route := range routes {
route.closeConnection(RouteRemoved)
}
return
}
// Fake clients to test cluster permissions
oldPermsTester := &client{}
oldPermsTester.setRoutePermissions(oldPerms)
newPermsTester := &client{}
newPermsTester.setRoutePermissions(newPerms)
var (
_localSubs [4096]*subscription
localSubs = _localSubs[:0]
subsNeedSUB []*subscription
subsNeedUNSUB []*subscription
deleteRoutedSubs []*subscription
)
s.sl.localSubs(&localSubs)
// Go through all local subscriptions
for _, sub := range localSubs {
// Get all subs that can now be imported
couldImportThen := oldPermsTester.canImport(sub.subject)
canImportNow := newPermsTester.canImport(sub.subject)
if canImportNow {
// If we could not before, then will need to send a SUB protocol.
if !couldImportThen {
subsNeedSUB = append(subsNeedSUB, sub)
}
} else if couldImportThen {
// We were previously able to import this sub, but now
// we can't so we need to send an UNSUB protocol
subsNeedUNSUB = append(subsNeedUNSUB, sub)
}
}
for _, route := range routes {
route.mu.Lock()
// If route is to older server, simply close connection.
if route.opts.Protocol < routeProtoInfo {
route.mu.Unlock()
route.closeConnection(RouteRemoved)
continue
}
route.setRoutePermissions(newPerms)
for _, sub := range route.subs {
// If we can't export, we need to drop the subscriptions that
// we have on behalf of this route.
if !route.canExport(sub.subject) {
delete(route.subs, string(sub.sid))
deleteRoutedSubs = append(deleteRoutedSubs, sub)
}
}
// Send an update INFO, which will allow remote server to show
// our current route config in monitoring and resend subscriptions
// that we now possibly allow with a change of Export permissions.
route.sendInfo(infoJSON)
// Now send SUB and UNSUB protocols as needed.
closed := route.sendRouteSubProtos(subsNeedSUB, nil)
if !closed {
route.sendRouteUnSubProtos(subsNeedUNSUB, nil)
}
route.mu.Unlock()
}
// Remove as a batch all the subs that we have removed from each route.
s.sl.RemoveBatch(deleteRoutedSubs)
}
// validateClusterOpts ensures the new ClusterOpts does not change host or
// port, which do not support reload.
func validateClusterOpts(old, new ClusterOpts) error {

View File

@@ -1972,3 +1972,462 @@ func TestConfigReloadClusterWorks(t *testing.T) {
t.Fatalf("Expected server B route ID to be %v, got %v", bcid, newbcid)
}
}
func TestConfigReloadClusterPerms(t *testing.T) {
confATemplate := `
port: -1
cluster {
listen: 127.0.0.1:-1
permissions {
import {
allow: %s
}
export {
allow: %s
}
}
}
`
confA := createConfFile(t, []byte(fmt.Sprintf(confATemplate, `"foo"`, `"foo"`)))
defer os.Remove(confA)
srva, _ := RunServerWithConfig(confA)
defer srva.Shutdown()
confBTemplate := `
port: -1
cluster {
listen: 127.0.0.1:-1
permissions {
import {
allow: %s
}
export {
allow: %s
}
}
routes = [
"nats://127.0.0.1:%d"
]
}
`
confB := createConfFile(t, []byte(fmt.Sprintf(confBTemplate, `"foo"`, `"foo"`, srva.ClusterAddr().Port)))
defer os.Remove(confB)
srvb, _ := RunServerWithConfig(confB)
defer srvb.Shutdown()
checkClusterFormed(t, srva, srvb)
// Create a connection on A
nca, err := nats.Connect(fmt.Sprintf("nats://127.0.0.1:%d", srva.Addr().(*net.TCPAddr).Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nca.Close()
// Create a subscription on "foo" and "bar", only "foo" will be also on server B.
subFooOnA, err := nca.SubscribeSync("foo")
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
subBarOnA, err := nca.SubscribeSync("bar")
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
// Connect on B and do the same
ncb, err := nats.Connect(fmt.Sprintf("nats://127.0.0.1:%d", srvb.Addr().(*net.TCPAddr).Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer ncb.Close()
// Create a subscription on "foo" and "bar", only "foo" will be also on server B.
subFooOnB, err := ncb.SubscribeSync("foo")
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
subBarOnB, err := ncb.SubscribeSync("bar")
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
// Check subscriptions on each server. There should be 3 on each server,
// foo and bar locally and foo from remote server.
checkExpectedSubs(t, 3, srva, srvb)
sendMsg := func(t *testing.T, subj string, nc *nats.Conn) {
t.Helper()
if err := nc.Publish(subj, []byte("msg")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
}
checkSub := func(t *testing.T, sub *nats.Subscription, shouldReceive bool) {
t.Helper()
_, err := sub.NextMsg(100 * time.Millisecond)
if shouldReceive && err != nil {
t.Fatalf("Expected message on %q, got %v", sub.Subject, err)
} else if !shouldReceive && err == nil {
t.Fatalf("Expected no message on %q, got one", sub.Subject)
}
}
// Produce from A and check received on both sides
sendMsg(t, "foo", nca)
checkSub(t, subFooOnA, true)
checkSub(t, subFooOnB, true)
// Now from B:
sendMsg(t, "foo", ncb)
checkSub(t, subFooOnA, true)
checkSub(t, subFooOnB, true)
// Publish on bar from A and make sure only local sub receives
sendMsg(t, "bar", nca)
checkSub(t, subBarOnA, true)
checkSub(t, subBarOnB, false)
// Publish on bar from B and make sure only local sub receives
sendMsg(t, "bar", ncb)
checkSub(t, subBarOnA, false)
checkSub(t, subBarOnB, true)
// We will now both import/export foo and bar. Start with reloading A.
reloadUpdateConfig(t, srva, confA, fmt.Sprintf(confATemplate, `["foo", "bar"]`, `["foo", "bar"]`))
// Since B has not been updated yet, the state should remain the same,
// that is 3 subs on each server.
checkExpectedSubs(t, 3, srva, srvb)
// Now update and reload B. Add "baz" for another test down below
reloadUpdateConfig(t, srvb, confB, fmt.Sprintf(confBTemplate, `["foo", "bar", "baz"]`, `["foo", "bar", "baz"]`, srva.ClusterAddr().Port))
// Now 4 on each server
checkExpectedSubs(t, 4, srva, srvb)
// Make sure that we can receive all messages
sendMsg(t, "foo", nca)
checkSub(t, subFooOnA, true)
checkSub(t, subFooOnB, true)
sendMsg(t, "foo", ncb)
checkSub(t, subFooOnA, true)
checkSub(t, subFooOnB, true)
sendMsg(t, "bar", nca)
checkSub(t, subBarOnA, true)
checkSub(t, subBarOnB, true)
sendMsg(t, "bar", ncb)
checkSub(t, subBarOnA, true)
checkSub(t, subBarOnB, true)
// Create subscription on baz on server B.
subBazOnB, err := ncb.SubscribeSync("baz")
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
// Check subscriptions count
checkExpectedSubs(t, 5, srvb)
checkExpectedSubs(t, 4, srva)
sendMsg(t, "baz", nca)
checkSub(t, subBazOnB, false)
sendMsg(t, "baz", ncb)
checkSub(t, subBazOnB, true)
// Test UNSUB by denying something that was previously imported
reloadUpdateConfig(t, srva, confA, fmt.Sprintf(confATemplate, `"foo"`, `["foo", "bar"]`))
// Since A no longer imports "bar", we should have one less subscription
// on B (B will have received an UNSUB for bar)
checkExpectedSubs(t, 4, srvb)
// A, however, should still have same number of subs.
checkExpectedSubs(t, 4, srva)
// Remove all permissions from A.
reloadUpdateConfig(t, srva, confA, `
port: -1
cluster {
listen: 127.0.0.1:-1
}
`)
// Server A should now have baz sub
checkExpectedSubs(t, 5, srvb)
checkExpectedSubs(t, 5, srva)
sendMsg(t, "baz", nca)
checkSub(t, subBazOnB, true)
sendMsg(t, "baz", ncb)
checkSub(t, subBazOnB, true)
// Finally, remove permissions from B
reloadUpdateConfig(t, srvb, confB, fmt.Sprintf(`
port: -1
cluster {
listen: 127.0.0.1:-1
routes = [
"nats://127.0.0.1:%d"
]
}
`, srva.ClusterAddr().Port))
// Check expected subscriptions count.
checkExpectedSubs(t, 5, srvb)
checkExpectedSubs(t, 5, srva)
}
func TestConfigReloadClusterPermsImport(t *testing.T) {
confATemplate := `
port: -1
cluster {
listen: 127.0.0.1:-1
permissions {
import: {
allow: %s
}
}
}
`
confA := createConfFile(t, []byte(fmt.Sprintf(confATemplate, `["foo", "bar"]`)))
defer os.Remove(confA)
srva, _ := RunServerWithConfig(confA)
defer srva.Shutdown()
confBTemplate := `
port: -1
cluster {
listen: 127.0.0.1:-1
routes = [
"nats://127.0.0.1:%d"
]
}
`
confB := createConfFile(t, []byte(fmt.Sprintf(confBTemplate, srva.ClusterAddr().Port)))
defer os.Remove(confB)
srvb, _ := RunServerWithConfig(confB)
defer srvb.Shutdown()
checkClusterFormed(t, srva, srvb)
// Create a connection on A
nca, err := nats.Connect(fmt.Sprintf("nats://127.0.0.1:%d", srva.Addr().(*net.TCPAddr).Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nca.Close()
// Create a subscription on "foo" and "bar"
if _, err := nca.SubscribeSync("foo"); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
if _, err := nca.SubscribeSync("bar"); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
checkExpectedSubs(t, 2, srva, srvb)
// Drop foo
reloadUpdateConfig(t, srva, confA, fmt.Sprintf(confATemplate, `"bar"`))
checkExpectedSubs(t, 2, srva)
checkExpectedSubs(t, 1, srvb)
// Add it back
reloadUpdateConfig(t, srva, confA, fmt.Sprintf(confATemplate, `["foo", "bar"]`))
checkExpectedSubs(t, 2, srva, srvb)
// Empty Import means implicit allow
reloadUpdateConfig(t, srva, confA, `
port: -1
cluster {
listen: 127.0.0.1:-1
permissions {
export: ">"
}
}
`)
checkExpectedSubs(t, 2, srva, srvb)
confATemplate = `
port: -1
cluster {
listen: 127.0.0.1:-1
permissions {
import: {
allow: ["foo", "bar"]
deny: %s
}
}
}
`
// Now deny all:
reloadUpdateConfig(t, srva, confA, fmt.Sprintf(confATemplate, `["foo", "bar"]`))
checkExpectedSubs(t, 2, srva)
checkExpectedSubs(t, 0, srvb)
// Drop foo from the deny list
reloadUpdateConfig(t, srva, confA, fmt.Sprintf(confATemplate, `"bar"`))
checkExpectedSubs(t, 2, srva)
checkExpectedSubs(t, 1, srvb)
}
func TestConfigReloadClusterPermsExport(t *testing.T) {
confATemplate := `
port: -1
cluster {
listen: 127.0.0.1:-1
permissions {
export: {
allow: %s
}
}
}
`
confA := createConfFile(t, []byte(fmt.Sprintf(confATemplate, `["foo", "bar"]`)))
defer os.Remove(confA)
srva, _ := RunServerWithConfig(confA)
defer srva.Shutdown()
confBTemplate := `
port: -1
cluster {
listen: 127.0.0.1:-1
routes = [
"nats://127.0.0.1:%d"
]
}
`
confB := createConfFile(t, []byte(fmt.Sprintf(confBTemplate, srva.ClusterAddr().Port)))
defer os.Remove(confB)
srvb, _ := RunServerWithConfig(confB)
defer srvb.Shutdown()
checkClusterFormed(t, srva, srvb)
// Create a connection on B
ncb, err := nats.Connect(fmt.Sprintf("nats://127.0.0.1:%d", srvb.Addr().(*net.TCPAddr).Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer ncb.Close()
// Create a subscription on "foo" and "bar"
if _, err := ncb.SubscribeSync("foo"); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
if _, err := ncb.SubscribeSync("bar"); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
checkExpectedSubs(t, 2, srva, srvb)
// Drop foo
reloadUpdateConfig(t, srva, confA, fmt.Sprintf(confATemplate, `"bar"`))
checkExpectedSubs(t, 2, srvb)
checkExpectedSubs(t, 1, srva)
// Add it back
reloadUpdateConfig(t, srva, confA, fmt.Sprintf(confATemplate, `["foo", "bar"]`))
checkExpectedSubs(t, 2, srva, srvb)
// Empty Export means implicit allow
reloadUpdateConfig(t, srva, confA, `
port: -1
cluster {
listen: 127.0.0.1:-1
permissions {
import: ">"
}
}
`)
checkExpectedSubs(t, 2, srva, srvb)
confATemplate = `
port: -1
cluster {
listen: 127.0.0.1:-1
permissions {
export: {
allow: ["foo", "bar"]
deny: %s
}
}
}
`
// Now deny all:
reloadUpdateConfig(t, srva, confA, fmt.Sprintf(confATemplate, `["foo", "bar"]`))
checkExpectedSubs(t, 0, srva)
checkExpectedSubs(t, 2, srvb)
// Drop foo from the deny list
reloadUpdateConfig(t, srva, confA, fmt.Sprintf(confATemplate, `"bar"`))
checkExpectedSubs(t, 1, srva)
checkExpectedSubs(t, 2, srvb)
}
func TestConfigReloadClusterPermsOldServer(t *testing.T) {
confATemplate := `
port: -1
cluster {
listen: 127.0.0.1:-1
permissions {
export: {
allow: %s
}
}
}
`
confA := createConfFile(t, []byte(fmt.Sprintf(confATemplate, `["foo", "bar"]`)))
defer os.Remove(confA)
srva, _ := RunServerWithConfig(confA)
defer srva.Shutdown()
optsB := DefaultOptions()
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srva.ClusterAddr().Port))
// Make server B behave like an old server
testRouteProto = routeProtoZero
defer func() { testRouteProto = routeProtoInfo }()
srvb := RunServer(optsB)
defer srvb.Shutdown()
testRouteProto = routeProtoInfo
checkClusterFormed(t, srva, srvb)
// Get the route's connection ID
getRouteRID := func() uint64 {
rid := uint64(0)
srvb.mu.Lock()
for _, r := range srvb.routes {
r.mu.Lock()
rid = r.cid
r.mu.Unlock()
break
}
srvb.mu.Unlock()
return rid
}
orgRID := getRouteRID()
// Cause a config reload on A
reloadUpdateConfig(t, srva, confA, fmt.Sprintf(confATemplate, `"bar"`))
// Check that new route gets created
check := func(t *testing.T) {
t.Helper()
checkFor(t, 3*time.Second, 15*time.Millisecond, func() error {
if rid := getRouteRID(); rid == orgRID {
return fmt.Errorf("Route does not seem to have been recreated")
}
return nil
})
}
check(t)
// Save the current value
orgRID = getRouteRID()
// Add another server that supports INFO updates
optsC := DefaultOptions()
optsC.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srva.ClusterAddr().Port))
srvc := RunServer(optsC)
defer srvc.Shutdown()
checkClusterFormed(t, srva, srvb, srvc)
// Cause a config reload on A
reloadUpdateConfig(t, srva, confA, fmt.Sprintf(confATemplate, `"foo"`))
// Check that new route gets created
check(t)
}

View File

@@ -39,6 +39,18 @@ const (
Explicit
)
const (
// routeProtoZero is the original Route protocol from 2009.
// http://nats.io/documentation/internals/nats-protocol/
routeProtoZero = iota
// routeProtoInfo signals a route can receive more then the original INFO block.
// This can be used to update remote cluster permissions, etc...
routeProtoInfo
)
// Used by tests
var testRouteProto = routeProtoInfo
type route struct {
remoteID string
didSolicit bool
@@ -305,6 +317,9 @@ func (c *client) processRouteInfo(info *Info) {
// in closeConnection().
c.route.remoteID = info.ID
// Get the route's proto version
c.opts.Protocol = info.Proto
// Detect route to self.
if c.route.remoteID == s.info.ID {
c.mu.Unlock()
@@ -316,6 +331,14 @@ func (c *client) processRouteInfo(info *Info) {
c.route.authRequired = info.AuthRequired
c.route.tlsRequired = info.TLSRequired
// If this is an update due to config reload on the remote server,
// need to possibly send local subs to the remote server.
if c.flags.isSet(infoReceived) {
s.updateRemoteRoutePerms(c, info)
c.mu.Unlock()
return
}
// Copy over permissions as well.
c.opts.Import = info.Import
c.opts.Export = info.Export
@@ -335,6 +358,10 @@ func (c *client) processRouteInfo(info *Info) {
c.route.url = url
}
// Mark that the INFO protocol has been received. Will allow
// to detect INFO updates.
c.flags.set(infoReceived)
// Check to see if we have this remote already registered.
// This can happen when both servers have routes to each other.
c.mu.Unlock()
@@ -376,6 +403,41 @@ func (c *client) processRouteInfo(info *Info) {
}
}
// Possibly sends local subscriptions interest to this route
// based on changes in the remote's Export permissions.
// Lock assumed held on entry
func (s *Server) updateRemoteRoutePerms(route *client, info *Info) {
// Interested only on Export permissions for the remote server.
// Create "fake" clients that we will use to check permissions
// using the old permissions...
oldPerms := &RoutePermissions{Export: route.opts.Export}
oldPermsTester := &client{}
oldPermsTester.setRoutePermissions(oldPerms)
// and the new ones.
newPerms := &RoutePermissions{Export: info.Export}
newPermsTester := &client{}
newPermsTester.setRoutePermissions(newPerms)
route.opts.Import = info.Import
route.opts.Export = info.Export
var (
_localSubs [4096]*subscription
localSubs = _localSubs[:0]
)
s.sl.localSubs(&localSubs)
route.sendRouteSubProtos(localSubs, func(sub *subscription) bool {
subj := sub.subject
// If the remote can now export but could not before, and this server can import this
// subject, then send SUB protocol.
if newPermsTester.canExport(subj) && !oldPermsTester.canExport(subj) && route.canImport(subj) {
return true
}
return false
})
}
// sendAsyncInfoToClients sends an INFO protocol to all
// connected clients that accept async INFO updates.
// The server lock is held on entry.
@@ -519,21 +581,102 @@ func (s *Server) sendLocalSubsToRoute(route *client) {
s.sl.localSubs(&subs)
route.mu.Lock()
closed := route.sendRouteSubProtos(subs, func(sub *subscription) bool {
return route.canImport(sub.subject)
})
route.mu.Unlock()
if !closed {
route.Debugf("Sent local subscriptions to route")
}
}
// Sends SUBs protocols for the given subscriptions. If a filter is specified, it is
// invoked for each subscription. If the filter returns false, the subscription is skipped.
// This function may release the route's lock due to flushing of outbound data. A boolean
// is returned to indicate if the connection has been closed during this call.
// Lock is held on entry.
func (c *client) sendRouteSubProtos(subs []*subscription, filter func(sub *subscription) bool) bool {
return c.sendRouteSubOrUnSubProtos(subs, true, filter)
}
// Sends UNSUBs protocols for the given subscriptions. If a filter is specified, it is
// invoked for each subscription. If the filter returns false, the subscription is skipped.
// This function may release the route's lock due to flushing of outbound data. A boolean
// is returned to indicate if the connection has been closed during this call.
// Lock is held on entry.
func (c *client) sendRouteUnSubProtos(subs []*subscription, filter func(sub *subscription) bool) bool {
return c.sendRouteSubOrUnSubProtos(subs, false, filter)
}
// Low-level function that sends SUBs or UNSUBs protcols for the given subscriptions.
// Use sendRouteSubProtos or sendRouteUnSubProtos instead for clarity.
// Lock is held on entry.
func (c *client) sendRouteSubOrUnSubProtos(subs []*subscription, isSubProto bool, filter func(sub *subscription) bool) bool {
const staticBufSize = maxBufSize * 2
var (
_buf [staticBufSize]byte // array on stack
buf = _buf[:0] // our buffer will initially point to the stack buffer
mbs = staticBufSize // max size of the buffer
mpMax = int(c.out.mp / 2) // 50% of max_pending
closed bool
)
// We need to make sure that we stay below the user defined max pending bytes.
if mbs > mpMax {
mbs = mpMax
}
for _, sub := range subs {
// Send SUB interest only if subject has a match in import permissions
if !route.canImport(sub.subject) {
if filter != nil && !filter(sub) {
continue
}
proto := fmt.Sprintf(subProto, sub.subject, sub.queue, routeSid(sub))
route.queueOutbound([]byte(proto))
if route.out.pb > int64(route.out.sz*2) {
route.flushSignal()
rsid := routeSid(sub)
// Check if proto is going to fit.
curSize := len(buf)
if isSubProto {
// "SUB " + subject + " " + queue + " " + ...
curSize += 4 + len(sub.subject) + 1 + len(sub.queue) + 1
} else {
// "UNSUB " + ...
curSize += 6
}
// rsid + "\r\n"
curSize += len(rsid) + 2
if curSize >= mbs {
if c.queueOutbound(buf) {
// Need to allocate new array
buf = make([]byte, 0, mbs)
} else {
// We can reuse previous buffer
buf = buf[:0]
}
// Update last activity because flushOutbound() will release
// the lock, which could cause pingTimer to think that this
// connection is stale otherwise.
c.last = time.Now()
c.flushOutbound()
if closed = c.flags.isSet(clearConnection); closed {
break
}
}
if isSubProto {
buf = append(buf, []byte("SUB ")...)
buf = append(buf, sub.subject...)
buf = append(buf, ' ')
if len(sub.queue) > 0 {
buf = append(buf, sub.queue...)
}
} else {
buf = append(buf, []byte("UNSUB ")...)
}
buf = append(buf, ' ')
buf = append(buf, rsid...)
buf = append(buf, []byte(CR_LF)...)
}
route.flushSignal()
route.mu.Unlock()
route.Debugf("Sent local subscriptions to route")
if !closed && len(buf) > 0 {
c.queueOutbound(buf)
c.flushOutbound()
closed = c.flags.isSet(clearConnection)
}
return closed
}
func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
@@ -835,6 +978,7 @@ func (s *Server) broadcastInterestToRoutes(sub *subscription, proto string) {
arg = []byte(proto[:len(proto)-LEN_CR_LF])
}
protoAsBytes := []byte(proto)
checkPerms := true
s.mu.Lock()
for _, route := range s.routes {
// FIXME(dlc) - Make same logic as deliverMsg
@@ -843,9 +987,12 @@ func (s *Server) broadcastInterestToRoutes(sub *subscription, proto string) {
// route will have the same `perms`, so check with the first route
// and send SUB interest only if subject has a match in import permissions.
// If there is no match, we stop here.
if !route.canImport(sub.subject) {
route.mu.Unlock()
break
if checkPerms {
checkPerms = false
if !route.canImport(sub.subject) {
route.mu.Unlock()
break
}
}
route.sendProto(protoAsBytes, true)
route.mu.Unlock()
@@ -883,6 +1030,27 @@ func (s *Server) broadcastUnSubscribe(sub *subscription) {
s.broadcastInterestToRoutes(sub, proto)
}
// Sends UNSUB protocols for each of the subscriptions in the given array
// to all connected routes. Used when a client connection is closed. Note
// that when that happens, the subscriptions' MAX have been cleared (force unsub).
func (s *Server) broadcastUnSubscribeBatch(subs []*subscription) {
var (
_routes [32]*client
routes = _routes[:0]
)
s.mu.Lock()
for _, route := range s.routes {
routes = append(routes, route)
}
s.mu.Unlock()
for _, route := range routes {
route.mu.Lock()
route.sendRouteUnSubProtos(subs, nil)
route.mu.Unlock()
}
}
func (s *Server) routeAcceptLoop(ch chan struct{}) {
defer func() {
if ch != nil {
@@ -910,6 +1078,9 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) {
net.JoinHostPort(opts.Cluster.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
s.mu.Lock()
// For tests, we want to be able to make this server behave
// as an older server.
proto := testRouteProto
// Check for TLSConfig
tlsReq := opts.Cluster.TLSConfig != nil
info := Info{
@@ -920,6 +1091,7 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) {
TLSRequired: tlsReq,
TLSVerify: tlsReq,
MaxPayload: s.info.MaxPayload,
Proto: proto,
}
// Set this if only if advertise is not disabled
if !opts.Cluster.NoAdvertise {

View File

@@ -1092,3 +1092,32 @@ func TestRoutePermsAppliedOnInboundAndOutboundRoute(t *testing.T) {
// Now check for permissions set on server initiating the route connection
check(t, srvb)
}
func TestRouteSendLocalSubsWithLowMaxPending(t *testing.T) {
optsA := DefaultOptions()
optsA.MaxPending = 1024
srvA := RunServer(optsA)
defer srvA.Shutdown()
nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
numSubs := 1000
for i := 0; i < numSubs; i++ {
nc.Subscribe("foo.bar", func(_ *nats.Msg) {})
}
checkExpectedSubs(t, numSubs, srvA)
// Now create a route between B and A
optsB := DefaultOptions()
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host, optsA.Cluster.Port))
srvB := RunServer(optsB)
defer srvB.Shutdown()
checkClusterFormed(t, srvA, srvB)
// Check that all subs have been sent ok
checkExpectedSubs(t, numSubs, srvA, srvB)
}

View File

@@ -128,6 +128,9 @@ type Server struct {
clientActualPort int
clusterActualPort int
// Use during reload
oldClusterPerms *RoutePermissions
// Used by tests to check that http.Servers do
// not set any timeout.
monitoringServer *http.Server