mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
Add large stress test, skipped by default
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1755,3 +1755,161 @@ func TestNoRaceJetStreamClusterSourcesMuxd(t *testing.T) {
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestNoRaceJetStreamClusterSuperClusterRIPStress(t *testing.T) {
|
||||
// Uncomment to run. Needs to be on a big machine.
|
||||
skip(t)
|
||||
|
||||
sc := createJetStreamSuperCluster(t, 3, 3)
|
||||
defer sc.shutdown()
|
||||
|
||||
// Client based API
|
||||
s := sc.clusterForName("C2").randomServer()
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
fmt.Printf("CONNECT is %v\n", s.ClientURL())
|
||||
|
||||
scm := make(map[string][]string)
|
||||
|
||||
// Create 50 streams per cluster.
|
||||
for _, cn := range []string{"C1", "C2", "C3"} {
|
||||
var streams []string
|
||||
for i := 0; i < 50; i++ {
|
||||
sn := fmt.Sprintf("%s-S%d", cn, i+1)
|
||||
streams = append(streams, sn)
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: sn,
|
||||
Replicas: 3,
|
||||
Placement: &nats.Placement{Cluster: cn},
|
||||
MaxAge: 2 * time.Minute,
|
||||
MaxMsgs: 50_000,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
scm[cn] = streams
|
||||
}
|
||||
|
||||
sourceForCluster := func(cn string) []*nats.StreamSource {
|
||||
var sns []string
|
||||
switch cn {
|
||||
case "C1":
|
||||
sns = scm["C2"]
|
||||
case "C2":
|
||||
sns = scm["C3"]
|
||||
case "C3":
|
||||
sns = scm["C1"]
|
||||
default:
|
||||
t.Fatalf("Unknown cluster %q", cn)
|
||||
}
|
||||
var ss []*nats.StreamSource
|
||||
for _, sn := range sns {
|
||||
ss = append(ss, &nats.StreamSource{Name: sn})
|
||||
}
|
||||
return ss
|
||||
}
|
||||
|
||||
// Mux all 50 streams from one cluster to a single stream across a GW connection to another cluster.
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "C1-S-MUX",
|
||||
Replicas: 2,
|
||||
Placement: &nats.Placement{Cluster: "C1"},
|
||||
Sources: sourceForCluster("C2"),
|
||||
MaxAge: time.Minute,
|
||||
MaxMsgs: 20_000,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
_, err = js.AddStream(&nats.StreamConfig{
|
||||
Name: "C2-S-MUX",
|
||||
Replicas: 2,
|
||||
Placement: &nats.Placement{Cluster: "C2"},
|
||||
Sources: sourceForCluster("C3"),
|
||||
MaxAge: time.Minute,
|
||||
MaxMsgs: 20_000,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
_, err = js.AddStream(&nats.StreamConfig{
|
||||
Name: "C3-S-MUX",
|
||||
Replicas: 2,
|
||||
Placement: &nats.Placement{Cluster: "C3"},
|
||||
Sources: sourceForCluster("C1"),
|
||||
MaxAge: time.Minute,
|
||||
MaxMsgs: 20_000,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// Now create mirrors for our mux'd streams.
|
||||
_, err = js.AddStream(&nats.StreamConfig{
|
||||
Name: "C1-MIRROR",
|
||||
Replicas: 3,
|
||||
Placement: &nats.Placement{Cluster: "C1"},
|
||||
Mirror: &nats.StreamSource{Name: "C3-S-MUX"},
|
||||
MaxMsgs: 10_000,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
_, err = js.AddStream(&nats.StreamConfig{
|
||||
Name: "C2-MIRROR",
|
||||
Replicas: 3,
|
||||
Placement: &nats.Placement{Cluster: "C2"},
|
||||
Mirror: &nats.StreamSource{Name: "C2-S-MUX"},
|
||||
MaxMsgs: 10_000,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
_, err = js.AddStream(&nats.StreamConfig{
|
||||
Name: "C3-MIRROR",
|
||||
Replicas: 3,
|
||||
Placement: &nats.Placement{Cluster: "C3"},
|
||||
Mirror: &nats.StreamSource{Name: "C1-S-MUX"},
|
||||
MaxMsgs: 10_000,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
var jsc []nats.JetStream
|
||||
|
||||
// Create 16 clients.
|
||||
for i := 0; i < 16; i++ {
|
||||
s := sc.randomCluster().randomServer()
|
||||
nc, _ := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
js, err := nc.JetStream(nats.PublishAsyncMaxPending(8 * 1024))
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
jsc = append(jsc, js)
|
||||
}
|
||||
|
||||
msg := make([]byte, 1024)
|
||||
rand.Read(msg)
|
||||
|
||||
expires := time.Now().Add(300 * time.Second)
|
||||
for time.Now().Before(expires) {
|
||||
for _, sns := range scm {
|
||||
rand.Shuffle(len(sns), func(i, j int) { sns[i], sns[j] = sns[j], sns[i] })
|
||||
for _, sn := range sns {
|
||||
js := jsc[rand.Intn(len(jsc))]
|
||||
if _, err = js.PublishAsync(sn, msg); err != nil {
|
||||
t.Fatalf("Unexpected publish error: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user