Add benchmarks for object store

Signed-off-by: reubenninan <reuben@nats.io>
This commit is contained in:
reubenninan
2023-06-06 19:28:24 -04:00
parent 876cb6d837
commit 13e09f2db4

View File

@@ -1174,3 +1174,203 @@ func BenchmarkJetStreamKV(b *testing.B) {
)
}
}
func BenchmarkJetStreamObjStore(b *testing.B) {
const (
verbose = false
objStoreName = "B"
keyPrefix = "K_"
seed = 12345
initKeys = true
// read/write ratios
ReadOnly = 1.0
WriteOnly = 0.0
)
// rwRatio to string
rwRatioToString := func(rwRatio float64) string {
switch rwRatio {
case ReadOnly:
return "readOnly"
case WriteOnly:
return "writeOnly"
default:
return fmt.Sprintf("%0.1f", rwRatio)
}
}
// benchmark for object store by performing read/write operations with data of random size
RunObjStoreBenchmark := func(b *testing.B, objStore nats.ObjectStore, minObjSz int, maxObjSz int, numKeys int, rwRatio float64) (int, int, int) {
var (
errors int
reads int
writes int
)
rng := rand.New(rand.NewSource(int64(seed)))
// Each operation is processing a random amount of bytes within a size range which
// will be either read from or written to an object store bucket. However, here we are
// approximating the size of the processed data with a simple average of the range.
b.SetBytes(int64((minObjSz + maxObjSz) / 2))
for i := 1; i <= b.N; i++ {
key := fmt.Sprintf("%s_%d", keyPrefix, rng.Intn(numKeys))
var err error
rwOp := rng.Float64()
switch {
case rwOp <= rwRatio:
// Read Op
_, err = objStore.GetBytes(key)
reads++
case rwOp > rwRatio:
// Write Op
// dataSz is a random value between min-max object size and cannot be less than 1 byte
dataSz := rng.Intn(maxObjSz-minObjSz+1) + minObjSz
data := make([]byte, dataSz)
rng.Read(data)
_, err = objStore.PutBytes(key, data)
writes++
}
if err != nil {
errors++
}
if verbose && i%1000 == 0 {
b.Logf("Completed: %d reads, %d writes, %d errors. %d/%d total operations have been completed.", reads, writes, errors, i, b.N)
}
}
return errors, reads, writes
}
// benchmark cases table
benchmarkCases := []struct {
storage nats.StorageType
numKeys int
minObjSz int
maxObjSz int
}{
// TODO remove duplicates and fix comments
{nats.MemoryStorage, 100, 1024, 102400}, // mem storage, 100 objects sized (1KB-100KB)
{nats.MemoryStorage, 100, 102400, 1048576}, // mem storage, 100 objects sized (100KB-1MB)
{nats.MemoryStorage, 1000, 10240, 102400}, // mem storage, 1k objects of various size (10KB - 100KB)
{nats.FileStorage, 100, 1024, 102400}, // file storage, 100 objects sized (1KB-100KB)
{nats.FileStorage, 1000, 10240, 1048576}, // file storage, 1k objects of various size (10KB - 1MB)
{nats.FileStorage, 100, 102400, 1048576}, // file storage, 100 objects sized (100KB-1MB)
{nats.FileStorage, 100, 1048576, 10485760}, // file storage, 100 objects sized (1MB-10MB)
{nats.FileStorage, 10, 10485760, 104857600}, // file storage, 10 objects sized (10MB-100MB)
}
var (
clusterSizeCases = []int{1, 3}
rwRatioCases = []float64{ReadOnly, WriteOnly, 0.8}
)
// Test with either single node or 3 node cluster
for _, clusterSize := range clusterSizeCases {
replicas := clusterSize
cName := fmt.Sprintf("N=%d,R=%d", clusterSize, replicas)
b.Run(
cName,
func(b *testing.B) {
for _, rwRatio := range rwRatioCases {
rName := fmt.Sprintf("workload=%s", rwRatioToString(rwRatio))
b.Run(
rName,
func(b *testing.B) {
// Test all tabled benchmark cases
for _, bc := range benchmarkCases {
bName := fmt.Sprintf("K=%d,storage=%s,minObjSz=%db,maxObjSz=%db", bc.numKeys, bc.storage, bc.minObjSz, bc.maxObjSz)
b.Run(
bName,
func(b *testing.B) {
// Test setup
rng := rand.New(rand.NewSource(int64(seed)))
if verbose {
b.Logf("Setting up %d nodes", replicas)
}
var (
connectURL string
cl *cluster
)
if clusterSize == 1 {
s := RunBasicJetStreamServer(b)
defer s.Shutdown()
connectURL = s.ClientURL()
} else {
cl = createJetStreamClusterExplicit(b, "BENCH_OBJ_STORE", clusterSize)
defer cl.shutdown()
cl.waitOnClusterReadyWithNumPeers(replicas)
cl.waitOnLeader()
// connect to leader and not replicas
connectURL = cl.leader().ClientURL()
}
nc, js := jsClientConnectURL(b, connectURL)
defer nc.Close()
// Initialize object store
if verbose {
b.Logf("Creating ObjectStore %s with R=%d", objStoreName, replicas)
}
objStoreConfig := &nats.ObjectStoreConfig{
Bucket: objStoreName,
Replicas: replicas,
Storage: bc.storage,
}
objStore, err := js.CreateObjectStore(objStoreConfig)
if err != nil {
b.Fatalf("Error creating ObjectStore: %v", err)
}
// if cluster_size > 1, connect to stream leader
if cl != nil {
nc.Close()
connectURL = cl.streamLeader("$G", fmt.Sprintf("OBJ_%s", objStoreName)).ClientURL()
nc, js := jsClientConnectURL(b, connectURL)
defer nc.Close()
objStore, err = js.ObjectStore(objStoreName)
if err != nil {
b.Fatalf("Error binding to ObjectStore: %v", err)
}
}
// Initialize keys
if initKeys {
for n := 0; n < bc.numKeys; n++ {
key := fmt.Sprintf("%s_%d", keyPrefix, n)
dataSz := rng.Intn(bc.maxObjSz-bc.minObjSz+1) + bc.minObjSz
value := make([]byte, dataSz)
rng.Read(value)
_, err := objStore.PutBytes(key, value)
if err != nil {
b.Fatalf("Failed to initialize %s/%s: %v", objStoreName, key, err)
}
}
}
b.ResetTimer()
// Run benchmark
errors, reads, writes := RunObjStoreBenchmark(b, objStore, bc.minObjSz, bc.maxObjSz, bc.numKeys, rwRatio)
// Report metrics
b.ReportMetric(float64(errors)*100/float64(b.N), "%error")
b.ReportMetric(float64(reads), "reads")
b.ReportMetric(float64(writes), "writes")
},
)
}
},
)
}
},
)
}
}