Merge pull request #3031 from nats-io/fix_3024

[FIXED] LeafNode interest propagation with imports/exports
This commit is contained in:
Ivan Kozlovic
2022-04-13 13:00:51 -06:00
committed by GitHub
3 changed files with 119 additions and 13 deletions

View File

@@ -7361,11 +7361,13 @@ func TestJetStreamStreamStorageTrackingAndLimits(t *testing.T) {
nc.Flush()
state = mset.state()
usage = gacc.JetStreamUsage()
if usage.Memory != 0 {
t.Fatalf("Expected usage memeory to be 0, got %d", usage.Memory)
}
checkFor(t, time.Second, 15*time.Millisecond, func() error {
usage = gacc.JetStreamUsage()
if usage.Memory != 0 {
return fmt.Errorf("Expected usage memory to be 0, got %d", usage.Memory)
}
return nil
})
// Now send twice the number of messages. Should receive an error at some point, and we will check usage against limits.
var errSeen string
@@ -7382,12 +7384,15 @@ func TestJetStreamStreamStorageTrackingAndLimits(t *testing.T) {
}
state = mset.state()
usage = gacc.JetStreamUsage()
lim := al[_EMPTY_]
if usage.Memory > uint64(lim.MaxMemory) {
t.Fatalf("Expected memory to not exceed limit of %d, got %d", lim.MaxMemory, usage.Memory)
}
var lim JetStreamAccountLimits
checkFor(t, time.Second, 15*time.Millisecond, func() error {
usage = gacc.JetStreamUsage()
lim = al[_EMPTY_]
if usage.Memory > uint64(lim.MaxMemory) {
return fmt.Errorf("Expected memory to not exceed limit of %d, got %d", lim.MaxMemory, usage.Memory)
}
return nil
})
// make sure that unlimited accounts work
lim.MaxMemory = -1

View File

@@ -1369,7 +1369,7 @@ func matchLiteral(literal, subject string) bool {
}
func addLocalSub(sub *subscription, subs *[]*subscription, includeLeafHubs bool) {
if sub != nil && sub.client != nil && sub.im == nil {
if sub != nil && sub.client != nil {
kind := sub.client.kind
if kind == CLIENT || kind == SYSTEM || kind == JETSTREAM || kind == ACCOUNT ||
(includeLeafHubs && sub.client.isHubLeafNode() /* implied kind==LEAF */) {

View File

@@ -4303,7 +4303,9 @@ func TestLeafNodeAdvertiseInCluster(t *testing.T) {
expectNothing(t, lc)
}
func TestLeafNodeStreamAndShadowSubs(t *testing.T) {
func TestLeafNodeAndGatewaysStreamAndShadowSubs(t *testing.T) {
server.SetGatewaysSolicitDelay(10 * time.Millisecond)
defer server.ResetGatewaysSolicitDelay()
conf1 := createConfFile(t, []byte(`
port: -1
system_account: SYS
@@ -4420,6 +4422,105 @@ func TestLeafNodeStreamAndShadowSubs(t *testing.T) {
}
}
func TestLeafNodeStreamAndShadowSubs(t *testing.T) {
hubConf := createConfFile(t, []byte(`
port: -1
leafnodes {
port: -1
authorization: {
user: leaf
password: leaf
account: B
}
}
accounts: {
A: {
users = [{user: usrA, password: usrA}]
exports: [{stream: foo.*.>}]
}
B: {
imports: [{stream: {account: A, subject: foo.*.>}}]
}
}
`))
defer removeFile(t, hubConf)
hub, hubo := RunServerWithConfig(hubConf)
defer hub.Shutdown()
leafConf := createConfFile(t, []byte(fmt.Sprintf(`
port: -1
leafnodes {
remotes = [
{
url: "nats-leaf://leaf:leaf@127.0.0.1:%d"
account: B
}
]
}
accounts: {
B: {
exports: [{stream: foo.*.>}]
}
C: {
users: [{user: usrC, password: usrC}]
imports: [{stream: {account: B, subject: foo.bar.>}}]
}
}
`, hubo.LeafNode.Port)))
defer removeFile(t, leafConf)
leafo := LoadConfig(leafConf)
leafo.LeafNode.ReconnectInterval = 50 * time.Millisecond
leaf := RunServer(leafo)
defer leaf.Shutdown()
checkLeafNodeConnected(t, hub)
checkLeafNodeConnected(t, leaf)
ncl, err := nats.Connect(leaf.ClientURL(), nats.UserInfo("usrC", "usrC"))
if err != nil {
t.Fatalf("Error connecting: %v", err)
}
defer ncl.Close()
// This will send an LS+ to the "hub" server.
sub, err := ncl.SubscribeSync("foo.*.baz")
if err != nil {
t.Fatalf("Error subscribing: %v", err)
}
ncl.Flush()
pubAndCheck := func() {
t.Helper()
ncm, err := nats.Connect(hub.ClientURL(), nats.UserInfo("usrA", "usrA"))
if err != nil {
t.Fatalf("Error connecting: %v", err)
}
defer ncm.Close()
// Try a few times in case subject interest has not propagated yet
for i := 0; i < 5; i++ {
ncm.Publish("foo.bar.baz", []byte("msg"))
if _, err := sub.NextMsg(time.Second); err == nil {
// OK, done!
return
}
}
t.Fatal("Message was not received")
}
pubAndCheck()
// Now cause a restart of the accepting side so that the leaf connection
// is recreated.
hub.Shutdown()
hub = RunServer(hubo)
defer hub.Shutdown()
checkLeafNodeConnected(t, hub)
checkLeafNodeConnected(t, leaf)
pubAndCheck()
}
func TestLeafnodeHeaders(t *testing.T) {
srv, opts := runLeafServer()
defer srv.Shutdown()