mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Merge branch 'main' into dev
This commit is contained in:
@@ -1207,3 +1207,203 @@ func BenchmarkJetStreamKV(b *testing.B) {
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkJetStreamObjStore(b *testing.B) {
|
||||
const (
|
||||
verbose = false
|
||||
objStoreName = "B"
|
||||
keyPrefix = "K_"
|
||||
seed = 12345
|
||||
initKeys = true
|
||||
|
||||
// read/write ratios
|
||||
ReadOnly = 1.0
|
||||
WriteOnly = 0.0
|
||||
)
|
||||
|
||||
// rwRatio to string
|
||||
rwRatioToString := func(rwRatio float64) string {
|
||||
switch rwRatio {
|
||||
case ReadOnly:
|
||||
return "readOnly"
|
||||
case WriteOnly:
|
||||
return "writeOnly"
|
||||
default:
|
||||
return fmt.Sprintf("%0.1f", rwRatio)
|
||||
}
|
||||
}
|
||||
|
||||
// benchmark for object store by performing read/write operations with data of random size
|
||||
RunObjStoreBenchmark := func(b *testing.B, objStore nats.ObjectStore, minObjSz int, maxObjSz int, numKeys int, rwRatio float64) (int, int, int) {
|
||||
var (
|
||||
errors int
|
||||
reads int
|
||||
writes int
|
||||
)
|
||||
|
||||
rng := rand.New(rand.NewSource(int64(seed)))
|
||||
|
||||
// Each operation is processing a random amount of bytes within a size range which
|
||||
// will be either read from or written to an object store bucket. However, here we are
|
||||
// approximating the size of the processed data with a simple average of the range.
|
||||
b.SetBytes(int64((minObjSz + maxObjSz) / 2))
|
||||
|
||||
for i := 1; i <= b.N; i++ {
|
||||
key := fmt.Sprintf("%s_%d", keyPrefix, rng.Intn(numKeys))
|
||||
var err error
|
||||
|
||||
rwOp := rng.Float64()
|
||||
switch {
|
||||
case rwOp <= rwRatio:
|
||||
// Read Op
|
||||
_, err = objStore.GetBytes(key)
|
||||
reads++
|
||||
case rwOp > rwRatio:
|
||||
// Write Op
|
||||
// dataSz is a random value between min-max object size and cannot be less than 1 byte
|
||||
dataSz := rng.Intn(maxObjSz-minObjSz+1) + minObjSz
|
||||
data := make([]byte, dataSz)
|
||||
rng.Read(data)
|
||||
_, err = objStore.PutBytes(key, data)
|
||||
writes++
|
||||
}
|
||||
if err != nil {
|
||||
errors++
|
||||
}
|
||||
|
||||
if verbose && i%1000 == 0 {
|
||||
b.Logf("Completed: %d reads, %d writes, %d errors. %d/%d total operations have been completed.", reads, writes, errors, i, b.N)
|
||||
}
|
||||
}
|
||||
return errors, reads, writes
|
||||
}
|
||||
|
||||
// benchmark cases table
|
||||
benchmarkCases := []struct {
|
||||
storage nats.StorageType
|
||||
numKeys int
|
||||
minObjSz int
|
||||
maxObjSz int
|
||||
}{
|
||||
// TODO remove duplicates and fix comments
|
||||
{nats.MemoryStorage, 100, 1024, 102400}, // mem storage, 100 objects sized (1KB-100KB)
|
||||
{nats.MemoryStorage, 100, 102400, 1048576}, // mem storage, 100 objects sized (100KB-1MB)
|
||||
{nats.MemoryStorage, 1000, 10240, 102400}, // mem storage, 1k objects of various size (10KB - 100KB)
|
||||
{nats.FileStorage, 100, 1024, 102400}, // file storage, 100 objects sized (1KB-100KB)
|
||||
{nats.FileStorage, 1000, 10240, 1048576}, // file storage, 1k objects of various size (10KB - 1MB)
|
||||
{nats.FileStorage, 100, 102400, 1048576}, // file storage, 100 objects sized (100KB-1MB)
|
||||
{nats.FileStorage, 100, 1048576, 10485760}, // file storage, 100 objects sized (1MB-10MB)
|
||||
{nats.FileStorage, 10, 10485760, 104857600}, // file storage, 10 objects sized (10MB-100MB)
|
||||
|
||||
}
|
||||
|
||||
var (
|
||||
clusterSizeCases = []int{1, 3}
|
||||
rwRatioCases = []float64{ReadOnly, WriteOnly, 0.8}
|
||||
)
|
||||
|
||||
// Test with either single node or 3 node cluster
|
||||
for _, clusterSize := range clusterSizeCases {
|
||||
replicas := clusterSize
|
||||
cName := fmt.Sprintf("N=%d,R=%d", clusterSize, replicas)
|
||||
b.Run(
|
||||
cName,
|
||||
func(b *testing.B) {
|
||||
for _, rwRatio := range rwRatioCases {
|
||||
rName := fmt.Sprintf("workload=%s", rwRatioToString(rwRatio))
|
||||
b.Run(
|
||||
rName,
|
||||
func(b *testing.B) {
|
||||
// Test all tabled benchmark cases
|
||||
for _, bc := range benchmarkCases {
|
||||
bName := fmt.Sprintf("K=%d,storage=%s,minObjSz=%db,maxObjSz=%db", bc.numKeys, bc.storage, bc.minObjSz, bc.maxObjSz)
|
||||
b.Run(
|
||||
bName,
|
||||
func(b *testing.B) {
|
||||
|
||||
// Test setup
|
||||
rng := rand.New(rand.NewSource(int64(seed)))
|
||||
|
||||
if verbose {
|
||||
b.Logf("Setting up %d nodes", replicas)
|
||||
}
|
||||
var (
|
||||
connectURL string
|
||||
cl *cluster
|
||||
)
|
||||
if clusterSize == 1 {
|
||||
s := RunBasicJetStreamServer(b)
|
||||
defer s.Shutdown()
|
||||
connectURL = s.ClientURL()
|
||||
} else {
|
||||
cl = createJetStreamClusterExplicit(b, "BENCH_OBJ_STORE", clusterSize)
|
||||
defer cl.shutdown()
|
||||
cl.waitOnClusterReadyWithNumPeers(replicas)
|
||||
cl.waitOnLeader()
|
||||
// connect to leader and not replicas
|
||||
connectURL = cl.leader().ClientURL()
|
||||
}
|
||||
nc, js := jsClientConnectURL(b, connectURL)
|
||||
defer nc.Close()
|
||||
|
||||
// Initialize object store
|
||||
if verbose {
|
||||
b.Logf("Creating ObjectStore %s with R=%d", objStoreName, replicas)
|
||||
}
|
||||
objStoreConfig := &nats.ObjectStoreConfig{
|
||||
Bucket: objStoreName,
|
||||
Replicas: replicas,
|
||||
Storage: bc.storage,
|
||||
}
|
||||
objStore, err := js.CreateObjectStore(objStoreConfig)
|
||||
if err != nil {
|
||||
b.Fatalf("Error creating ObjectStore: %v", err)
|
||||
}
|
||||
|
||||
// if cluster_size > 1, connect to stream leader
|
||||
if cl != nil {
|
||||
nc.Close()
|
||||
connectURL = cl.streamLeader("$G", fmt.Sprintf("OBJ_%s", objStoreName)).ClientURL()
|
||||
nc, js := jsClientConnectURL(b, connectURL)
|
||||
defer nc.Close()
|
||||
objStore, err = js.ObjectStore(objStoreName)
|
||||
if err != nil {
|
||||
b.Fatalf("Error binding to ObjectStore: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize keys
|
||||
if initKeys {
|
||||
for n := 0; n < bc.numKeys; n++ {
|
||||
key := fmt.Sprintf("%s_%d", keyPrefix, n)
|
||||
dataSz := rng.Intn(bc.maxObjSz-bc.minObjSz+1) + bc.minObjSz
|
||||
value := make([]byte, dataSz)
|
||||
rng.Read(value)
|
||||
_, err := objStore.PutBytes(key, value)
|
||||
if err != nil {
|
||||
b.Fatalf("Failed to initialize %s/%s: %v", objStoreName, key, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
// Run benchmark
|
||||
errors, reads, writes := RunObjStoreBenchmark(b, objStore, bc.minObjSz, bc.maxObjSz, bc.numKeys, rwRatio)
|
||||
|
||||
// Report metrics
|
||||
b.ReportMetric(float64(errors)*100/float64(b.N), "%error")
|
||||
b.ReportMetric(float64(reads), "reads")
|
||||
b.ReportMetric(float64(writes), "writes")
|
||||
|
||||
},
|
||||
)
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user