diff --git a/server/leafnode_test.go b/server/leafnode_test.go index b58e2df7..ade5ce85 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -5093,19 +5093,22 @@ func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithStreamImportAccounts(t { stream: { account: STL, subject: "REQUEST"} } { stream: { account: KSC, subject: "REQUEST"} } ] + exports [ { stream: "RESPONSE" } ] } STL { users = [ { user: "stl", pass: "p" } ] exports [ { stream: "REQUEST" } ] + imports [ { stream: { account: EFG, subject: "RESPONSE"} } ] } KSC { users = [ { user: "ksc", pass: "p" } ] exports [ { stream: "REQUEST" } ] + imports [ { stream: { account: EFG, subject: "RESPONSE"} } ] } $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }` - sc := createJetStreamSuperClusterWithTemplate(t, tmpl, 3, 2) + sc := createJetStreamSuperClusterWithTemplate(t, tmpl, 5, 2) defer sc.shutdown() // Now create a leafnode cluster that has 2 LNs, one to each cluster but on separate accounts, ONE and TWO. @@ -5131,7 +5134,7 @@ func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithStreamImportAccounts(t remotes [ { urls: [ %s ] } { urls: [ %s ] } - { urls: [ %s ] ; deny_export: REQUEST} + { urls: [ %s ] ; deny_export: [REQUEST, RESPONSE], deny_import: RESPONSE } ] }` @@ -5339,4 +5342,42 @@ func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithStreamImportAccounts(t return fmt.Errorf("Not all received: %d vs %d", total, num) }) require_True(t, r2.Load() > r1.Load()) + + // Now check opposite flow for responses. + + // Create 10 subscribers. + var rsubs []*nats.Subscription + + for i := 0; i < 10; i++ { + nc, _ := jsClientConnect(t, ln.randomServer()) + defer nc.Close() + sub, err := nc.QueueSubscribeSync("RESPONSE", "SA") + require_NoError(t, err) + nc.Flush() + rsubs = append(rsubs, sub) + } + + nc, _ = jsClientConnect(t, ln.randomServer()) + defer nc.Close() + _, err := nc.SubscribeSync("RESPONSE") + require_NoError(t, err) + nc.Flush() + + // Now connect and send responses from EFG in cloud. + nc, _ = jsClientConnect(t, sc.randomServer(), nats.UserInfo("efg", "p")) + + for i := 0; i < 100; i++ { + require_NoError(t, nc.Publish("RESPONSE", []byte("OK"))) + } + nc.Flush() + + checkAllRespReceived := func() error { + p := pending(rsubs) + if p == 100 { + return nil + } + return fmt.Errorf("Not all responses received: %d vs %d", p, 100) + } + + checkFor(t, time.Second, 200*time.Millisecond, checkAllRespReceived) }