diff --git a/server/configs/tls/tls-ed25519.conf b/server/configs/tls/tls-ed25519.conf index 2f10f7dd..dcb8fd94 100644 --- a/server/configs/tls/tls-ed25519.conf +++ b/server/configs/tls/tls-ed25519.conf @@ -1,6 +1,6 @@ # Simple TLS (ed25519) config file -listen: 127.0.0.1:4443 +listen: 127.0.0.1:-1 tls { cert_file: "./configs/certs/tls/benchmark-server-cert-ed25519.pem" diff --git a/server/configs/tls/tls-none.conf b/server/configs/tls/tls-none.conf index c6cae62d..042bf4e0 100644 --- a/server/configs/tls/tls-none.conf +++ b/server/configs/tls/tls-none.conf @@ -1,4 +1,4 @@ # Simple config file -listen: 127.0.0.1:4443 +listen: 127.0.0.1:-1 diff --git a/server/configs/tls/tls-rsa-1024.conf b/server/configs/tls/tls-rsa-1024.conf index a5032fda..fb3aaa41 100644 --- a/server/configs/tls/tls-rsa-1024.conf +++ b/server/configs/tls/tls-rsa-1024.conf @@ -1,6 +1,6 @@ # Simple TLS (rsa-1024) config file -listen: 127.0.0.1:4443 +listen: 127.0.0.1:-1 tls { cert_file: "./configs/certs/tls/benchmark-server-cert-rsa-1024.pem" diff --git a/server/configs/tls/tls-rsa-2048.conf b/server/configs/tls/tls-rsa-2048.conf index 9e33b7d8..08f54a25 100644 --- a/server/configs/tls/tls-rsa-2048.conf +++ b/server/configs/tls/tls-rsa-2048.conf @@ -1,6 +1,6 @@ # Simple TLS (rsa-2048) config file -listen: 127.0.0.1:4443 +listen: 127.0.0.1:-1 tls { cert_file: "./configs/certs/tls/benchmark-server-cert-rsa-2048.pem" diff --git a/server/configs/tls/tls-rsa-4096.conf b/server/configs/tls/tls-rsa-4096.conf index a64ede8f..68ad841b 100644 --- a/server/configs/tls/tls-rsa-4096.conf +++ b/server/configs/tls/tls-rsa-4096.conf @@ -1,6 +1,6 @@ # Simple TLS (rsa-4096) config file -listen: 127.0.0.1:4443 +listen: 127.0.0.1:-1 tls { cert_file: "./configs/certs/tls/benchmark-server-cert-rsa-4096.pem" diff --git a/server/core_benchmarks_test.go b/server/core_benchmarks_test.go index 560cd149..1ecb594e 100644 --- a/server/core_benchmarks_test.go +++ b/server/core_benchmarks_test.go @@ -16,102 +16,236 @@ package server import ( "crypto/rand" "crypto/tls" + "errors" "fmt" + "os" + "sync" "testing" "time" "github.com/nats-io/nats.go" ) -func BenchmarkRequestReplyOverEncryptedConnection(b *testing.B) { +func BenchmarkCoreRequestReply(b *testing.B) { const ( - subject = "test-subject" - configsBasePath = "./configs/tls" + subject = "test-subject" ) - // default TLS client connection options - defaultOpts := []nats.Option{} - - keyTypes := []string{ - "none", - "ed25519", - "rsa-1024", - "rsa-2048", - "rsa-4096", - } - payloadSzs := []int64{ + messageSizes := []int64{ 1024, // 1kb 4096, // 4kb 40960, // 40kb 409600, // 400kb } - for _, keyType := range keyTypes { - schemeConfig := fmt.Sprintf("%s/tls-%s.conf", configsBasePath, keyType) - b.Run(fmt.Sprintf("keyType=%s", keyType), func(b *testing.B) { - for _, payloadSz := range payloadSzs { - b.Run(fmt.Sprintf("payloadSz=%db", payloadSz), func(b *testing.B) { + for _, messageSize := range messageSizes { + b.Run(fmt.Sprintf("msgSz=%db", messageSize), func(b *testing.B) { - // run server with tls scheme - server, _ := RunServerWithConfig(schemeConfig) - opts := defaultOpts - defer server.Shutdown() + // Start server + serverOpts := DefaultOptions() + server := RunServer(serverOpts) + defer server.Shutdown() - if keyType != "none" { - opts = append(opts, nats.Secure(&tls.Config{ - InsecureSkipVerify: true, - })) - } + clientUrl := server.ClientURL() - // default client url - clientUrl := server.ClientURL() - - // subscriber - ncSub, err := nats.Connect(clientUrl, opts...) - if err != nil { - b.Fatal(err) - } - defer ncSub.Close() - sub, err := ncSub.Subscribe(subject, func(msg *nats.Msg) { - // Responder echoes the request payload as-is - msg.Respond(msg.Data) - }) - defer sub.Unsubscribe() - if err != nil { - b.Fatal(err) - } - - // publisher - ncPub, err := nats.Connect(clientUrl, opts...) - if err != nil { - b.Fatal(err) - } - defer ncPub.Close() - - var errors = 0 - - // random bytes as payload - b.SetBytes(payloadSz) - payload := make([]byte, payloadSz) - rand.Read(payload) - - // start benchmark - b.ResetTimer() - - for i := 0; i < b.N; i++ { - _, err := ncPub.Request(subject, payload, time.Second) - if err != nil { - errors++ - } - } - - // stop benchmark - b.StopTimer() - - b.ReportMetric(float64(errors), "errors") - }) + // Create "echo" subscriber + ncSub, err := nats.Connect(clientUrl) + if err != nil { + b.Fatal(err) } + defer ncSub.Close() + sub, err := ncSub.Subscribe(subject, func(msg *nats.Msg) { + // Responder echoes the request payload as-is + msg.Respond(msg.Data) + }) + defer sub.Unsubscribe() + if err != nil { + b.Fatal(err) + } + + // Create publisher + ncPub, err := nats.Connect(clientUrl) + if err != nil { + b.Fatal(err) + } + defer ncPub.Close() + + var errors = 0 + + // Create message (reused for all requests) + messageData := make([]byte, messageSize) + b.SetBytes(messageSize) + rand.Read(messageData) + + // Benchmark + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := ncPub.Request(subject, messageData, time.Second) + if err != nil { + errors++ + } + } + b.StopTimer() + + b.ReportMetric(float64(errors), "errors") }) } - +} + +func BenchmarkCoreTLSFanOut(b *testing.B) { + const ( + subject = "test-subject" + configsBasePath = "./configs/tls" + maxPendingMessages = 25 + maxPendingBytes = 15 * 1024 * 1024 // 15MiB + ) + + keyTypeCases := []string{ + "none", + "ed25519", + "rsa-1024", + "rsa-2048", + "rsa-4096", + } + messageSizeCases := []int64{ + 512 * 1024, // 512Kib + } + numSubsCases := []int{ + 5, + } + + // Custom error handler that ignores ErrSlowConsumer. + // Lots of them are expected in this benchmark which indiscriminately publishes at a rate higher + // than what the server can fan-out to subscribers. + ignoreSlowConsumerErrorHandler := func(conn *nats.Conn, s *nats.Subscription, err error) { + if errors.Is(err, nats.ErrSlowConsumer) { + // Swallow this error + } else { + _, _ = fmt.Fprintf(os.Stderr, "Warning: %s\n", err) + } + } + + for _, keyType := range keyTypeCases { + + b.Run( + fmt.Sprintf("keyType=%s", keyType), + func(b *testing.B) { + + for _, messageSize := range messageSizeCases { + b.Run( + fmt.Sprintf("msgSz=%db", messageSize), + func(b *testing.B) { + + for _, numSubs := range numSubsCases { + b.Run( + fmt.Sprintf("subs=%d", numSubs), + func(b *testing.B) { + // Start server + configPath := fmt.Sprintf("%s/tls-%s.conf", configsBasePath, keyType) + server, _ := RunServerWithConfig(configPath) + defer server.Shutdown() + + opts := []nats.Option{ + nats.MaxReconnects(-1), + nats.ReconnectWait(0), + nats.ErrorHandler(ignoreSlowConsumerErrorHandler), + } + + if keyType != "none" { + opts = append(opts, nats.Secure(&tls.Config{ + InsecureSkipVerify: true, + })) + } + + clientUrl := server.ClientURL() + + // Count of messages received for by each subscriber + counters := make([]int, numSubs) + + // Wait group for subscribers to signal they received b.N messages + var wg sync.WaitGroup + wg.Add(numSubs) + + // Create subscribers + for i := 0; i < numSubs; i++ { + subIndex := i + ncSub, err := nats.Connect(clientUrl, opts...) + if err != nil { + b.Fatal(err) + } + defer ncSub.Close() + sub, err := ncSub.Subscribe(subject, func(msg *nats.Msg) { + counters[subIndex] += 1 + if counters[subIndex] == b.N { + wg.Done() + } + }) + if err != nil { + b.Fatalf("failed to subscribe: %s", err) + } + err = sub.SetPendingLimits(maxPendingMessages, maxPendingBytes) + if err != nil { + b.Fatalf("failed to set pending limits: %s", err) + } + defer sub.Unsubscribe() + if err != nil { + b.Fatal(err) + } + } + + // publisher + ncPub, err := nats.Connect(clientUrl, opts...) + if err != nil { + b.Fatal(err) + } + defer ncPub.Close() + + var errorCount = 0 + + // random bytes as payload + messageData := make([]byte, messageSize) + rand.Read(messageData) + + quitCh := make(chan bool, 1) + + publish := func() { + for { + select { + case <-quitCh: + return + default: + // continue publishing + } + + err := ncPub.Publish(subject, messageData) + if err != nil { + errorCount += 1 + } + } + } + + // Set bytes per operation + b.SetBytes(messageSize) + // Start the clock + b.ResetTimer() + // Start publishing as fast as the server allows + go publish() + // Wait for all subscribers to have delivered b.N messages + wg.Wait() + // Stop the clock + b.StopTimer() + + // Stop publisher + quitCh <- true + + b.ReportMetric(float64(errorCount), "errors") + }, + ) + } + }, + ) + } + }, + ) + } } diff --git a/server/monitor.go b/server/monitor.go index 3bbb30fd..977bb921 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1130,14 +1130,17 @@ func (s *Server) HandleIPQueuesz(w http.ResponseWriter, r *http.Request) { queues := map[string]monitorIPQueue{} - s.ipQueues.Range(func(k, v interface{}) bool { + s.ipQueues.Range(func(k, v any) bool { + var pending, inProgress int name := k.(string) - queue := v.(interface { + queue, ok := v.(interface { len() int - inProgress() uint64 + inProgress() int64 }) - pending := queue.len() - inProgress := int(queue.inProgress()) + if ok { + pending = queue.len() + inProgress = int(queue.inProgress()) + } if !all && (pending == 0 && inProgress == 0) { return true } else if qfilter != _EMPTY_ && !strings.Contains(name, qfilter) { diff --git a/server/monitor_test.go b/server/monitor_test.go index 793b39b2..cb9f4b1b 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -5225,3 +5225,22 @@ func TestHealthzStatusUnavailable(t *testing.T) { checkHealthzEndpoint(t, s.MonitorAddr().String(), http.StatusServiceUnavailable, "unavailable") } + +// When we converted ipq to use generics we still were using sync.Map. Currently you can not convert +// interface{} or any to a generic parameterized type. So this stopped working and panics. +func TestIpqzWithGenerics(t *testing.T) { + opts := DefaultMonitorOptions() + opts.JetStream = true + + s := RunServer(opts) + defer s.Shutdown() + + url := fmt.Sprintf("http://%s/ipqueuesz?all=1", s.MonitorAddr().String()) + body := readBody(t, url) + require_True(t, len(body) > 0) + + queues := map[string]*monitorIPQueue{} + require_NoError(t, json.Unmarshal(body, &queues)) + require_True(t, len(queues) >= 4) + require_True(t, queues["SendQ"] != nil) +} diff --git a/server/stream.go b/server/stream.go index 6287f56d..e229243f 100644 --- a/server/stream.go +++ b/server/stream.go @@ -947,29 +947,31 @@ func (mset *stream) rebuildDedupe() { mset.ddloaded = true - // We have some messages. Lookup starting sequence by duplicate time window. - sseq := mset.store.GetSeqFromTime(time.Now().Add(-mset.cfg.Duplicates)) - if sseq == 0 { - return - } - - var smv StoreMsg - var state StreamState - mset.store.FastState(&state) - - for seq := sseq; seq <= state.LastSeq; seq++ { - sm, err := mset.store.LoadMsg(seq, &smv) - if err != nil { - continue + if mset.cfg.Duplicates > time.Duration(0) { + // We have some messages. Lookup starting sequence by duplicate time window. + sseq := mset.store.GetSeqFromTime(time.Now().Add(-mset.cfg.Duplicates)) + if sseq == 0 { + return } - var msgId string - if len(sm.hdr) > 0 { - if msgId = getMsgId(sm.hdr); msgId != _EMPTY_ { - mset.storeMsgIdLocked(&ddentry{msgId, sm.seq, sm.ts}) + + var smv StoreMsg + var state StreamState + mset.store.FastState(&state) + + for seq := sseq; seq <= state.LastSeq; seq++ { + sm, err := mset.store.LoadMsg(seq, &smv) + if err != nil { + continue + } + var msgId string + if len(sm.hdr) > 0 { + if msgId = getMsgId(sm.hdr); msgId != _EMPTY_ { + mset.storeMsgIdLocked(&ddentry{msgId, sm.seq, sm.ts}) + } + } + if seq == state.LastSeq { + mset.lmsgId = msgId } - } - if seq == state.LastSeq { - mset.lmsgId = msgId } } } @@ -1173,7 +1175,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi if cfg.MaxConsumers == 0 { cfg.MaxConsumers = -1 } - if cfg.Duplicates == 0 && cfg.Mirror == nil { + if cfg.Duplicates == 0 && cfg.Mirror == nil && len(cfg.Sources) == 0 { maxWindow := StreamDefaultDuplicatesWindow if lim.Duplicates > 0 && maxWindow > lim.Duplicates { maxWindow = lim.Duplicates @@ -3789,13 +3791,15 @@ func (mset *stream) storeMsgId(dde *ddentry) { // storeMsgIdLocked will store the message id for duplicate detection. // Lock should he held. func (mset *stream) storeMsgIdLocked(dde *ddentry) { - if mset.ddmap == nil { - mset.ddmap = make(map[string]*ddentry) - } - mset.ddmap[dde.id] = dde - mset.ddarr = append(mset.ddarr, dde) - if mset.ddtmr == nil { - mset.ddtmr = time.AfterFunc(mset.cfg.Duplicates, mset.purgeMsgIds) + if mset.cfg.Duplicates > time.Duration(0) { + if mset.ddmap == nil { + mset.ddmap = make(map[string]*ddentry) + } + mset.ddmap[dde.id] = dde + mset.ddarr = append(mset.ddarr, dde) + if mset.ddtmr == nil { + mset.ddtmr = time.AfterFunc(mset.cfg.Duplicates, mset.purgeMsgIds) + } } }