diff --git a/server/norace_test.go b/server/norace_test.go index fba30873..48c45f13 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -1755,3 +1755,161 @@ func TestNoRaceJetStreamClusterSourcesMuxd(t *testing.T) { }) } + +func TestNoRaceJetStreamClusterSuperClusterRIPStress(t *testing.T) { + // Uncomment to run. Needs to be on a big machine. + skip(t) + + sc := createJetStreamSuperCluster(t, 3, 3) + defer sc.shutdown() + + // Client based API + s := sc.clusterForName("C2").randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + fmt.Printf("CONNECT is %v\n", s.ClientURL()) + + scm := make(map[string][]string) + + // Create 50 streams per cluster. + for _, cn := range []string{"C1", "C2", "C3"} { + var streams []string + for i := 0; i < 50; i++ { + sn := fmt.Sprintf("%s-S%d", cn, i+1) + streams = append(streams, sn) + _, err := js.AddStream(&nats.StreamConfig{ + Name: sn, + Replicas: 3, + Placement: &nats.Placement{Cluster: cn}, + MaxAge: 2 * time.Minute, + MaxMsgs: 50_000, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } + scm[cn] = streams + } + + sourceForCluster := func(cn string) []*nats.StreamSource { + var sns []string + switch cn { + case "C1": + sns = scm["C2"] + case "C2": + sns = scm["C3"] + case "C3": + sns = scm["C1"] + default: + t.Fatalf("Unknown cluster %q", cn) + } + var ss []*nats.StreamSource + for _, sn := range sns { + ss = append(ss, &nats.StreamSource{Name: sn}) + } + return ss + } + + // Mux all 50 streams from one cluster to a single stream across a GW connection to another cluster. + _, err := js.AddStream(&nats.StreamConfig{ + Name: "C1-S-MUX", + Replicas: 2, + Placement: &nats.Placement{Cluster: "C1"}, + Sources: sourceForCluster("C2"), + MaxAge: time.Minute, + MaxMsgs: 20_000, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "C2-S-MUX", + Replicas: 2, + Placement: &nats.Placement{Cluster: "C2"}, + Sources: sourceForCluster("C3"), + MaxAge: time.Minute, + MaxMsgs: 20_000, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "C3-S-MUX", + Replicas: 2, + Placement: &nats.Placement{Cluster: "C3"}, + Sources: sourceForCluster("C1"), + MaxAge: time.Minute, + MaxMsgs: 20_000, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Now create mirrors for our mux'd streams. + _, err = js.AddStream(&nats.StreamConfig{ + Name: "C1-MIRROR", + Replicas: 3, + Placement: &nats.Placement{Cluster: "C1"}, + Mirror: &nats.StreamSource{Name: "C3-S-MUX"}, + MaxMsgs: 10_000, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "C2-MIRROR", + Replicas: 3, + Placement: &nats.Placement{Cluster: "C2"}, + Mirror: &nats.StreamSource{Name: "C2-S-MUX"}, + MaxMsgs: 10_000, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "C3-MIRROR", + Replicas: 3, + Placement: &nats.Placement{Cluster: "C3"}, + Mirror: &nats.StreamSource{Name: "C1-S-MUX"}, + MaxMsgs: 10_000, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + var jsc []nats.JetStream + + // Create 16 clients. + for i := 0; i < 16; i++ { + s := sc.randomCluster().randomServer() + nc, _ := jsClientConnect(t, s) + defer nc.Close() + js, err := nc.JetStream(nats.PublishAsyncMaxPending(8 * 1024)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + jsc = append(jsc, js) + } + + msg := make([]byte, 1024) + rand.Read(msg) + + expires := time.Now().Add(300 * time.Second) + for time.Now().Before(expires) { + for _, sns := range scm { + rand.Shuffle(len(sns), func(i, j int) { sns[i], sns[j] = sns[j], sns[i] }) + for _, sn := range sns { + js := jsc[rand.Intn(len(jsc))] + if _, err = js.PublishAsync(sn, msg); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + } + } + time.Sleep(50 * time.Millisecond) + } +}