diff --git a/server/jetstream_benchmark_test.go b/server/jetstream_benchmark_test.go index da42b607..5d6ec571 100644 --- a/server/jetstream_benchmark_test.go +++ b/server/jetstream_benchmark_test.go @@ -305,23 +305,9 @@ func BenchmarkJetStreamConsume(b *testing.B) { if verbose { b.Logf("Setting up %d nodes", bc.clusterSize) } - var ( - connectURL string - cl *cluster - ) - if bc.clusterSize == 1 { - s := RunBasicJetStreamServer(b) - defer s.Shutdown() - connectURL = s.ClientURL() - } else { - cl = createJetStreamClusterExplicit(b, "BENCH_PUB", bc.clusterSize) - defer cl.shutdown() - cl.waitOnClusterReadyWithNumPeers(bc.clusterSize) - cl.waitOnLeader() - connectURL = cl.leader().ClientURL() - } - nc, js := jsClientConnectURL(b, connectURL) + cl, _, shutdown, nc, js := startJSClusterAndConnect(b, bc.clusterSize) + defer shutdown() defer nc.Close() if verbose { @@ -336,9 +322,9 @@ func BenchmarkJetStreamConsume(b *testing.B) { b.Fatalf("Error creating stream: %v", err) } - // cluster_size > 1, connect to stream leader - if cl != nil { - connectURL = cl.streamLeader("$G", streamName).ClientURL() + // If replicated resource, connect to stream leader for lower variability + if bc.replicas > 1 { + connectURL := cl.streamLeader("$G", streamName).ClientURL() nc.Close() _, js = jsClientConnectURL(b, connectURL) } @@ -576,27 +562,9 @@ func BenchmarkJetStreamPublish(b *testing.B) { if verbose { b.Logf("Setting up %d nodes", bc.clusterSize) } - var ( - connectURL string - cl *cluster - ) - if bc.clusterSize == 1 { - s := RunBasicJetStreamServer(b) - defer s.Shutdown() - connectURL = s.ClientURL() - } else { - cl = createJetStreamClusterExplicit(b, "BENCH_PUB", bc.clusterSize) - defer cl.shutdown() - cl.waitOnClusterReadyWithNumPeers(bc.clusterSize) - cl.waitOnLeader() - connectURL = cl.leader().ClientURL() - } - - nc, err := nats.Connect(connectURL) - if err != nil { - b.Fatalf("Failed to create client: %v", err) - } + cl, _, shutdown, nc, _ := startJSClusterAndConnect(b, bc.clusterSize) + defer shutdown() defer nc.Close() jsOpts := []nats.JSOpt{ @@ -624,9 +592,9 @@ func BenchmarkJetStreamPublish(b *testing.B) { b.Fatalf("Error creating stream: %v", err) } - // cluster_size > 1, connect to stream leader - if cl != nil { - connectURL = cl.streamLeader("$G", streamName).ClientURL() + // If replicated resource, connect to stream leader for lower variability + if bc.replicas > 1 { + connectURL := cl.streamLeader("$G", streamName).ClientURL() nc.Close() nc, err = nats.Connect(connectURL) if err != nil { @@ -637,7 +605,6 @@ func BenchmarkJetStreamPublish(b *testing.B) { if err != nil { b.Fatalf("Unexpected error getting JetStream context for stream leader: %v", err) } - } if verbose { @@ -738,47 +705,6 @@ func BenchmarkJetStreamInterestStreamWithLimit(b *testing.B) { }, } - // Helper: Stand up in-process single node or cluster - setupCluster := func(b *testing.B, clusterSize int) (string, func()) { - var connectURL string - var shutdownFunc func() - - if clusterSize == 1 { - s := RunBasicJetStreamServer(b) - shutdownFunc = s.Shutdown - connectURL = s.ClientURL() - } else { - cl := createJetStreamClusterExplicit(b, "BENCH_PUB", clusterSize) - shutdownFunc = cl.shutdown - cl.waitOnClusterReadyWithNumPeers(clusterSize) - cl.waitOnLeader() - connectURL = cl.leader().ClientURL() - } - - return connectURL, shutdownFunc - } - - // Helper: Create the stream - setupStream := func(b *testing.B, connectURL string, streamConfig *nats.StreamConfig) { - // Connect - nc, err := nats.Connect(connectURL) - if err != nil { - b.Fatalf("Failed to create client: %v", err) - } - defer nc.Close() - - jsOpts := []nats.JSOpt{} - - js, err := nc.JetStream(jsOpts...) - if err != nil { - b.Fatalf("Unexpected error getting JetStream context: %v", err) - } - - if _, err := js.AddStream(streamConfig); err != nil { - b.Fatalf("Error creating stream: %v", err) - } - } - // Context shared by publishers routines type PublishersContext = struct { readyWg sync.WaitGroup @@ -877,8 +803,9 @@ func BenchmarkJetStreamInterestStreamWithLimit(b *testing.B) { } // Setup server or cluster - connectURL, shutdownFunc := setupCluster(b, benchmarkCase.clusterSize) - defer shutdownFunc() + cl, ls, shutdown, nc, js := startJSClusterAndConnect(b, benchmarkCase.clusterSize) + defer shutdown() + defer nc.Close() // Common stream configuration streamConfig := &nats.StreamConfig{ @@ -891,8 +818,11 @@ func BenchmarkJetStreamInterestStreamWithLimit(b *testing.B) { } // Configure stream limit limitConfigFunc(streamConfig) + // Create stream - setupStream(b, connectURL, streamConfig) + if _, err := js.AddStream(streamConfig); err != nil { + b.Fatalf("Error creating stream: %v", err) + } // Set up publishers shared context var pubCtx PublishersContext @@ -903,6 +833,12 @@ func BenchmarkJetStreamInterestStreamWithLimit(b *testing.B) { pubCtx.lock.Lock() pubCtx.messagesLeft = b.N + connectURL := ls.ClientURL() + // If replicated resource, connect to stream leader for lower variability + if benchmarkCase.replicas > 1 { + connectURL = cl.streamLeader("$G", "S").ClientURL() + } + // Spawn publishers routines, each with its own connection and JS context for i := 0; i < numPublishers; i++ { nc, err := nats.Connect(connectURL) @@ -1093,24 +1029,6 @@ func BenchmarkJetStreamKV(b *testing.B) { if verbose { b.Logf("Setting up %d nodes", bc.clusterSize) } - var ( - connectURL string - cl *cluster - ) - if bc.clusterSize == 1 { - s := RunBasicJetStreamServer(b) - defer s.Shutdown() - connectURL = s.ClientURL() - } else { - cl = createJetStreamClusterExplicit(b, "BENCH_KV", bc.clusterSize) - defer cl.shutdown() - cl.waitOnClusterReadyWithNumPeers(bc.clusterSize) - cl.waitOnLeader() - connectURL = cl.leader().ClientURL() - } - - nc, js := jsClientConnectURL(b, connectURL) - defer nc.Close() // Pre-generate all keys keys := make([]string, 0, bc.numKeys) @@ -1119,6 +1037,11 @@ func BenchmarkJetStreamKV(b *testing.B) { keys = append(keys, key) } + // Setup server or cluster + cl, _, shutdown, nc, js := startJSClusterAndConnect(b, bc.clusterSize) + defer shutdown() + defer nc.Close() + // Create bucket if verbose { b.Logf("Creating KV %s with R=%d", kvName, bc.replicas) @@ -1143,13 +1066,14 @@ func BenchmarkJetStreamKV(b *testing.B) { } } - // if cluster_size > 1, connect to stream leader of bucket - if cl != nil { + // If replicated resource, connect to stream leader for lower variability + if bc.replicas > 1 { nc.Close() - - connectURL = cl.streamLeader("$G", fmt.Sprintf("KV_%s", kvName)).ClientURL() - _, js = jsClientConnectURL(b, connectURL) + connectURL := cl.streamLeader("$G", fmt.Sprintf("KV_%s", kvName)).ClientURL() + nc, js = jsClientConnectURL(b, connectURL) + defer nc.Close() } + kv, err = js.KeyValue(kv.Bucket()) if err != nil { b.Fatalf("Error binding to KV: %v", err) @@ -1264,7 +1188,6 @@ func BenchmarkJetStreamObjStore(b *testing.B) { minObjSz int maxObjSz int }{ - // TODO remove duplicates and fix comments {nats.MemoryStorage, 100, 1024, 102400}, // mem storage, 100 objects sized (1KB-100KB) {nats.MemoryStorage, 100, 102400, 1048576}, // mem storage, 100 objects sized (100KB-1MB) {nats.MemoryStorage, 1000, 10240, 102400}, // mem storage, 1k objects of various size (10KB - 100KB) @@ -1273,7 +1196,6 @@ func BenchmarkJetStreamObjStore(b *testing.B) { {nats.FileStorage, 100, 102400, 1048576}, // file storage, 100 objects sized (100KB-1MB) {nats.FileStorage, 100, 1048576, 10485760}, // file storage, 100 objects sized (1MB-10MB) {nats.FileStorage, 10, 10485760, 104857600}, // file storage, 10 objects sized (10MB-100MB) - } var ( @@ -1306,23 +1228,10 @@ func BenchmarkJetStreamObjStore(b *testing.B) { if verbose { b.Logf("Setting up %d nodes", replicas) } - var ( - connectURL string - cl *cluster - ) - if clusterSize == 1 { - s := RunBasicJetStreamServer(b) - defer s.Shutdown() - connectURL = s.ClientURL() - } else { - cl = createJetStreamClusterExplicit(b, "BENCH_OBJ_STORE", clusterSize) - defer cl.shutdown() - cl.waitOnClusterReadyWithNumPeers(replicas) - cl.waitOnLeader() - // connect to leader and not replicas - connectURL = cl.leader().ClientURL() - } - nc, js := jsClientConnectURL(b, connectURL) + + // Setup server or cluster + cl, _, shutdown, nc, js := startJSClusterAndConnect(b, clusterSize) + defer shutdown() defer nc.Close() // Initialize object store @@ -1339,10 +1248,10 @@ func BenchmarkJetStreamObjStore(b *testing.B) { b.Fatalf("Error creating ObjectStore: %v", err) } - // if cluster_size > 1, connect to stream leader - if cl != nil { + // If replicated resource, connect to stream leader for lower variability + if clusterSize > 1 { nc.Close() - connectURL = cl.streamLeader("$G", fmt.Sprintf("OBJ_%s", objStoreName)).ClientURL() + connectURL := cl.streamLeader("$G", fmt.Sprintf("OBJ_%s", objStoreName)).ClientURL() nc, js := jsClientConnectURL(b, connectURL) defer nc.Close() objStore, err = js.ObjectStore(objStoreName) @@ -1380,9 +1289,41 @@ func BenchmarkJetStreamObjStore(b *testing.B) { } }, ) - } }, ) } } + +// Helper function to stand up a JS-enabled single server or cluster +func startJSClusterAndConnect(b *testing.B, clusterSize int) (c *cluster, s *Server, shutdown func(), nc *nats.Conn, js nats.JetStreamContext) { + b.Helper() + var err error + + if clusterSize == 1 { + s = RunBasicJetStreamServer(b) + shutdown = func() { + s.Shutdown() + } + } else { + c = createJetStreamClusterExplicit(b, "BENCH_PUB", clusterSize) + c.waitOnClusterReadyWithNumPeers(clusterSize) + c.waitOnLeader() + s = c.leader() + shutdown = func() { + c.shutdown() + } + } + + nc, err = nats.Connect(s.ClientURL()) + if err != nil { + b.Fatalf("failed to connect: %s", err) + } + + js, err = nc.JetStream() + if err != nil { + b.Fatalf("failed to init jetstream: %s", err) + } + + return c, s, shutdown, nc, js +}