diff --git a/server/jetstream_benchmark_test.go b/server/jetstream_benchmark_test.go index e1701ad9..94ded0ef 100644 --- a/server/jetstream_benchmark_test.go +++ b/server/jetstream_benchmark_test.go @@ -1174,3 +1174,203 @@ func BenchmarkJetStreamKV(b *testing.B) { ) } } + +func BenchmarkJetStreamObjStore(b *testing.B) { + const ( + verbose = false + objStoreName = "B" + keyPrefix = "K_" + seed = 12345 + initKeys = true + + // read/write ratios + ReadOnly = 1.0 + WriteOnly = 0.0 + ) + + // rwRatio to string + rwRatioToString := func(rwRatio float64) string { + switch rwRatio { + case ReadOnly: + return "readOnly" + case WriteOnly: + return "writeOnly" + default: + return fmt.Sprintf("%0.1f", rwRatio) + } + } + + // benchmark for object store by performing read/write operations with data of random size + RunObjStoreBenchmark := func(b *testing.B, objStore nats.ObjectStore, minObjSz int, maxObjSz int, numKeys int, rwRatio float64) (int, int, int) { + var ( + errors int + reads int + writes int + ) + + rng := rand.New(rand.NewSource(int64(seed))) + + // Each operation is processing a random amount of bytes within a size range which + // will be either read from or written to an object store bucket. However, here we are + // approximating the size of the processed data with a simple average of the range. + b.SetBytes(int64((minObjSz + maxObjSz) / 2)) + + for i := 1; i <= b.N; i++ { + key := fmt.Sprintf("%s_%d", keyPrefix, rng.Intn(numKeys)) + var err error + + rwOp := rng.Float64() + switch { + case rwOp <= rwRatio: + // Read Op + _, err = objStore.GetBytes(key) + reads++ + case rwOp > rwRatio: + // Write Op + // dataSz is a random value between min-max object size and cannot be less than 1 byte + dataSz := rng.Intn(maxObjSz-minObjSz+1) + minObjSz + data := make([]byte, dataSz) + rng.Read(data) + _, err = objStore.PutBytes(key, data) + writes++ + } + if err != nil { + errors++ + } + + if verbose && i%1000 == 0 { + b.Logf("Completed: %d reads, %d writes, %d errors. %d/%d total operations have been completed.", reads, writes, errors, i, b.N) + } + } + return errors, reads, writes + } + + // benchmark cases table + benchmarkCases := []struct { + storage nats.StorageType + numKeys int + minObjSz int + maxObjSz int + }{ + // TODO remove duplicates and fix comments + {nats.MemoryStorage, 100, 1024, 102400}, // mem storage, 100 objects sized (1KB-100KB) + {nats.MemoryStorage, 100, 102400, 1048576}, // mem storage, 100 objects sized (100KB-1MB) + {nats.MemoryStorage, 1000, 10240, 102400}, // mem storage, 1k objects of various size (10KB - 100KB) + {nats.FileStorage, 100, 1024, 102400}, // file storage, 100 objects sized (1KB-100KB) + {nats.FileStorage, 1000, 10240, 1048576}, // file storage, 1k objects of various size (10KB - 1MB) + {nats.FileStorage, 100, 102400, 1048576}, // file storage, 100 objects sized (100KB-1MB) + {nats.FileStorage, 100, 1048576, 10485760}, // file storage, 100 objects sized (1MB-10MB) + {nats.FileStorage, 10, 10485760, 104857600}, // file storage, 10 objects sized (10MB-100MB) + + } + + var ( + clusterSizeCases = []int{1, 3} + rwRatioCases = []float64{ReadOnly, WriteOnly, 0.8} + ) + + // Test with either single node or 3 node cluster + for _, clusterSize := range clusterSizeCases { + replicas := clusterSize + cName := fmt.Sprintf("N=%d,R=%d", clusterSize, replicas) + b.Run( + cName, + func(b *testing.B) { + for _, rwRatio := range rwRatioCases { + rName := fmt.Sprintf("workload=%s", rwRatioToString(rwRatio)) + b.Run( + rName, + func(b *testing.B) { + // Test all tabled benchmark cases + for _, bc := range benchmarkCases { + bName := fmt.Sprintf("K=%d,storage=%s,minObjSz=%db,maxObjSz=%db", bc.numKeys, bc.storage, bc.minObjSz, bc.maxObjSz) + b.Run( + bName, + func(b *testing.B) { + + // Test setup + rng := rand.New(rand.NewSource(int64(seed))) + + if verbose { + b.Logf("Setting up %d nodes", replicas) + } + var ( + connectURL string + cl *cluster + ) + if clusterSize == 1 { + s := RunBasicJetStreamServer(b) + defer s.Shutdown() + connectURL = s.ClientURL() + } else { + cl = createJetStreamClusterExplicit(b, "BENCH_OBJ_STORE", clusterSize) + defer cl.shutdown() + cl.waitOnClusterReadyWithNumPeers(replicas) + cl.waitOnLeader() + // connect to leader and not replicas + connectURL = cl.leader().ClientURL() + } + nc, js := jsClientConnectURL(b, connectURL) + defer nc.Close() + + // Initialize object store + if verbose { + b.Logf("Creating ObjectStore %s with R=%d", objStoreName, replicas) + } + objStoreConfig := &nats.ObjectStoreConfig{ + Bucket: objStoreName, + Replicas: replicas, + Storage: bc.storage, + } + objStore, err := js.CreateObjectStore(objStoreConfig) + if err != nil { + b.Fatalf("Error creating ObjectStore: %v", err) + } + + // if cluster_size > 1, connect to stream leader + if cl != nil { + nc.Close() + connectURL = cl.streamLeader("$G", fmt.Sprintf("OBJ_%s", objStoreName)).ClientURL() + nc, js := jsClientConnectURL(b, connectURL) + defer nc.Close() + objStore, err = js.ObjectStore(objStoreName) + if err != nil { + b.Fatalf("Error binding to ObjectStore: %v", err) + } + } + + // Initialize keys + if initKeys { + for n := 0; n < bc.numKeys; n++ { + key := fmt.Sprintf("%s_%d", keyPrefix, n) + dataSz := rng.Intn(bc.maxObjSz-bc.minObjSz+1) + bc.minObjSz + value := make([]byte, dataSz) + rng.Read(value) + _, err := objStore.PutBytes(key, value) + if err != nil { + b.Fatalf("Failed to initialize %s/%s: %v", objStoreName, key, err) + } + } + } + + b.ResetTimer() + + // Run benchmark + errors, reads, writes := RunObjStoreBenchmark(b, objStore, bc.minObjSz, bc.maxObjSz, bc.numKeys, rwRatio) + + // Report metrics + b.ReportMetric(float64(errors)*100/float64(b.N), "%error") + b.ReportMetric(float64(reads), "reads") + b.ReportMetric(float64(writes), "writes") + + }, + ) + } + }, + ) + + } + }, + ) + } +}