From 758b733d4333268b33c205366e1f19bc3c487526 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 8 Aug 2022 06:27:50 -0700 Subject: [PATCH] Attempt to improve long RTT catchup time during stream moves. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 14 +- server/jetstream_helpers_test.go | 113 +++++++-- server/jetstream_jwt_test.go | 4 +- server/jetstream_super_cluster_test.go | 317 ++++++++++++++++++++++++- server/jetstream_test.go | 239 ------------------- server/norace_test.go | 4 +- 6 files changed, 416 insertions(+), 275 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index c578b103..4ef6df54 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -6711,6 +6711,15 @@ RETRY: notActive.Reset(getActivityInterval()) mrecs := msgsQ.pop() + + // Send acks first for longer RTT situations. + for _, mreci := range mrecs { + mrec := mreci.(*im) + if mrec.reply != _EMPTY_ { + s.sendInternalMsgLocked(mrec.reply, _EMPTY_, nil, nil) + } + } + for _, mreci := range mrecs { mrec := mreci.(*im) msg := mrec.msg @@ -6754,9 +6763,6 @@ RETRY: } goto RETRY } - if mrec.reply != _EMPTY_ { - s.sendInternalMsgLocked(mrec.reply, _EMPTY_, nil, nil) - } } msgsQ.recycle(&mrecs) case <-notActive.C: @@ -7164,6 +7170,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { } var smv StoreMsg + for ; seq <= last && atomic.LoadInt64(&outb) <= maxOutBytes && atomic.LoadInt32(&outm) <= maxOutMsgs && s.gcbTotal() <= maxTotalCatchupOutBytes; seq++ { sm, err := mset.store.LoadMsg(seq, &smv) // if this is not a deleted msg, bail out. @@ -7197,6 +7204,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { // Skip record for deleted msg. em = encodeStreamMsg(_EMPTY_, _EMPTY_, nil, nil, seq, 0) } + // Place size in reply subject for flow control. l := int64(len(em)) reply := fmt.Sprintf(ackReplyT, l) diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index 6d480223..d3881ee8 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -22,6 +22,7 @@ import ( "fmt" "math/rand" "net" + "net/url" "os" "strings" "testing" @@ -46,6 +47,7 @@ func init() { type supercluster struct { t *testing.T clusters []*cluster + nproxies []*netProxy } func (sc *supercluster) shutdown() { @@ -55,6 +57,9 @@ func (sc *supercluster) shutdown() { for _, c := range sc.clusters { shutdownCluster(c) } + for _, np := range sc.nproxies { + np.stop() + } } func (sc *supercluster) randomServer() *Server { @@ -260,7 +265,11 @@ var jsMixedModeGlobalAccountTempl = ` var jsGWTempl = `%s{name: %s, urls: [%s]}` func createJetStreamTaggedSuperCluster(t *testing.T) *supercluster { - sc := createJetStreamSuperCluster(t, 3, 3) + return createJetStreamTaggedSuperClusterWithGWProxy(t, nil) +} + +func createJetStreamTaggedSuperClusterWithGWProxy(t *testing.T, gwm gwProxyMap) *supercluster { + sc := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 3, 3, nil, gwm) sc.waitOnPeerCount(9) reset := func(s *Server) { @@ -332,10 +341,20 @@ func createJetStreamSuperCluster(t *testing.T, numServersPer, numClusters int) * } func createJetStreamSuperClusterWithTemplate(t *testing.T, tmpl string, numServersPer, numClusters int) *supercluster { - return createJetStreamSuperClusterWithTemplateAndModHook(t, tmpl, numServersPer, numClusters, nil) + return createJetStreamSuperClusterWithTemplateAndModHook(t, tmpl, numServersPer, numClusters, nil, nil) } -func createJetStreamSuperClusterWithTemplateAndModHook(t *testing.T, tmpl string, numServersPer, numClusters int, modify modifyCb) *supercluster { +// For doing proxyies in GWs. +type gwProxy struct { + rtt time.Duration + up int + down int +} + +// Maps cluster names to proxy settings. +type gwProxyMap map[string]*gwProxy + +func createJetStreamSuperClusterWithTemplateAndModHook(t *testing.T, tmpl string, numServersPer, numClusters int, modify modifyCb, gwm gwProxyMap) *supercluster { t.Helper() if numServersPer < 1 { t.Fatalf("Number of servers must be >= 1") @@ -355,19 +374,30 @@ func createJetStreamSuperClusterWithTemplateAndModHook(t *testing.T, tmpl string cp, gp := startClusterPort, startGWPort var clusters []*cluster - + var nproxies []*netProxy var gws []string + // Build GWs first, will be same for all servers. for i, port := 1, gp; i <= numClusters; i++ { cn := fmt.Sprintf("C%d", i) + var gwp *gwProxy + if len(gwm) > 0 { + gwp = gwm[cn] + } var urls []string for n := 0; n < numServersPer; n++ { - urls = append(urls, fmt.Sprintf("nats-route://127.0.0.1:%d", port)) + routeURL := fmt.Sprintf("nats-route://127.0.0.1:%d", port) + if gwp != nil { + np := createNetProxy(gwp.rtt, gwp.up, gwp.down, routeURL, false) + nproxies = append(nproxies, np) + routeURL = np.routeURL() + } + urls = append(urls, routeURL) port++ } gws = append(gws, fmt.Sprintf(jsGWTempl, "\n\t\t\t", cn, strings.Join(urls, ","))) } - gwconf := strings.Join(gws, "") + gwconf := strings.Join(gws, _EMPTY_) for i := 1; i <= numClusters; i++ { cn := fmt.Sprintf("C%d", i) @@ -400,15 +430,20 @@ func createJetStreamSuperClusterWithTemplateAndModHook(t *testing.T, tmpl string c.t = t } + // Start any proxies. + for _, np := range nproxies { + np.start() + } + // Wait for the supercluster to be formed. egws := numClusters - 1 for _, c := range clusters { for _, s := range c.servers { - waitForOutboundGateways(t, s, egws, 2*time.Second) + waitForOutboundGateways(t, s, egws, 10*time.Second) } } - sc := &supercluster{t, clusters} + sc := &supercluster{t, clusters, nproxies} sc.waitOnLeader() sc.waitOnAllCurrent() @@ -1452,38 +1487,68 @@ func (o *consumer) setInActiveDeleteThreshold(dthresh time.Duration) error { type netProxy struct { listener net.Listener conns []net.Conn + rtt time.Duration + up int + down int url string + surl string } func newNetProxy(rtt time.Duration, upRate, downRate int, serverURL string) *netProxy { + return createNetProxy(rtt, upRate, downRate, serverURL, true) +} + +func createNetProxy(rtt time.Duration, upRate, downRate int, serverURL string, start bool) *netProxy { hp := net.JoinHostPort("127.0.0.1", "0") l, e := net.Listen("tcp", hp) if e != nil { panic(fmt.Sprintf("Error listening on port: %s, %q", hp, e)) } port := l.Addr().(*net.TCPAddr).Port - proxy := &netProxy{listener: l} - go func() { - client, err := l.Accept() - if err != nil { - return - } - server, err := net.DialTimeout("tcp", serverURL[7:], time.Second) - if err != nil { - panic("Can't connect to NATS server") - } - proxy.conns = append(proxy.conns, client, server) - go proxy.loop(rtt, upRate, client, server) - go proxy.loop(rtt, downRate, server, client) - }() - proxy.url = fmt.Sprintf("nats://127.0.0.1:%d", port) + proxy := &netProxy{ + listener: l, + rtt: rtt, + up: upRate, + down: downRate, + url: fmt.Sprintf("nats://127.0.0.1:%d", port), + surl: serverURL, + } + if start { + proxy.start() + } return proxy } +func (np *netProxy) start() { + go func() { + for { + client, err := np.listener.Accept() + if err != nil { + return + } + u, err := url.Parse(np.surl) + if err != nil { + panic(fmt.Sprintf("Could not parse server URL: %v", err)) + } + server, err := net.DialTimeout("tcp", u.Host, time.Second) + if err != nil { + panic("Can't connect proxy to NATS server") + } + np.conns = append(np.conns, client, server) + go np.loop(np.rtt, np.up, client, server) + go np.loop(np.rtt, np.down, server, client) + } + }() +} + func (np *netProxy) clientURL() string { return np.url } +func (np *netProxy) routeURL() string { + return strings.Replace(np.url, "nats", "nats-route", 1) +} + func (np *netProxy) loop(rtt time.Duration, tbw int, r, w net.Conn) { delay := rtt / 2 const rbl = 8192 @@ -1499,7 +1564,7 @@ func (np *netProxy) loop(rtt time.Duration, tbw int, r, w net.Conn) { return } // RTT delays - if fr || time.Since(sr) > 2*time.Millisecond { + if fr || time.Since(sr) > 250*time.Millisecond { fr = false if delay > 0 { time.Sleep(delay) diff --git a/server/jetstream_jwt_test.go b/server/jetstream_jwt_test.go index 82bca608..af30e02e 100644 --- a/server/jetstream_jwt_test.go +++ b/server/jetstream_jwt_test.go @@ -326,7 +326,7 @@ func TestJetStreamJWTMove(t *testing.T) { %s : %s } `, clustername, ojwt, syspub, storeDir, syspub, sysJwt) - }) + }, nil) defer sc.shutdown() s := sc.serverByName("C1-S1") @@ -813,7 +813,7 @@ func TestJetStreamJWTSysAccUpdateMixedMode(t *testing.T) { operator: %s system_account: %s resolver: URL("%s%s")`, conf, ojwt, spub, ts.URL, basePath) - }) + }, nil) defer sc.shutdown() disconnectChan := make(chan struct{}, 100) defer close(disconnectChan) diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index a4612620..f6ada776 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -17,6 +17,7 @@ package server import ( + "bytes" "encoding/json" "errors" "fmt" @@ -122,7 +123,7 @@ func TestJetStreamSuperClusterUniquePlacementTag(t *testing.T) { "C2-S5": "az:1", } return conf + fmt.Sprintf("\nserver_tags: [cloud:%s-tag, %s]\n", clustername, azTag[serverName]) - }) + }, nil) defer s.shutdown() inDifferentAz := func(ci *nats.ClusterInfo) (bool, error) { @@ -2328,7 +2329,7 @@ func TestJetStreamSuperClusterMaxHaAssets(t *testing.T) { `, 3, 2, func(serverName, clusterName, storeDir, conf string) string { return conf - }) + }, nil) defer sc.shutdown() // speed up statsz reporting @@ -2657,7 +2658,7 @@ func TestJetStreamSuperClusterMoveCancel(t *testing.T) { func(serverName, clusterName, storeDir, conf string) string { server[serverName] = struct{}{} return fmt.Sprintf("%s\nserver_tags: [%s]", conf, serverName) - }) + }, nil) defer sc.shutdown() // Client based API @@ -2794,7 +2795,7 @@ func TestJetStreamSuperClusterDoubleStreamMove(t *testing.T) { func(serverName, clusterName, storeDir, conf string) string { server[serverName] = struct{}{} return fmt.Sprintf("%s\nserver_tags: [%s]", conf, serverName) - }) + }, nil) defer sc.shutdown() // Client based API @@ -2992,7 +2993,7 @@ func TestJetStreamSuperClusterPeerEvacuationAndStreamReassignment(t *testing.T) s := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 4, 2, func(serverName, clusterName, storeDir, conf string) string { return fmt.Sprintf("%s\nserver_tags: [cluster:%s, server:%s]", conf, clusterName, serverName) - }) + }, nil) defer s.shutdown() c := s.clusterForName("C1") @@ -3223,3 +3224,309 @@ func TestJetStreamSuperClusterMirrorInheritsAllowDirect(t *testing.T) { t.Fatalf("Expected MirrorDirect to be inherited as true") } } + +func TestJetStreamSuperClusterSystemLimitsPlacement(t *testing.T) { + const largeSystemLimit = 1024 + const smallSystemLimit = 512 + + tmpl := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: { + max_mem_store: _MAXMEM_ + max_file_store: _MAXFILE_ + store_dir: '%s', + } + server_tags: [ + _TAG_ + ] + leaf { + listen: 127.0.0.1:-1 + } + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + + accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } } + ` + storeCnf := func(serverName, clusterName, storeDir, conf string) string { + switch { + case strings.HasPrefix(serverName, "C1"): + conf = strings.Replace(conf, "_MAXMEM_", fmt.Sprint(largeSystemLimit), 1) + conf = strings.Replace(conf, "_MAXFILE_", fmt.Sprint(largeSystemLimit), 1) + return strings.Replace(conf, "_TAG_", serverName, 1) + case strings.HasPrefix(serverName, "C2"): + conf = strings.Replace(conf, "_MAXMEM_", fmt.Sprint(smallSystemLimit), 1) + conf = strings.Replace(conf, "_MAXFILE_", fmt.Sprint(smallSystemLimit), 1) + return strings.Replace(conf, "_TAG_", serverName, 1) + default: + return conf + } + } + + sCluster := createJetStreamSuperClusterWithTemplateAndModHook(t, tmpl, 3, 2, storeCnf, nil) + defer sCluster.shutdown() + + requestLeaderStepDown := func(clientURL string) error { + nc, err := nats.Connect(clientURL) + if err != nil { + return err + } + defer nc.Close() + + ncResp, err := nc.Request(JSApiLeaderStepDown, nil, 3*time.Second) + if err != nil { + return err + } + + var resp JSApiLeaderStepDownResponse + if err := json.Unmarshal(ncResp.Data, &resp); err != nil { + return err + } + if resp.Error != nil { + return resp.Error + } + if !resp.Success { + return fmt.Errorf("leader step down request not successful") + } + + return nil + } + + // Force large cluster to be leader + var largeLeader *Server + err := checkForErr(15*time.Second, 500*time.Millisecond, func() error { + // Range over cluster A, which is the large cluster. + servers := sCluster.clusters[0].servers + for _, s := range servers { + if s.JetStreamIsLeader() { + largeLeader = s + return nil + } + } + + if err := requestLeaderStepDown(servers[0].ClientURL()); err != nil { + return fmt.Errorf("failed to request leader step down: %s", err) + } + return fmt.Errorf("leader is not in large cluster") + }) + if err != nil { + t.Skipf("failed to get desired layout: %s", err) + } + + getStreams := func(jsm nats.JetStreamManager) []string { + var streams []string + for s := range jsm.StreamNames() { + streams = append(streams, s) + } + return streams + } + nc, js := jsClientConnect(t, largeLeader) + defer nc.Close() + + cases := []struct { + name string + storage nats.StorageType + createMaxBytes int64 + serverTag string + wantErr bool + }{ + { + name: "file create large stream on small cluster b0", + storage: nats.FileStorage, + createMaxBytes: smallSystemLimit + 1, + serverTag: "C2-S1", + wantErr: true, + }, + { + name: "memory create large stream on small cluster b0", + storage: nats.MemoryStorage, + createMaxBytes: smallSystemLimit + 1, + serverTag: "C2-S1", + wantErr: true, + }, + { + name: "file create large stream on small cluster b1", + storage: nats.FileStorage, + createMaxBytes: smallSystemLimit + 1, + serverTag: "C2-S2", + wantErr: true, + }, + { + name: "memory create large stream on small cluster b1", + storage: nats.MemoryStorage, + createMaxBytes: smallSystemLimit + 1, + serverTag: "C2-S2", + wantErr: true, + }, + { + name: "file create large stream on small cluster b2", + storage: nats.FileStorage, + createMaxBytes: smallSystemLimit + 1, + serverTag: "C2-S3", + wantErr: true, + }, + { + name: "memory create large stream on small cluster b2", + storage: nats.MemoryStorage, + createMaxBytes: smallSystemLimit + 1, + serverTag: "C2-S3", + wantErr: true, + }, + { + name: "file create large stream on large cluster a0", + storage: nats.FileStorage, + createMaxBytes: smallSystemLimit + 1, + serverTag: "C1-S1", + }, + { + name: "memory create large stream on large cluster a0", + storage: nats.MemoryStorage, + createMaxBytes: smallSystemLimit + 1, + serverTag: "C1-S1", + }, + { + name: "file create large stream on large cluster a1", + storage: nats.FileStorage, + createMaxBytes: smallSystemLimit + 1, + serverTag: "C1-S2", + }, + { + name: "memory create large stream on large cluster a1", + storage: nats.MemoryStorage, + createMaxBytes: smallSystemLimit + 1, + serverTag: "C1-S2", + }, + { + name: "file create large stream on large cluster a2", + storage: nats.FileStorage, + createMaxBytes: smallSystemLimit + 1, + serverTag: "C1-S3", + }, + { + name: "memory create large stream on large cluster a2", + storage: nats.MemoryStorage, + createMaxBytes: smallSystemLimit + 1, + serverTag: "C1-S3", + }, + } + for i := 0; i < len(cases) && !t.Failed(); i++ { + c := cases[i] + t.Run(c.name, func(st *testing.T) { + var clusterName string + if strings.HasPrefix(c.serverTag, "a") { + clusterName = "cluster-a" + } else if strings.HasPrefix(c.serverTag, "b") { + clusterName = "cluster-b" + } + + if s := getStreams(js); len(s) != 0 { + st.Fatalf("unexpected stream count, got=%d, want=0", len(s)) + } + + streamName := fmt.Sprintf("TEST-%s", c.serverTag) + si, err := js.AddStream(&nats.StreamConfig{ + Name: streamName, + Subjects: []string{"foo"}, + Storage: c.storage, + MaxBytes: c.createMaxBytes, + Placement: &nats.Placement{ + Cluster: clusterName, + Tags: []string{c.serverTag}, + }, + }) + if c.wantErr && err == nil { + if s := getStreams(js); len(s) != 1 { + st.Logf("unexpected stream count, got=%d, want=1, streams=%v", len(s), s) + } + + cfg := si.Config + st.Fatalf("unexpected success, maxBytes=%d, cluster=%s, tags=%v", + cfg.MaxBytes, cfg.Placement.Cluster, cfg.Placement.Tags) + } else if !c.wantErr && err != nil { + if s := getStreams(js); len(s) != 0 { + st.Logf("unexpected stream count, got=%d, want=0, streams=%v", len(s), s) + } + + require_NoError(st, err) + } + + if err == nil { + if s := getStreams(js); len(s) != 1 { + st.Fatalf("unexpected stream count, got=%d, want=1", len(s)) + } + } + // Delete regardless. + js.DeleteStream(streamName) + }) + } +} + +func TestJetStreamSuperClusterStreamCathupLongRTT(t *testing.T) { + skip(t) + + // Make C2 far away. + gwm := gwProxyMap{ + "C2": &gwProxy{ + rtt: 300 * time.Millisecond, + up: 1 * 1024 * 1024 * 1024, // 1gbit + down: 1 * 1024 * 1024 * 1024, // 1gbit + }, + } + sc := createJetStreamTaggedSuperClusterWithGWProxy(t, gwm) + defer sc.shutdown() + + nc, js := jsClientConnect(t, sc.randomServer()) + defer nc.Close() + + cfg := &nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"chunk.*"}, + Placement: &nats.Placement{Tags: []string{"cloud:aws", "country:us"}}, + Replicas: 3, + MaxMsgsPerSubject: 1, + } + + // Place a stream in C1. + _, err := js.AddStream(cfg) + require_NoError(t, err) + + chunk := bytes.Repeat([]byte("Z"), 1000*1024) // ~1MB + // 256 MB + for i := 0; i < 256; i++ { + subj := fmt.Sprintf("chunk.%d", i) + js.PublishAsync(subj, chunk) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + // C2, slow RTT. + cfg.Placement = &nats.Placement{Tags: []string{"cloud:gcp", "country:uk"}} + _, err = js.UpdateStream(cfg) + require_NoError(t, err) + + checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { + si, err := js.StreamInfo("TEST") + if err != nil { + return err + } + if si.Cluster.Name != "C2" { + return fmt.Errorf("Wrong cluster: %q", si.Cluster.Name) + } + if si.Cluster.Leader == _EMPTY_ { + return fmt.Errorf("No leader yet") + } else if !strings.HasPrefix(si.Cluster.Leader, "C2-") { + return fmt.Errorf("Wrong leader: %q", si.Cluster.Leader) + } + // Now we want to see that we shrink back to original. + if len(si.Cluster.Replicas) != cfg.Replicas-1 { + return fmt.Errorf("Expected %d replicas, got %d", cfg.Replicas-1, len(si.Cluster.Replicas)) + } + return nil + }) +} diff --git a/server/jetstream_test.go b/server/jetstream_test.go index b5116ca8..9331862f 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -7031,245 +7031,6 @@ func TestJetStreamSystemLimitsPlacement(t *testing.T) { } } -func TestJetStreamSuperClusterSystemLimitsPlacement(t *testing.T) { - const largeSystemLimit = 1024 - const smallSystemLimit = 512 - - tmpl := ` - listen: 127.0.0.1:-1 - server_name: %s - jetstream: { - max_mem_store: _MAXMEM_ - max_file_store: _MAXFILE_ - store_dir: '%s', - } - server_tags: [ - _TAG_ - ] - leaf { - listen: 127.0.0.1:-1 - } - cluster { - name: %s - listen: 127.0.0.1:%d - routes = [%s] - } - - accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } } - ` - storeCnf := func(serverName, clusterName, storeDir, conf string) string { - switch { - case strings.HasPrefix(serverName, "C1"): - conf = strings.Replace(conf, "_MAXMEM_", fmt.Sprint(largeSystemLimit), 1) - conf = strings.Replace(conf, "_MAXFILE_", fmt.Sprint(largeSystemLimit), 1) - return strings.Replace(conf, "_TAG_", serverName, 1) - case strings.HasPrefix(serverName, "C2"): - conf = strings.Replace(conf, "_MAXMEM_", fmt.Sprint(smallSystemLimit), 1) - conf = strings.Replace(conf, "_MAXFILE_", fmt.Sprint(smallSystemLimit), 1) - return strings.Replace(conf, "_TAG_", serverName, 1) - default: - return conf - } - } - - sCluster := createJetStreamSuperClusterWithTemplateAndModHook(t, tmpl, 3, 2, storeCnf) - defer sCluster.shutdown() - - requestLeaderStepDown := func(clientURL string) error { - nc, err := nats.Connect(clientURL) - if err != nil { - return err - } - defer nc.Close() - - ncResp, err := nc.Request(JSApiLeaderStepDown, nil, 3*time.Second) - if err != nil { - return err - } - - var resp JSApiLeaderStepDownResponse - if err := json.Unmarshal(ncResp.Data, &resp); err != nil { - return err - } - if resp.Error != nil { - return resp.Error - } - if !resp.Success { - return fmt.Errorf("leader step down request not successful") - } - - return nil - } - - // Force large cluster to be leader - var largeLeader *Server - err := checkForErr(15*time.Second, 500*time.Millisecond, func() error { - // Range over cluster A, which is the large cluster. - servers := sCluster.clusters[0].servers - for _, s := range servers { - if s.JetStreamIsLeader() { - largeLeader = s - return nil - } - } - - if err := requestLeaderStepDown(servers[0].ClientURL()); err != nil { - return fmt.Errorf("failed to request leader step down: %s", err) - } - return fmt.Errorf("leader is not in large cluster") - }) - if err != nil { - t.Skipf("failed to get desired layout: %s", err) - } - - getStreams := func(jsm nats.JetStreamManager) []string { - var streams []string - for s := range jsm.StreamNames() { - streams = append(streams, s) - } - return streams - } - nc, js := jsClientConnect(t, largeLeader) - defer nc.Close() - - cases := []struct { - name string - storage nats.StorageType - createMaxBytes int64 - serverTag string - wantErr bool - }{ - { - name: "file create large stream on small cluster b0", - storage: nats.FileStorage, - createMaxBytes: smallSystemLimit + 1, - serverTag: "C2-S1", - wantErr: true, - }, - { - name: "memory create large stream on small cluster b0", - storage: nats.MemoryStorage, - createMaxBytes: smallSystemLimit + 1, - serverTag: "C2-S1", - wantErr: true, - }, - { - name: "file create large stream on small cluster b1", - storage: nats.FileStorage, - createMaxBytes: smallSystemLimit + 1, - serverTag: "C2-S2", - wantErr: true, - }, - { - name: "memory create large stream on small cluster b1", - storage: nats.MemoryStorage, - createMaxBytes: smallSystemLimit + 1, - serverTag: "C2-S2", - wantErr: true, - }, - { - name: "file create large stream on small cluster b2", - storage: nats.FileStorage, - createMaxBytes: smallSystemLimit + 1, - serverTag: "C2-S3", - wantErr: true, - }, - { - name: "memory create large stream on small cluster b2", - storage: nats.MemoryStorage, - createMaxBytes: smallSystemLimit + 1, - serverTag: "C2-S3", - wantErr: true, - }, - { - name: "file create large stream on large cluster a0", - storage: nats.FileStorage, - createMaxBytes: smallSystemLimit + 1, - serverTag: "C1-S1", - }, - { - name: "memory create large stream on large cluster a0", - storage: nats.MemoryStorage, - createMaxBytes: smallSystemLimit + 1, - serverTag: "C1-S1", - }, - { - name: "file create large stream on large cluster a1", - storage: nats.FileStorage, - createMaxBytes: smallSystemLimit + 1, - serverTag: "C1-S2", - }, - { - name: "memory create large stream on large cluster a1", - storage: nats.MemoryStorage, - createMaxBytes: smallSystemLimit + 1, - serverTag: "C1-S2", - }, - { - name: "file create large stream on large cluster a2", - storage: nats.FileStorage, - createMaxBytes: smallSystemLimit + 1, - serverTag: "C1-S3", - }, - { - name: "memory create large stream on large cluster a2", - storage: nats.MemoryStorage, - createMaxBytes: smallSystemLimit + 1, - serverTag: "C1-S3", - }, - } - for i := 0; i < len(cases) && !t.Failed(); i++ { - c := cases[i] - t.Run(c.name, func(st *testing.T) { - var clusterName string - if strings.HasPrefix(c.serverTag, "a") { - clusterName = "cluster-a" - } else if strings.HasPrefix(c.serverTag, "b") { - clusterName = "cluster-b" - } - - if s := getStreams(js); len(s) != 0 { - st.Fatalf("unexpected stream count, got=%d, want=0", len(s)) - } - - streamName := fmt.Sprintf("TEST-%s", c.serverTag) - si, err := js.AddStream(&nats.StreamConfig{ - Name: streamName, - Subjects: []string{"foo"}, - Storage: c.storage, - MaxBytes: c.createMaxBytes, - Placement: &nats.Placement{ - Cluster: clusterName, - Tags: []string{c.serverTag}, - }, - }) - if c.wantErr && err == nil { - if s := getStreams(js); len(s) != 1 { - st.Logf("unexpected stream count, got=%d, want=1, streams=%v", len(s), s) - } - - cfg := si.Config - st.Fatalf("unexpected success, maxBytes=%d, cluster=%s, tags=%v", - cfg.MaxBytes, cfg.Placement.Cluster, cfg.Placement.Tags) - } else if !c.wantErr && err != nil { - if s := getStreams(js); len(s) != 0 { - st.Logf("unexpected stream count, got=%d, want=0, streams=%v", len(s), s) - } - - require_NoError(st, err) - } - - if err == nil { - if s := getStreams(js); len(s) != 1 { - st.Fatalf("unexpected stream count, got=%d, want=1", len(s)) - } - } - // Delete regardless. - js.DeleteStream(streamName) - }) - } -} - func TestJetStreamStreamLimitUpdate(t *testing.T) { s := RunBasicJetStreamServer() if config := s.JetStreamConfig(); config != nil { diff --git a/server/norace_test.go b/server/norace_test.go index a20ca45b..d33b3159 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -1674,7 +1674,7 @@ func TestNoRaceJetStreamSuperClusterMixedModeMirrors(t *testing.T) { conf = strings.ReplaceAll(conf, "leaf: { ", "#leaf: { ") } return conf - }) + }, nil) defer sc.shutdown() // Connect our client to a non JS server @@ -1978,7 +1978,7 @@ func TestNoRaceJetStreamSuperClusterMixedModeSources(t *testing.T) { conf = strings.ReplaceAll(conf, "leaf: { ", "#leaf: { ") } return conf - }) + }, nil) defer sc.shutdown() // Connect our client to a non JS server