diff --git a/server/accounts_test.go b/server/accounts_test.go index 4d3f7836..c6b5d7cf 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -213,7 +213,7 @@ func TestAccountIsolationExportImport(t *testing.T) { // Setup NATS server. s := opTrustBasicSetup() defer s.Shutdown() - go s.Start() + s.Start() if err := s.readyForConnections(5 * time.Second); err != nil { t.Fatal(err) } @@ -1677,7 +1677,7 @@ func TestAccountRequestReplyTrackLatency(t *testing.T) { defer s.Shutdown() // Run server in Go routine. We need this one running for internal sending of msgs. - go s.Start() + s.Start() // Wait for accept loop(s) to be started if err := s.readyForConnections(10 * time.Second); err != nil { t.Fatal(err) diff --git a/server/const.go b/server/const.go index 4ea01acc..e26e6971 100644 --- a/server/const.go +++ b/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.10.0-beta.33" + VERSION = "2.10.0-beta.34" // PROTO is the currently supported protocol. // 0 was the original diff --git a/server/consumer.go b/server/consumer.go index 4bf3a922..65e00511 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -677,7 +677,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri mset.mu.Lock() if mset.client == nil || mset.store == nil || mset.consumers == nil { mset.mu.Unlock() - return nil, errors.New("invalid stream") + return nil, NewJSStreamInvalidError() } // If this one is durable and already exists, we let that be ok as long as only updating what should be allowed. @@ -4674,6 +4674,13 @@ func (o *consumer) delete() error { return o.stopWithFlags(true, false, true, true) } +// To test for closed state. +func (o *consumer) isClosed() bool { + o.mu.RLock() + defer o.mu.RUnlock() + return o.closed +} + func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error { o.mu.Lock() js := o.js diff --git a/server/jetstream.go b/server/jetstream.go index 647e2e38..1a7b88ad 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1,4 +1,4 @@ -// Copyright 2019-2022 The NATS Authors +// Copyright 2019-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -110,11 +110,14 @@ type jetStream struct { started time.Time // System level request to purge a stream move - accountPurge *subscription + accountPurge *subscription + + // Some bools regarding general state. metaRecovering bool standAlone bool disabled bool oos bool + shuttingDown bool } type remoteUsage struct { @@ -859,6 +862,13 @@ func (s *Server) signalPullConsumers() { } } +// Helper for determining if we are shutting down. +func (js *jetStream) isShuttingDown() bool { + js.mu.RLock() + defer js.mu.RUnlock() + return js.shuttingDown +} + // Shutdown jetstream for this server. func (s *Server) shutdownJetStream() { s.mu.RLock() @@ -891,6 +901,8 @@ func (s *Server) shutdownJetStream() { } accPurgeSub := js.accountPurge js.accountPurge = nil + // Signal we are shutting down. + js.shuttingDown = true js.mu.Unlock() if accPurgeSub != nil { diff --git a/server/jetstream_benchmark_consume_test.go b/server/jetstream_benchmark_consume_test.go deleted file mode 100644 index f2af0ef2..00000000 --- a/server/jetstream_benchmark_consume_test.go +++ /dev/null @@ -1,404 +0,0 @@ -// Copyright 2022 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build !skip_js_tests && !skip_js_cluster_tests && !skip_js_cluster_tests_2 -// +build !skip_js_tests,!skip_js_cluster_tests,!skip_js_cluster_tests_2 - -package server - -import ( - "fmt" - "math/rand" - "testing" - "time" - - "github.com/nats-io/nats.go" -) - -func BenchmarkJetStreamConsume(b *testing.B) { - - const ( - verbose = false - streamName = "S" - subject = "s" - seed = 12345 - publishTimeout = 30 * time.Second - ) - - runSyncPushConsumer := func(b *testing.B, js nats.JetStreamContext, streamName, subject string) (int, int, int) { - const nextMsgTimeout = 3 * time.Second - - subOpts := []nats.SubOpt{ - nats.BindStream(streamName), - } - sub, err := js.SubscribeSync("", subOpts...) - if err != nil { - b.Fatalf("Failed to subscribe: %v", err) - } - defer sub.Unsubscribe() - - bitset := NewBitset(uint64(b.N)) - uniqueConsumed, duplicates, errors := 0, 0, 0 - - b.ResetTimer() - - for uniqueConsumed < b.N { - msg, err := sub.NextMsg(nextMsgTimeout) - if err != nil { - b.Fatalf("No more messages (received: %d/%d)", uniqueConsumed, b.N) - } - - metadata, mdErr := msg.Metadata() - if mdErr != nil { - errors++ - continue - } - - ackErr := msg.Ack() - if ackErr != nil { - errors++ - continue - } - - seq := metadata.Sequence.Stream - - index := seq - 1 - if bitset.get(index) { - duplicates++ - continue - } - - uniqueConsumed++ - bitset.set(index, true) - b.SetBytes(int64(len(msg.Data))) - - if verbose && uniqueConsumed%1000 == 0 { - b.Logf("Consumed: %d/%d", bitset.count(), b.N) - } - } - - b.StopTimer() - - return uniqueConsumed, duplicates, errors - } - - runAsyncPushConsumer := func(b *testing.B, js nats.JetStreamContext, streamName, subject string, ordered, durable bool) (int, int, int) { - const timeout = 3 * time.Minute - bitset := NewBitset(uint64(b.N)) - doneCh := make(chan bool, 1) - uniqueConsumed, duplicates, errors := 0, 0, 0 - - handleMsg := func(msg *nats.Msg) { - metadata, mdErr := msg.Metadata() - if mdErr != nil { - // fmt.Printf("Metadata error: %v\n", mdErr) - errors++ - return - } - - // Ordered defaults to AckNone policy, don't try to ACK - if !ordered { - ackErr := msg.Ack() - if ackErr != nil { - // fmt.Printf("Ack error: %v\n", ackErr) - errors++ - return - } - } - - seq := metadata.Sequence.Stream - - index := seq - 1 - if bitset.get(index) { - duplicates++ - return - } - - uniqueConsumed++ - bitset.set(index, true) - b.SetBytes(int64(len(msg.Data))) - - if uniqueConsumed == b.N { - msg.Sub.Unsubscribe() - doneCh <- true - } - if verbose && uniqueConsumed%1000 == 0 { - b.Logf("Consumed %d/%d", uniqueConsumed, b.N) - } - } - - subOpts := []nats.SubOpt{ - nats.BindStream(streamName), - } - - if ordered { - subOpts = append(subOpts, nats.OrderedConsumer()) - } - - if durable { - subOpts = append(subOpts, nats.Durable("c")) - } - - sub, err := js.Subscribe("", handleMsg, subOpts...) - if err != nil { - b.Fatalf("Failed to subscribe: %v", err) - } - defer sub.Unsubscribe() - - b.ResetTimer() - - select { - case <-doneCh: - b.StopTimer() - case <-time.After(timeout): - b.Fatalf("Timeout, %d/%d received, %d errors", uniqueConsumed, b.N, errors) - } - - return uniqueConsumed, duplicates, errors - } - - runPullConsumer := func(b *testing.B, js nats.JetStreamContext, streamName, subject string, durable bool) (int, int, int) { - const fetchMaxWait = nats.MaxWait(3 * time.Second) - const fetchMaxMessages = 1000 - - bitset := NewBitset(uint64(b.N)) - uniqueConsumed, duplicates, errors := 0, 0, 0 - - subOpts := []nats.SubOpt{ - nats.BindStream(streamName), - } - - consumerName := "" // Default ephemeral - if durable { - consumerName = "c" // Durable - } - - sub, err := js.PullSubscribe("", consumerName, subOpts...) - if err != nil { - b.Fatalf("Failed to subscribe: %v", err) - } - defer sub.Unsubscribe() - - b.ResetTimer() - - fetchLoop: - for { - msgs, err := sub.Fetch(fetchMaxMessages, fetchMaxWait) - if err != nil { - b.Fatalf("Failed to fetch: %v", err) - } - - processMsgsLoop: - for _, msg := range msgs { - metadata, mdErr := msg.Metadata() - if mdErr != nil { - errors++ - continue processMsgsLoop - } - - ackErr := msg.Ack() - if ackErr != nil { - errors++ - continue processMsgsLoop - } - - seq := metadata.Sequence.Stream - - index := seq - 1 - if bitset.get(index) { - duplicates++ - continue processMsgsLoop - } - - uniqueConsumed++ - bitset.set(index, true) - b.SetBytes(int64(len(msg.Data))) - - if uniqueConsumed == b.N { - msg.Sub.Unsubscribe() - break fetchLoop - } - - if verbose && uniqueConsumed%1000 == 0 { - b.Logf("Consumed %d/%d", uniqueConsumed, b.N) - } - } - } - - b.StopTimer() - - return uniqueConsumed, duplicates, errors - } - - type ConsumerType string - const ( - PushSync ConsumerType = "PUSH[Sync,Ephemeral]" - PushAsync ConsumerType = "PUSH[Async,Ephemeral]" - PushAsyncOrdered ConsumerType = "PUSH[Async,Ordered]" - PushAsyncDurable ConsumerType = "PUSH[Async,Durable]" - PullDurable ConsumerType = "PULL[Durable]" - PullEphemeral ConsumerType = "PULL[Ephemeral]" - ) - - benchmarksCases := []struct { - clusterSize int - replicas int - messageSize int - minMessages int - }{ - {1, 1, 10, 100_000}, // Single node, 10B messages, ~1MiB minimum - {1, 1, 1024, 1_000}, // Single node, 1KB messages, ~1MiB minimum - {3, 3, 10, 100_000}, // Cluster, R3, 10B messages, ~1MiB minimum - {3, 3, 1024, 1_000}, // Cluster, R3, 1KB messages, ~1MiB minimum - } - - //Each of the cases above is run with each of the consumer types - consumerTypes := []ConsumerType{ - PushSync, - PushAsync, - PushAsyncOrdered, - PushAsyncDurable, - PullDurable, - PullEphemeral, - } - - for _, bc := range benchmarksCases { - - name := fmt.Sprintf( - "N=%d,R=%d,MsgSz=%db", - bc.clusterSize, - bc.replicas, - bc.messageSize, - ) - - b.Run( - name, - func(b *testing.B) { - - for _, ct := range consumerTypes { - name := fmt.Sprintf( - "%v", - ct, - ) - b.Run( - name, - func(b *testing.B) { - // Skip short runs, benchmark gets re-executed with a larger N - if b.N < bc.minMessages { - b.ResetTimer() - return - } - - if verbose { - b.Logf("Running %s with %d messages", name, b.N) - } - - if verbose { - b.Logf("Setting up %d nodes", bc.clusterSize) - } - var connectURL string - if bc.clusterSize == 1 { - s := RunBasicJetStreamServer(b) - defer s.Shutdown() - connectURL = s.ClientURL() - } else { - cl := createJetStreamClusterExplicit(b, "BENCH_PUB", bc.clusterSize) - defer cl.shutdown() - cl.waitOnClusterReadyWithNumPeers(bc.clusterSize) - cl.waitOnLeader() - connectURL = cl.randomServer().ClientURL() - } - - nc, js := jsClientConnectURL(b, connectURL) - defer nc.Close() - - if verbose { - b.Logf("Creating stream with R=%d", bc.replicas) - } - streamConfig := &nats.StreamConfig{ - Name: streamName, - Subjects: []string{subject}, - Replicas: bc.replicas, - } - if _, err := js.AddStream(streamConfig); err != nil { - b.Fatalf("Error creating stream: %v", err) - } - - rng := rand.New(rand.NewSource(int64(seed))) - message := make([]byte, bc.messageSize) - publishedCount := 0 - for publishedCount < b.N { - rng.Read(message) - _, err := js.PublishAsync(subject, message) - if err != nil { - continue - } else { - publishedCount++ - } - } - - select { - case <-js.PublishAsyncComplete(): - if verbose { - b.Logf("Published %d messages", b.N) - } - case <-time.After(publishTimeout): - b.Fatalf("Publish timed out") - } - - // Discard time spent during setup - // Consumer may reset again further in - b.ResetTimer() - - var consumed, duplicates, errors int - - const ( - ordered = true - unordered = false - durable = true - ephemeral = false - ) - - switch ct { - case PushSync: - consumed, duplicates, errors = runSyncPushConsumer(b, js, streamName, subject) - case PushAsync: - consumed, duplicates, errors = runAsyncPushConsumer(b, js, streamName, subject, unordered, ephemeral) - case PushAsyncOrdered: - consumed, duplicates, errors = runAsyncPushConsumer(b, js, streamName, subject, ordered, ephemeral) - case PushAsyncDurable: - consumed, duplicates, errors = runAsyncPushConsumer(b, js, streamName, subject, unordered, durable) - case PullDurable: - consumed, duplicates, errors = runPullConsumer(b, js, streamName, subject, durable) - case PullEphemeral: - consumed, duplicates, errors = runPullConsumer(b, js, streamName, subject, ephemeral) - default: - b.Fatalf("Unknown consumer type: %v", ct) - } - - // Benchmark ends here, (consumer may have stopped earlier) - b.StopTimer() - - if consumed != b.N { - b.Fatalf("Something doesn't add up: %d != %d", consumed, b.N) - } - - b.ReportMetric(float64(duplicates)*100/float64(b.N), "%dupe") - b.ReportMetric(float64(errors)*100/float64(b.N), "%error") - }, - ) - } - }, - ) - } -} diff --git a/server/jetstream_benchmark_kv_test.go b/server/jetstream_benchmark_kv_test.go deleted file mode 100644 index f89f686c..00000000 --- a/server/jetstream_benchmark_kv_test.go +++ /dev/null @@ -1,265 +0,0 @@ -// Copyright 2022 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build !skip_js_tests && !skip_js_cluster_tests && !skip_js_cluster_tests_2 -// +build !skip_js_tests,!skip_js_cluster_tests,!skip_js_cluster_tests_2 - -package server - -import ( - "fmt" - "math/rand" - "testing" - - "github.com/nats-io/nats.go" -) - -func BenchmarkJetStreamKV(b *testing.B) { - - const ( - verbose = false - kvNamePrefix = "B_" - keyPrefix = "K_" - seed = 12345 - minOps = 1_000 - ) - - runKVGet := func(b *testing.B, kvs []nats.KeyValue, keys []string) int { - rng := rand.New(rand.NewSource(int64(seed))) - errors := 0 - - b.ResetTimer() - - for i := 1; i <= b.N; i++ { - kv := kvs[rng.Intn(len(kvs))] - key := keys[rng.Intn(len(keys))] - kve, err := kv.Get(key) - if err != nil { - errors++ - continue - } - - b.SetBytes(int64(len(kve.Value()))) - - if verbose && i%1000 == 0 { - b.Logf("Completed %d/%d Get ops", i, b.N) - } - } - - b.StopTimer() - return errors - } - - runKVPut := func(b *testing.B, kvs []nats.KeyValue, keys []string, valueSize int) int { - rng := rand.New(rand.NewSource(int64(seed))) - value := make([]byte, valueSize) - errors := 0 - - b.ResetTimer() - - for i := 1; i <= b.N; i++ { - kv := kvs[rng.Intn(len(kvs))] - key := keys[rng.Intn(len(keys))] - rng.Read(value) - _, err := kv.Put(key, value) - if err != nil { - errors++ - continue - } - - b.SetBytes(int64(valueSize)) - - if verbose && i%1000 == 0 { - b.Logf("Completed %d/%d Put ops", i, b.N) - } - } - - b.StopTimer() - return errors - } - - runKVUpdate := func(b *testing.B, kvs []nats.KeyValue, keys []string, valueSize int) int { - rng := rand.New(rand.NewSource(int64(seed))) - value := make([]byte, valueSize) - errors := 0 - - b.ResetTimer() - - for i := 1; i <= b.N; i++ { - kv := kvs[rng.Intn(len(kvs))] - key := keys[rng.Intn(len(keys))] - - kve, getErr := kv.Get(key) - if getErr != nil { - errors++ - continue - } - - rng.Read(value) - _, updateErr := kv.Update(key, value, kve.Revision()) - if updateErr != nil { - errors++ - continue - } - - b.SetBytes(int64(valueSize)) - - if verbose && i%1000 == 0 { - b.Logf("Completed %d/%d Update ops", i, b.N) - } - } - - b.StopTimer() - return errors - } - - type WorkloadType string - const ( - Get WorkloadType = "GET" - Put WorkloadType = "PUT" - Update WorkloadType = "CAS" - ) - - benchmarksCases := []struct { - clusterSize int - replicas int - numBuckets int - numKeys int - valueSize int - }{ - {1, 1, 1, 100, 100}, // 1 node, 1 bucket with 100 keys, 100B values - {1, 1, 10, 1000, 100}, // 1 node, 10 buckets with 1000 keys, 100B values - {3, 3, 1, 100, 100}, // 3 nodes, 1 bucket with 100 keys, 100B values - {3, 3, 10, 1000, 100}, // 3 nodes, 10 buckets with 1000 keys, 100B values - {3, 3, 10, 1000, 1024}, // 3 nodes, 10 buckets with 1000 keys, 1KB values - } - - workloadCases := []WorkloadType{ - Get, - Put, - Update, - } - - for _, bc := range benchmarksCases { - - bName := fmt.Sprintf( - "N=%d,R=%d,B=%d,K=%d,ValSz=%db", - bc.clusterSize, - bc.replicas, - bc.numBuckets, - bc.numKeys, - bc.valueSize, - ) - - b.Run( - bName, - func(b *testing.B) { - for _, wc := range workloadCases { - wName := fmt.Sprintf("%v", wc) - b.Run( - wName, - func(b *testing.B) { - // Skip short runs, benchmark gets re-executed with a larger N - if b.N < minOps { - b.ResetTimer() - return - } - - if verbose { - b.Logf("Running %s workload %s with %d messages", wName, bName, b.N) - } - - if verbose { - b.Logf("Setting up %d nodes", bc.clusterSize) - } - var connectURL string - if bc.clusterSize == 1 { - s := RunBasicJetStreamServer(b) - defer s.Shutdown() - connectURL = s.ClientURL() - } else { - cl := createJetStreamClusterExplicit(b, "BENCH_KV", bc.clusterSize) - defer cl.shutdown() - cl.waitOnClusterReadyWithNumPeers(bc.clusterSize) - cl.waitOnLeader() - connectURL = cl.randomServer().ClientURL() - } - - nc, js := jsClientConnectURL(b, connectURL) - defer nc.Close() - - // Pre-generate all keys - keys := make([]string, 0, bc.numKeys) - for i := 1; i <= bc.numKeys; i++ { - key := fmt.Sprintf("%s%d", keyPrefix, i) - keys = append(keys, key) - } - - // Initialize all KVs - kvs := make([]nats.KeyValue, 0, bc.numBuckets) - for i := 1; i <= bc.numBuckets; i++ { - // Create bucket - kvName := fmt.Sprintf("%s%d", kvNamePrefix, i) - if verbose { - b.Logf("Creating KV %s with R=%d", kvName, bc.replicas) - } - kvConfig := &nats.KeyValueConfig{ - Bucket: kvName, - Replicas: bc.replicas, - } - kv, err := js.CreateKeyValue(kvConfig) - if err != nil { - b.Fatalf("Error creating KV: %v", err) - } - kvs = append(kvs, kv) - - // Initialize all keys - rng := rand.New(rand.NewSource(int64(seed * i))) - value := make([]byte, bc.valueSize) - for _, key := range keys { - rng.Read(value) - _, err := kv.Create(key, value) - if err != nil { - b.Fatalf("Failed to initialize %s/%s: %v", kvName, key, err) - } - } - } - - // Discard time spent during setup - // May reset again further in - b.ResetTimer() - - var errors int - - switch wc { - case Get: - errors = runKVGet(b, kvs, keys) - case Put: - errors = runKVPut(b, kvs, keys, bc.valueSize) - case Update: - errors = runKVUpdate(b, kvs, keys, bc.valueSize) - default: - b.Fatalf("Unknown workload type: %v", wc) - } - - // Benchmark ends here, (may have stopped earlier) - b.StopTimer() - - b.ReportMetric(float64(errors)*100/float64(b.N), "%error") - }, - ) - } - }, - ) - } -} diff --git a/server/jetstream_benchmark_publish_test.go b/server/jetstream_benchmark_publish_test.go deleted file mode 100644 index c1f3427a..00000000 --- a/server/jetstream_benchmark_publish_test.go +++ /dev/null @@ -1,274 +0,0 @@ -// Copyright 2022 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build !skip_js_tests && !skip_js_cluster_tests && !skip_js_cluster_tests_2 -// +build !skip_js_tests,!skip_js_cluster_tests,!skip_js_cluster_tests_2 - -package server - -import ( - "fmt" - "math/rand" - "testing" - "time" - - "github.com/nats-io/nats.go" -) - -func BenchmarkJetStreamPublish(b *testing.B) { - - const ( - verbose = false - seed = 12345 - ) - - runSyncPublisher := func(b *testing.B, js nats.JetStreamContext, messageSize int, subjects []string) (int, int) { - published, errors := 0, 0 - rng := rand.New(rand.NewSource(int64(seed))) - message := make([]byte, messageSize) - - b.ResetTimer() - - for i := 1; i <= b.N; i++ { - rng.Read(message) // TODO may skip this? - subject := subjects[rng.Intn(len(subjects))] - _, pubErr := js.Publish(subject, message) - if pubErr != nil { - errors++ - } else { - published++ - b.SetBytes(int64(messageSize)) - } - - if verbose && i%1000 == 0 { - b.Logf("Published %d/%d, %d errors", i, b.N, errors) - } - } - - b.StopTimer() - - return published, errors - } - - runAsyncPublisher := func(b *testing.B, js nats.JetStreamContext, messageSize int, subjects []string, asyncWindow int) (int, int) { - const publishCompleteMaxWait = 30 * time.Second - rng := rand.New(rand.NewSource(int64(seed))) - message := make([]byte, messageSize) - pending := make([]nats.PubAckFuture, 0, asyncWindow) - published, errors := 0, 0 - - b.ResetTimer() - - for i := 1; i <= b.N; i++ { - rng.Read(message) // TODO may skip this? - subject := subjects[rng.Intn(len(subjects))] - pubAckFuture, err := js.PublishAsync(subject, message) - if err != nil { - errors++ - continue - } - pending = append(pending, pubAckFuture) - - // Regularly trim the list of pending - if i%asyncWindow == 0 { - newPending := make([]nats.PubAckFuture, 0, asyncWindow) - for _, pubAckFuture := range pending { - select { - case <-pubAckFuture.Ok(): - published++ - b.SetBytes(int64(messageSize)) - case <-pubAckFuture.Err(): - errors++ - default: - // This pubAck is still pending, keep it - newPending = append(newPending, pubAckFuture) - } - } - pending = newPending - } - - if verbose && i%1000 == 0 { - b.Logf("Published %d/%d, %d errors", i, b.N, errors) - } - } - - // All published, wait for completed - select { - case <-js.PublishAsyncComplete(): - case <-time.After(publishCompleteMaxWait): - b.Fatalf("Publish timed out") - } - - // Clear whatever is left pending - for _, pubAckFuture := range pending { - select { - case <-pubAckFuture.Ok(): - published++ - b.SetBytes(int64(messageSize)) - case <-pubAckFuture.Err(): - errors++ - default: - b.Fatalf("PubAck is still pending after publish completed") - } - } - - b.StopTimer() - - return published, errors - } - - type PublishType string - const ( - Sync PublishType = "Sync" - Async PublishType = "Async" - ) - - benchmarksCases := []struct { - clusterSize int - replicas int - messageSize int - numSubjects int - minMessages int - }{ - {1, 1, 10, 1, 100_000}, // Single node, 10B messages, ~1MB minimum - {1, 1, 1024, 1, 1_000}, // Single node, 1KB messages, ~1MB minimum - {3, 3, 10, 1, 100_000}, // 3-nodes cluster, R=3, 10B messages, ~1MB minimum - {3, 3, 1024, 1, 1_000}, // 3-nodes cluster, R=3, 10B messages, ~1MB minimum - } - - // All the cases above are run with each of the publisher cases below - publisherCases := []struct { - pType PublishType - asyncWindow int - }{ - {Sync, -1}, - {Async, 1000}, - {Async, 4000}, - {Async, 8000}, - } - - for _, bc := range benchmarksCases { - name := fmt.Sprintf( - "N=%d,R=%d,MsgSz=%db,Subjs=%d", - bc.clusterSize, - bc.replicas, - bc.messageSize, - bc.numSubjects, - ) - - b.Run( - name, - func(b *testing.B) { - - for _, pc := range publisherCases { - name := fmt.Sprintf("%v", pc.pType) - if pc.pType == Async && pc.asyncWindow > 0 { - name = fmt.Sprintf("%s[W:%d]", name, pc.asyncWindow) - } - - b.Run( - name, - func(b *testing.B) { - // Skip short runs, benchmark gets re-executed with a larger N - if b.N < bc.minMessages { - b.ResetTimer() - return - } - - subjects := make([]string, bc.numSubjects) - for i := 0; i < bc.numSubjects; i++ { - subjects[i] = fmt.Sprintf("s-%d", i+1) - } - - if verbose { - b.Logf("Running %s with %d ops", name, b.N) - } - - if verbose { - b.Logf("Setting up %d nodes", bc.clusterSize) - } - var connectURL string - - if bc.clusterSize == 1 { - s := RunBasicJetStreamServer(b) - defer s.Shutdown() - connectURL = s.ClientURL() - } else { - cl := createJetStreamClusterExplicit(b, "BENCH_PUB", bc.clusterSize) - defer cl.shutdown() - cl.waitOnClusterReadyWithNumPeers(bc.clusterSize) - cl.waitOnLeader() - connectURL = cl.randomServer().ClientURL() - } - - nc, err := nats.Connect(connectURL) - if err != nil { - b.Fatalf("Failed to create client: %v", err) - } - defer nc.Close() - - jsOpts := []nats.JSOpt{ - nats.MaxWait(10 * time.Second), - } - - if pc.asyncWindow > 0 && pc.pType == Async { - jsOpts = append(jsOpts, nats.PublishAsyncMaxPending(pc.asyncWindow)) - } - - js, err := nc.JetStream(jsOpts...) - if err != nil { - b.Fatalf("Unexpected error getting JetStream context: %v", err) - } - - if verbose { - b.Logf("Creating stream with R=%d and %d input subjects", bc.replicas, bc.numSubjects) - } - streamConfig := &nats.StreamConfig{ - Name: "S", - Subjects: subjects, - Replicas: bc.replicas, - } - if _, err := js.AddStream(streamConfig); err != nil { - b.Fatalf("Error creating stream: %v", err) - } - - if verbose { - b.Logf("Running %v publisher with message size: %dB", pc.pType, bc.messageSize) - } - - // Benchmark starts here - b.ResetTimer() - - var published, errors int - switch pc.pType { - case Sync: - published, errors = runSyncPublisher(b, js, bc.messageSize, subjects) - case Async: - published, errors = runAsyncPublisher(b, js, bc.messageSize, subjects, pc.asyncWindow) - } - - // Benchmark ends here - b.StopTimer() - - if published+errors != b.N { - b.Fatalf("Something doesn't add up: %d + %d != %d", published, errors, b.N) - } - - b.ReportMetric(float64(errors)*100/float64(b.N), "%error") - }, - ) - } - }, - ) - } -} diff --git a/server/jetstream_benchmark_test.go b/server/jetstream_benchmark_test.go new file mode 100644 index 00000000..e1701ad9 --- /dev/null +++ b/server/jetstream_benchmark_test.go @@ -0,0 +1,1176 @@ +// Copyright 2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !skip_js_tests && !skip_js_cluster_tests && !skip_js_cluster_tests_2 +// +build !skip_js_tests,!skip_js_cluster_tests,!skip_js_cluster_tests_2 + +package server + +import ( + "fmt" + "math/rand" + "sync" + "testing" + "time" + + "github.com/nats-io/nats.go" +) + +func BenchmarkJetStreamConsume(b *testing.B) { + + const ( + verbose = false + streamName = "S" + subject = "s" + seed = 12345 + publishTimeout = 30 * time.Second + ) + + runSyncPushConsumer := func(b *testing.B, js nats.JetStreamContext, streamName, subject string) (int, int, int) { + const nextMsgTimeout = 3 * time.Second + + subOpts := []nats.SubOpt{ + nats.BindStream(streamName), + } + sub, err := js.SubscribeSync("", subOpts...) + if err != nil { + b.Fatalf("Failed to subscribe: %v", err) + } + defer sub.Unsubscribe() + + bitset := NewBitset(uint64(b.N)) + uniqueConsumed, duplicates, errors := 0, 0, 0 + + b.ResetTimer() + + for uniqueConsumed < b.N { + msg, err := sub.NextMsg(nextMsgTimeout) + if err != nil { + b.Fatalf("No more messages (received: %d/%d)", uniqueConsumed, b.N) + } + + metadata, mdErr := msg.Metadata() + if mdErr != nil { + errors++ + continue + } + + ackErr := msg.Ack() + if ackErr != nil { + errors++ + continue + } + + seq := metadata.Sequence.Stream + + index := seq - 1 + if bitset.get(index) { + duplicates++ + continue + } + + uniqueConsumed++ + bitset.set(index, true) + b.SetBytes(int64(len(msg.Data))) + + if verbose && uniqueConsumed%1000 == 0 { + b.Logf("Consumed: %d/%d", bitset.count(), b.N) + } + } + + b.StopTimer() + + return uniqueConsumed, duplicates, errors + } + + runAsyncPushConsumer := func(b *testing.B, js nats.JetStreamContext, streamName, subject string, ordered, durable bool) (int, int, int) { + const timeout = 3 * time.Minute + bitset := NewBitset(uint64(b.N)) + doneCh := make(chan bool, 1) + uniqueConsumed, duplicates, errors := 0, 0, 0 + + handleMsg := func(msg *nats.Msg) { + metadata, mdErr := msg.Metadata() + if mdErr != nil { + // fmt.Printf("Metadata error: %v\n", mdErr) + errors++ + return + } + + // Ordered defaults to AckNone policy, don't try to ACK + if !ordered { + ackErr := msg.Ack() + if ackErr != nil { + // fmt.Printf("Ack error: %v\n", ackErr) + errors++ + return + } + } + + seq := metadata.Sequence.Stream + + index := seq - 1 + if bitset.get(index) { + duplicates++ + return + } + + uniqueConsumed++ + bitset.set(index, true) + b.SetBytes(int64(len(msg.Data))) + + if uniqueConsumed == b.N { + msg.Sub.Unsubscribe() + doneCh <- true + } + if verbose && uniqueConsumed%1000 == 0 { + b.Logf("Consumed %d/%d", uniqueConsumed, b.N) + } + } + + subOpts := []nats.SubOpt{ + nats.BindStream(streamName), + } + + if ordered { + subOpts = append(subOpts, nats.OrderedConsumer()) + } + + if durable { + subOpts = append(subOpts, nats.Durable("c")) + } + + sub, err := js.Subscribe("", handleMsg, subOpts...) + if err != nil { + b.Fatalf("Failed to subscribe: %v", err) + } + defer sub.Unsubscribe() + + b.ResetTimer() + + select { + case <-doneCh: + b.StopTimer() + case <-time.After(timeout): + b.Fatalf("Timeout, %d/%d received, %d errors", uniqueConsumed, b.N, errors) + } + + return uniqueConsumed, duplicates, errors + } + + runPullConsumer := func(b *testing.B, js nats.JetStreamContext, streamName, subject string, durable bool) (int, int, int) { + const fetchMaxWait = nats.MaxWait(3 * time.Second) + const fetchMaxMessages = 1000 + + bitset := NewBitset(uint64(b.N)) + uniqueConsumed, duplicates, errors := 0, 0, 0 + + subOpts := []nats.SubOpt{ + nats.BindStream(streamName), + } + + consumerName := "" // Default ephemeral + if durable { + consumerName = "c" // Durable + } + + sub, err := js.PullSubscribe("", consumerName, subOpts...) + if err != nil { + b.Fatalf("Failed to subscribe: %v", err) + } + defer sub.Unsubscribe() + + b.ResetTimer() + + fetchLoop: + for { + msgs, err := sub.Fetch(fetchMaxMessages, fetchMaxWait) + if err != nil { + b.Fatalf("Failed to fetch: %v", err) + } + + processMsgsLoop: + for _, msg := range msgs { + metadata, mdErr := msg.Metadata() + if mdErr != nil { + errors++ + continue processMsgsLoop + } + + ackErr := msg.Ack() + if ackErr != nil { + errors++ + continue processMsgsLoop + } + + seq := metadata.Sequence.Stream + + index := seq - 1 + if bitset.get(index) { + duplicates++ + continue processMsgsLoop + } + + uniqueConsumed++ + bitset.set(index, true) + b.SetBytes(int64(len(msg.Data))) + + if uniqueConsumed == b.N { + msg.Sub.Unsubscribe() + break fetchLoop + } + + if verbose && uniqueConsumed%1000 == 0 { + b.Logf("Consumed %d/%d", uniqueConsumed, b.N) + } + } + } + + b.StopTimer() + + return uniqueConsumed, duplicates, errors + } + + type ConsumerType string + const ( + PushSync ConsumerType = "PUSH[Sync,Ephemeral]" + PushAsync ConsumerType = "PUSH[Async,Ephemeral]" + PushAsyncOrdered ConsumerType = "PUSH[Async,Ordered]" + PushAsyncDurable ConsumerType = "PUSH[Async,Durable]" + PullDurable ConsumerType = "PULL[Durable]" + PullEphemeral ConsumerType = "PULL[Ephemeral]" + ) + + benchmarksCases := []struct { + clusterSize int + replicas int + messageSize int + minMessages int + }{ + {1, 1, 10, 100_000}, // Single node, 10B messages, ~1MiB minimum + {1, 1, 1024, 1_000}, // Single node, 1KB messages, ~1MiB minimum + {3, 3, 10, 100_000}, // Cluster, R3, 10B messages, ~1MiB minimum + {3, 3, 1024, 1_000}, // Cluster, R3, 1KB messages, ~1MiB minimum + } + + //Each of the cases above is run with each of the consumer types + consumerTypes := []ConsumerType{ + PushSync, + PushAsync, + PushAsyncOrdered, + PushAsyncDurable, + PullDurable, + PullEphemeral, + } + + for _, bc := range benchmarksCases { + + name := fmt.Sprintf( + "N=%d,R=%d,MsgSz=%db", + bc.clusterSize, + bc.replicas, + bc.messageSize, + ) + + b.Run( + name, + func(b *testing.B) { + + for _, ct := range consumerTypes { + name := fmt.Sprintf( + "%v", + ct, + ) + b.Run( + name, + func(b *testing.B) { + // Skip short runs, benchmark gets re-executed with a larger N + if b.N < bc.minMessages { + b.ResetTimer() + return + } + + if verbose { + b.Logf("Running %s with %d messages", name, b.N) + } + + if verbose { + b.Logf("Setting up %d nodes", bc.clusterSize) + } + var connectURL string + if bc.clusterSize == 1 { + s := RunBasicJetStreamServer(b) + defer s.Shutdown() + connectURL = s.ClientURL() + } else { + cl := createJetStreamClusterExplicit(b, "BENCH_PUB", bc.clusterSize) + defer cl.shutdown() + cl.waitOnClusterReadyWithNumPeers(bc.clusterSize) + cl.waitOnLeader() + connectURL = cl.randomServer().ClientURL() + } + + nc, js := jsClientConnectURL(b, connectURL) + defer nc.Close() + + if verbose { + b.Logf("Creating stream with R=%d", bc.replicas) + } + streamConfig := &nats.StreamConfig{ + Name: streamName, + Subjects: []string{subject}, + Replicas: bc.replicas, + } + if _, err := js.AddStream(streamConfig); err != nil { + b.Fatalf("Error creating stream: %v", err) + } + + rng := rand.New(rand.NewSource(int64(seed))) + message := make([]byte, bc.messageSize) + publishedCount := 0 + for publishedCount < b.N { + rng.Read(message) + _, err := js.PublishAsync(subject, message) + if err != nil { + continue + } else { + publishedCount++ + } + } + + select { + case <-js.PublishAsyncComplete(): + if verbose { + b.Logf("Published %d messages", b.N) + } + case <-time.After(publishTimeout): + b.Fatalf("Publish timed out") + } + + // Discard time spent during setup + // Consumer may reset again further in + b.ResetTimer() + + var consumed, duplicates, errors int + + const ( + ordered = true + unordered = false + durable = true + ephemeral = false + ) + + switch ct { + case PushSync: + consumed, duplicates, errors = runSyncPushConsumer(b, js, streamName, subject) + case PushAsync: + consumed, duplicates, errors = runAsyncPushConsumer(b, js, streamName, subject, unordered, ephemeral) + case PushAsyncOrdered: + consumed, duplicates, errors = runAsyncPushConsumer(b, js, streamName, subject, ordered, ephemeral) + case PushAsyncDurable: + consumed, duplicates, errors = runAsyncPushConsumer(b, js, streamName, subject, unordered, durable) + case PullDurable: + consumed, duplicates, errors = runPullConsumer(b, js, streamName, subject, durable) + case PullEphemeral: + consumed, duplicates, errors = runPullConsumer(b, js, streamName, subject, ephemeral) + default: + b.Fatalf("Unknown consumer type: %v", ct) + } + + // Benchmark ends here, (consumer may have stopped earlier) + b.StopTimer() + + if consumed != b.N { + b.Fatalf("Something doesn't add up: %d != %d", consumed, b.N) + } + + b.ReportMetric(float64(duplicates)*100/float64(b.N), "%dupe") + b.ReportMetric(float64(errors)*100/float64(b.N), "%error") + }, + ) + } + }, + ) + } +} + +func BenchmarkJetStreamPublish(b *testing.B) { + + const ( + verbose = false + seed = 12345 + ) + + runSyncPublisher := func(b *testing.B, js nats.JetStreamContext, messageSize int, subjects []string) (int, int) { + published, errors := 0, 0 + rng := rand.New(rand.NewSource(int64(seed))) + message := make([]byte, messageSize) + + b.ResetTimer() + + for i := 1; i <= b.N; i++ { + rng.Read(message) // TODO may skip this? + subject := subjects[rng.Intn(len(subjects))] + _, pubErr := js.Publish(subject, message) + if pubErr != nil { + errors++ + } else { + published++ + b.SetBytes(int64(messageSize)) + } + + if verbose && i%1000 == 0 { + b.Logf("Published %d/%d, %d errors", i, b.N, errors) + } + } + + b.StopTimer() + + return published, errors + } + + runAsyncPublisher := func(b *testing.B, js nats.JetStreamContext, messageSize int, subjects []string, asyncWindow int) (int, int) { + const publishCompleteMaxWait = 30 * time.Second + rng := rand.New(rand.NewSource(int64(seed))) + message := make([]byte, messageSize) + pending := make([]nats.PubAckFuture, 0, asyncWindow) + published, errors := 0, 0 + + b.ResetTimer() + + for i := 1; i <= b.N; i++ { + rng.Read(message) // TODO may skip this? + subject := subjects[rng.Intn(len(subjects))] + pubAckFuture, err := js.PublishAsync(subject, message) + if err != nil { + errors++ + continue + } + pending = append(pending, pubAckFuture) + + // Regularly trim the list of pending + if i%asyncWindow == 0 { + newPending := make([]nats.PubAckFuture, 0, asyncWindow) + for _, pubAckFuture := range pending { + select { + case <-pubAckFuture.Ok(): + published++ + b.SetBytes(int64(messageSize)) + case <-pubAckFuture.Err(): + errors++ + default: + // This pubAck is still pending, keep it + newPending = append(newPending, pubAckFuture) + } + } + pending = newPending + } + + if verbose && i%1000 == 0 { + b.Logf("Published %d/%d, %d errors", i, b.N, errors) + } + } + + // All published, wait for completed + select { + case <-js.PublishAsyncComplete(): + case <-time.After(publishCompleteMaxWait): + b.Fatalf("Publish timed out") + } + + // Clear whatever is left pending + for _, pubAckFuture := range pending { + select { + case <-pubAckFuture.Ok(): + published++ + b.SetBytes(int64(messageSize)) + case <-pubAckFuture.Err(): + errors++ + default: + b.Fatalf("PubAck is still pending after publish completed") + } + } + + b.StopTimer() + + return published, errors + } + + type PublishType string + const ( + Sync PublishType = "Sync" + Async PublishType = "Async" + ) + + benchmarksCases := []struct { + clusterSize int + replicas int + messageSize int + numSubjects int + minMessages int + }{ + {1, 1, 10, 1, 100_000}, // Single node, 10B messages, ~1MB minimum + {1, 1, 1024, 1, 1_000}, // Single node, 1KB messages, ~1MB minimum + {3, 3, 10, 1, 100_000}, // 3-nodes cluster, R=3, 10B messages, ~1MB minimum + {3, 3, 1024, 1, 1_000}, // 3-nodes cluster, R=3, 10B messages, ~1MB minimum + } + + // All the cases above are run with each of the publisher cases below + publisherCases := []struct { + pType PublishType + asyncWindow int + }{ + {Sync, -1}, + {Async, 1000}, + {Async, 4000}, + {Async, 8000}, + } + + for _, bc := range benchmarksCases { + name := fmt.Sprintf( + "N=%d,R=%d,MsgSz=%db,Subjs=%d", + bc.clusterSize, + bc.replicas, + bc.messageSize, + bc.numSubjects, + ) + + b.Run( + name, + func(b *testing.B) { + + for _, pc := range publisherCases { + name := fmt.Sprintf("%v", pc.pType) + if pc.pType == Async && pc.asyncWindow > 0 { + name = fmt.Sprintf("%s[W:%d]", name, pc.asyncWindow) + } + + b.Run( + name, + func(b *testing.B) { + // Skip short runs, benchmark gets re-executed with a larger N + if b.N < bc.minMessages { + b.ResetTimer() + return + } + + subjects := make([]string, bc.numSubjects) + for i := 0; i < bc.numSubjects; i++ { + subjects[i] = fmt.Sprintf("s-%d", i+1) + } + + if verbose { + b.Logf("Running %s with %d ops", name, b.N) + } + + if verbose { + b.Logf("Setting up %d nodes", bc.clusterSize) + } + var connectURL string + + if bc.clusterSize == 1 { + s := RunBasicJetStreamServer(b) + defer s.Shutdown() + connectURL = s.ClientURL() + } else { + cl := createJetStreamClusterExplicit(b, "BENCH_PUB", bc.clusterSize) + defer cl.shutdown() + cl.waitOnClusterReadyWithNumPeers(bc.clusterSize) + cl.waitOnLeader() + connectURL = cl.randomServer().ClientURL() + } + + nc, err := nats.Connect(connectURL) + if err != nil { + b.Fatalf("Failed to create client: %v", err) + } + defer nc.Close() + + jsOpts := []nats.JSOpt{ + nats.MaxWait(10 * time.Second), + } + + if pc.asyncWindow > 0 && pc.pType == Async { + jsOpts = append(jsOpts, nats.PublishAsyncMaxPending(pc.asyncWindow)) + } + + js, err := nc.JetStream(jsOpts...) + if err != nil { + b.Fatalf("Unexpected error getting JetStream context: %v", err) + } + + if verbose { + b.Logf("Creating stream with R=%d and %d input subjects", bc.replicas, bc.numSubjects) + } + streamConfig := &nats.StreamConfig{ + Name: "S", + Subjects: subjects, + Replicas: bc.replicas, + } + if _, err := js.AddStream(streamConfig); err != nil { + b.Fatalf("Error creating stream: %v", err) + } + + if verbose { + b.Logf("Running %v publisher with message size: %dB", pc.pType, bc.messageSize) + } + + // Benchmark starts here + b.ResetTimer() + + var published, errors int + switch pc.pType { + case Sync: + published, errors = runSyncPublisher(b, js, bc.messageSize, subjects) + case Async: + published, errors = runAsyncPublisher(b, js, bc.messageSize, subjects, pc.asyncWindow) + } + + // Benchmark ends here + b.StopTimer() + + if published+errors != b.N { + b.Fatalf("Something doesn't add up: %d + %d != %d", published, errors, b.N) + } + + b.ReportMetric(float64(errors)*100/float64(b.N), "%error") + }, + ) + } + }, + ) + } +} + +func BenchmarkJetStreamInterestStreamWithLimit(b *testing.B) { + + const ( + verbose = true + seed = 12345 + publishBatchSize = 100 + messageSize = 256 + numSubjects = 2500 + subjectPrefix = "S" + numPublishers = 4 + randomData = true + warmupMessages = 1 + ) + + if verbose { + b.Logf( + "BatchSize: %d, MsgSize: %d, Subjects: %d, Publishers: %d, Random Message: %v", + publishBatchSize, + messageSize, + numSubjects, + numPublishers, + randomData, + ) + } + + // Benchmark parameters: sub-benchmarks are executed for every combination of the following 3 groups + // Unless a more restrictive filter is specified, e.g.: + // BenchmarkJetStreamInterestStreamWithLimit/.*R=3.*/Storage=Memory/unlimited + + // Parameter: Number of nodes and number of stream replicas + clusterAndReplicasCases := []struct { + clusterSize int + replicas int + }{ + {1, 1}, // Single node, R=1 + {3, 3}, // 3-nodes cluster, R=3 + } + + // Parameter: Stream storage type + storageTypeCases := []nats.StorageType{ + nats.MemoryStorage, + nats.FileStorage, + } + + // Parameter: Stream limit configuration + limitConfigCases := map[string]func(*nats.StreamConfig){ + "unlimited": func(config *nats.StreamConfig) { + }, + "MaxMsg=1000": func(config *nats.StreamConfig) { + config.MaxMsgs = 100 + }, + "MaxMsg=10": func(config *nats.StreamConfig) { + config.MaxMsgs = 10 + }, + "MaxPerSubject=10": func(config *nats.StreamConfig) { + config.MaxMsgsPerSubject = 10 + }, + "MaxAge=1s": func(config *nats.StreamConfig) { + config.MaxAge = 1 * time.Second + }, + "MaxBytes=1MB": func(config *nats.StreamConfig) { + config.MaxBytes = 1024 * 1024 + }, + } + + // Helper: Stand up in-process single node or cluster + setupCluster := func(b *testing.B, clusterSize int) (string, func()) { + var connectURL string + var shutdownFunc func() + + if clusterSize == 1 { + s := RunBasicJetStreamServer(b) + shutdownFunc = s.Shutdown + connectURL = s.ClientURL() + } else { + cl := createJetStreamClusterExplicit(b, "BENCH_PUB", clusterSize) + shutdownFunc = cl.shutdown + cl.waitOnClusterReadyWithNumPeers(clusterSize) + cl.waitOnLeader() + connectURL = cl.randomServer().ClientURL() + //connectURL = cl.leader().ClientURL() + } + + return connectURL, shutdownFunc + } + + // Helper: Create the stream + setupStream := func(b *testing.B, connectURL string, streamConfig *nats.StreamConfig) { + // Connect + nc, err := nats.Connect(connectURL) + if err != nil { + b.Fatalf("Failed to create client: %v", err) + } + defer nc.Close() + + jsOpts := []nats.JSOpt{} + + js, err := nc.JetStream(jsOpts...) + if err != nil { + b.Fatalf("Unexpected error getting JetStream context: %v", err) + } + + if _, err := js.AddStream(streamConfig); err != nil { + b.Fatalf("Error creating stream: %v", err) + } + } + + // Context shared by publishers routines + type PublishersContext = struct { + readyWg sync.WaitGroup + completedWg sync.WaitGroup + messagesLeft int + lock sync.Mutex + errors int + } + + // Helper: Publish synchronously as Goroutine + publish := func(publisherId int, ctx *PublishersContext, js nats.JetStreamContext) { + defer ctx.completedWg.Done() + errors := 0 + messageBuf := make([]byte, messageSize) + rng := rand.New(rand.NewSource(int64(seed + publisherId))) + + // Warm up: publish a few messages + for i := 0; i < warmupMessages; i++ { + subject := fmt.Sprintf("%s.%d", subjectPrefix, rng.Intn(numSubjects)) + if randomData { + rng.Read(messageBuf) + } + _, err := js.Publish(subject, messageBuf) + if err != nil { + b.Logf("Warning: failed to publish warmup message: %s", err) + } + } + + // Signal this publisher is ready + ctx.readyWg.Done() + + for { + // Obtain a batch of messages to publish + batchSize := 0 + { + ctx.lock.Lock() + if ctx.messagesLeft >= publishBatchSize { + batchSize = publishBatchSize + } else if ctx.messagesLeft < publishBatchSize { + batchSize = ctx.messagesLeft + } + ctx.messagesLeft -= batchSize + ctx.lock.Unlock() + } + + // Nothing left to publish, terminate + if batchSize == 0 { + ctx.lock.Lock() + ctx.errors += errors + ctx.lock.Unlock() + return + } + + // Publish a batch of messages + for i := 0; i < batchSize; i++ { + subject := fmt.Sprintf("%s.%d", subjectPrefix, rng.Intn(numSubjects)) + if randomData { + rng.Read(messageBuf) + } + _, err := js.Publish(subject, messageBuf) + if err != nil { + errors += 1 + } + } + } + } + + // Benchmark matrix: (cluster and replicas) * (storage type) * (stream limit) + for _, benchmarkCase := range clusterAndReplicasCases { + b.Run( + fmt.Sprintf( + "N=%d,R=%d", + benchmarkCase.clusterSize, + benchmarkCase.replicas, + ), + func(b *testing.B) { + for _, storageType := range storageTypeCases { + b.Run( + fmt.Sprintf("Storage=%v", storageType), + func(b *testing.B) { + + for limitDescription, limitConfigFunc := range limitConfigCases { + b.Run( + limitDescription, + func(b *testing.B) { + // Stop timer during setup + b.StopTimer() + b.ResetTimer() + + // Set per-iteration bytes to calculate throughput (a.k.a. speed) + b.SetBytes(messageSize) + + // Print benchmark parameters + if verbose { + b.Logf( + "Stream: %+v, Storage: [%v] Limit: [%s], Ops: %d", + benchmarkCase, + storageType, + limitDescription, + b.N, + ) + } + + // Setup server or cluster + connectURL, shutdownFunc := setupCluster(b, benchmarkCase.clusterSize) + defer shutdownFunc() + + // Common stream configuration + streamConfig := &nats.StreamConfig{ + Name: "S", + Subjects: []string{fmt.Sprintf("%s.>", subjectPrefix)}, + Replicas: benchmarkCase.replicas, + Storage: storageType, + Discard: DiscardOld, + Retention: DiscardOld, + } + // Configure stream limit + limitConfigFunc(streamConfig) + // Create stream + setupStream(b, connectURL, streamConfig) + + // Set up publishers shared context + var pubCtx PublishersContext + pubCtx.readyWg.Add(numPublishers) + pubCtx.completedWg.Add(numPublishers) + + // Hold this lock until all publishers are ready + pubCtx.lock.Lock() + pubCtx.messagesLeft = b.N + + // Spawn publishers routines, each with its own connection and JS context + for i := 0; i < numPublishers; i++ { + nc, err := nats.Connect(connectURL) + if err != nil { + b.Fatal(err) + } + defer nc.Close() + js, err := nc.JetStream() + if err != nil { + b.Fatal(err) + } + go publish(i, &pubCtx, js) + } + + // Wait for all publishers to be ready + pubCtx.readyWg.Wait() + + // Benchmark starts here + b.StartTimer() + + // Unblock the publishers + pubCtx.lock.Unlock() + + // Wait for all publishers to complete + pubCtx.completedWg.Wait() + + // Benchmark ends here + b.StopTimer() + + // Sanity check, publishers may have died before completing + if pubCtx.messagesLeft != 0 { + b.Fatalf("Some messages left: %d", pubCtx.messagesLeft) + } + + b.ReportMetric(float64(pubCtx.errors)*100/float64(b.N), "%error") + }, + ) + } + }, + ) + } + }, + ) + } +} + +func BenchmarkJetStreamKV(b *testing.B) { + + const ( + verbose = false + kvNamePrefix = "B_" + keyPrefix = "K_" + seed = 12345 + minOps = 1_000 + ) + + runKVGet := func(b *testing.B, kvs []nats.KeyValue, keys []string) int { + rng := rand.New(rand.NewSource(int64(seed))) + errors := 0 + + b.ResetTimer() + + for i := 1; i <= b.N; i++ { + kv := kvs[rng.Intn(len(kvs))] + key := keys[rng.Intn(len(keys))] + kve, err := kv.Get(key) + if err != nil { + errors++ + continue + } + + b.SetBytes(int64(len(kve.Value()))) + + if verbose && i%1000 == 0 { + b.Logf("Completed %d/%d Get ops", i, b.N) + } + } + + b.StopTimer() + return errors + } + + runKVPut := func(b *testing.B, kvs []nats.KeyValue, keys []string, valueSize int) int { + rng := rand.New(rand.NewSource(int64(seed))) + value := make([]byte, valueSize) + errors := 0 + + b.ResetTimer() + + for i := 1; i <= b.N; i++ { + kv := kvs[rng.Intn(len(kvs))] + key := keys[rng.Intn(len(keys))] + rng.Read(value) + _, err := kv.Put(key, value) + if err != nil { + errors++ + continue + } + + b.SetBytes(int64(valueSize)) + + if verbose && i%1000 == 0 { + b.Logf("Completed %d/%d Put ops", i, b.N) + } + } + + b.StopTimer() + return errors + } + + runKVUpdate := func(b *testing.B, kvs []nats.KeyValue, keys []string, valueSize int) int { + rng := rand.New(rand.NewSource(int64(seed))) + value := make([]byte, valueSize) + errors := 0 + + b.ResetTimer() + + for i := 1; i <= b.N; i++ { + kv := kvs[rng.Intn(len(kvs))] + key := keys[rng.Intn(len(keys))] + + kve, getErr := kv.Get(key) + if getErr != nil { + errors++ + continue + } + + rng.Read(value) + _, updateErr := kv.Update(key, value, kve.Revision()) + if updateErr != nil { + errors++ + continue + } + + b.SetBytes(int64(valueSize)) + + if verbose && i%1000 == 0 { + b.Logf("Completed %d/%d Update ops", i, b.N) + } + } + + b.StopTimer() + return errors + } + + type WorkloadType string + const ( + Get WorkloadType = "GET" + Put WorkloadType = "PUT" + Update WorkloadType = "CAS" + ) + + benchmarksCases := []struct { + clusterSize int + replicas int + numBuckets int + numKeys int + valueSize int + }{ + {1, 1, 1, 100, 100}, // 1 node, 1 bucket with 100 keys, 100B values + {1, 1, 10, 1000, 100}, // 1 node, 10 buckets with 1000 keys, 100B values + {3, 3, 1, 100, 100}, // 3 nodes, 1 bucket with 100 keys, 100B values + {3, 3, 10, 1000, 100}, // 3 nodes, 10 buckets with 1000 keys, 100B values + {3, 3, 10, 1000, 1024}, // 3 nodes, 10 buckets with 1000 keys, 1KB values + } + + workloadCases := []WorkloadType{ + Get, + Put, + Update, + } + + for _, bc := range benchmarksCases { + + bName := fmt.Sprintf( + "N=%d,R=%d,B=%d,K=%d,ValSz=%db", + bc.clusterSize, + bc.replicas, + bc.numBuckets, + bc.numKeys, + bc.valueSize, + ) + + b.Run( + bName, + func(b *testing.B) { + for _, wc := range workloadCases { + wName := fmt.Sprintf("%v", wc) + b.Run( + wName, + func(b *testing.B) { + // Skip short runs, benchmark gets re-executed with a larger N + if b.N < minOps { + b.ResetTimer() + return + } + + if verbose { + b.Logf("Running %s workload %s with %d messages", wName, bName, b.N) + } + + if verbose { + b.Logf("Setting up %d nodes", bc.clusterSize) + } + var connectURL string + if bc.clusterSize == 1 { + s := RunBasicJetStreamServer(b) + defer s.Shutdown() + connectURL = s.ClientURL() + } else { + cl := createJetStreamClusterExplicit(b, "BENCH_KV", bc.clusterSize) + defer cl.shutdown() + cl.waitOnClusterReadyWithNumPeers(bc.clusterSize) + cl.waitOnLeader() + connectURL = cl.randomServer().ClientURL() + } + + nc, js := jsClientConnectURL(b, connectURL) + defer nc.Close() + + // Pre-generate all keys + keys := make([]string, 0, bc.numKeys) + for i := 1; i <= bc.numKeys; i++ { + key := fmt.Sprintf("%s%d", keyPrefix, i) + keys = append(keys, key) + } + + // Initialize all KVs + kvs := make([]nats.KeyValue, 0, bc.numBuckets) + for i := 1; i <= bc.numBuckets; i++ { + // Create bucket + kvName := fmt.Sprintf("%s%d", kvNamePrefix, i) + if verbose { + b.Logf("Creating KV %s with R=%d", kvName, bc.replicas) + } + kvConfig := &nats.KeyValueConfig{ + Bucket: kvName, + Replicas: bc.replicas, + } + kv, err := js.CreateKeyValue(kvConfig) + if err != nil { + b.Fatalf("Error creating KV: %v", err) + } + kvs = append(kvs, kv) + + // Initialize all keys + rng := rand.New(rand.NewSource(int64(seed * i))) + value := make([]byte, bc.valueSize) + for _, key := range keys { + rng.Read(value) + _, err := kv.Create(key, value) + if err != nil { + b.Fatalf("Failed to initialize %s/%s: %v", kvName, key, err) + } + } + } + + // Discard time spent during setup + // May reset again further in + b.ResetTimer() + + var errors int + + switch wc { + case Get: + errors = runKVGet(b, kvs, keys) + case Put: + errors = runKVPut(b, kvs, keys, bc.valueSize) + case Update: + errors = runKVUpdate(b, kvs, keys, bc.valueSize) + default: + b.Fatalf("Unknown workload type: %v", wc) + } + + // Benchmark ends here, (may have stopped earlier) + b.StopTimer() + + b.ReportMetric(float64(errors)*100/float64(b.N), "%error") + }, + ) + } + }, + ) + } +} diff --git a/server/jetstream_chaos_cluster_test.go b/server/jetstream_chaos_cluster_test.go deleted file mode 100644 index 6d409d29..00000000 --- a/server/jetstream_chaos_cluster_test.go +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright 2022 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build js_chaos_tests -// +build js_chaos_tests - -package server - -import ( - "testing" - "time" -) - -// Bounces the entire set of nodes, then brings them back up. -// Fail if some nodes don't come back online. -func TestJetStreamChaosClusterBounce(t *testing.T) { - - const duration = 60 * time.Second - const clusterSize = 3 - - c := createJetStreamClusterExplicit(t, "R3", clusterSize) - defer c.shutdown() - - chaos := createClusterChaosMonkeyController( - t, - c, - &clusterBouncerChaosMonkey{ - minDowntime: 0 * time.Second, - maxDowntime: 2 * time.Second, - minDownServers: clusterSize, - maxDownServers: clusterSize, - pause: 3 * time.Second, - }, - ) - chaos.start() - defer chaos.stop() - - <-time.After(duration) -} - -// Bounces a subset of the nodes, then brings them back up. -// Fails if some nodes don't come back online. -func TestJetStreamChaosClusterBounceSubset(t *testing.T) { - - const duration = 60 * time.Second - const clusterSize = 3 - - c := createJetStreamClusterExplicit(t, "R3", clusterSize) - defer c.shutdown() - - chaos := createClusterChaosMonkeyController( - t, - c, - &clusterBouncerChaosMonkey{ - minDowntime: 0 * time.Second, - maxDowntime: 2 * time.Second, - minDownServers: 1, - maxDownServers: clusterSize, - pause: 3 * time.Second, - }, - ) - chaos.start() - defer chaos.stop() - - <-time.After(duration) -} diff --git a/server/jetstream_chaos_consumer_test.go b/server/jetstream_chaos_consumer_test.go deleted file mode 100644 index bd21104b..00000000 --- a/server/jetstream_chaos_consumer_test.go +++ /dev/null @@ -1,615 +0,0 @@ -// Copyright 2022 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build js_chaos_tests -// +build js_chaos_tests - -package server - -import ( - "bytes" - "fmt" - "strings" - "sync" - "testing" - "time" - - "github.com/nats-io/nats.go" -) - -const ( - chaosConsumerTestsClusterName = "CONSUMERS_CHAOS_TEST" - chaosConsumerTestsStreamName = "CONSUMER_CHAOS_TEST_STREAM" - chaosConsumerTestsSubject = "foo" - chaosConsumerTestsDebug = false -) - -// Creates stream and fills it with the given number of messages. -// Each message is the string representation of the stream sequence number, -// e.g. the first message (seqno: 1) contains data "1". -// This allows consumers to verify the content of each message without tracking additional state -func createStreamForConsumerChaosTest(t *testing.T, c *cluster, replicas, numMessages int) { - t.Helper() - - const publishBatchSize = 1_000 - - pubNc, pubJs := jsClientConnectCluster(t, c) - defer pubNc.Close() - - _, err := pubJs.AddStream(&nats.StreamConfig{ - Name: chaosConsumerTestsStreamName, - Subjects: []string{chaosConsumerTestsSubject}, - Replicas: replicas, - }) - if err != nil { - t.Fatalf("Error creating stream: %v", err) - } - - ackFutures := make([]nats.PubAckFuture, 0, publishBatchSize) - - for i := 1; i <= numMessages; i++ { - message := []byte(fmt.Sprintf("%d", i)) - pubAckFuture, err := pubJs.PublishAsync(chaosConsumerTestsSubject, message, nats.ExpectLastSequence(uint64(i-1))) - if err != nil { - t.Fatalf("Publish error: %s", err) - } - ackFutures = append(ackFutures, pubAckFuture) - - if (i > 0 && i%publishBatchSize == 0) || i == numMessages { - select { - case <-pubJs.PublishAsyncComplete(): - for _, pubAckFuture := range ackFutures { - select { - case <-pubAckFuture.Ok(): - // Noop - case pubAckErr := <-pubAckFuture.Err(): - t.Fatalf("Error publishing: %s", pubAckErr) - case <-time.After(30 * time.Second): - t.Fatalf("Timeout verifying pubAck for message: %s", pubAckFuture.Msg().Data) - } - } - ackFutures = make([]nats.PubAckFuture, 0, publishBatchSize) - t.Logf("Published %d/%d messages", i, numMessages) - - case <-time.After(30 * time.Second): - t.Fatalf("Publish timed out") - } - } - } -} - -// Verify ordered delivery despite cluster-wide outages -func TestJetStreamChaosConsumerOrdered(t *testing.T) { - - const numMessages = 30_000 - const maxRetries = 100 - const retryDelay = 500 * time.Millisecond - const fetchTimeout = 250 * time.Millisecond - const clusterSize = 3 - const replicas = 3 - - c := createJetStreamClusterExplicit(t, chaosConsumerTestsClusterName, clusterSize) - defer c.shutdown() - - createStreamForConsumerChaosTest(t, c, replicas, numMessages) - - chaos := createClusterChaosMonkeyController( - t, - c, - &clusterBouncerChaosMonkey{ - minDowntime: 0 * time.Second, - maxDowntime: 2 * time.Second, - minDownServers: clusterSize, // Whole cluster outage - maxDownServers: clusterSize, - pause: 1 * time.Second, - }, - ) - - subNc, subJs := jsClientConnectCluster(t, c) - defer subNc.Close() - - sub, err := subJs.SubscribeSync( - chaosConsumerTestsSubject, - nats.OrderedConsumer(), - ) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer sub.Unsubscribe() - - if chaosConsumerTestsDebug { - t.Logf("Initial subscription: %s", toIndentedJsonString(sub)) - } - - chaos.start() - defer chaos.stop() - - for i := 1; i <= numMessages; i++ { - var msg *nats.Msg - var nextMsgErr error - var expectedMsgData = []byte(fmt.Sprintf("%d", i)) - - nextMsgRetryLoop: - for r := 0; r <= maxRetries; r++ { - msg, nextMsgErr = sub.NextMsg(fetchTimeout) - if nextMsgErr == nil { - break nextMsgRetryLoop - } else if r == maxRetries { - t.Fatalf("Exceeded max retries for NextMsg") - } else if nextMsgErr == nats.ErrBadSubscription { - t.Fatalf("Subscription is invalid: %s", toIndentedJsonString(sub)) - } else { - time.Sleep(retryDelay) - } - } - - metadata, err := msg.Metadata() - if err != nil { - t.Fatalf("Failed to get message metadata: %v", err) - } - - if metadata.Sequence.Stream != uint64(i) { - t.Fatalf("Expecting stream sequence %d, got %d instead", i, metadata.Sequence.Stream) - } - - if !bytes.Equal(msg.Data, expectedMsgData) { - t.Fatalf("Expecting message %s, got %s instead", expectedMsgData, msg.Data) - } - - // Simulate application processing (and gives the monkey some time to brew chaos) - time.Sleep(10 * time.Millisecond) - - if i%1000 == 0 { - t.Logf("Consumed %d/%d", i, numMessages) - } - } -} - -// Verify ordered delivery despite cluster-wide outages -func TestJetStreamChaosConsumerAsync(t *testing.T) { - - const numMessages = 30_000 - const timeout = 30 * time.Second // No (new) messages for 30s => terminate - const maxRetries = 25 - const retryDelay = 500 * time.Millisecond - const clusterSize = 3 - const replicas = 3 - - c := createJetStreamClusterExplicit(t, chaosConsumerTestsClusterName, clusterSize) - defer c.shutdown() - - createStreamForConsumerChaosTest(t, c, replicas, numMessages) - - chaos := createClusterChaosMonkeyController( - t, - c, - &clusterBouncerChaosMonkey{ - minDowntime: 0 * time.Second, - maxDowntime: 2 * time.Second, - minDownServers: clusterSize, - maxDownServers: clusterSize, - pause: 2 * time.Second, - }, - ) - - subNc, subJs := jsClientConnectCluster(t, c) - defer subNc.Close() - - timeoutTimer := time.NewTimer(timeout) - deliveryCount := uint64(0) - received := NewBitset(numMessages) - - handleMsg := func(msg *nats.Msg) { - deliveryCount += 1 - - metadata, err := msg.Metadata() - if err != nil { - t.Fatalf("Failed to get message metadata: %v", err) - } - seq := metadata.Sequence.Stream - - var expectedMsgData = []byte(fmt.Sprintf("%d", seq)) - if !bytes.Equal(msg.Data, expectedMsgData) { - t.Fatalf("Expecting message content '%s', got '%s' instead", expectedMsgData, msg.Data) - } - - isDupe := received.get(seq - 1) - - if isDupe { - if chaosConsumerTestsDebug { - t.Logf("Duplicate message delivery, seq: %d", seq) - } - return - } - - // Mark this sequence as received - received.set(seq-1, true) - if received.count() < numMessages { - // Reset timeout - timeoutTimer.Reset(timeout) - } else { - // All received, speed up the shutdown - timeoutTimer.Reset(1 * time.Second) - } - - if received.count()%1000 == 0 { - t.Logf("Consumed %d/%d", received.count(), numMessages) - } - - // Simulate application processing (and gives the monkey some time to brew chaos) - time.Sleep(10 * time.Millisecond) - - ackRetryLoop: - for i := 0; i <= maxRetries; i++ { - ackErr := msg.Ack() - if ackErr == nil { - break ackRetryLoop - } else if i == maxRetries { - t.Fatalf("Failed to ACK message %d (retried %d times)", seq, maxRetries) - } else { - time.Sleep(retryDelay) - } - } - } - - subOpts := []nats.SubOpt{} - sub, err := subJs.Subscribe(chaosConsumerTestsSubject, handleMsg, subOpts...) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer sub.Unsubscribe() - - chaos.start() - defer chaos.stop() - - // Wait for long enough silence. - // Either a stall, or all messages received - <-timeoutTimer.C - - // Shut down consumer - sub.Unsubscribe() - - uniqueDeliveredCount := received.count() - - t.Logf( - "Delivered %d/%d messages %d duplicate deliveries", - uniqueDeliveredCount, - numMessages, - deliveryCount-uniqueDeliveredCount, - ) - - if uniqueDeliveredCount != numMessages { - t.Fatalf("No new message delivered in the last %s, %d/%d messages never delivered", timeout, numMessages-uniqueDeliveredCount, numMessages) - } -} - -// Verify durable consumer retains state despite cluster-wide outages -// The consumer connection is also periodically closed, and the consumer 'resumes' on a different one -func TestJetStreamChaosConsumerDurable(t *testing.T) { - - const numMessages = 30_000 - const timeout = 30 * time.Second // No (new) messages for 60s => terminate - const clusterSize = 3 - const replicas = 3 - const maxRetries = 25 - const retryDelay = 500 * time.Millisecond - const durableConsumerName = "durable" - - c := createJetStreamClusterExplicit(t, chaosConsumerTestsClusterName, clusterSize) - defer c.shutdown() - - createStreamForConsumerChaosTest(t, c, replicas, numMessages) - - chaos := createClusterChaosMonkeyController( - t, - c, - &clusterBouncerChaosMonkey{ - minDowntime: 0 * time.Second, - maxDowntime: 2 * time.Second, - minDownServers: 1, - maxDownServers: clusterSize, - pause: 3 * time.Second, - }, - ) - - var nc *nats.Conn - var sub *nats.Subscription - var subLock sync.Mutex - - var handleMsgFun func(msg *nats.Msg) - var natsURL string - - { - var sb strings.Builder - for _, s := range c.servers { - sb.WriteString(s.ClientURL()) - sb.WriteString(",") - } - natsURL = sb.String() - } - - resetDurableConsumer := func() { - subLock.Lock() - defer subLock.Unlock() - - if nc != nil { - nc.Close() - } - - var newNc *nats.Conn - connectRetryLoop: - for r := 0; r <= maxRetries; r++ { - var connErr error - newNc, connErr = nats.Connect(natsURL) - if connErr == nil { - break connectRetryLoop - } else if r == maxRetries { - t.Fatalf("Failed to connect, exceeded max retries, last error: %s", connErr) - } else { - time.Sleep(retryDelay) - } - } - - var newJs nats.JetStreamContext - jsRetryLoop: - for r := 0; r <= maxRetries; r++ { - var jsErr error - newJs, jsErr = newNc.JetStream(nats.MaxWait(10 * time.Second)) - if jsErr == nil { - break jsRetryLoop - } else if r == maxRetries { - t.Fatalf("Failed to get JS, exceeded max retries, last error: %s", jsErr) - } else { - time.Sleep(retryDelay) - } - } - - subOpts := []nats.SubOpt{ - nats.Durable(durableConsumerName), - } - - var newSub *nats.Subscription - subscribeRetryLoop: - for i := 0; i <= maxRetries; i++ { - var subErr error - newSub, subErr = newJs.Subscribe(chaosConsumerTestsSubject, handleMsgFun, subOpts...) - if subErr == nil { - ci, err := newJs.ConsumerInfo(chaosConsumerTestsStreamName, durableConsumerName) - if err == nil { - if chaosConsumerTestsDebug { - t.Logf("Consumer info:\n %s", toIndentedJsonString(ci)) - } - } else { - t.Logf("Failed to retrieve consumer info: %s", err) - } - - break subscribeRetryLoop - } else if i == maxRetries { - t.Fatalf("Exceeded max retries creating subscription: %v", subErr) - } else { - time.Sleep(retryDelay) - } - } - - nc, sub = newNc, newSub - } - - timeoutTimer := time.NewTimer(timeout) - deliveryCount := uint64(0) - received := NewBitset(numMessages) - - handleMsgFun = func(msg *nats.Msg) { - - subLock.Lock() - if msg.Sub != sub { - // Message from a previous instance of durable consumer, drop - defer subLock.Unlock() - return - } - subLock.Unlock() - - deliveryCount += 1 - - metadata, err := msg.Metadata() - if err != nil { - t.Fatalf("Failed to get message metadata: %v", err) - } - seq := metadata.Sequence.Stream - - var expectedMsgData = []byte(fmt.Sprintf("%d", seq)) - if !bytes.Equal(msg.Data, expectedMsgData) { - t.Fatalf("Expecting message content '%s', got '%s' instead", expectedMsgData, msg.Data) - } - - isDupe := received.get(seq - 1) - - if isDupe { - if chaosConsumerTestsDebug { - t.Logf("Duplicate message delivery, seq: %d", seq) - } - return - } - - // Mark this sequence as received - received.set(seq-1, true) - if received.count() < numMessages { - // Reset timeout - timeoutTimer.Reset(timeout) - } else { - // All received, speed up the shutdown - timeoutTimer.Reset(1 * time.Second) - } - - // Simulate application processing (and gives the monkey some time to brew chaos) - time.Sleep(10 * time.Millisecond) - - ackRetryLoop: - for i := 0; i <= maxRetries; i++ { - ackErr := msg.Ack() - if ackErr == nil { - break ackRetryLoop - } else if i == maxRetries { - t.Fatalf("Failed to ACK message %d (retried %d times)", seq, maxRetries) - } else { - time.Sleep(retryDelay) - } - } - - if received.count()%1000 == 0 { - t.Logf("Consumed %d/%d, duplicate deliveries: %d", received.count(), numMessages, deliveryCount-received.count()) - // Close connection and resume consuming on a different one - resetDurableConsumer() - } - } - - resetDurableConsumer() - - chaos.start() - defer chaos.stop() - - // Wait for long enough silence. - // Either a stall, or all messages received - <-timeoutTimer.C - - // Shut down consumer - if sub != nil { - sub.Unsubscribe() - } - - uniqueDeliveredCount := received.count() - - t.Logf( - "Delivered %d/%d messages %d duplicate deliveries", - uniqueDeliveredCount, - numMessages, - deliveryCount-uniqueDeliveredCount, - ) - - if uniqueDeliveredCount != numMessages { - t.Fatalf("No new message delivered in the last %s, %d/%d messages never delivered", timeout, numMessages-uniqueDeliveredCount, numMessages) - } -} - -func TestJetStreamChaosConsumerPull(t *testing.T) { - - const numMessages = 10_000 - const maxRetries = 100 - const retryDelay = 500 * time.Millisecond - const fetchTimeout = 250 * time.Millisecond - const fetchBatchSize = 100 - const clusterSize = 3 - const replicas = 3 - const durableConsumerName = "durable" - - c := createJetStreamClusterExplicit(t, chaosConsumerTestsClusterName, clusterSize) - defer c.shutdown() - - createStreamForConsumerChaosTest(t, c, replicas, numMessages) - - chaos := createClusterChaosMonkeyController( - t, - c, - &clusterBouncerChaosMonkey{ - minDowntime: 0 * time.Second, - maxDowntime: 2 * time.Second, - minDownServers: clusterSize, // Whole cluster outage - maxDownServers: clusterSize, - pause: 1 * time.Second, - }, - ) - - subNc, subJs := jsClientConnectCluster(t, c) - defer subNc.Close() - - sub, err := subJs.PullSubscribe( - chaosConsumerTestsSubject, - durableConsumerName, - ) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer sub.Unsubscribe() - - if chaosConsumerTestsDebug { - t.Logf("Initial subscription: %s", toIndentedJsonString(sub)) - } - - chaos.start() - defer chaos.stop() - - fetchMaxWait := nats.MaxWait(fetchTimeout) - received := NewBitset(numMessages) - deliveredCount := uint64(0) - - for received.count() < numMessages { - - var msgs []*nats.Msg - var fetchErr error - - fetchRetryLoop: - for r := 0; r <= maxRetries; r++ { - msgs, fetchErr = sub.Fetch(fetchBatchSize, fetchMaxWait) - if fetchErr == nil { - break fetchRetryLoop - } else if r == maxRetries { - t.Fatalf("Exceeded max retries for Fetch, last error: %s", fetchErr) - } else if fetchErr == nats.ErrBadSubscription { - t.Fatalf("Subscription is invalid: %s", toIndentedJsonString(sub)) - } else { - // t.Logf("Fetch error: %v", fetchErr) - time.Sleep(retryDelay) - } - } - - for _, msg := range msgs { - - deliveredCount += 1 - - metadata, err := msg.Metadata() - if err != nil { - t.Fatalf("Failed to get message metadata: %v", err) - } - - streamSeq := metadata.Sequence.Stream - - expectedMsgData := []byte(fmt.Sprintf("%d", streamSeq)) - - if !bytes.Equal(msg.Data, expectedMsgData) { - t.Fatalf("Expecting message %s, got %s instead", expectedMsgData, msg.Data) - } - - isDupe := received.get(streamSeq - 1) - - received.set(streamSeq-1, true) - - // Simulate application processing (and gives the monkey some time to brew chaos) - time.Sleep(10 * time.Millisecond) - - ackRetryLoop: - for r := 0; r <= maxRetries; r++ { - ackErr := msg.Ack() - if ackErr == nil { - break ackRetryLoop - } else if r == maxRetries { - t.Fatalf("Failed to ACK message %d, last error: %s", streamSeq, ackErr) - } else { - time.Sleep(retryDelay) - } - } - - if !isDupe && received.count()%1000 == 0 { - t.Logf("Consumed %d/%d (duplicates: %d)", received.count(), numMessages, deliveredCount-received.count()) - } - } - } -} diff --git a/server/jetstream_chaos_helpers_test.go b/server/jetstream_chaos_helpers_test.go deleted file mode 100644 index 5b3f9001..00000000 --- a/server/jetstream_chaos_helpers_test.go +++ /dev/null @@ -1,177 +0,0 @@ -// Copyright 2022 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build js_chaos_tests -// +build js_chaos_tests - -package server - -import ( - "fmt" - "math/rand" - "sync" - "testing" - "time" -) - -// Additional cluster helpers - -func (c *cluster) waitOnClusterHealthz() { - c.t.Helper() - for _, cs := range c.servers { - c.waitOnServerHealthz(cs) - } -} - -func (c *cluster) stopSubset(toStop []*Server) { - c.t.Helper() - for _, s := range toStop { - s.Shutdown() - } -} - -func (c *cluster) selectRandomServers(numServers int) []*Server { - c.t.Helper() - if numServers > len(c.servers) { - panic(fmt.Sprintf("Can't select %d servers in a cluster of %d", numServers, len(c.servers))) - } - var selectedServers []*Server - selectedServers = append(selectedServers, c.servers...) - rand.Shuffle(len(selectedServers), func(x, y int) { - selectedServers[x], selectedServers[y] = selectedServers[y], selectedServers[x] - }) - return selectedServers[0:numServers] -} - -// Support functions for "chaos" testing (random injected failures) - -type ChaosMonkeyController interface { - // Launch the monkey as background routine and return - start() - // Stop a monkey that was previously started - stop() - // Run the monkey synchronously, until it is manually stopped via stopCh - run() -} - -type ClusterChaosMonkey interface { - // Set defaults and validates the monkey parameters - validate(t *testing.T, c *cluster) - // Run the monkey synchronously, until it is manually stopped via stopCh - run(t *testing.T, c *cluster, stopCh <-chan bool) -} - -// Chaos Monkey Controller that acts on a cluster -type clusterChaosMonkeyController struct { - t *testing.T - cluster *cluster - wg sync.WaitGroup - stopCh chan bool - ccm ClusterChaosMonkey -} - -func createClusterChaosMonkeyController(t *testing.T, c *cluster, ccm ClusterChaosMonkey) ChaosMonkeyController { - ccm.validate(t, c) - return &clusterChaosMonkeyController{ - t: t, - cluster: c, - stopCh: make(chan bool, 3), - ccm: ccm, - } -} - -func (m *clusterChaosMonkeyController) start() { - m.t.Logf("🐵 Starting monkey") - m.wg.Add(1) - go func() { - defer m.wg.Done() - m.run() - }() -} - -func (m *clusterChaosMonkeyController) stop() { - m.t.Logf("🐵 Stopping monkey") - m.stopCh <- true - m.wg.Wait() - m.t.Logf("🐵 Monkey stopped") -} - -func (m *clusterChaosMonkeyController) run() { - m.ccm.run(m.t, m.cluster, m.stopCh) -} - -// Cluster Chaos Monkey that selects a random subset of the nodes in a cluster (according to min/max provided), -// shuts them down for a given duration (according to min/max provided), then brings them back up. -// Then sleeps for a given time, and does it again until stopped. -type clusterBouncerChaosMonkey struct { - minDowntime time.Duration - maxDowntime time.Duration - minDownServers int - maxDownServers int - pause time.Duration -} - -func (m *clusterBouncerChaosMonkey) validate(t *testing.T, c *cluster) { - if m.minDowntime > m.maxDowntime { - t.Fatalf("Min downtime %v cannot be larger than max downtime %v", m.minDowntime, m.maxDowntime) - } - - if m.minDownServers > m.maxDownServers { - t.Fatalf("Min down servers %v cannot be larger than max down servers %v", m.minDownServers, m.maxDownServers) - } -} - -func (m *clusterBouncerChaosMonkey) run(t *testing.T, c *cluster, stopCh <-chan bool) { - for { - // Pause between actions - select { - case <-stopCh: - return - case <-time.After(m.pause): - } - - // Pick a random subset of servers - numServersDown := rand.Intn(1+m.maxDownServers-m.minDownServers) + m.minDownServers - servers := c.selectRandomServers(numServersDown) - serverNames := []string{} - for _, s := range servers { - serverNames = append(serverNames, s.info.Name) - } - - // Pick a random outage interval - minOutageNanos := m.minDowntime.Nanoseconds() - maxOutageNanos := m.maxDowntime.Nanoseconds() - outageDurationNanos := rand.Int63n(1+maxOutageNanos-minOutageNanos) + minOutageNanos - outageDuration := time.Duration(outageDurationNanos) - - // Take down selected servers - t.Logf("🐵 Taking down %d/%d servers for %v (%v)", numServersDown, len(c.servers), outageDuration, serverNames) - c.stopSubset(servers) - - // Wait for the "outage" duration - select { - case <-stopCh: - return - case <-time.After(outageDuration): - } - - // Restart servers and wait for cluster to be healthy - t.Logf("🐵 Restoring cluster") - c.restartAllSamePorts() - c.waitOnClusterHealthz() - - c.waitOnClusterReady() - c.waitOnAllCurrent() - c.waitOnLeader() - } -} diff --git a/server/jetstream_chaos_kv_test.go b/server/jetstream_chaos_kv_test.go deleted file mode 100644 index 6c177a9a..00000000 --- a/server/jetstream_chaos_kv_test.go +++ /dev/null @@ -1,456 +0,0 @@ -// Copyright 2022 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build js_chaos_tests -// +build js_chaos_tests - -package server - -import ( - "fmt" - "math/rand" - "strings" - "sync" - "testing" - "time" - - "github.com/nats-io/nats.go" -) - -const ( - chaosKvTestsClusterName = "KV_CHAOS_TEST" - chaosKvTestsBucketName = "KV_CHAOS_TEST_BUCKET" - chaosKvTestsSubject = "foo" - chaosKvTestsDebug = false -) - -// Creates KV store (a.k.a. bucket). -func createBucketForKvChaosTest(t *testing.T, c *cluster, replicas int) { - t.Helper() - - pubNc, pubJs := jsClientConnectCluster(t, c) - defer pubNc.Close() - - config := nats.KeyValueConfig{ - Bucket: chaosKvTestsBucketName, - Replicas: replicas, - Description: "Test bucket", - } - - kvs, err := pubJs.CreateKeyValue(&config) - if err != nil { - t.Fatalf("Error creating bucket: %v", err) - } - - status, err := kvs.Status() - if err != nil { - t.Fatalf("Error retrieving bucket status: %v", err) - } - t.Logf("Bucket created: %s", status.Bucket()) -} - -// Single client performs a set of PUT on a single key. -// If PUT is successful, perform a GET on the same key. -// If GET is successful, ensure key revision and value match the most recent successful write. -func TestJetStreamChaosKvPutGet(t *testing.T) { - - const numOps = 100_000 - const clusterSize = 3 - const replicas = 3 - const key = "key" - const staleReadsOk = true // Set to false to check for violations of 'read committed' consistency - - c := createJetStreamClusterExplicit(t, chaosKvTestsClusterName, clusterSize) - defer c.shutdown() - - createBucketForKvChaosTest(t, c, replicas) - - chaos := createClusterChaosMonkeyController( - t, - c, - &clusterBouncerChaosMonkey{ - minDowntime: 0 * time.Second, - maxDowntime: 2 * time.Second, - minDownServers: clusterSize, // Whole cluster outage - maxDownServers: clusterSize, - pause: 1 * time.Second, - }, - ) - - nc, js := jsClientConnectCluster(t, c) - defer nc.Close() - - // Create KV bucket - kv, err := js.KeyValue(chaosKvTestsBucketName) - if err != nil { - t.Fatalf("Failed to get KV store: %v", err) - } - - // Initialize the only key - firstRevision, err := kv.Create(key, []byte("INITIAL VALUE")) - if err != nil { - t.Fatalf("Failed to create key: %v", err) - } else if firstRevision != 1 { - t.Fatalf("Unexpected revision: %d", firstRevision) - } - - // Start chaos - chaos.start() - defer chaos.stop() - - staleReadsCount := uint64(0) - successCount := uint64(0) - - previousRevision := firstRevision - -putGetLoop: - for i := 1; i <= numOps; i++ { - - if i%1000 == 0 { - t.Logf("Completed %d/%d PUT+GET operations", i, numOps) - } - - // PUT a value - putValue := fmt.Sprintf("value-%d", i) - putRevision, err := kv.Put(key, []byte(putValue)) - if err != nil { - t.Logf("PUT error: %v", err) - continue putGetLoop - } - - // Check revision is monotonically increasing - if putRevision <= previousRevision { - t.Fatalf("PUT produced revision %d which is not greater than the previous successful PUT revision: %d", putRevision, previousRevision) - } - - previousRevision = putRevision - - // If PUT was successful, GET the same - kve, err := kv.Get(key) - if err == nats.ErrKeyNotFound { - t.Fatalf("GET key not found, but key does exists (last PUT revision: %d)", putRevision) - } else if err != nil { - t.Logf("GET error: %v", err) - continue putGetLoop - } - - getValue := string(kve.Value()) - getRevision := kve.Revision() - - if putRevision > getRevision { - // Stale read, violates 'read committed' consistency criteria - if !staleReadsOk { - t.Fatalf("PUT value %s (rev: %d) then read value %s (rev: %d)", putValue, putRevision, getValue, getRevision) - } else { - staleReadsCount += 1 - } - } else if putRevision < getRevision { - // Returned revision is higher than any ever written, this should never happen - t.Fatalf("GET returned revision %d, but most recent expected revision is %d", getRevision, putRevision) - } else if putValue != getValue { - // Returned revision matches latest, but values do not, this should never happen - t.Fatalf("GET returned revision %d with value %s, but value %s was just committed for that same revision", getRevision, getValue, putValue) - } else { - // Get returned the latest revision/value - successCount += 1 - if chaosKvTestsDebug { - t.Logf("PUT+GET %s=%s (rev: %d)", key, putValue, putRevision) - } - } - } - - t.Logf("Completed %d PUT+GET cycles of which %d successful, %d GETs returned a stale value", numOps, successCount, staleReadsCount) -} - -// A variant TestJetStreamChaosKvPutGet where PUT is retried until successful, and GET is retried until it returns the latest known key revision. -// This validates than a confirmed PUT value is never lost, and becomes eventually visible. -func TestJetStreamChaosKvPutGetWithRetries(t *testing.T) { - - const numOps = 10_000 - const maxRetries = 20 - const retryDelay = 100 * time.Millisecond - const clusterSize = 3 - const replicas = 3 - const key = "key" - - c := createJetStreamClusterExplicit(t, chaosKvTestsClusterName, clusterSize) - defer c.shutdown() - - createBucketForKvChaosTest(t, c, replicas) - - chaos := createClusterChaosMonkeyController( - t, - c, - &clusterBouncerChaosMonkey{ - minDowntime: 0 * time.Second, - maxDowntime: 2 * time.Second, - minDownServers: clusterSize, // Whole cluster outage - maxDownServers: clusterSize, - pause: 1 * time.Second, - }, - ) - - nc, js := jsClientConnectCluster(t, c) - defer nc.Close() - - kv, err := js.KeyValue(chaosKvTestsBucketName) - if err != nil { - t.Fatalf("Failed to get KV store: %v", err) - } - - // Initialize key value - firstRevision, err := kv.Create(key, []byte("INITIAL VALUE")) - if err != nil { - t.Fatalf("Failed to create key: %v", err) - } else if firstRevision != 1 { - t.Fatalf("Unexpected revision: %d", firstRevision) - } - - // Start chaos - chaos.start() - defer chaos.stop() - - staleReadCount := 0 - previousRevision := firstRevision - -putGetLoop: - for i := 1; i <= numOps; i++ { - - if i%1000 == 0 { - t.Logf("Completed %d/%d PUT+GET operations", i, numOps) - } - - putValue := fmt.Sprintf("value-%d", i) - putRevision := uint64(0) - - // Put new value for key, retry until successful or out of retries - putRetryLoop: - for r := 0; r <= maxRetries; r++ { - var putErr error - putRevision, putErr = kv.Put(key, []byte(putValue)) - if putErr == nil { - break putRetryLoop - } else if r == maxRetries { - t.Fatalf("Failed to PUT (retried %d times): %v", maxRetries, putErr) - } else { - if chaosKvTestsDebug { - t.Logf("PUT error: %v", putErr) - } - time.Sleep(retryDelay) - } - } - - // Ensure key version is monotonically increasing - if putRevision <= previousRevision { - t.Fatalf("Latest PUT created revision %d which is not greater than the previous revision: %d", putRevision, previousRevision) - } - previousRevision = putRevision - - // Read value for key, retry until successful, and validate corresponding version and value - getRetryLoop: - for r := 0; r <= maxRetries; r++ { - var getErr error - kve, getErr := kv.Get(key) - if getErr != nil && r == maxRetries { - t.Fatalf("Failed to GET (retried %d times): %v", maxRetries, getErr) - } else if getErr != nil { - if chaosKvTestsDebug { - t.Logf("GET error: %v", getErr) - } - time.Sleep(retryDelay) - continue getRetryLoop - } - - // GET successful, check revision and value - getValue := string(kve.Value()) - getRevision := kve.Revision() - - if putRevision == getRevision { - if putValue != getValue { - t.Fatalf("Unexpected value %s for revision %d, expected: %s", getValue, getRevision, putValue) - } - if chaosKvTestsDebug { - t.Logf("PUT+GET %s=%s (rev: %d) (retry: %d)", key, putValue, putRevision, r) - } - continue putGetLoop - } else if getRevision > putRevision { - t.Fatalf("GET returned version that should not exist yet: %d, last created: %d", getRevision, putRevision) - } else { // get revision < put revision - staleReadCount += 1 - if chaosKvTestsDebug { - t.Logf("GET got stale value: %v (rev: %d, latest: %d)", getValue, getRevision, putRevision) - } - time.Sleep(retryDelay) - continue getRetryLoop - } - } - } - - t.Logf("Client completed %d PUT+GET cycles, %d GET returned a stale value", numOps, staleReadCount) -} - -// Multiple clients updating a finite set of keys with CAS semantics. -// TODO check that revision is never lower than last one seen -// TODO check that KeyNotFound is never returned, as keys are initialized beforehand -func TestJetStreamChaosKvCAS(t *testing.T) { - const numOps = 10_000 - const maxRetries = 50 - const retryDelay = 300 * time.Millisecond - const clusterSize = 3 - const replicas = 3 - const numKeys = 15 - const numClients = 5 - - c := createJetStreamClusterExplicit(t, chaosKvTestsClusterName, clusterSize) - defer c.shutdown() - - createBucketForKvChaosTest(t, c, replicas) - - chaos := createClusterChaosMonkeyController( - t, - c, - &clusterBouncerChaosMonkey{ - minDowntime: 0 * time.Second, - maxDowntime: 2 * time.Second, - minDownServers: clusterSize, // Whole cluster outage - maxDownServers: clusterSize, - pause: 1 * time.Second, - }, - ) - - nc, js := jsClientConnectCluster(t, c) - defer nc.Close() - - // Create bucket - kv, err := js.KeyValue(chaosKvTestsBucketName) - if err != nil { - t.Fatalf("Failed to get KV store: %v", err) - } - - // Create set of keys and initialize them with dummy value - keys := make([]string, numKeys) - for k := 0; k < numKeys; k++ { - key := fmt.Sprintf("key-%d", k) - keys[k] = key - - _, err := kv.Create(key, []byte("Initial value")) - if err != nil { - t.Fatalf("Failed to create key: %v", err) - } - } - - wgStart := sync.WaitGroup{} - wgComplete := sync.WaitGroup{} - - // Client routine - client := func(clientId int, kv nats.KeyValue) { - defer wgComplete.Done() - - rng := rand.New(rand.NewSource(int64(clientId))) - successfulUpdates := 0 - casRejectUpdates := 0 - otherUpdateErrors := 0 - - // Map to track last known revision for each of the keys - knownRevisions := map[string]uint64{} - for _, key := range keys { - knownRevisions[key] = 0 - } - - // Wait for all clients to reach this point before proceeding - wgStart.Done() - wgStart.Wait() - - for i := 1; i <= numOps; i++ { - - if i%1000 == 0 { - t.Logf("Client %d completed %d/%d updates", clientId, i, numOps) - } - - // Pick random key from the set - key := keys[rng.Intn(numKeys)] - - // Prepare unique value to be written - value := fmt.Sprintf("client: %d operation %d", clientId, i) - - // Try to update a key with CAS - newRevision, updateErr := kv.Update(key, []byte(value), knownRevisions[key]) - if updateErr == nil { - // Update successful - knownRevisions[key] = newRevision - successfulUpdates += 1 - if chaosKvTestsDebug { - t.Logf("Client %d updated key %s, new revision: %d", clientId, key, newRevision) - } - } else if updateErr != nil && strings.Contains(fmt.Sprint(updateErr), "wrong last sequence") { - // CAS rejected update, learn current revision for this key - casRejectUpdates += 1 - - for r := 0; r <= maxRetries; r++ { - kve, getErr := kv.Get(key) - if getErr == nil { - currentRevision := kve.Revision() - if currentRevision < knownRevisions[key] { - // Revision number moved backward, this should never happen - t.Fatalf("Current revision for key %s is %d, which is lower than the last known revision %d", key, currentRevision, knownRevisions[key]) - - } - - knownRevisions[key] = currentRevision - if chaosKvTestsDebug { - t.Logf("Client %d learn key %s revision: %d", clientId, key, currentRevision) - } - break - } else if r == maxRetries { - t.Fatalf("Failed to GET (retried %d times): %v", maxRetries, getErr) - } else { - time.Sleep(retryDelay) - } - } - } else { - // Other update error - otherUpdateErrors += 1 - if chaosKvTestsDebug { - t.Logf("Client %d update error for key %s: %v", clientId, key, updateErr) - } - time.Sleep(retryDelay) - } - } - t.Logf("Client %d done, %d kv updates, %d CAS rejected, %d other errors", clientId, successfulUpdates, casRejectUpdates, otherUpdateErrors) - } - - // Launch all clients - for i := 1; i <= numClients; i++ { - cNc, cJs := jsClientConnectCluster(t, c) - defer cNc.Close() - - cKv, err := cJs.KeyValue(chaosKvTestsBucketName) - if err != nil { - t.Fatalf("Failed to get KV store: %v", err) - } - - wgStart.Add(1) - wgComplete.Add(1) - go client(i, cKv) - } - - // Wait for clients to be connected and ready - wgStart.Wait() - - // Start failures - chaos.start() - defer chaos.stop() - - // Wait for all clients to be done - wgComplete.Wait() -} diff --git a/server/jetstream_chaos_test.go b/server/jetstream_chaos_test.go new file mode 100644 index 00000000..2ef91f6a --- /dev/null +++ b/server/jetstream_chaos_test.go @@ -0,0 +1,1281 @@ +// Copyright 2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build js_chaos_tests +// +build js_chaos_tests + +package server + +import ( + "bytes" + "encoding/json" + "fmt" + "math/rand" + "strings" + "sync" + "testing" + "time" + + "github.com/nats-io/nats.go" +) + +// Support functions for "chaos" testing (random injected failures) + +type ChaosMonkeyController interface { + // Launch the monkey as background routine and return + start() + // Stop a monkey that was previously started + stop() + // Run the monkey synchronously, until it is manually stopped via stopCh + run() +} + +type ClusterChaosMonkey interface { + // Set defaults and validates the monkey parameters + validate(t *testing.T, c *cluster) + // Run the monkey synchronously, until it is manually stopped via stopCh + run(t *testing.T, c *cluster, stopCh <-chan bool) +} + +// Chaos Monkey Controller that acts on a cluster +type clusterChaosMonkeyController struct { + t *testing.T + cluster *cluster + wg sync.WaitGroup + stopCh chan bool + ccm ClusterChaosMonkey +} + +func createClusterChaosMonkeyController(t *testing.T, c *cluster, ccm ClusterChaosMonkey) ChaosMonkeyController { + ccm.validate(t, c) + return &clusterChaosMonkeyController{ + t: t, + cluster: c, + stopCh: make(chan bool, 3), + ccm: ccm, + } +} + +func (m *clusterChaosMonkeyController) start() { + m.t.Logf("🐵 Starting monkey") + m.wg.Add(1) + go func() { + defer m.wg.Done() + m.run() + }() +} + +func (m *clusterChaosMonkeyController) stop() { + m.t.Logf("🐵 Stopping monkey") + m.stopCh <- true + m.wg.Wait() + m.t.Logf("🐵 Monkey stopped") +} + +func (m *clusterChaosMonkeyController) run() { + m.ccm.run(m.t, m.cluster, m.stopCh) +} + +// Cluster Chaos Monkey that selects a random subset of the nodes in a cluster (according to min/max provided), +// shuts them down for a given duration (according to min/max provided), then brings them back up. +// Then sleeps for a given time, and does it again until stopped. +type clusterBouncerChaosMonkey struct { + minDowntime time.Duration + maxDowntime time.Duration + minDownServers int + maxDownServers int + pause time.Duration +} + +func (m *clusterBouncerChaosMonkey) validate(t *testing.T, c *cluster) { + if m.minDowntime > m.maxDowntime { + t.Fatalf("Min downtime %v cannot be larger than max downtime %v", m.minDowntime, m.maxDowntime) + } + + if m.minDownServers > m.maxDownServers { + t.Fatalf("Min down servers %v cannot be larger than max down servers %v", m.minDownServers, m.maxDownServers) + } +} + +func (m *clusterBouncerChaosMonkey) run(t *testing.T, c *cluster, stopCh <-chan bool) { + for { + // Pause between actions + select { + case <-stopCh: + return + case <-time.After(m.pause): + } + + // Pick a random subset of servers + numServersDown := rand.Intn(1+m.maxDownServers-m.minDownServers) + m.minDownServers + servers := c.selectRandomServers(numServersDown) + serverNames := []string{} + for _, s := range servers { + serverNames = append(serverNames, s.info.Name) + } + + // Pick a random outage interval + minOutageNanos := m.minDowntime.Nanoseconds() + maxOutageNanos := m.maxDowntime.Nanoseconds() + outageDurationNanos := rand.Int63n(1+maxOutageNanos-minOutageNanos) + minOutageNanos + outageDuration := time.Duration(outageDurationNanos) + + // Take down selected servers + t.Logf("🐵 Taking down %d/%d servers for %v (%v)", numServersDown, len(c.servers), outageDuration, serverNames) + c.stopSubset(servers) + + // Wait for the "outage" duration + select { + case <-stopCh: + return + case <-time.After(outageDuration): + } + + // Restart servers and wait for cluster to be healthy + t.Logf("🐵 Restoring cluster") + c.restartAllSamePorts() + c.waitOnClusterHealthz() + + c.waitOnClusterReady() + c.waitOnAllCurrent() + c.waitOnLeader() + } +} + +// Additional cluster methods for chaos testing + +func (c *cluster) waitOnClusterHealthz() { + c.t.Helper() + for _, cs := range c.servers { + c.waitOnServerHealthz(cs) + } +} + +func (c *cluster) stopSubset(toStop []*Server) { + c.t.Helper() + for _, s := range toStop { + s.Shutdown() + } +} + +func (c *cluster) selectRandomServers(numServers int) []*Server { + c.t.Helper() + if numServers > len(c.servers) { + panic(fmt.Sprintf("Can't select %d servers in a cluster of %d", numServers, len(c.servers))) + } + var selectedServers []*Server + selectedServers = append(selectedServers, c.servers...) + rand.Shuffle(len(selectedServers), func(x, y int) { + selectedServers[x], selectedServers[y] = selectedServers[y], selectedServers[x] + }) + return selectedServers[0:numServers] +} + +// Other helpers + +func jsClientConnectCluster(t testing.TB, c *cluster) (*nats.Conn, nats.JetStreamContext) { + serverConnectURLs := make([]string, len(c.servers)) + for i, server := range c.servers { + serverConnectURLs[i] = server.ClientURL() + } + connectURL := strings.Join(serverConnectURLs, ",") + + nc, err := nats.Connect(connectURL) + if err != nil { + t.Fatalf("Failed to connect: %s", err) + } + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Failed to init JetStream context: %s", err) + } + + return nc, js +} + +func toIndentedJsonString(v any) string { + s, err := json.MarshalIndent(v, "", " ") + if err != nil { + panic(err) + } + return string(s) +} + +// Bounces the entire set of nodes, then brings them back up. +// Fail if some nodes don't come back online. +func TestJetStreamChaosClusterBounce(t *testing.T) { + + const duration = 60 * time.Second + const clusterSize = 3 + + c := createJetStreamClusterExplicit(t, "R3", clusterSize) + defer c.shutdown() + + chaos := createClusterChaosMonkeyController( + t, + c, + &clusterBouncerChaosMonkey{ + minDowntime: 0 * time.Second, + maxDowntime: 2 * time.Second, + minDownServers: clusterSize, + maxDownServers: clusterSize, + pause: 3 * time.Second, + }, + ) + chaos.start() + defer chaos.stop() + + <-time.After(duration) +} + +// Bounces a subset of the nodes, then brings them back up. +// Fails if some nodes don't come back online. +func TestJetStreamChaosClusterBounceSubset(t *testing.T) { + + const duration = 60 * time.Second + const clusterSize = 3 + + c := createJetStreamClusterExplicit(t, "R3", clusterSize) + defer c.shutdown() + + chaos := createClusterChaosMonkeyController( + t, + c, + &clusterBouncerChaosMonkey{ + minDowntime: 0 * time.Second, + maxDowntime: 2 * time.Second, + minDownServers: 1, + maxDownServers: clusterSize, + pause: 3 * time.Second, + }, + ) + chaos.start() + defer chaos.stop() + + <-time.After(duration) +} + +const ( + chaosConsumerTestsClusterName = "CONSUMERS_CHAOS_TEST" + chaosConsumerTestsStreamName = "CONSUMER_CHAOS_TEST_STREAM" + chaosConsumerTestsSubject = "foo" + chaosConsumerTestsDebug = false +) + +// Creates stream and fills it with the given number of messages. +// Each message is the string representation of the stream sequence number, +// e.g. the first message (seqno: 1) contains data "1". +// This allows consumers to verify the content of each message without tracking additional state +func createStreamForConsumerChaosTest(t *testing.T, c *cluster, replicas, numMessages int) { + t.Helper() + + const publishBatchSize = 1_000 + + pubNc, pubJs := jsClientConnectCluster(t, c) + defer pubNc.Close() + + _, err := pubJs.AddStream(&nats.StreamConfig{ + Name: chaosConsumerTestsStreamName, + Subjects: []string{chaosConsumerTestsSubject}, + Replicas: replicas, + }) + if err != nil { + t.Fatalf("Error creating stream: %v", err) + } + + ackFutures := make([]nats.PubAckFuture, 0, publishBatchSize) + + for i := 1; i <= numMessages; i++ { + message := []byte(fmt.Sprintf("%d", i)) + pubAckFuture, err := pubJs.PublishAsync(chaosConsumerTestsSubject, message, nats.ExpectLastSequence(uint64(i-1))) + if err != nil { + t.Fatalf("Publish error: %s", err) + } + ackFutures = append(ackFutures, pubAckFuture) + + if (i > 0 && i%publishBatchSize == 0) || i == numMessages { + select { + case <-pubJs.PublishAsyncComplete(): + for _, pubAckFuture := range ackFutures { + select { + case <-pubAckFuture.Ok(): + // Noop + case pubAckErr := <-pubAckFuture.Err(): + t.Fatalf("Error publishing: %s", pubAckErr) + case <-time.After(30 * time.Second): + t.Fatalf("Timeout verifying pubAck for message: %s", pubAckFuture.Msg().Data) + } + } + ackFutures = make([]nats.PubAckFuture, 0, publishBatchSize) + t.Logf("Published %d/%d messages", i, numMessages) + + case <-time.After(30 * time.Second): + t.Fatalf("Publish timed out") + } + } + } +} + +// Verify ordered delivery despite cluster-wide outages +func TestJetStreamChaosConsumerOrdered(t *testing.T) { + + const numMessages = 30_000 + const maxRetries = 100 + const retryDelay = 500 * time.Millisecond + const fetchTimeout = 250 * time.Millisecond + const clusterSize = 3 + const replicas = 3 + + c := createJetStreamClusterExplicit(t, chaosConsumerTestsClusterName, clusterSize) + defer c.shutdown() + + createStreamForConsumerChaosTest(t, c, replicas, numMessages) + + chaos := createClusterChaosMonkeyController( + t, + c, + &clusterBouncerChaosMonkey{ + minDowntime: 0 * time.Second, + maxDowntime: 2 * time.Second, + minDownServers: clusterSize, // Whole cluster outage + maxDownServers: clusterSize, + pause: 1 * time.Second, + }, + ) + + subNc, subJs := jsClientConnectCluster(t, c) + defer subNc.Close() + + sub, err := subJs.SubscribeSync( + chaosConsumerTestsSubject, + nats.OrderedConsumer(), + ) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer sub.Unsubscribe() + + if chaosConsumerTestsDebug { + t.Logf("Initial subscription: %s", toIndentedJsonString(sub)) + } + + chaos.start() + defer chaos.stop() + + for i := 1; i <= numMessages; i++ { + var msg *nats.Msg + var nextMsgErr error + var expectedMsgData = []byte(fmt.Sprintf("%d", i)) + + nextMsgRetryLoop: + for r := 0; r <= maxRetries; r++ { + msg, nextMsgErr = sub.NextMsg(fetchTimeout) + if nextMsgErr == nil { + break nextMsgRetryLoop + } else if r == maxRetries { + t.Fatalf("Exceeded max retries for NextMsg") + } else if nextMsgErr == nats.ErrBadSubscription { + t.Fatalf("Subscription is invalid: %s", toIndentedJsonString(sub)) + } else { + time.Sleep(retryDelay) + } + } + + metadata, err := msg.Metadata() + if err != nil { + t.Fatalf("Failed to get message metadata: %v", err) + } + + if metadata.Sequence.Stream != uint64(i) { + t.Fatalf("Expecting stream sequence %d, got %d instead", i, metadata.Sequence.Stream) + } + + if !bytes.Equal(msg.Data, expectedMsgData) { + t.Fatalf("Expecting message %s, got %s instead", expectedMsgData, msg.Data) + } + + // Simulate application processing (and gives the monkey some time to brew chaos) + time.Sleep(10 * time.Millisecond) + + if i%1000 == 0 { + t.Logf("Consumed %d/%d", i, numMessages) + } + } +} + +// Verify ordered delivery despite cluster-wide outages +func TestJetStreamChaosConsumerAsync(t *testing.T) { + + const numMessages = 30_000 + const timeout = 30 * time.Second // No (new) messages for 30s => terminate + const maxRetries = 25 + const retryDelay = 500 * time.Millisecond + const clusterSize = 3 + const replicas = 3 + + c := createJetStreamClusterExplicit(t, chaosConsumerTestsClusterName, clusterSize) + defer c.shutdown() + + createStreamForConsumerChaosTest(t, c, replicas, numMessages) + + chaos := createClusterChaosMonkeyController( + t, + c, + &clusterBouncerChaosMonkey{ + minDowntime: 0 * time.Second, + maxDowntime: 2 * time.Second, + minDownServers: clusterSize, + maxDownServers: clusterSize, + pause: 2 * time.Second, + }, + ) + + subNc, subJs := jsClientConnectCluster(t, c) + defer subNc.Close() + + timeoutTimer := time.NewTimer(timeout) + deliveryCount := uint64(0) + received := NewBitset(numMessages) + + handleMsg := func(msg *nats.Msg) { + deliveryCount += 1 + + metadata, err := msg.Metadata() + if err != nil { + t.Fatalf("Failed to get message metadata: %v", err) + } + seq := metadata.Sequence.Stream + + var expectedMsgData = []byte(fmt.Sprintf("%d", seq)) + if !bytes.Equal(msg.Data, expectedMsgData) { + t.Fatalf("Expecting message content '%s', got '%s' instead", expectedMsgData, msg.Data) + } + + isDupe := received.get(seq - 1) + + if isDupe { + if chaosConsumerTestsDebug { + t.Logf("Duplicate message delivery, seq: %d", seq) + } + return + } + + // Mark this sequence as received + received.set(seq-1, true) + if received.count() < numMessages { + // Reset timeout + timeoutTimer.Reset(timeout) + } else { + // All received, speed up the shutdown + timeoutTimer.Reset(1 * time.Second) + } + + if received.count()%1000 == 0 { + t.Logf("Consumed %d/%d", received.count(), numMessages) + } + + // Simulate application processing (and gives the monkey some time to brew chaos) + time.Sleep(10 * time.Millisecond) + + ackRetryLoop: + for i := 0; i <= maxRetries; i++ { + ackErr := msg.Ack() + if ackErr == nil { + break ackRetryLoop + } else if i == maxRetries { + t.Fatalf("Failed to ACK message %d (retried %d times)", seq, maxRetries) + } else { + time.Sleep(retryDelay) + } + } + } + + subOpts := []nats.SubOpt{} + sub, err := subJs.Subscribe(chaosConsumerTestsSubject, handleMsg, subOpts...) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer sub.Unsubscribe() + + chaos.start() + defer chaos.stop() + + // Wait for long enough silence. + // Either a stall, or all messages received + <-timeoutTimer.C + + // Shut down consumer + sub.Unsubscribe() + + uniqueDeliveredCount := received.count() + + t.Logf( + "Delivered %d/%d messages %d duplicate deliveries", + uniqueDeliveredCount, + numMessages, + deliveryCount-uniqueDeliveredCount, + ) + + if uniqueDeliveredCount != numMessages { + t.Fatalf("No new message delivered in the last %s, %d/%d messages never delivered", timeout, numMessages-uniqueDeliveredCount, numMessages) + } +} + +// Verify durable consumer retains state despite cluster-wide outages +// The consumer connection is also periodically closed, and the consumer 'resumes' on a different one +func TestJetStreamChaosConsumerDurable(t *testing.T) { + + const numMessages = 30_000 + const timeout = 30 * time.Second // No (new) messages for 60s => terminate + const clusterSize = 3 + const replicas = 3 + const maxRetries = 25 + const retryDelay = 500 * time.Millisecond + const durableConsumerName = "durable" + + c := createJetStreamClusterExplicit(t, chaosConsumerTestsClusterName, clusterSize) + defer c.shutdown() + + createStreamForConsumerChaosTest(t, c, replicas, numMessages) + + chaos := createClusterChaosMonkeyController( + t, + c, + &clusterBouncerChaosMonkey{ + minDowntime: 0 * time.Second, + maxDowntime: 2 * time.Second, + minDownServers: 1, + maxDownServers: clusterSize, + pause: 3 * time.Second, + }, + ) + + var nc *nats.Conn + var sub *nats.Subscription + var subLock sync.Mutex + + var handleMsgFun func(msg *nats.Msg) + var natsURL string + + { + var sb strings.Builder + for _, s := range c.servers { + sb.WriteString(s.ClientURL()) + sb.WriteString(",") + } + natsURL = sb.String() + } + + resetDurableConsumer := func() { + subLock.Lock() + defer subLock.Unlock() + + if nc != nil { + nc.Close() + } + + var newNc *nats.Conn + connectRetryLoop: + for r := 0; r <= maxRetries; r++ { + var connErr error + newNc, connErr = nats.Connect(natsURL) + if connErr == nil { + break connectRetryLoop + } else if r == maxRetries { + t.Fatalf("Failed to connect, exceeded max retries, last error: %s", connErr) + } else { + time.Sleep(retryDelay) + } + } + + var newJs nats.JetStreamContext + jsRetryLoop: + for r := 0; r <= maxRetries; r++ { + var jsErr error + newJs, jsErr = newNc.JetStream(nats.MaxWait(10 * time.Second)) + if jsErr == nil { + break jsRetryLoop + } else if r == maxRetries { + t.Fatalf("Failed to get JS, exceeded max retries, last error: %s", jsErr) + } else { + time.Sleep(retryDelay) + } + } + + subOpts := []nats.SubOpt{ + nats.Durable(durableConsumerName), + } + + var newSub *nats.Subscription + subscribeRetryLoop: + for i := 0; i <= maxRetries; i++ { + var subErr error + newSub, subErr = newJs.Subscribe(chaosConsumerTestsSubject, handleMsgFun, subOpts...) + if subErr == nil { + ci, err := newJs.ConsumerInfo(chaosConsumerTestsStreamName, durableConsumerName) + if err == nil { + if chaosConsumerTestsDebug { + t.Logf("Consumer info:\n %s", toIndentedJsonString(ci)) + } + } else { + t.Logf("Failed to retrieve consumer info: %s", err) + } + + break subscribeRetryLoop + } else if i == maxRetries { + t.Fatalf("Exceeded max retries creating subscription: %v", subErr) + } else { + time.Sleep(retryDelay) + } + } + + nc, sub = newNc, newSub + } + + timeoutTimer := time.NewTimer(timeout) + deliveryCount := uint64(0) + received := NewBitset(numMessages) + + handleMsgFun = func(msg *nats.Msg) { + + subLock.Lock() + if msg.Sub != sub { + // Message from a previous instance of durable consumer, drop + defer subLock.Unlock() + return + } + subLock.Unlock() + + deliveryCount += 1 + + metadata, err := msg.Metadata() + if err != nil { + t.Fatalf("Failed to get message metadata: %v", err) + } + seq := metadata.Sequence.Stream + + var expectedMsgData = []byte(fmt.Sprintf("%d", seq)) + if !bytes.Equal(msg.Data, expectedMsgData) { + t.Fatalf("Expecting message content '%s', got '%s' instead", expectedMsgData, msg.Data) + } + + isDupe := received.get(seq - 1) + + if isDupe { + if chaosConsumerTestsDebug { + t.Logf("Duplicate message delivery, seq: %d", seq) + } + return + } + + // Mark this sequence as received + received.set(seq-1, true) + if received.count() < numMessages { + // Reset timeout + timeoutTimer.Reset(timeout) + } else { + // All received, speed up the shutdown + timeoutTimer.Reset(1 * time.Second) + } + + // Simulate application processing (and gives the monkey some time to brew chaos) + time.Sleep(10 * time.Millisecond) + + ackRetryLoop: + for i := 0; i <= maxRetries; i++ { + ackErr := msg.Ack() + if ackErr == nil { + break ackRetryLoop + } else if i == maxRetries { + t.Fatalf("Failed to ACK message %d (retried %d times)", seq, maxRetries) + } else { + time.Sleep(retryDelay) + } + } + + if received.count()%1000 == 0 { + t.Logf("Consumed %d/%d, duplicate deliveries: %d", received.count(), numMessages, deliveryCount-received.count()) + // Close connection and resume consuming on a different one + resetDurableConsumer() + } + } + + resetDurableConsumer() + + chaos.start() + defer chaos.stop() + + // Wait for long enough silence. + // Either a stall, or all messages received + <-timeoutTimer.C + + // Shut down consumer + if sub != nil { + sub.Unsubscribe() + } + + uniqueDeliveredCount := received.count() + + t.Logf( + "Delivered %d/%d messages %d duplicate deliveries", + uniqueDeliveredCount, + numMessages, + deliveryCount-uniqueDeliveredCount, + ) + + if uniqueDeliveredCount != numMessages { + t.Fatalf("No new message delivered in the last %s, %d/%d messages never delivered", timeout, numMessages-uniqueDeliveredCount, numMessages) + } +} + +func TestJetStreamChaosConsumerPull(t *testing.T) { + + const numMessages = 10_000 + const maxRetries = 100 + const retryDelay = 500 * time.Millisecond + const fetchTimeout = 250 * time.Millisecond + const fetchBatchSize = 100 + const clusterSize = 3 + const replicas = 3 + const durableConsumerName = "durable" + + c := createJetStreamClusterExplicit(t, chaosConsumerTestsClusterName, clusterSize) + defer c.shutdown() + + createStreamForConsumerChaosTest(t, c, replicas, numMessages) + + chaos := createClusterChaosMonkeyController( + t, + c, + &clusterBouncerChaosMonkey{ + minDowntime: 0 * time.Second, + maxDowntime: 2 * time.Second, + minDownServers: clusterSize, // Whole cluster outage + maxDownServers: clusterSize, + pause: 1 * time.Second, + }, + ) + + subNc, subJs := jsClientConnectCluster(t, c) + defer subNc.Close() + + sub, err := subJs.PullSubscribe( + chaosConsumerTestsSubject, + durableConsumerName, + ) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer sub.Unsubscribe() + + if chaosConsumerTestsDebug { + t.Logf("Initial subscription: %s", toIndentedJsonString(sub)) + } + + chaos.start() + defer chaos.stop() + + fetchMaxWait := nats.MaxWait(fetchTimeout) + received := NewBitset(numMessages) + deliveredCount := uint64(0) + + for received.count() < numMessages { + + var msgs []*nats.Msg + var fetchErr error + + fetchRetryLoop: + for r := 0; r <= maxRetries; r++ { + msgs, fetchErr = sub.Fetch(fetchBatchSize, fetchMaxWait) + if fetchErr == nil { + break fetchRetryLoop + } else if r == maxRetries { + t.Fatalf("Exceeded max retries for Fetch, last error: %s", fetchErr) + } else if fetchErr == nats.ErrBadSubscription { + t.Fatalf("Subscription is invalid: %s", toIndentedJsonString(sub)) + } else { + // t.Logf("Fetch error: %v", fetchErr) + time.Sleep(retryDelay) + } + } + + for _, msg := range msgs { + + deliveredCount += 1 + + metadata, err := msg.Metadata() + if err != nil { + t.Fatalf("Failed to get message metadata: %v", err) + } + + streamSeq := metadata.Sequence.Stream + + expectedMsgData := []byte(fmt.Sprintf("%d", streamSeq)) + + if !bytes.Equal(msg.Data, expectedMsgData) { + t.Fatalf("Expecting message %s, got %s instead", expectedMsgData, msg.Data) + } + + isDupe := received.get(streamSeq - 1) + + received.set(streamSeq-1, true) + + // Simulate application processing (and gives the monkey some time to brew chaos) + time.Sleep(10 * time.Millisecond) + + ackRetryLoop: + for r := 0; r <= maxRetries; r++ { + ackErr := msg.Ack() + if ackErr == nil { + break ackRetryLoop + } else if r == maxRetries { + t.Fatalf("Failed to ACK message %d, last error: %s", streamSeq, ackErr) + } else { + time.Sleep(retryDelay) + } + } + + if !isDupe && received.count()%1000 == 0 { + t.Logf("Consumed %d/%d (duplicates: %d)", received.count(), numMessages, deliveredCount-received.count()) + } + } + } +} + +const ( + chaosKvTestsClusterName = "KV_CHAOS_TEST" + chaosKvTestsBucketName = "KV_CHAOS_TEST_BUCKET" + chaosKvTestsSubject = "foo" + chaosKvTestsDebug = false +) + +// Creates KV store (a.k.a. bucket). +func createBucketForKvChaosTest(t *testing.T, c *cluster, replicas int) { + t.Helper() + + pubNc, pubJs := jsClientConnectCluster(t, c) + defer pubNc.Close() + + config := nats.KeyValueConfig{ + Bucket: chaosKvTestsBucketName, + Replicas: replicas, + Description: "Test bucket", + } + + kvs, err := pubJs.CreateKeyValue(&config) + if err != nil { + t.Fatalf("Error creating bucket: %v", err) + } + + status, err := kvs.Status() + if err != nil { + t.Fatalf("Error retrieving bucket status: %v", err) + } + t.Logf("Bucket created: %s", status.Bucket()) +} + +// Single client performs a set of PUT on a single key. +// If PUT is successful, perform a GET on the same key. +// If GET is successful, ensure key revision and value match the most recent successful write. +func TestJetStreamChaosKvPutGet(t *testing.T) { + + const numOps = 100_000 + const clusterSize = 3 + const replicas = 3 + const key = "key" + const staleReadsOk = true // Set to false to check for violations of 'read committed' consistency + + c := createJetStreamClusterExplicit(t, chaosKvTestsClusterName, clusterSize) + defer c.shutdown() + + createBucketForKvChaosTest(t, c, replicas) + + chaos := createClusterChaosMonkeyController( + t, + c, + &clusterBouncerChaosMonkey{ + minDowntime: 0 * time.Second, + maxDowntime: 2 * time.Second, + minDownServers: clusterSize, // Whole cluster outage + maxDownServers: clusterSize, + pause: 1 * time.Second, + }, + ) + + nc, js := jsClientConnectCluster(t, c) + defer nc.Close() + + // Create KV bucket + kv, err := js.KeyValue(chaosKvTestsBucketName) + if err != nil { + t.Fatalf("Failed to get KV store: %v", err) + } + + // Initialize the only key + firstRevision, err := kv.Create(key, []byte("INITIAL VALUE")) + if err != nil { + t.Fatalf("Failed to create key: %v", err) + } else if firstRevision != 1 { + t.Fatalf("Unexpected revision: %d", firstRevision) + } + + // Start chaos + chaos.start() + defer chaos.stop() + + staleReadsCount := uint64(0) + successCount := uint64(0) + + previousRevision := firstRevision + +putGetLoop: + for i := 1; i <= numOps; i++ { + + if i%1000 == 0 { + t.Logf("Completed %d/%d PUT+GET operations", i, numOps) + } + + // PUT a value + putValue := fmt.Sprintf("value-%d", i) + putRevision, err := kv.Put(key, []byte(putValue)) + if err != nil { + t.Logf("PUT error: %v", err) + continue putGetLoop + } + + // Check revision is monotonically increasing + if putRevision <= previousRevision { + t.Fatalf("PUT produced revision %d which is not greater than the previous successful PUT revision: %d", putRevision, previousRevision) + } + + previousRevision = putRevision + + // If PUT was successful, GET the same + kve, err := kv.Get(key) + if err == nats.ErrKeyNotFound { + t.Fatalf("GET key not found, but key does exists (last PUT revision: %d)", putRevision) + } else if err != nil { + t.Logf("GET error: %v", err) + continue putGetLoop + } + + getValue := string(kve.Value()) + getRevision := kve.Revision() + + if putRevision > getRevision { + // Stale read, violates 'read committed' consistency criteria + if !staleReadsOk { + t.Fatalf("PUT value %s (rev: %d) then read value %s (rev: %d)", putValue, putRevision, getValue, getRevision) + } else { + staleReadsCount += 1 + } + } else if putRevision < getRevision { + // Returned revision is higher than any ever written, this should never happen + t.Fatalf("GET returned revision %d, but most recent expected revision is %d", getRevision, putRevision) + } else if putValue != getValue { + // Returned revision matches latest, but values do not, this should never happen + t.Fatalf("GET returned revision %d with value %s, but value %s was just committed for that same revision", getRevision, getValue, putValue) + } else { + // Get returned the latest revision/value + successCount += 1 + if chaosKvTestsDebug { + t.Logf("PUT+GET %s=%s (rev: %d)", key, putValue, putRevision) + } + } + } + + t.Logf("Completed %d PUT+GET cycles of which %d successful, %d GETs returned a stale value", numOps, successCount, staleReadsCount) +} + +// A variant TestJetStreamChaosKvPutGet where PUT is retried until successful, and GET is retried until it returns the latest known key revision. +// This validates than a confirmed PUT value is never lost, and becomes eventually visible. +func TestJetStreamChaosKvPutGetWithRetries(t *testing.T) { + + const numOps = 10_000 + const maxRetries = 20 + const retryDelay = 100 * time.Millisecond + const clusterSize = 3 + const replicas = 3 + const key = "key" + + c := createJetStreamClusterExplicit(t, chaosKvTestsClusterName, clusterSize) + defer c.shutdown() + + createBucketForKvChaosTest(t, c, replicas) + + chaos := createClusterChaosMonkeyController( + t, + c, + &clusterBouncerChaosMonkey{ + minDowntime: 0 * time.Second, + maxDowntime: 2 * time.Second, + minDownServers: clusterSize, // Whole cluster outage + maxDownServers: clusterSize, + pause: 1 * time.Second, + }, + ) + + nc, js := jsClientConnectCluster(t, c) + defer nc.Close() + + kv, err := js.KeyValue(chaosKvTestsBucketName) + if err != nil { + t.Fatalf("Failed to get KV store: %v", err) + } + + // Initialize key value + firstRevision, err := kv.Create(key, []byte("INITIAL VALUE")) + if err != nil { + t.Fatalf("Failed to create key: %v", err) + } else if firstRevision != 1 { + t.Fatalf("Unexpected revision: %d", firstRevision) + } + + // Start chaos + chaos.start() + defer chaos.stop() + + staleReadCount := 0 + previousRevision := firstRevision + +putGetLoop: + for i := 1; i <= numOps; i++ { + + if i%1000 == 0 { + t.Logf("Completed %d/%d PUT+GET operations", i, numOps) + } + + putValue := fmt.Sprintf("value-%d", i) + putRevision := uint64(0) + + // Put new value for key, retry until successful or out of retries + putRetryLoop: + for r := 0; r <= maxRetries; r++ { + var putErr error + putRevision, putErr = kv.Put(key, []byte(putValue)) + if putErr == nil { + break putRetryLoop + } else if r == maxRetries { + t.Fatalf("Failed to PUT (retried %d times): %v", maxRetries, putErr) + } else { + if chaosKvTestsDebug { + t.Logf("PUT error: %v", putErr) + } + time.Sleep(retryDelay) + } + } + + // Ensure key version is monotonically increasing + if putRevision <= previousRevision { + t.Fatalf("Latest PUT created revision %d which is not greater than the previous revision: %d", putRevision, previousRevision) + } + previousRevision = putRevision + + // Read value for key, retry until successful, and validate corresponding version and value + getRetryLoop: + for r := 0; r <= maxRetries; r++ { + var getErr error + kve, getErr := kv.Get(key) + if getErr != nil && r == maxRetries { + t.Fatalf("Failed to GET (retried %d times): %v", maxRetries, getErr) + } else if getErr != nil { + if chaosKvTestsDebug { + t.Logf("GET error: %v", getErr) + } + time.Sleep(retryDelay) + continue getRetryLoop + } + + // GET successful, check revision and value + getValue := string(kve.Value()) + getRevision := kve.Revision() + + if putRevision == getRevision { + if putValue != getValue { + t.Fatalf("Unexpected value %s for revision %d, expected: %s", getValue, getRevision, putValue) + } + if chaosKvTestsDebug { + t.Logf("PUT+GET %s=%s (rev: %d) (retry: %d)", key, putValue, putRevision, r) + } + continue putGetLoop + } else if getRevision > putRevision { + t.Fatalf("GET returned version that should not exist yet: %d, last created: %d", getRevision, putRevision) + } else { // get revision < put revision + staleReadCount += 1 + if chaosKvTestsDebug { + t.Logf("GET got stale value: %v (rev: %d, latest: %d)", getValue, getRevision, putRevision) + } + time.Sleep(retryDelay) + continue getRetryLoop + } + } + } + + t.Logf("Client completed %d PUT+GET cycles, %d GET returned a stale value", numOps, staleReadCount) +} + +// Multiple clients updating a finite set of keys with CAS semantics. +// TODO check that revision is never lower than last one seen +// TODO check that KeyNotFound is never returned, as keys are initialized beforehand +func TestJetStreamChaosKvCAS(t *testing.T) { + const numOps = 10_000 + const maxRetries = 50 + const retryDelay = 300 * time.Millisecond + const clusterSize = 3 + const replicas = 3 + const numKeys = 15 + const numClients = 5 + + c := createJetStreamClusterExplicit(t, chaosKvTestsClusterName, clusterSize) + defer c.shutdown() + + createBucketForKvChaosTest(t, c, replicas) + + chaos := createClusterChaosMonkeyController( + t, + c, + &clusterBouncerChaosMonkey{ + minDowntime: 0 * time.Second, + maxDowntime: 2 * time.Second, + minDownServers: clusterSize, // Whole cluster outage + maxDownServers: clusterSize, + pause: 1 * time.Second, + }, + ) + + nc, js := jsClientConnectCluster(t, c) + defer nc.Close() + + // Create bucket + kv, err := js.KeyValue(chaosKvTestsBucketName) + if err != nil { + t.Fatalf("Failed to get KV store: %v", err) + } + + // Create set of keys and initialize them with dummy value + keys := make([]string, numKeys) + for k := 0; k < numKeys; k++ { + key := fmt.Sprintf("key-%d", k) + keys[k] = key + + _, err := kv.Create(key, []byte("Initial value")) + if err != nil { + t.Fatalf("Failed to create key: %v", err) + } + } + + wgStart := sync.WaitGroup{} + wgComplete := sync.WaitGroup{} + + // Client routine + client := func(clientId int, kv nats.KeyValue) { + defer wgComplete.Done() + + rng := rand.New(rand.NewSource(int64(clientId))) + successfulUpdates := 0 + casRejectUpdates := 0 + otherUpdateErrors := 0 + + // Map to track last known revision for each of the keys + knownRevisions := map[string]uint64{} + for _, key := range keys { + knownRevisions[key] = 0 + } + + // Wait for all clients to reach this point before proceeding + wgStart.Done() + wgStart.Wait() + + for i := 1; i <= numOps; i++ { + + if i%1000 == 0 { + t.Logf("Client %d completed %d/%d updates", clientId, i, numOps) + } + + // Pick random key from the set + key := keys[rng.Intn(numKeys)] + + // Prepare unique value to be written + value := fmt.Sprintf("client: %d operation %d", clientId, i) + + // Try to update a key with CAS + newRevision, updateErr := kv.Update(key, []byte(value), knownRevisions[key]) + if updateErr == nil { + // Update successful + knownRevisions[key] = newRevision + successfulUpdates += 1 + if chaosKvTestsDebug { + t.Logf("Client %d updated key %s, new revision: %d", clientId, key, newRevision) + } + } else if updateErr != nil && strings.Contains(fmt.Sprint(updateErr), "wrong last sequence") { + // CAS rejected update, learn current revision for this key + casRejectUpdates += 1 + + for r := 0; r <= maxRetries; r++ { + kve, getErr := kv.Get(key) + if getErr == nil { + currentRevision := kve.Revision() + if currentRevision < knownRevisions[key] { + // Revision number moved backward, this should never happen + t.Fatalf("Current revision for key %s is %d, which is lower than the last known revision %d", key, currentRevision, knownRevisions[key]) + + } + + knownRevisions[key] = currentRevision + if chaosKvTestsDebug { + t.Logf("Client %d learn key %s revision: %d", clientId, key, currentRevision) + } + break + } else if r == maxRetries { + t.Fatalf("Failed to GET (retried %d times): %v", maxRetries, getErr) + } else { + time.Sleep(retryDelay) + } + } + } else { + // Other update error + otherUpdateErrors += 1 + if chaosKvTestsDebug { + t.Logf("Client %d update error for key %s: %v", clientId, key, updateErr) + } + time.Sleep(retryDelay) + } + } + t.Logf("Client %d done, %d kv updates, %d CAS rejected, %d other errors", clientId, successfulUpdates, casRejectUpdates, otherUpdateErrors) + } + + // Launch all clients + for i := 1; i <= numClients; i++ { + cNc, cJs := jsClientConnectCluster(t, c) + defer cNc.Close() + + cKv, err := cJs.KeyValue(chaosKvTestsBucketName) + if err != nil { + t.Fatalf("Failed to get KV store: %v", err) + } + + wgStart.Add(1) + wgComplete.Add(1) + go client(i, cKv) + } + + // Wait for clients to be connected and ready + wgStart.Wait() + + // Start failures + chaos.start() + defer chaos.stop() + + // Wait for all clients to be done + wgComplete.Wait() +} diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 7e1d97f7..683b3940 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -434,24 +434,89 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool { return false } +// Restart the stream in question. +// Should only be called when the stream is know in a bad state. +func (js *jetStream) restartStream(acc *Account, csa *streamAssignment) { + js.mu.Lock() + cc := js.cluster + if cc == nil { + js.mu.Unlock() + return + } + // Need to lookup the one directly from the meta layer, what we get handed is a copy if coming from isStreamHealthy. + asa := cc.streams[acc.Name] + if asa == nil { + js.mu.Unlock() + return + } + sa := asa[csa.Config.Name] + if sa == nil { + js.mu.Unlock() + return + } + // Make sure to clear out the raft node if still present in the meta layer. + if rg := sa.Group; rg != nil && rg.node != nil { + rg.node = nil + } + js.mu.Unlock() + + // Process stream assignment to recreate. + js.processStreamAssignment(sa) + + // If we had consumers assigned to this server they will be present in the copy, csa. + // They also need to be processed. The csa consumers is a copy of only our consumers, + // those assigned to us, but the consumer assignment's there are direct from the meta + // layer to make this part much easier and avoid excessive lookups. + for _, cca := range csa.consumers { + if cca.deleted { + continue + } + // Need to look up original as well here to make sure node is nil. + js.mu.Lock() + ca := sa.consumers[cca.Name] + if ca != nil && ca.Group != nil { + // Make sure node is wiped. + ca.Group.node = nil + } + js.mu.Unlock() + if ca != nil { + js.processConsumerAssignment(ca) + } + } +} + // isStreamHealthy will determine if the stream is up to date or very close. // For R1 it will make sure the stream is present on this server. func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool { - js.mu.RLock() - defer js.mu.RUnlock() - + js.mu.Lock() cc := js.cluster if cc == nil { // Non-clustered mode + js.mu.Unlock() return true } + + // Pull the group out. rg := sa.Group if rg == nil { + js.mu.Unlock() return false } - if rg := sa.Group; rg != nil && (rg.node == nil || rg.node.Healthy()) { + + streamName := sa.Config.Name + node := rg.node + js.mu.Unlock() + + // First lookup stream and make sure its there. + mset, err := acc.lookupStream(streamName) + if err != nil { + js.restartStream(acc, sa) + return false + } + + if node == nil || node.Healthy() { // Check if we are processing a snapshot and are catching up. - if mset, err := acc.lookupStream(sa.Config.Name); err == nil && !mset.isCatchingUp() { + if !mset.isCatchingUp() { return true } } @@ -460,23 +525,46 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool { // isConsumerCurrent will determine if the consumer is up to date. // For R1 it will make sure the consunmer is present on this server. -func (js *jetStream) isConsumerCurrent(mset *stream, consumer string, ca *consumerAssignment) bool { +func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consumerAssignment) bool { + if mset == nil { + return false + } js.mu.RLock() - defer js.mu.RUnlock() - cc := js.cluster + js.mu.RUnlock() + if cc == nil { // Non-clustered mode return true } + + // When we try to restart we nil out the node if applicable + // and reprocess the consumer assignment. + restartConsumer := func() { + js.mu.Lock() + if ca.Group != nil { + ca.Group.node = nil + } + deleted := ca.deleted + js.mu.Unlock() + if !deleted { + js.processConsumerAssignment(ca) + } + } + o := mset.lookupConsumer(consumer) if o == nil { + restartConsumer() return false } - if n := o.raftNode(); n != nil && !n.Current() { - return false + if node := o.raftNode(); node == nil || node.Healthy() { + return true + } else if node != nil && node.State() == Closed { + // We have a consumer, and it should have a running node but it is closed. + o.stop() + restartConsumer() } - return true + return false } // subjectsOverlap checks all existing stream assignments for the account cross-cluster for subject overlap @@ -1050,6 +1138,17 @@ func (js *jetStream) monitorCluster() { lt := time.NewTicker(leaderCheckInterval) defer lt.Stop() + const healthCheckInterval = 2 * time.Minute + ht := time.NewTicker(healthCheckInterval) + defer ht.Stop() + + // Utility to check health. + checkHealth := func() { + if hs := s.healthz(nil); hs.Error != _EMPTY_ { + s.Warnf("%v", hs.Error) + } + } + var ( isLeader bool lastSnapTime time.Time @@ -1124,6 +1223,8 @@ func (js *jetStream) monitorCluster() { ru = nil s.Debugf("Recovered JetStream cluster metadata") js.checkForOrphans() + // Do a health check here as well. + go checkHealth() continue } // FIXME(dlc) - Deal with errors. @@ -1140,6 +1241,7 @@ func (js *jetStream) monitorCluster() { } } aq.recycle(&ces) + case isLeader = <-lch: // For meta layer synchronize everyone to our state on becoming leader. if isLeader { @@ -1160,6 +1262,10 @@ func (js *jetStream) monitorCluster() { if n.Leader() { js.checkClusterSize() } + case <-ht.C: + // Do this in a separate go routine. + go checkHealth() + case <-lt.C: s.Debugf("Checking JetStream cluster state") // If we have a current leader or had one in the past we can cancel this here since the metaleader @@ -1728,7 +1834,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo js.processUpdateStreamAssignment(sa) } default: - panic("JetStream Cluster Unknown meta entry op type") + panic(fmt.Sprintf("JetStream Cluster Unknown meta entry op type: %v", entryOp(buf[0]))) } } } @@ -2461,6 +2567,12 @@ func (mset *stream) resetClusteredState(err error) bool { node.StepDown() } + // If we detect we are shutting down just return. + if js != nil && js.isShuttingDown() { + s.Debugf("Will not reset stream, jetstream shutting down") + return false + } + // Server if js.limitsExceeded(stype) { s.Debugf("Will not reset stream, server resources exceeded") @@ -2687,7 +2799,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco } } default: - panic("JetStream Cluster Unknown group entry op type!") + panic(fmt.Sprintf("JetStream Cluster Unknown group entry op type: %v", op)) } } else if e.Type == EntrySnapshot { if !isRecovering && mset != nil { @@ -3097,19 +3209,23 @@ func (s *Server) removeStream(ourID string, mset *stream, nsa *streamAssignment) node.StepDown(nsa.Group.Preferred) } node.ProposeRemovePeer(ourID) - // shut down monitor by shutting down raft + // shutdown monitor by shutting down raft. node.Delete() } + var isShuttingDown bool // Make sure this node is no longer attached to our stream assignment. if js, _ := s.getJetStreamCluster(); js != nil { js.mu.Lock() nsa.Group.node = nil + isShuttingDown = js.shuttingDown js.mu.Unlock() } - // wait for monitor to be shut down - mset.monitorWg.Wait() + if !isShuttingDown { + // wait for monitor to be shutdown. + mset.monitorWg.Wait() + } mset.stop(true, false) } @@ -3913,6 +4029,13 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state if rg.node != nil { rg.node.Delete() + // Clear the node here. + rg.node = nil + } + + // If we did seem to create a consumer make sure to stop it. + if o != nil { + o.stop() } var result *consumerAssignmentResult @@ -4517,7 +4640,7 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea } o.mu.Unlock() default: - panic(fmt.Sprintf("JetStream Cluster Unknown group entry op type! %v", entryOp(buf[0]))) + panic(fmt.Sprintf("JetStream Cluster Unknown group entry op type: %v", entryOp(buf[0]))) } } } diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index ff8d68a9..a5dec068 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -3763,3 +3763,89 @@ func TestJetStreamClusterConsumerInfoForJszForFollowers(t *testing.T) { } } } + +// Under certain scenarios we have seen consumers become stopped and cause healthz to fail. +// The specific scneario is heavy loads, and stream resets on upgrades that could orphan consumers. +func TestJetStreamClusterHealthzCheckForStoppedAssets(t *testing.T) { + c := createJetStreamClusterExplicit(t, "NATS", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"*"}, + Replicas: 3, + }) + require_NoError(t, err) + + for i := 0; i < 1000; i++ { + sendStreamMsg(t, nc, "foo", "HELLO") + } + + sub, err := js.PullSubscribe("foo", "d") + require_NoError(t, err) + + fetch, ack := 122, 22 + msgs, err := sub.Fetch(fetch, nats.MaxWait(10*time.Second)) + require_NoError(t, err) + require_True(t, len(msgs) == fetch) + for _, m := range msgs[:ack] { + m.AckSync() + } + // Let acks propagate. + time.Sleep(100 * time.Millisecond) + + // We will now stop a stream on a given server. + s := c.randomServer() + mset, err := s.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + // Stop the stream + mset.stop(false, false) + + // Wait for exit. + time.Sleep(100 * time.Millisecond) + + checkFor(t, 5*time.Second, 500*time.Millisecond, func() error { + hs := s.healthz(nil) + if hs.Error != _EMPTY_ { + return errors.New(hs.Error) + } + return nil + }) + + // Now take out the consumer. + mset, err = s.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + + o := mset.lookupConsumer("d") + require_NotNil(t, o) + + o.stop() + // Wait for exit. + time.Sleep(100 * time.Millisecond) + + checkFor(t, 5*time.Second, 500*time.Millisecond, func() error { + hs := s.healthz(nil) + if hs.Error != _EMPTY_ { + return errors.New(hs.Error) + } + return nil + }) + + // Now just stop the raft node from underneath the consumer. + o = mset.lookupConsumer("d") + require_NotNil(t, o) + node := o.raftNode() + require_NotNil(t, node) + node.Stop() + + checkFor(t, 5*time.Second, 500*time.Millisecond, func() error { + hs := s.healthz(nil) + if hs.Error != _EMPTY_ { + return errors.New(hs.Error) + } + return nil + }) +} diff --git a/server/monitor.go b/server/monitor.go index 8b051601..09b7d15d 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -3162,7 +3162,8 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus { csa.consumers = make(map[string]*consumerAssignment) for consumer, ca := range sa.consumers { if ca.Group.isMember(ourID) { - csa.consumers[consumer] = ca.copyGroup() + // Use original here. Not a copy. + csa.consumers[consumer] = ca } } nasa[stream] = csa @@ -3191,7 +3192,7 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus { mset, _ := acc.lookupStream(stream) // Now check consumers. for consumer, ca := range sa.consumers { - if !js.isConsumerCurrent(mset, consumer, ca) { + if !js.isConsumerHealthy(mset, consumer, ca) { health.Status = na health.Error = fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current", acc, stream, consumer) return health diff --git a/server/mqtt_test.go b/server/mqtt_test.go index 31bee2cf..9c9b0b8f 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -319,7 +319,7 @@ func testMQTTRunServer(t testing.TB, o *Options) *Server { } l := &DummyLogger{} s.SetLogger(l, true, true) - go s.Start() + s.Start() if err := s.readyForConnections(3 * time.Second); err != nil { testMQTTShutdownServer(s) t.Fatal(err) diff --git a/server/raft.go b/server/raft.go index 1e5a93fb..1b08abee 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1172,9 +1172,16 @@ func (n *raft) isCatchingUp() bool { return n.catchup != nil } -// Lock should be held. This function may block for up to ~5ms to check +// This function may block for up to ~10ms to check // forward progress in some cases. +// Lock should be held. func (n *raft) isCurrent(includeForwardProgress bool) bool { + // Check if we are closed. + if n.state == Closed { + n.debug("Not current, node is closed") + return false + } + // Check whether we've made progress on any state, 0 is invalid so not healthy. if n.commit == 0 { n.debug("Not current, no commits") @@ -1219,7 +1226,7 @@ func (n *raft) isCurrent(includeForwardProgress bool) bool { if startDelta := n.commit - n.applied; startDelta > 0 { for i := 0; i < 10; i++ { // 5ms, in 0.5ms increments n.Unlock() - time.Sleep(time.Millisecond / 2) + time.Sleep(time.Millisecond) n.Lock() if n.commit-n.applied < startDelta { // The gap is getting smaller, so we're making forward progress. diff --git a/server/routes_test.go b/server/routes_test.go index f3ad4508..9fa3018b 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -615,7 +615,7 @@ func TestBlockedShutdownOnRouteAcceptLoopFailure(t *testing.T) { opts.Cluster.Port = 7222 s := New(opts) - go s.Start() + s.Start() // Wait a second time.Sleep(time.Second) ch := make(chan bool) @@ -1399,9 +1399,7 @@ func TestRouteIPResolutionAndRouteToSelf(t *testing.T) { defer s.Shutdown() l := &routeHostLookupLogger{errCh: make(chan string, 1), ch: make(chan bool, 1)} s.SetLogger(l, true, true) - go func() { - s.Start() - }() + s.Start() if err := s.readyForConnections(time.Second); err != nil { t.Fatal(err) } diff --git a/server/server.go b/server/server.go index 7181e058..2b8a1dd8 100644 --- a/server/server.go +++ b/server/server.go @@ -1981,8 +1981,10 @@ func (s *Server) fetchAccount(name string) (*Account, error) { return acc, nil } -// Start up the server, this will block. -// Start via a Go routine if needed. +// Start up the server, this will not block. +// +// WaitForShutdown can be used to block and wait for the server to shutdown properly if needed +// after calling s.Shutdown() func (s *Server) Start() { s.Noticef("Starting nats-server") diff --git a/server/server_test.go b/server/server_test.go index 3f181a78..7bac1cc3 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -86,7 +86,7 @@ func RunServer(opts *Options) *Server { } // Run server in Go routine. - go s.Start() + s.Start() // Wait for accept loop(s) to be started if err := s.readyForConnections(10 * time.Second); err != nil { diff --git a/server/stream.go b/server/stream.go index e190508e..834e6682 100644 --- a/server/stream.go +++ b/server/stream.go @@ -658,6 +658,20 @@ func (mset *stream) streamAssignment() *streamAssignment { } func (mset *stream) setStreamAssignment(sa *streamAssignment) { + var node RaftNode + + mset.mu.RLock() + js := mset.js + mset.mu.RUnlock() + + if js != nil { + js.mu.RLock() + if sa.Group != nil { + node = sa.Group.node + } + js.mu.RUnlock() + } + mset.mu.Lock() defer mset.mu.Unlock() @@ -667,7 +681,7 @@ func (mset *stream) setStreamAssignment(sa *streamAssignment) { } // Set our node. - mset.node = sa.Group.node + mset.node = node if mset.node != nil { mset.node.UpdateKnownPeers(sa.Group.Peers) } @@ -4461,7 +4475,7 @@ func (mset *stream) internalLoop() { } } -// Used to break consumers out of their +// Used to break consumers out of their monitorConsumer go routines. func (mset *stream) resetAndWaitOnConsumers() { mset.mu.RLock() consumers := make([]*consumer, 0, len(mset.consumers)) @@ -4543,12 +4557,17 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { } mset.mu.Unlock() + isShuttingDown := js.isShuttingDown() for _, o := range obs { - // Third flag says do not broadcast a signal. - // TODO(dlc) - If we have an err here we don't want to stop - // but should we log? - o.stopWithFlags(deleteFlag, deleteFlag, false, advisory) - o.monitorWg.Wait() + if !o.isClosed() { + // Third flag says do not broadcast a signal. + // TODO(dlc) - If we have an err here we don't want to stop + // but should we log? + o.stopWithFlags(deleteFlag, deleteFlag, false, advisory) + if !isShuttingDown { + o.monitorWg.Wait() + } + } } mset.mu.Lock() diff --git a/server/websocket_test.go b/server/websocket_test.go index 6e1c9126..2e590e2c 100644 --- a/server/websocket_test.go +++ b/server/websocket_test.go @@ -2497,7 +2497,7 @@ func TestWSAdvertise(t *testing.T) { defer s.Shutdown() l := &captureFatalLogger{fatalCh: make(chan string, 1)} s.SetLogger(l, false, false) - go s.Start() + s.Start() select { case e := <-l.fatalCh: if !strings.Contains(e, "Unable to get websocket connect URLs") { diff --git a/test/cluster_test.go b/test/cluster_test.go index 477ec4db..a7f0121b 100644 --- a/test/cluster_test.go +++ b/test/cluster_test.go @@ -406,6 +406,8 @@ func TestClusterDoubleMsgs(t *testing.T) { sendB("PING\r\n") expectB(pongRe) + time.Sleep(10 * time.Millisecond) + matches = expectMsgsA2(2) checkMsg(t, matches[0], "foo", "", "", "2", "ok") checkForPubSids(t, matches, pSids)