When we were optimizing for single cluster but large number of leafnodes we inadvertently broke a daisy chained scenarion where a server was a spoke and a hub with a single hub cluster.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-06-02 15:00:15 -07:00
parent 25ad3cd4af
commit 1bce79750e
3 changed files with 161 additions and 3 deletions

View File

@@ -4122,3 +4122,161 @@ func TestJetStreamClusterStaleDirectGetOnRestart(t *testing.T) {
t.Fatalf("Expected no errors but got %v", <-errCh)
}
}
// This test mimics a user's setup where there is a cloud cluster/domain, and one for eu and ap that are leafnoded into the
// cloud cluster, and one for cn that is leafnoded into the ap cluster.
// We broke basic connectivity in 2.9.17 from publishing in eu for delivery in cn on same account which is daisy chained through ap.
// We will also test cross account delivery in this test as well.
func TestJetStreamClusterLeafnodePlusDaisyChainSetup(t *testing.T) {
var cloudTmpl = `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, domain: CLOUD, store_dir: '%s'}
leaf { listen: 127.0.0.1:-1 }
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
accounts {
F {
jetstream: enabled
users = [ { user: "F", pass: "pass" } ]
exports [ { stream: "F.>" } ]
}
T {
jetstream: enabled
users = [ { user: "T", pass: "pass" } ]
imports [ { stream: { account: F, subject: "F.>"} } ]
}
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}`
// Now create the cloud and make sure we are connected.
// Cloud
c := createJetStreamCluster(t, cloudTmpl, "CLOUD", _EMPTY_, 3, 22020, false)
defer c.shutdown()
var lnTmpl = `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
{{leaf}}
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
accounts {
F {
jetstream: enabled
users = [ { user: "F", pass: "pass" } ]
exports [ { stream: "F.>" } ]
}
T {
jetstream: enabled
users = [ { user: "T", pass: "pass" } ]
imports [ { stream: { account: F, subject: "F.>"} } ]
}
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}`
var leafFrag = `
leaf {
listen: 127.0.0.1:-1
remotes [ { urls: [ %s ], account: "T" }, { urls: [ %s ], account: "F" } ]
}`
genLeafTmpl := func(tmpl string, c *cluster) string {
t.Helper()
// Create our leafnode cluster template first.
var lnt, lnf []string
for _, s := range c.servers {
if s.ClusterName() != c.name {
continue
}
ln := s.getOpts().LeafNode
lnt = append(lnt, fmt.Sprintf("nats://T:pass@%s:%d", ln.Host, ln.Port))
lnf = append(lnf, fmt.Sprintf("nats://F:pass@%s:%d", ln.Host, ln.Port))
}
lntc := strings.Join(lnt, ", ")
lnfc := strings.Join(lnf, ", ")
return strings.Replace(tmpl, "{{leaf}}", fmt.Sprintf(leafFrag, lntc, lnfc), 1)
}
// Cluster EU
// Domain is "EU'
tmpl := strings.Replace(lnTmpl, "store_dir:", fmt.Sprintf(`domain: "%s", store_dir:`, "EU"), 1)
tmpl = genLeafTmpl(tmpl, c)
lceu := createJetStreamCluster(t, tmpl, "EU", "EU-", 3, 22110, false)
lceu.waitOnClusterReady()
defer lceu.shutdown()
for _, s := range lceu.servers {
checkLeafNodeConnectedCount(t, s, 2)
}
// Cluster AP
// Domain is "AP'
tmpl = strings.Replace(lnTmpl, "store_dir:", fmt.Sprintf(`domain: "%s", store_dir:`, "AP"), 1)
tmpl = genLeafTmpl(tmpl, c)
lcap := createJetStreamCluster(t, tmpl, "AP", "AP-", 3, 22180, false)
lcap.waitOnClusterReady()
defer lcap.shutdown()
for _, s := range lcap.servers {
checkLeafNodeConnectedCount(t, s, 2)
}
// Cluster CN
// Domain is "CN'
// This one connects to AP, not the cloud hub.
tmpl = strings.Replace(lnTmpl, "store_dir:", fmt.Sprintf(`domain: "%s", store_dir:`, "CN"), 1)
tmpl = genLeafTmpl(tmpl, lcap)
lccn := createJetStreamCluster(t, tmpl, "CN", "CN-", 3, 22280, false)
lccn.waitOnClusterReady()
defer lccn.shutdown()
for _, s := range lccn.servers {
checkLeafNodeConnectedCount(t, s, 2)
}
// Now connect to CN on account F and subscribe to data.
nc, _ := jsClientConnect(t, lccn.randomServer(), nats.UserInfo("F", "pass"))
defer nc.Close()
fsub, err := nc.SubscribeSync("F.EU.>")
require_NoError(t, err)
// Same for account T where the import is.
nc, _ = jsClientConnect(t, lccn.randomServer(), nats.UserInfo("T", "pass"))
defer nc.Close()
tsub, err := nc.SubscribeSync("F.EU.>")
require_NoError(t, err)
// Let sub propagate.
time.Sleep(500 * time.Millisecond)
// Now connect to EU on account F and generate data.
nc, _ = jsClientConnect(t, lceu.randomServer(), nats.UserInfo("F", "pass"))
defer nc.Close()
num := 10
for i := 0; i < num; i++ {
err := nc.Publish("F.EU.DATA", []byte(fmt.Sprintf("MSG-%d", i)))
require_NoError(t, err)
}
checkSubsPending(t, fsub, num)
// Since we export and import in each cluster, we will receive 4x.
// First hop from EU -> CLOUD is 1F and 1T
// Second hop from CLOUD -> AP is 1F, 1T and another 1T
// Third hop from AP -> CN is 1F, 1T, 1T and 1T
// Each cluster hop that has the export/import mapping will add another T message copy.
checkSubsPending(t, tsub, num*4)
}