[FIXED] LeafNode: propagation interest issue after a config reload

When a configuration reload is done, the account's leaf node connections
were not transfered to the new instance of the account, causing the
interest to not be propagated until a leafnode reconnect or a server
restart.

Resolves #3009

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2022-04-20 08:03:34 -06:00
parent 69ea1ab5f4
commit 730d8921e4
3 changed files with 113 additions and 99 deletions

View File

@@ -5419,3 +5419,114 @@ func TestLeafNodeMinVersion(t *testing.T) {
// OK
}
}
func TestLeafNodeStreamAndShadowSubs(t *testing.T) {
hubConf := createConfFile(t, []byte(`
port: -1
leafnodes {
port: -1
authorization: {
user: leaf
password: leaf
account: B
}
}
accounts: {
A: {
users = [{user: usrA, password: usrA}]
exports: [{stream: foo.*.>}]
}
B: {
imports: [{stream: {account: A, subject: foo.*.>}}]
}
}
`))
defer removeFile(t, hubConf)
hub, hubo := RunServerWithConfig(hubConf)
defer hub.Shutdown()
leafConfContet := fmt.Sprintf(`
port: -1
leafnodes {
remotes = [
{
url: "nats-leaf://leaf:leaf@127.0.0.1:%d"
account: B
}
]
}
accounts: {
B: {
exports: [{stream: foo.*.>}]
}
C: {
users: [{user: usrC, password: usrC}]
imports: [{stream: {account: B, subject: foo.bar.>}}]
}
}
`, hubo.LeafNode.Port)
leafConf := createConfFile(t, []byte(leafConfContet))
defer removeFile(t, leafConf)
leafo := LoadConfig(leafConf)
leafo.LeafNode.ReconnectInterval = 50 * time.Millisecond
leaf := RunServer(leafo)
defer leaf.Shutdown()
checkLeafNodeConnected(t, hub)
checkLeafNodeConnected(t, leaf)
subPubAndCheck := func() {
t.Helper()
ncl, err := nats.Connect(leaf.ClientURL(), nats.UserInfo("usrC", "usrC"))
if err != nil {
t.Fatalf("Error connecting: %v", err)
}
defer ncl.Close()
fmt.Printf("@@IK: ------ sub to foo.*.baz -----\n")
// This will send an LS+ to the "hub" server.
sub, err := ncl.SubscribeSync("foo.*.baz")
if err != nil {
t.Fatalf("Error subscribing: %v", err)
}
ncl.Flush()
ncm, err := nats.Connect(hub.ClientURL(), nats.UserInfo("usrA", "usrA"))
if err != nil {
t.Fatalf("Error connecting: %v", err)
}
defer ncm.Close()
// Try a few times in case subject interest has not propagated yet
for i := 0; i < 5; i++ {
ncm.Publish("foo.bar.baz", []byte("msg"))
if _, err := sub.NextMsg(time.Second); err == nil {
// OK, done!
return
}
}
t.Fatal("Message was not received")
}
subPubAndCheck()
// Now cause a restart of the accepting side so that the leaf connection
// is recreated.
hub.Shutdown()
hub = RunServer(hubo)
defer hub.Shutdown()
checkLeafNodeConnected(t, hub)
checkLeafNodeConnected(t, leaf)
subPubAndCheck()
// Issue a config reload even though we make no modification. There was
// a defect that caused the interest propagation to break.
// Set the ReconnectInterval to the default value so that reload does not complain.
leaf.getOpts().LeafNode.ReconnectInterval = DEFAULT_LEAF_NODE_RECONNECT
reloadUpdateConfig(t, leaf, leafConf, leafConfContet)
// Check again
subPubAndCheck()
}

View File

@@ -1482,6 +1482,8 @@ func (s *Server) reloadAuthorization() {
newAcc.clients[c] = struct{}{}
}
}
// Same for leafnodes
newAcc.lleafs = append([]*client(nil), acc.lleafs...)
newAcc.sl = acc.sl
newAcc.rm = acc.rm

View File

@@ -4422,105 +4422,6 @@ func TestLeafNodeAndGatewaysStreamAndShadowSubs(t *testing.T) {
}
}
func TestLeafNodeStreamAndShadowSubs(t *testing.T) {
hubConf := createConfFile(t, []byte(`
port: -1
leafnodes {
port: -1
authorization: {
user: leaf
password: leaf
account: B
}
}
accounts: {
A: {
users = [{user: usrA, password: usrA}]
exports: [{stream: foo.*.>}]
}
B: {
imports: [{stream: {account: A, subject: foo.*.>}}]
}
}
`))
defer removeFile(t, hubConf)
hub, hubo := RunServerWithConfig(hubConf)
defer hub.Shutdown()
leafConf := createConfFile(t, []byte(fmt.Sprintf(`
port: -1
leafnodes {
remotes = [
{
url: "nats-leaf://leaf:leaf@127.0.0.1:%d"
account: B
}
]
}
accounts: {
B: {
exports: [{stream: foo.*.>}]
}
C: {
users: [{user: usrC, password: usrC}]
imports: [{stream: {account: B, subject: foo.bar.>}}]
}
}
`, hubo.LeafNode.Port)))
defer removeFile(t, leafConf)
leafo := LoadConfig(leafConf)
leafo.LeafNode.ReconnectInterval = 50 * time.Millisecond
leaf := RunServer(leafo)
defer leaf.Shutdown()
checkLeafNodeConnected(t, hub)
checkLeafNodeConnected(t, leaf)
ncl, err := nats.Connect(leaf.ClientURL(), nats.UserInfo("usrC", "usrC"))
if err != nil {
t.Fatalf("Error connecting: %v", err)
}
defer ncl.Close()
// This will send an LS+ to the "hub" server.
sub, err := ncl.SubscribeSync("foo.*.baz")
if err != nil {
t.Fatalf("Error subscribing: %v", err)
}
ncl.Flush()
pubAndCheck := func() {
t.Helper()
ncm, err := nats.Connect(hub.ClientURL(), nats.UserInfo("usrA", "usrA"))
if err != nil {
t.Fatalf("Error connecting: %v", err)
}
defer ncm.Close()
// Try a few times in case subject interest has not propagated yet
for i := 0; i < 5; i++ {
ncm.Publish("foo.bar.baz", []byte("msg"))
if _, err := sub.NextMsg(time.Second); err == nil {
// OK, done!
return
}
}
t.Fatal("Message was not received")
}
pubAndCheck()
// Now cause a restart of the accepting side so that the leaf connection
// is recreated.
hub.Shutdown()
hub = RunServer(hubo)
defer hub.Shutdown()
checkLeafNodeConnected(t, hub)
checkLeafNodeConnected(t, leaf)
pubAndCheck()
}
func TestLeafnodeHeaders(t *testing.T) {
srv, opts := runLeafServer()
defer srv.Shutdown()