mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-14 10:10:42 -07:00
Updated Leafnode Test (#4283)
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -5093,19 +5093,22 @@ func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithStreamImportAccounts(t
|
||||
{ stream: { account: STL, subject: "REQUEST"} }
|
||||
{ stream: { account: KSC, subject: "REQUEST"} }
|
||||
]
|
||||
exports [ { stream: "RESPONSE" } ]
|
||||
}
|
||||
STL {
|
||||
users = [ { user: "stl", pass: "p" } ]
|
||||
exports [ { stream: "REQUEST" } ]
|
||||
imports [ { stream: { account: EFG, subject: "RESPONSE"} } ]
|
||||
}
|
||||
KSC {
|
||||
users = [ { user: "ksc", pass: "p" } ]
|
||||
exports [ { stream: "REQUEST" } ]
|
||||
imports [ { stream: { account: EFG, subject: "RESPONSE"} } ]
|
||||
}
|
||||
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
|
||||
}`
|
||||
|
||||
sc := createJetStreamSuperClusterWithTemplate(t, tmpl, 3, 2)
|
||||
sc := createJetStreamSuperClusterWithTemplate(t, tmpl, 5, 2)
|
||||
defer sc.shutdown()
|
||||
|
||||
// Now create a leafnode cluster that has 2 LNs, one to each cluster but on separate accounts, ONE and TWO.
|
||||
@@ -5131,7 +5134,7 @@ func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithStreamImportAccounts(t
|
||||
remotes [
|
||||
{ urls: [ %s ] }
|
||||
{ urls: [ %s ] }
|
||||
{ urls: [ %s ] ; deny_export: REQUEST}
|
||||
{ urls: [ %s ] ; deny_export: [REQUEST, RESPONSE], deny_import: RESPONSE }
|
||||
]
|
||||
}`
|
||||
|
||||
@@ -5339,4 +5342,42 @@ func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithStreamImportAccounts(t
|
||||
return fmt.Errorf("Not all received: %d vs %d", total, num)
|
||||
})
|
||||
require_True(t, r2.Load() > r1.Load())
|
||||
|
||||
// Now check opposite flow for responses.
|
||||
|
||||
// Create 10 subscribers.
|
||||
var rsubs []*nats.Subscription
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
nc, _ := jsClientConnect(t, ln.randomServer())
|
||||
defer nc.Close()
|
||||
sub, err := nc.QueueSubscribeSync("RESPONSE", "SA")
|
||||
require_NoError(t, err)
|
||||
nc.Flush()
|
||||
rsubs = append(rsubs, sub)
|
||||
}
|
||||
|
||||
nc, _ = jsClientConnect(t, ln.randomServer())
|
||||
defer nc.Close()
|
||||
_, err := nc.SubscribeSync("RESPONSE")
|
||||
require_NoError(t, err)
|
||||
nc.Flush()
|
||||
|
||||
// Now connect and send responses from EFG in cloud.
|
||||
nc, _ = jsClientConnect(t, sc.randomServer(), nats.UserInfo("efg", "p"))
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
require_NoError(t, nc.Publish("RESPONSE", []byte("OK")))
|
||||
}
|
||||
nc.Flush()
|
||||
|
||||
checkAllRespReceived := func() error {
|
||||
p := pending(rsubs)
|
||||
if p == 100 {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("Not all responses received: %d vs %d", p, 100)
|
||||
}
|
||||
|
||||
checkFor(t, time.Second, 200*time.Millisecond, checkAllRespReceived)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user