diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 84a3b1d2..2b001fce 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -7361,11 +7361,13 @@ func TestJetStreamStreamStorageTrackingAndLimits(t *testing.T) { nc.Flush() state = mset.state() - usage = gacc.JetStreamUsage() - - if usage.Memory != 0 { - t.Fatalf("Expected usage memeory to be 0, got %d", usage.Memory) - } + checkFor(t, time.Second, 15*time.Millisecond, func() error { + usage = gacc.JetStreamUsage() + if usage.Memory != 0 { + return fmt.Errorf("Expected usage memory to be 0, got %d", usage.Memory) + } + return nil + }) // Now send twice the number of messages. Should receive an error at some point, and we will check usage against limits. var errSeen string @@ -7382,12 +7384,15 @@ func TestJetStreamStreamStorageTrackingAndLimits(t *testing.T) { } state = mset.state() - usage = gacc.JetStreamUsage() - - lim := al[_EMPTY_] - if usage.Memory > uint64(lim.MaxMemory) { - t.Fatalf("Expected memory to not exceed limit of %d, got %d", lim.MaxMemory, usage.Memory) - } + var lim JetStreamAccountLimits + checkFor(t, time.Second, 15*time.Millisecond, func() error { + usage = gacc.JetStreamUsage() + lim = al[_EMPTY_] + if usage.Memory > uint64(lim.MaxMemory) { + return fmt.Errorf("Expected memory to not exceed limit of %d, got %d", lim.MaxMemory, usage.Memory) + } + return nil + }) // make sure that unlimited accounts work lim.MaxMemory = -1 diff --git a/server/sublist.go b/server/sublist.go index a9873e95..2ab89943 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -1369,7 +1369,7 @@ func matchLiteral(literal, subject string) bool { } func addLocalSub(sub *subscription, subs *[]*subscription, includeLeafHubs bool) { - if sub != nil && sub.client != nil && sub.im == nil { + if sub != nil && sub.client != nil { kind := sub.client.kind if kind == CLIENT || kind == SYSTEM || kind == JETSTREAM || kind == ACCOUNT || (includeLeafHubs && sub.client.isHubLeafNode() /* implied kind==LEAF */) { diff --git a/test/leafnode_test.go b/test/leafnode_test.go index cb29db33..b00331d2 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -4303,7 +4303,9 @@ func TestLeafNodeAdvertiseInCluster(t *testing.T) { expectNothing(t, lc) } -func TestLeafNodeStreamAndShadowSubs(t *testing.T) { +func TestLeafNodeAndGatewaysStreamAndShadowSubs(t *testing.T) { + server.SetGatewaysSolicitDelay(10 * time.Millisecond) + defer server.ResetGatewaysSolicitDelay() conf1 := createConfFile(t, []byte(` port: -1 system_account: SYS @@ -4420,6 +4422,105 @@ func TestLeafNodeStreamAndShadowSubs(t *testing.T) { } } +func TestLeafNodeStreamAndShadowSubs(t *testing.T) { + hubConf := createConfFile(t, []byte(` + port: -1 + leafnodes { + port: -1 + authorization: { + user: leaf + password: leaf + account: B + } + } + accounts: { + A: { + users = [{user: usrA, password: usrA}] + exports: [{stream: foo.*.>}] + } + B: { + imports: [{stream: {account: A, subject: foo.*.>}}] + } + } + `)) + defer removeFile(t, hubConf) + hub, hubo := RunServerWithConfig(hubConf) + defer hub.Shutdown() + + leafConf := createConfFile(t, []byte(fmt.Sprintf(` + port: -1 + leafnodes { + remotes = [ + { + url: "nats-leaf://leaf:leaf@127.0.0.1:%d" + account: B + } + ] + } + accounts: { + B: { + exports: [{stream: foo.*.>}] + } + C: { + users: [{user: usrC, password: usrC}] + imports: [{stream: {account: B, subject: foo.bar.>}}] + } + } + `, hubo.LeafNode.Port))) + defer removeFile(t, leafConf) + leafo := LoadConfig(leafConf) + leafo.LeafNode.ReconnectInterval = 50 * time.Millisecond + leaf := RunServer(leafo) + defer leaf.Shutdown() + + checkLeafNodeConnected(t, hub) + checkLeafNodeConnected(t, leaf) + + ncl, err := nats.Connect(leaf.ClientURL(), nats.UserInfo("usrC", "usrC")) + if err != nil { + t.Fatalf("Error connecting: %v", err) + } + defer ncl.Close() + + // This will send an LS+ to the "hub" server. + sub, err := ncl.SubscribeSync("foo.*.baz") + if err != nil { + t.Fatalf("Error subscribing: %v", err) + } + ncl.Flush() + + pubAndCheck := func() { + t.Helper() + ncm, err := nats.Connect(hub.ClientURL(), nats.UserInfo("usrA", "usrA")) + if err != nil { + t.Fatalf("Error connecting: %v", err) + } + defer ncm.Close() + + // Try a few times in case subject interest has not propagated yet + for i := 0; i < 5; i++ { + ncm.Publish("foo.bar.baz", []byte("msg")) + if _, err := sub.NextMsg(time.Second); err == nil { + // OK, done! + return + } + } + t.Fatal("Message was not received") + } + pubAndCheck() + + // Now cause a restart of the accepting side so that the leaf connection + // is recreated. + hub.Shutdown() + hub = RunServer(hubo) + defer hub.Shutdown() + + checkLeafNodeConnected(t, hub) + checkLeafNodeConnected(t, leaf) + + pubAndCheck() +} + func TestLeafnodeHeaders(t *testing.T) { srv, opts := runLeafServer() defer srv.Shutdown()