diff --git a/server/accounts.go b/server/accounts.go index 8cd21e12..c3bb0376 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -958,7 +958,7 @@ func (a *Account) isLeafNodeClusterIsolated(cluster string) bool { if len(a.leafClusters) > 1 { return false } - return a.leafClusters[cluster] > 0 + return a.leafClusters[cluster] == uint64(a.nleafs) } // Helper function to remove leaf nodes. If number of leafnodes gets large diff --git a/server/events_test.go b/server/events_test.go index 1c416570..7e572a39 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -436,8 +436,8 @@ func checkLeafNodeConnectedCount(t testing.TB, s *Server, lnCons int) { t.Helper() checkFor(t, 5*time.Second, 15*time.Millisecond, func() error { if nln := s.NumLeafNodes(); nln != lnCons { - return fmt.Errorf("Expected %d connected leafnode(s) for server %q, got %d", - lnCons, s.ID(), nln) + return fmt.Errorf("Expected %d connected leafnode(s) for server %v, got %d", + lnCons, s, nln) } return nil }) diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index ec9ac2fa..dfe75bb1 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -4122,3 +4122,161 @@ func TestJetStreamClusterStaleDirectGetOnRestart(t *testing.T) { t.Fatalf("Expected no errors but got %v", <-errCh) } } + +// This test mimics a user's setup where there is a cloud cluster/domain, and one for eu and ap that are leafnoded into the +// cloud cluster, and one for cn that is leafnoded into the ap cluster. +// We broke basic connectivity in 2.9.17 from publishing in eu for delivery in cn on same account which is daisy chained through ap. +// We will also test cross account delivery in this test as well. +func TestJetStreamClusterLeafnodePlusDaisyChainSetup(t *testing.T) { + var cloudTmpl = ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, domain: CLOUD, store_dir: '%s'} + + leaf { listen: 127.0.0.1:-1 } + + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + + accounts { + F { + jetstream: enabled + users = [ { user: "F", pass: "pass" } ] + exports [ { stream: "F.>" } ] + } + T { + jetstream: enabled + users = [ { user: "T", pass: "pass" } ] + imports [ { stream: { account: F, subject: "F.>"} } ] + } + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } + }` + + // Now create the cloud and make sure we are connected. + // Cloud + c := createJetStreamCluster(t, cloudTmpl, "CLOUD", _EMPTY_, 3, 22020, false) + defer c.shutdown() + + var lnTmpl = ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + + {{leaf}} + + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + + accounts { + F { + jetstream: enabled + users = [ { user: "F", pass: "pass" } ] + exports [ { stream: "F.>" } ] + } + T { + jetstream: enabled + users = [ { user: "T", pass: "pass" } ] + imports [ { stream: { account: F, subject: "F.>"} } ] + } + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } + }` + + var leafFrag = ` + leaf { + listen: 127.0.0.1:-1 + remotes [ { urls: [ %s ], account: "T" }, { urls: [ %s ], account: "F" } ] + }` + + genLeafTmpl := func(tmpl string, c *cluster) string { + t.Helper() + // Create our leafnode cluster template first. + var lnt, lnf []string + for _, s := range c.servers { + if s.ClusterName() != c.name { + continue + } + ln := s.getOpts().LeafNode + lnt = append(lnt, fmt.Sprintf("nats://T:pass@%s:%d", ln.Host, ln.Port)) + lnf = append(lnf, fmt.Sprintf("nats://F:pass@%s:%d", ln.Host, ln.Port)) + } + lntc := strings.Join(lnt, ", ") + lnfc := strings.Join(lnf, ", ") + return strings.Replace(tmpl, "{{leaf}}", fmt.Sprintf(leafFrag, lntc, lnfc), 1) + } + + // Cluster EU + // Domain is "EU' + tmpl := strings.Replace(lnTmpl, "store_dir:", fmt.Sprintf(`domain: "%s", store_dir:`, "EU"), 1) + tmpl = genLeafTmpl(tmpl, c) + lceu := createJetStreamCluster(t, tmpl, "EU", "EU-", 3, 22110, false) + lceu.waitOnClusterReady() + defer lceu.shutdown() + + for _, s := range lceu.servers { + checkLeafNodeConnectedCount(t, s, 2) + } + + // Cluster AP + // Domain is "AP' + tmpl = strings.Replace(lnTmpl, "store_dir:", fmt.Sprintf(`domain: "%s", store_dir:`, "AP"), 1) + tmpl = genLeafTmpl(tmpl, c) + lcap := createJetStreamCluster(t, tmpl, "AP", "AP-", 3, 22180, false) + lcap.waitOnClusterReady() + defer lcap.shutdown() + + for _, s := range lcap.servers { + checkLeafNodeConnectedCount(t, s, 2) + } + + // Cluster CN + // Domain is "CN' + // This one connects to AP, not the cloud hub. + tmpl = strings.Replace(lnTmpl, "store_dir:", fmt.Sprintf(`domain: "%s", store_dir:`, "CN"), 1) + tmpl = genLeafTmpl(tmpl, lcap) + lccn := createJetStreamCluster(t, tmpl, "CN", "CN-", 3, 22280, false) + lccn.waitOnClusterReady() + defer lccn.shutdown() + + for _, s := range lccn.servers { + checkLeafNodeConnectedCount(t, s, 2) + } + + // Now connect to CN on account F and subscribe to data. + nc, _ := jsClientConnect(t, lccn.randomServer(), nats.UserInfo("F", "pass")) + defer nc.Close() + fsub, err := nc.SubscribeSync("F.EU.>") + require_NoError(t, err) + + // Same for account T where the import is. + nc, _ = jsClientConnect(t, lccn.randomServer(), nats.UserInfo("T", "pass")) + defer nc.Close() + tsub, err := nc.SubscribeSync("F.EU.>") + require_NoError(t, err) + + // Let sub propagate. + time.Sleep(500 * time.Millisecond) + + // Now connect to EU on account F and generate data. + nc, _ = jsClientConnect(t, lceu.randomServer(), nats.UserInfo("F", "pass")) + defer nc.Close() + + num := 10 + for i := 0; i < num; i++ { + err := nc.Publish("F.EU.DATA", []byte(fmt.Sprintf("MSG-%d", i))) + require_NoError(t, err) + } + + checkSubsPending(t, fsub, num) + // Since we export and import in each cluster, we will receive 4x. + // First hop from EU -> CLOUD is 1F and 1T + // Second hop from CLOUD -> AP is 1F, 1T and another 1T + // Third hop from AP -> CN is 1F, 1T, 1T and 1T + // Each cluster hop that has the export/import mapping will add another T message copy. + checkSubsPending(t, tsub, num*4) +}