Refactor cluster creation for JS benchmarks

This commit is contained in:
Marco Primi
2023-09-26 14:54:06 -07:00
parent be106d1ee5
commit d31236cea2

View File

@@ -305,23 +305,9 @@ func BenchmarkJetStreamConsume(b *testing.B) {
if verbose {
b.Logf("Setting up %d nodes", bc.clusterSize)
}
var (
connectURL string
cl *cluster
)
if bc.clusterSize == 1 {
s := RunBasicJetStreamServer(b)
defer s.Shutdown()
connectURL = s.ClientURL()
} else {
cl = createJetStreamClusterExplicit(b, "BENCH_PUB", bc.clusterSize)
defer cl.shutdown()
cl.waitOnClusterReadyWithNumPeers(bc.clusterSize)
cl.waitOnLeader()
connectURL = cl.leader().ClientURL()
}
nc, js := jsClientConnectURL(b, connectURL)
cl, _, shutdown, nc, js := startJSClusterAndConnect(b, bc.clusterSize)
defer shutdown()
defer nc.Close()
if verbose {
@@ -336,9 +322,9 @@ func BenchmarkJetStreamConsume(b *testing.B) {
b.Fatalf("Error creating stream: %v", err)
}
// cluster_size > 1, connect to stream leader
if cl != nil {
connectURL = cl.streamLeader("$G", streamName).ClientURL()
// If replicated resource, connect to stream leader for lower variability
if bc.replicas > 1 {
connectURL := cl.streamLeader("$G", streamName).ClientURL()
nc.Close()
_, js = jsClientConnectURL(b, connectURL)
}
@@ -576,27 +562,9 @@ func BenchmarkJetStreamPublish(b *testing.B) {
if verbose {
b.Logf("Setting up %d nodes", bc.clusterSize)
}
var (
connectURL string
cl *cluster
)
if bc.clusterSize == 1 {
s := RunBasicJetStreamServer(b)
defer s.Shutdown()
connectURL = s.ClientURL()
} else {
cl = createJetStreamClusterExplicit(b, "BENCH_PUB", bc.clusterSize)
defer cl.shutdown()
cl.waitOnClusterReadyWithNumPeers(bc.clusterSize)
cl.waitOnLeader()
connectURL = cl.leader().ClientURL()
}
nc, err := nats.Connect(connectURL)
if err != nil {
b.Fatalf("Failed to create client: %v", err)
}
cl, _, shutdown, nc, _ := startJSClusterAndConnect(b, bc.clusterSize)
defer shutdown()
defer nc.Close()
jsOpts := []nats.JSOpt{
@@ -624,9 +592,9 @@ func BenchmarkJetStreamPublish(b *testing.B) {
b.Fatalf("Error creating stream: %v", err)
}
// cluster_size > 1, connect to stream leader
if cl != nil {
connectURL = cl.streamLeader("$G", streamName).ClientURL()
// If replicated resource, connect to stream leader for lower variability
if bc.replicas > 1 {
connectURL := cl.streamLeader("$G", streamName).ClientURL()
nc.Close()
nc, err = nats.Connect(connectURL)
if err != nil {
@@ -637,7 +605,6 @@ func BenchmarkJetStreamPublish(b *testing.B) {
if err != nil {
b.Fatalf("Unexpected error getting JetStream context for stream leader: %v", err)
}
}
if verbose {
@@ -738,47 +705,6 @@ func BenchmarkJetStreamInterestStreamWithLimit(b *testing.B) {
},
}
// Helper: Stand up in-process single node or cluster
setupCluster := func(b *testing.B, clusterSize int) (string, func()) {
var connectURL string
var shutdownFunc func()
if clusterSize == 1 {
s := RunBasicJetStreamServer(b)
shutdownFunc = s.Shutdown
connectURL = s.ClientURL()
} else {
cl := createJetStreamClusterExplicit(b, "BENCH_PUB", clusterSize)
shutdownFunc = cl.shutdown
cl.waitOnClusterReadyWithNumPeers(clusterSize)
cl.waitOnLeader()
connectURL = cl.leader().ClientURL()
}
return connectURL, shutdownFunc
}
// Helper: Create the stream
setupStream := func(b *testing.B, connectURL string, streamConfig *nats.StreamConfig) {
// Connect
nc, err := nats.Connect(connectURL)
if err != nil {
b.Fatalf("Failed to create client: %v", err)
}
defer nc.Close()
jsOpts := []nats.JSOpt{}
js, err := nc.JetStream(jsOpts...)
if err != nil {
b.Fatalf("Unexpected error getting JetStream context: %v", err)
}
if _, err := js.AddStream(streamConfig); err != nil {
b.Fatalf("Error creating stream: %v", err)
}
}
// Context shared by publishers routines
type PublishersContext = struct {
readyWg sync.WaitGroup
@@ -877,8 +803,9 @@ func BenchmarkJetStreamInterestStreamWithLimit(b *testing.B) {
}
// Setup server or cluster
connectURL, shutdownFunc := setupCluster(b, benchmarkCase.clusterSize)
defer shutdownFunc()
cl, ls, shutdown, nc, js := startJSClusterAndConnect(b, benchmarkCase.clusterSize)
defer shutdown()
defer nc.Close()
// Common stream configuration
streamConfig := &nats.StreamConfig{
@@ -891,8 +818,11 @@ func BenchmarkJetStreamInterestStreamWithLimit(b *testing.B) {
}
// Configure stream limit
limitConfigFunc(streamConfig)
// Create stream
setupStream(b, connectURL, streamConfig)
if _, err := js.AddStream(streamConfig); err != nil {
b.Fatalf("Error creating stream: %v", err)
}
// Set up publishers shared context
var pubCtx PublishersContext
@@ -903,6 +833,12 @@ func BenchmarkJetStreamInterestStreamWithLimit(b *testing.B) {
pubCtx.lock.Lock()
pubCtx.messagesLeft = b.N
connectURL := ls.ClientURL()
// If replicated resource, connect to stream leader for lower variability
if benchmarkCase.replicas > 1 {
connectURL = cl.streamLeader("$G", "S").ClientURL()
}
// Spawn publishers routines, each with its own connection and JS context
for i := 0; i < numPublishers; i++ {
nc, err := nats.Connect(connectURL)
@@ -1093,24 +1029,6 @@ func BenchmarkJetStreamKV(b *testing.B) {
if verbose {
b.Logf("Setting up %d nodes", bc.clusterSize)
}
var (
connectURL string
cl *cluster
)
if bc.clusterSize == 1 {
s := RunBasicJetStreamServer(b)
defer s.Shutdown()
connectURL = s.ClientURL()
} else {
cl = createJetStreamClusterExplicit(b, "BENCH_KV", bc.clusterSize)
defer cl.shutdown()
cl.waitOnClusterReadyWithNumPeers(bc.clusterSize)
cl.waitOnLeader()
connectURL = cl.leader().ClientURL()
}
nc, js := jsClientConnectURL(b, connectURL)
defer nc.Close()
// Pre-generate all keys
keys := make([]string, 0, bc.numKeys)
@@ -1119,6 +1037,11 @@ func BenchmarkJetStreamKV(b *testing.B) {
keys = append(keys, key)
}
// Setup server or cluster
cl, _, shutdown, nc, js := startJSClusterAndConnect(b, bc.clusterSize)
defer shutdown()
defer nc.Close()
// Create bucket
if verbose {
b.Logf("Creating KV %s with R=%d", kvName, bc.replicas)
@@ -1143,13 +1066,14 @@ func BenchmarkJetStreamKV(b *testing.B) {
}
}
// if cluster_size > 1, connect to stream leader of bucket
if cl != nil {
// If replicated resource, connect to stream leader for lower variability
if bc.replicas > 1 {
nc.Close()
connectURL = cl.streamLeader("$G", fmt.Sprintf("KV_%s", kvName)).ClientURL()
_, js = jsClientConnectURL(b, connectURL)
connectURL := cl.streamLeader("$G", fmt.Sprintf("KV_%s", kvName)).ClientURL()
nc, js = jsClientConnectURL(b, connectURL)
defer nc.Close()
}
kv, err = js.KeyValue(kv.Bucket())
if err != nil {
b.Fatalf("Error binding to KV: %v", err)
@@ -1264,7 +1188,6 @@ func BenchmarkJetStreamObjStore(b *testing.B) {
minObjSz int
maxObjSz int
}{
// TODO remove duplicates and fix comments
{nats.MemoryStorage, 100, 1024, 102400}, // mem storage, 100 objects sized (1KB-100KB)
{nats.MemoryStorage, 100, 102400, 1048576}, // mem storage, 100 objects sized (100KB-1MB)
{nats.MemoryStorage, 1000, 10240, 102400}, // mem storage, 1k objects of various size (10KB - 100KB)
@@ -1273,7 +1196,6 @@ func BenchmarkJetStreamObjStore(b *testing.B) {
{nats.FileStorage, 100, 102400, 1048576}, // file storage, 100 objects sized (100KB-1MB)
{nats.FileStorage, 100, 1048576, 10485760}, // file storage, 100 objects sized (1MB-10MB)
{nats.FileStorage, 10, 10485760, 104857600}, // file storage, 10 objects sized (10MB-100MB)
}
var (
@@ -1306,23 +1228,10 @@ func BenchmarkJetStreamObjStore(b *testing.B) {
if verbose {
b.Logf("Setting up %d nodes", replicas)
}
var (
connectURL string
cl *cluster
)
if clusterSize == 1 {
s := RunBasicJetStreamServer(b)
defer s.Shutdown()
connectURL = s.ClientURL()
} else {
cl = createJetStreamClusterExplicit(b, "BENCH_OBJ_STORE", clusterSize)
defer cl.shutdown()
cl.waitOnClusterReadyWithNumPeers(replicas)
cl.waitOnLeader()
// connect to leader and not replicas
connectURL = cl.leader().ClientURL()
}
nc, js := jsClientConnectURL(b, connectURL)
// Setup server or cluster
cl, _, shutdown, nc, js := startJSClusterAndConnect(b, clusterSize)
defer shutdown()
defer nc.Close()
// Initialize object store
@@ -1339,10 +1248,10 @@ func BenchmarkJetStreamObjStore(b *testing.B) {
b.Fatalf("Error creating ObjectStore: %v", err)
}
// if cluster_size > 1, connect to stream leader
if cl != nil {
// If replicated resource, connect to stream leader for lower variability
if clusterSize > 1 {
nc.Close()
connectURL = cl.streamLeader("$G", fmt.Sprintf("OBJ_%s", objStoreName)).ClientURL()
connectURL := cl.streamLeader("$G", fmt.Sprintf("OBJ_%s", objStoreName)).ClientURL()
nc, js := jsClientConnectURL(b, connectURL)
defer nc.Close()
objStore, err = js.ObjectStore(objStoreName)
@@ -1380,9 +1289,41 @@ func BenchmarkJetStreamObjStore(b *testing.B) {
}
},
)
}
},
)
}
}
// Helper function to stand up a JS-enabled single server or cluster
func startJSClusterAndConnect(b *testing.B, clusterSize int) (c *cluster, s *Server, shutdown func(), nc *nats.Conn, js nats.JetStreamContext) {
b.Helper()
var err error
if clusterSize == 1 {
s = RunBasicJetStreamServer(b)
shutdown = func() {
s.Shutdown()
}
} else {
c = createJetStreamClusterExplicit(b, "BENCH_PUB", clusterSize)
c.waitOnClusterReadyWithNumPeers(clusterSize)
c.waitOnLeader()
s = c.leader()
shutdown = func() {
c.shutdown()
}
}
nc, err = nats.Connect(s.ClientURL())
if err != nil {
b.Fatalf("failed to connect: %s", err)
}
js, err = nc.JetStream()
if err != nil {
b.Fatalf("failed to init jetstream: %s", err)
}
return c, s, shutdown, nc, js
}