From db96238ad9b18c9930e63c404d1ae288f8855374 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-No=C3=ABl=20Moyne?= Date: Fri, 1 Sep 2023 12:47:14 -0700 Subject: [PATCH 1/6] Enables 0s deduplication window duration when the stream has sources (#4476) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - [X] Link to issue, e.g. `Resolves #NNN` - [X] Branch rebased on top of current main (`git pull --rebase origin main`) - [X] Changes squashed to a single commit (described [here](http://gitready.com/advanced/2009/02/10/squashing-commits-with-rebase.html)) - [X] Build is green in Travis CI - [X] You have certified that the contribution is your original work and that you license the work to the project under the [Apache 2 license](https://github.com/nats-io/nats-server/blob/main/LICENSE) Resolves #4459 Allows the user to set the deduplication window duration to 0s when the stream has sources defined. Remember that if the stream in question is also listening on subjects as well as sourcing the deduplication window is the same for sourced and listened messages. --------- Signed-off-by: Jean-Noël Moyne --- server/stream.go | 62 ++++++++++++++++++++++++++---------------------- 1 file changed, 33 insertions(+), 29 deletions(-) diff --git a/server/stream.go b/server/stream.go index fb792845..bfe06b23 100644 --- a/server/stream.go +++ b/server/stream.go @@ -805,29 +805,31 @@ func (mset *stream) rebuildDedupe() { mset.ddloaded = true - // We have some messages. Lookup starting sequence by duplicate time window. - sseq := mset.store.GetSeqFromTime(time.Now().Add(-mset.cfg.Duplicates)) - if sseq == 0 { - return - } - - var smv StoreMsg - var state StreamState - mset.store.FastState(&state) - - for seq := sseq; seq <= state.LastSeq; seq++ { - sm, err := mset.store.LoadMsg(seq, &smv) - if err != nil { - continue + if mset.cfg.Duplicates > time.Duration(0) { + // We have some messages. Lookup starting sequence by duplicate time window. + sseq := mset.store.GetSeqFromTime(time.Now().Add(-mset.cfg.Duplicates)) + if sseq == 0 { + return } - var msgId string - if len(sm.hdr) > 0 { - if msgId = getMsgId(sm.hdr); msgId != _EMPTY_ { - mset.storeMsgIdLocked(&ddentry{msgId, sm.seq, sm.ts}) + + var smv StoreMsg + var state StreamState + mset.store.FastState(&state) + + for seq := sseq; seq <= state.LastSeq; seq++ { + sm, err := mset.store.LoadMsg(seq, &smv) + if err != nil { + continue + } + var msgId string + if len(sm.hdr) > 0 { + if msgId = getMsgId(sm.hdr); msgId != _EMPTY_ { + mset.storeMsgIdLocked(&ddentry{msgId, sm.seq, sm.ts}) + } + } + if seq == state.LastSeq { + mset.lmsgId = msgId } - } - if seq == state.LastSeq { - mset.lmsgId = msgId } } } @@ -1023,7 +1025,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi if cfg.MaxConsumers == 0 { cfg.MaxConsumers = -1 } - if cfg.Duplicates == 0 && cfg.Mirror == nil { + if cfg.Duplicates == 0 && cfg.Mirror == nil && len(cfg.Sources) == 0 { maxWindow := StreamDefaultDuplicatesWindow if lim.Duplicates > 0 && maxWindow > lim.Duplicates { maxWindow = lim.Duplicates @@ -3486,13 +3488,15 @@ func (mset *stream) storeMsgId(dde *ddentry) { // storeMsgIdLocked will store the message id for duplicate detection. // Lock should he held. func (mset *stream) storeMsgIdLocked(dde *ddentry) { - if mset.ddmap == nil { - mset.ddmap = make(map[string]*ddentry) - } - mset.ddmap[dde.id] = dde - mset.ddarr = append(mset.ddarr, dde) - if mset.ddtmr == nil { - mset.ddtmr = time.AfterFunc(mset.cfg.Duplicates, mset.purgeMsgIds) + if mset.cfg.Duplicates > time.Duration(0) { + if mset.ddmap == nil { + mset.ddmap = make(map[string]*ddentry) + } + mset.ddmap[dde.id] = dde + mset.ddarr = append(mset.ddarr, dde) + if mset.ddtmr == nil { + mset.ddtmr = time.AfterFunc(mset.cfg.Duplicates, mset.purgeMsgIds) + } } } From bbf42c1f57617aec2e33fa271d4c603cd6faff8a Mon Sep 17 00:00:00 2001 From: Marco Primi Date: Fri, 1 Sep 2023 08:33:43 -0700 Subject: [PATCH 2/6] Use dynamic port number in benchmark --- server/configs/tls/tls-ed25519.conf | 2 +- server/configs/tls/tls-none.conf | 2 +- server/configs/tls/tls-rsa-1024.conf | 2 +- server/configs/tls/tls-rsa-2048.conf | 2 +- server/configs/tls/tls-rsa-4096.conf | 2 +- server/core_benchmarks_test.go | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/server/configs/tls/tls-ed25519.conf b/server/configs/tls/tls-ed25519.conf index 2f10f7dd..dcb8fd94 100644 --- a/server/configs/tls/tls-ed25519.conf +++ b/server/configs/tls/tls-ed25519.conf @@ -1,6 +1,6 @@ # Simple TLS (ed25519) config file -listen: 127.0.0.1:4443 +listen: 127.0.0.1:-1 tls { cert_file: "./configs/certs/tls/benchmark-server-cert-ed25519.pem" diff --git a/server/configs/tls/tls-none.conf b/server/configs/tls/tls-none.conf index c6cae62d..042bf4e0 100644 --- a/server/configs/tls/tls-none.conf +++ b/server/configs/tls/tls-none.conf @@ -1,4 +1,4 @@ # Simple config file -listen: 127.0.0.1:4443 +listen: 127.0.0.1:-1 diff --git a/server/configs/tls/tls-rsa-1024.conf b/server/configs/tls/tls-rsa-1024.conf index a5032fda..fb3aaa41 100644 --- a/server/configs/tls/tls-rsa-1024.conf +++ b/server/configs/tls/tls-rsa-1024.conf @@ -1,6 +1,6 @@ # Simple TLS (rsa-1024) config file -listen: 127.0.0.1:4443 +listen: 127.0.0.1:-1 tls { cert_file: "./configs/certs/tls/benchmark-server-cert-rsa-1024.pem" diff --git a/server/configs/tls/tls-rsa-2048.conf b/server/configs/tls/tls-rsa-2048.conf index 9e33b7d8..08f54a25 100644 --- a/server/configs/tls/tls-rsa-2048.conf +++ b/server/configs/tls/tls-rsa-2048.conf @@ -1,6 +1,6 @@ # Simple TLS (rsa-2048) config file -listen: 127.0.0.1:4443 +listen: 127.0.0.1:-1 tls { cert_file: "./configs/certs/tls/benchmark-server-cert-rsa-2048.pem" diff --git a/server/configs/tls/tls-rsa-4096.conf b/server/configs/tls/tls-rsa-4096.conf index a64ede8f..68ad841b 100644 --- a/server/configs/tls/tls-rsa-4096.conf +++ b/server/configs/tls/tls-rsa-4096.conf @@ -1,6 +1,6 @@ # Simple TLS (rsa-4096) config file -listen: 127.0.0.1:4443 +listen: 127.0.0.1:-1 tls { cert_file: "./configs/certs/tls/benchmark-server-cert-rsa-4096.pem" diff --git a/server/core_benchmarks_test.go b/server/core_benchmarks_test.go index 560cd149..707d267b 100644 --- a/server/core_benchmarks_test.go +++ b/server/core_benchmarks_test.go @@ -54,9 +54,9 @@ func BenchmarkRequestReplyOverEncryptedConnection(b *testing.B) { // run server with tls scheme server, _ := RunServerWithConfig(schemeConfig) - opts := defaultOpts defer server.Shutdown() + opts := defaultOpts if keyType != "none" { opts = append(opts, nats.Secure(&tls.Config{ InsecureSkipVerify: true, From 4eedfe8d0c6a462fb3b8e47a448e6e82994bc68f Mon Sep 17 00:00:00 2001 From: Marco Primi Date: Fri, 1 Sep 2023 08:33:52 -0700 Subject: [PATCH 3/6] Simplify core request/response benchmark - Remove TLS, impact is negligible for the amount of data pushed through - Rename benchmark --- server/core_benchmarks_test.go | 126 +++++++++++++-------------------- 1 file changed, 49 insertions(+), 77 deletions(-) diff --git a/server/core_benchmarks_test.go b/server/core_benchmarks_test.go index 707d267b..101d2038 100644 --- a/server/core_benchmarks_test.go +++ b/server/core_benchmarks_test.go @@ -15,7 +15,6 @@ package server import ( "crypto/rand" - "crypto/tls" "fmt" "testing" "time" @@ -23,95 +22,68 @@ import ( "github.com/nats-io/nats.go" ) -func BenchmarkRequestReplyOverEncryptedConnection(b *testing.B) { +func BenchmarkCoreRequestReply(b *testing.B) { const ( - subject = "test-subject" - configsBasePath = "./configs/tls" + subject = "test-subject" ) - // default TLS client connection options - defaultOpts := []nats.Option{} - - keyTypes := []string{ - "none", - "ed25519", - "rsa-1024", - "rsa-2048", - "rsa-4096", - } - payloadSzs := []int64{ + messageSizes := []int64{ 1024, // 1kb 4096, // 4kb 40960, // 40kb 409600, // 400kb } - for _, keyType := range keyTypes { - schemeConfig := fmt.Sprintf("%s/tls-%s.conf", configsBasePath, keyType) - b.Run(fmt.Sprintf("keyType=%s", keyType), func(b *testing.B) { - for _, payloadSz := range payloadSzs { - b.Run(fmt.Sprintf("payloadSz=%db", payloadSz), func(b *testing.B) { + for _, messageSize := range messageSizes { + b.Run(fmt.Sprintf("msgSz=%db", messageSize), func(b *testing.B) { - // run server with tls scheme - server, _ := RunServerWithConfig(schemeConfig) - defer server.Shutdown() + // Start server + serverOpts := DefaultOptions() + server := RunServer(serverOpts) + defer server.Shutdown() - opts := defaultOpts - if keyType != "none" { - opts = append(opts, nats.Secure(&tls.Config{ - InsecureSkipVerify: true, - })) - } + clientUrl := server.ClientURL() - // default client url - clientUrl := server.ClientURL() - - // subscriber - ncSub, err := nats.Connect(clientUrl, opts...) - if err != nil { - b.Fatal(err) - } - defer ncSub.Close() - sub, err := ncSub.Subscribe(subject, func(msg *nats.Msg) { - // Responder echoes the request payload as-is - msg.Respond(msg.Data) - }) - defer sub.Unsubscribe() - if err != nil { - b.Fatal(err) - } - - // publisher - ncPub, err := nats.Connect(clientUrl, opts...) - if err != nil { - b.Fatal(err) - } - defer ncPub.Close() - - var errors = 0 - - // random bytes as payload - b.SetBytes(payloadSz) - payload := make([]byte, payloadSz) - rand.Read(payload) - - // start benchmark - b.ResetTimer() - - for i := 0; i < b.N; i++ { - _, err := ncPub.Request(subject, payload, time.Second) - if err != nil { - errors++ - } - } - - // stop benchmark - b.StopTimer() - - b.ReportMetric(float64(errors), "errors") - }) + // Create "echo" subscriber + ncSub, err := nats.Connect(clientUrl) + if err != nil { + b.Fatal(err) } + defer ncSub.Close() + sub, err := ncSub.Subscribe(subject, func(msg *nats.Msg) { + // Responder echoes the request payload as-is + msg.Respond(msg.Data) + }) + defer sub.Unsubscribe() + if err != nil { + b.Fatal(err) + } + + // Create publisher + ncPub, err := nats.Connect(clientUrl) + if err != nil { + b.Fatal(err) + } + defer ncPub.Close() + + var errors = 0 + + // Create message (reused for all requests) + messageData := make([]byte, messageSize) + b.SetBytes(messageSize) + rand.Read(messageData) + + // Benchmark + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := ncPub.Request(subject, messageData, time.Second) + if err != nil { + errors++ + } + } + b.StopTimer() + + b.ReportMetric(float64(errors), "errors") }) } - } From e61e40e3febf437c1fc102f0009700c8c6f78d7b Mon Sep 17 00:00:00 2001 From: Marco Primi Date: Fri, 1 Sep 2023 11:07:16 -0700 Subject: [PATCH 4/6] Add benchmark for TLS content encryption overhead --- server/core_benchmarks_test.go | 162 +++++++++++++++++++++++++++++++++ 1 file changed, 162 insertions(+) diff --git a/server/core_benchmarks_test.go b/server/core_benchmarks_test.go index 101d2038..1ecb594e 100644 --- a/server/core_benchmarks_test.go +++ b/server/core_benchmarks_test.go @@ -15,7 +15,11 @@ package server import ( "crypto/rand" + "crypto/tls" + "errors" "fmt" + "os" + "sync" "testing" "time" @@ -87,3 +91,161 @@ func BenchmarkCoreRequestReply(b *testing.B) { }) } } + +func BenchmarkCoreTLSFanOut(b *testing.B) { + const ( + subject = "test-subject" + configsBasePath = "./configs/tls" + maxPendingMessages = 25 + maxPendingBytes = 15 * 1024 * 1024 // 15MiB + ) + + keyTypeCases := []string{ + "none", + "ed25519", + "rsa-1024", + "rsa-2048", + "rsa-4096", + } + messageSizeCases := []int64{ + 512 * 1024, // 512Kib + } + numSubsCases := []int{ + 5, + } + + // Custom error handler that ignores ErrSlowConsumer. + // Lots of them are expected in this benchmark which indiscriminately publishes at a rate higher + // than what the server can fan-out to subscribers. + ignoreSlowConsumerErrorHandler := func(conn *nats.Conn, s *nats.Subscription, err error) { + if errors.Is(err, nats.ErrSlowConsumer) { + // Swallow this error + } else { + _, _ = fmt.Fprintf(os.Stderr, "Warning: %s\n", err) + } + } + + for _, keyType := range keyTypeCases { + + b.Run( + fmt.Sprintf("keyType=%s", keyType), + func(b *testing.B) { + + for _, messageSize := range messageSizeCases { + b.Run( + fmt.Sprintf("msgSz=%db", messageSize), + func(b *testing.B) { + + for _, numSubs := range numSubsCases { + b.Run( + fmt.Sprintf("subs=%d", numSubs), + func(b *testing.B) { + // Start server + configPath := fmt.Sprintf("%s/tls-%s.conf", configsBasePath, keyType) + server, _ := RunServerWithConfig(configPath) + defer server.Shutdown() + + opts := []nats.Option{ + nats.MaxReconnects(-1), + nats.ReconnectWait(0), + nats.ErrorHandler(ignoreSlowConsumerErrorHandler), + } + + if keyType != "none" { + opts = append(opts, nats.Secure(&tls.Config{ + InsecureSkipVerify: true, + })) + } + + clientUrl := server.ClientURL() + + // Count of messages received for by each subscriber + counters := make([]int, numSubs) + + // Wait group for subscribers to signal they received b.N messages + var wg sync.WaitGroup + wg.Add(numSubs) + + // Create subscribers + for i := 0; i < numSubs; i++ { + subIndex := i + ncSub, err := nats.Connect(clientUrl, opts...) + if err != nil { + b.Fatal(err) + } + defer ncSub.Close() + sub, err := ncSub.Subscribe(subject, func(msg *nats.Msg) { + counters[subIndex] += 1 + if counters[subIndex] == b.N { + wg.Done() + } + }) + if err != nil { + b.Fatalf("failed to subscribe: %s", err) + } + err = sub.SetPendingLimits(maxPendingMessages, maxPendingBytes) + if err != nil { + b.Fatalf("failed to set pending limits: %s", err) + } + defer sub.Unsubscribe() + if err != nil { + b.Fatal(err) + } + } + + // publisher + ncPub, err := nats.Connect(clientUrl, opts...) + if err != nil { + b.Fatal(err) + } + defer ncPub.Close() + + var errorCount = 0 + + // random bytes as payload + messageData := make([]byte, messageSize) + rand.Read(messageData) + + quitCh := make(chan bool, 1) + + publish := func() { + for { + select { + case <-quitCh: + return + default: + // continue publishing + } + + err := ncPub.Publish(subject, messageData) + if err != nil { + errorCount += 1 + } + } + } + + // Set bytes per operation + b.SetBytes(messageSize) + // Start the clock + b.ResetTimer() + // Start publishing as fast as the server allows + go publish() + // Wait for all subscribers to have delivered b.N messages + wg.Wait() + // Stop the clock + b.StopTimer() + + // Stop publisher + quitCh <- true + + b.ReportMetric(float64(errorCount), "errors") + }, + ) + } + }, + ) + } + }, + ) + } +} From 2c81224262744738ac0829c96e012bd8270233c9 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 2 Sep 2023 11:43:08 -0700 Subject: [PATCH 5/6] Fixed interface conversion for ipQueue in monitor which caused panics. Signed-off-by: Derek Collison --- server/monitor.go | 13 ++++++++----- server/monitor_test.go | 19 +++++++++++++++++++ 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/server/monitor.go b/server/monitor.go index 407a028a..d279abb7 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1120,14 +1120,17 @@ func (s *Server) HandleIPQueuesz(w http.ResponseWriter, r *http.Request) { queues := map[string]monitorIPQueue{} - s.ipQueues.Range(func(k, v interface{}) bool { + s.ipQueues.Range(func(k, v any) bool { + var pending, inProgress int name := k.(string) - queue := v.(interface { + queue, ok := v.(interface { len() int - inProgress() uint64 + inProgress() int64 }) - pending := queue.len() - inProgress := int(queue.inProgress()) + if ok { + pending = queue.len() + inProgress = int(queue.inProgress()) + } if !all && (pending == 0 && inProgress == 0) { return true } else if qfilter != _EMPTY_ && !strings.Contains(name, qfilter) { diff --git a/server/monitor_test.go b/server/monitor_test.go index d3bd215e..d5cd9ff6 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -4981,3 +4981,22 @@ func TestHealthzStatusUnavailable(t *testing.T) { checkHealthzEndpoint(t, s.MonitorAddr().String(), http.StatusServiceUnavailable, "unavailable") } + +// When we converted ipq to use generics we still were using sync.Map. Currently you can not convert +// interface{} or any to a generic parameterized type. So this stopped working and panics. +func TestIpqzWithGenerics(t *testing.T) { + opts := DefaultMonitorOptions() + opts.JetStream = true + + s := RunServer(opts) + defer s.Shutdown() + + url := fmt.Sprintf("http://%s/ipqueuesz?all=1", s.MonitorAddr().String()) + body := readBody(t, url) + require_True(t, len(body) > 0) + + queues := map[string]*monitorIPQueue{} + require_NoError(t, json.Unmarshal(body, &queues)) + require_True(t, len(queues) >= 4) + require_True(t, queues["SendQ"] != nil) +} From b63318c0c96501b71adf0c01520f55b1b515fe89 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 2 Sep 2023 12:00:39 -0700 Subject: [PATCH 6/6] Bump to 2.9.22-RC.5 Signed-off-by: Derek Collison --- server/const.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/const.go b/server/const.go index ccf42691..a46b5251 100644 --- a/server/const.go +++ b/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.9.22-RC.4" + VERSION = "2.9.22-RC.5" // PROTO is the currently supported protocol. // 0 was the original