From c92dc0dc5b65e067f6e1ae27a6c37516ae72c7c3 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 13 Apr 2022 09:31:10 -0600 Subject: [PATCH 1/2] [FIXED] LeafNode interest propagation with imports/exports When using subscriptions through import/exports, the server with a leafnode connection would properly send the interest over, but if the connection is recreated, this would not happen. In case of JetStream where that happens under the cover, message flow would stop after the leafnode restart because the subscriptions would be created on recovery of the JetStream assets but *before* the LeafNode connection could be established. Resolves #3024 Resolves #3027 Resolves #3009 Signed-off-by: Ivan Kozlovic --- server/sublist.go | 2 +- test/leafnode_test.go | 103 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 103 insertions(+), 2 deletions(-) diff --git a/server/sublist.go b/server/sublist.go index a9873e95..2ab89943 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -1369,7 +1369,7 @@ func matchLiteral(literal, subject string) bool { } func addLocalSub(sub *subscription, subs *[]*subscription, includeLeafHubs bool) { - if sub != nil && sub.client != nil && sub.im == nil { + if sub != nil && sub.client != nil { kind := sub.client.kind if kind == CLIENT || kind == SYSTEM || kind == JETSTREAM || kind == ACCOUNT || (includeLeafHubs && sub.client.isHubLeafNode() /* implied kind==LEAF */) { diff --git a/test/leafnode_test.go b/test/leafnode_test.go index cb29db33..b00331d2 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -4303,7 +4303,9 @@ func TestLeafNodeAdvertiseInCluster(t *testing.T) { expectNothing(t, lc) } -func TestLeafNodeStreamAndShadowSubs(t *testing.T) { +func TestLeafNodeAndGatewaysStreamAndShadowSubs(t *testing.T) { + server.SetGatewaysSolicitDelay(10 * time.Millisecond) + defer server.ResetGatewaysSolicitDelay() conf1 := createConfFile(t, []byte(` port: -1 system_account: SYS @@ -4420,6 +4422,105 @@ func TestLeafNodeStreamAndShadowSubs(t *testing.T) { } } +func TestLeafNodeStreamAndShadowSubs(t *testing.T) { + hubConf := createConfFile(t, []byte(` + port: -1 + leafnodes { + port: -1 + authorization: { + user: leaf + password: leaf + account: B + } + } + accounts: { + A: { + users = [{user: usrA, password: usrA}] + exports: [{stream: foo.*.>}] + } + B: { + imports: [{stream: {account: A, subject: foo.*.>}}] + } + } + `)) + defer removeFile(t, hubConf) + hub, hubo := RunServerWithConfig(hubConf) + defer hub.Shutdown() + + leafConf := createConfFile(t, []byte(fmt.Sprintf(` + port: -1 + leafnodes { + remotes = [ + { + url: "nats-leaf://leaf:leaf@127.0.0.1:%d" + account: B + } + ] + } + accounts: { + B: { + exports: [{stream: foo.*.>}] + } + C: { + users: [{user: usrC, password: usrC}] + imports: [{stream: {account: B, subject: foo.bar.>}}] + } + } + `, hubo.LeafNode.Port))) + defer removeFile(t, leafConf) + leafo := LoadConfig(leafConf) + leafo.LeafNode.ReconnectInterval = 50 * time.Millisecond + leaf := RunServer(leafo) + defer leaf.Shutdown() + + checkLeafNodeConnected(t, hub) + checkLeafNodeConnected(t, leaf) + + ncl, err := nats.Connect(leaf.ClientURL(), nats.UserInfo("usrC", "usrC")) + if err != nil { + t.Fatalf("Error connecting: %v", err) + } + defer ncl.Close() + + // This will send an LS+ to the "hub" server. + sub, err := ncl.SubscribeSync("foo.*.baz") + if err != nil { + t.Fatalf("Error subscribing: %v", err) + } + ncl.Flush() + + pubAndCheck := func() { + t.Helper() + ncm, err := nats.Connect(hub.ClientURL(), nats.UserInfo("usrA", "usrA")) + if err != nil { + t.Fatalf("Error connecting: %v", err) + } + defer ncm.Close() + + // Try a few times in case subject interest has not propagated yet + for i := 0; i < 5; i++ { + ncm.Publish("foo.bar.baz", []byte("msg")) + if _, err := sub.NextMsg(time.Second); err == nil { + // OK, done! + return + } + } + t.Fatal("Message was not received") + } + pubAndCheck() + + // Now cause a restart of the accepting side so that the leaf connection + // is recreated. + hub.Shutdown() + hub = RunServer(hubo) + defer hub.Shutdown() + + checkLeafNodeConnected(t, hub) + checkLeafNodeConnected(t, leaf) + + pubAndCheck() +} + func TestLeafnodeHeaders(t *testing.T) { srv, opts := runLeafServer() defer srv.Shutdown() From c1a17e890a38845f5927d7c04592616af0bb9515 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 13 Apr 2022 09:55:03 -0600 Subject: [PATCH 2/2] Fixed JetStream flapper Signed-off-by: Ivan Kozlovic --- server/jetstream_test.go | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 84a3b1d2..2b001fce 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -7361,11 +7361,13 @@ func TestJetStreamStreamStorageTrackingAndLimits(t *testing.T) { nc.Flush() state = mset.state() - usage = gacc.JetStreamUsage() - - if usage.Memory != 0 { - t.Fatalf("Expected usage memeory to be 0, got %d", usage.Memory) - } + checkFor(t, time.Second, 15*time.Millisecond, func() error { + usage = gacc.JetStreamUsage() + if usage.Memory != 0 { + return fmt.Errorf("Expected usage memory to be 0, got %d", usage.Memory) + } + return nil + }) // Now send twice the number of messages. Should receive an error at some point, and we will check usage against limits. var errSeen string @@ -7382,12 +7384,15 @@ func TestJetStreamStreamStorageTrackingAndLimits(t *testing.T) { } state = mset.state() - usage = gacc.JetStreamUsage() - - lim := al[_EMPTY_] - if usage.Memory > uint64(lim.MaxMemory) { - t.Fatalf("Expected memory to not exceed limit of %d, got %d", lim.MaxMemory, usage.Memory) - } + var lim JetStreamAccountLimits + checkFor(t, time.Second, 15*time.Millisecond, func() error { + usage = gacc.JetStreamUsage() + lim = al[_EMPTY_] + if usage.Memory > uint64(lim.MaxMemory) { + return fmt.Errorf("Expected memory to not exceed limit of %d, got %d", lim.MaxMemory, usage.Memory) + } + return nil + }) // make sure that unlimited accounts work lim.MaxMemory = -1