mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
Set S2 writer concurrency to 1 (#4570)
By default the S2 library defaults to a concurrency level of `GOMAXPROCS`, which forces the library to run extra goroutines to manage asynchronous flushes. As we only ever have one goroutine (the client writer) using a given S2 writer, reducing the concurrency down to 1 helps a bit with overheads, slightly reduces allocations and slightly improves throughput. Signed-off-by: Neil Twigg <neil@nats.io>
This commit is contained in:
@@ -5471,6 +5471,99 @@ func TestLeafNodeCompression(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkLeafNodeCompression(b *testing.B) {
|
||||
conf1 := createConfFile(b, []byte(`
|
||||
port: -1
|
||||
server_name: "Hub"
|
||||
accounts {
|
||||
A { users: [{user: a, password: pwd}] }
|
||||
B { users: [{user: b, password: pwd}] }
|
||||
C { users: [{user: c, password: pwd}] }
|
||||
D { users: [{user: d, password: pwd}] }
|
||||
}
|
||||
leafnodes {
|
||||
port: -1
|
||||
}
|
||||
`))
|
||||
s1, o1 := RunServerWithConfig(conf1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
port := o1.LeafNode.Port
|
||||
conf2 := createConfFile(b, []byte(fmt.Sprintf(`
|
||||
port: -1
|
||||
server_name: "Spoke"
|
||||
accounts {
|
||||
A { users: [{user: a, password: pwd}] }
|
||||
B { users: [{user: b, password: pwd}] }
|
||||
C { users: [{user: c, password: pwd}] }
|
||||
D { users: [{user: d, password: pwd}] }
|
||||
}
|
||||
leafnodes {
|
||||
remotes [
|
||||
{ url: "nats://a:pwd@127.0.0.1:%d", account: "A", compression: s2_better }
|
||||
{ url: "nats://b:pwd@127.0.0.1:%d", account: "B", compression: s2_best }
|
||||
{ url: "nats://c:pwd@127.0.0.1:%d", account: "C", compression: s2_fast }
|
||||
{ url: "nats://d:pwd@127.0.0.1:%d", account: "D", compression: off }
|
||||
]
|
||||
}
|
||||
`, port, port, port, port)))
|
||||
s2, _ := RunServerWithConfig(conf2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
checkLeafNodeConnectedCount(b, s1, 4)
|
||||
checkLeafNodeConnectedCount(b, s2, 4)
|
||||
|
||||
l, err := s2.Leafz(nil)
|
||||
require_NoError(b, err)
|
||||
for _, r := range l.Leafs {
|
||||
switch {
|
||||
case r.Account == "A" && r.Compression == CompressionS2Better:
|
||||
case r.Account == "B" && r.Compression == CompressionS2Best:
|
||||
case r.Account == "C" && r.Compression == CompressionS2Fast:
|
||||
case r.Account == "D" && r.Compression == CompressionOff:
|
||||
default:
|
||||
b.Fatalf("Account %q had incorrect compression mode %q on leaf connection", r.Account, r.Compression)
|
||||
}
|
||||
}
|
||||
|
||||
msg := make([]byte, 1024)
|
||||
for _, p := range []struct {
|
||||
algo string
|
||||
user string
|
||||
}{
|
||||
{"Better", "a"},
|
||||
{"Best", "b"},
|
||||
{"Fast", "c"},
|
||||
{"Off", "d"},
|
||||
} {
|
||||
nc1 := natsConnect(b, s1.ClientURL(), nats.UserInfo(p.user, "pwd"))
|
||||
nc2 := natsConnect(b, s2.ClientURL(), nats.UserInfo(p.user, "pwd"))
|
||||
|
||||
sub, err := nc1.SubscribeSync("foo")
|
||||
require_NoError(b, err)
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
b.Run(p.algo, func(b *testing.B) {
|
||||
start := time.Now()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
err = nc2.Publish("foo", msg)
|
||||
require_NoError(b, err)
|
||||
|
||||
_, err = sub.NextMsg(time.Second)
|
||||
require_NoError(b, err)
|
||||
}
|
||||
|
||||
b.ReportMetric(float64(len(msg)*b.N)/1024/1024, "MB")
|
||||
b.ReportMetric(float64(len(msg)*b.N)/1024/1024/float64(time.Since(start).Seconds()), "MB/sec")
|
||||
})
|
||||
|
||||
nc1.Close()
|
||||
nc2.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeafNodeCompressionMatrixModes(t *testing.T) {
|
||||
for _, test := range []struct {
|
||||
name string
|
||||
|
||||
@@ -572,13 +572,18 @@ func selectS2AutoModeBasedOnRTT(rtt time.Duration, rttThresholds []time.Duration
|
||||
// with a nil []s2.WriterOption, but not with a nil s2.WriterOption, so
|
||||
// this is more versatile.
|
||||
func s2WriterOptions(cm string) []s2.WriterOption {
|
||||
_opts := [2]s2.WriterOption{}
|
||||
opts := append(
|
||||
_opts[:0],
|
||||
s2.WriterConcurrency(1), // Stop asynchronous flushing in separate goroutines
|
||||
)
|
||||
switch cm {
|
||||
case CompressionS2Uncompressed:
|
||||
return []s2.WriterOption{s2.WriterUncompressed()}
|
||||
return append(opts, s2.WriterUncompressed())
|
||||
case CompressionS2Best:
|
||||
return []s2.WriterOption{s2.WriterBestCompression()}
|
||||
return append(opts, s2.WriterBestCompression())
|
||||
case CompressionS2Better:
|
||||
return []s2.WriterOption{s2.WriterBetterCompression()}
|
||||
return append(opts, s2.WriterBetterCompression())
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user