From d83e0e2b25d989eb4e6d88f871b9374de62540f9 Mon Sep 17 00:00:00 2001 From: Marco Primi Date: Mon, 1 Aug 2022 16:40:09 -0700 Subject: [PATCH] Add 'chaos' test utility and 2 example tests 'Chaos' is a new a group of test that validates behavior in presence of random failures. Overview: - Introduce a 'Chaos Monkey' controller which can unleash a monkey against a test cluster. - Introduce a monkey of type 'ClusterBouncer' which stops and restarts nodes according to some configuration - Add 2 example tests, they ensure a cluster can survive some amount of nodes bouncing - Configure the build to skip chaos tests unless explicitly requested - Add some test utility functions --- scripts/runTestsOnTravis.sh | 9 +- server/jetstream_chaos_cluster_test.go | 76 +++++++++++++ server/jetstream_chaos_helpers_test.go | 144 +++++++++++++++++++++++++ server/jetstream_helpers_test.go | 27 +++++ 4 files changed, 255 insertions(+), 1 deletion(-) create mode 100644 server/jetstream_chaos_cluster_test.go create mode 100644 server/jetstream_chaos_helpers_test.go diff --git a/scripts/runTestsOnTravis.sh b/scripts/runTestsOnTravis.sh index ffd1cadb..09d64bc9 100755 --- a/scripts/runTestsOnTravis.sh +++ b/scripts/runTestsOnTravis.sh @@ -35,7 +35,7 @@ elif [ "$1" = "js_tests" ]; then # tests by using the `skip_js_cluster_tests` and `skip_js_super_cluster_tests` # build tags. - go test -race -v -run=TestJetStream ./server -tags=skip_js_cluster_tests,skip_js_super_cluster_tests -count=1 -vet=off -timeout=30m -failfast + go test -race -v -run=TestJetStream ./server -tags=skip_js_cluster_tests,skip_js_super_cluster_tests,skip_js_cluster_chaos_tests -count=1 -vet=off -timeout=30m -failfast elif [ "$1" = "js_cluster_tests" ]; then @@ -51,6 +51,13 @@ elif [ "$1" = "js_super_cluster_tests" ]; then go test -race -v -run=TestJetStreamSuperCluster ./server -count=1 -vet=off -timeout=30m -failfast +elif [ "$1" = "js_chaos_tests" ]; then + + # Run JetStream chaos tests. By convention, all JS cluster chaos + # tests with `TestJetStreamChaos`. + + go test -race -v -p=1 -run=TestJetStreamChaos ./server -count=1 -vet=off -timeout=30m -failfast + elif [ "$1" = "srv_pkg_non_js_tests" ]; then # Run all non JetStream tests in the server package. We exclude the diff --git a/server/jetstream_chaos_cluster_test.go b/server/jetstream_chaos_cluster_test.go new file mode 100644 index 00000000..c4065696 --- /dev/null +++ b/server/jetstream_chaos_cluster_test.go @@ -0,0 +1,76 @@ +// Copyright 2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !skip_js_tests && !skip_js_cluster_tests && !skip_js_chaos_tests +// +build !skip_js_tests,!skip_js_cluster_tests,!skip_js_chaos_tests + +package server + +import ( + "testing" + "time" +) + +// Bounces the entire set of nodes, then brings them back up. +// Fail if some nodes don't come back online. +func TestJetStreamChaosClusterBounce(t *testing.T) { + + const duration = 60 * time.Second + const clusterSize = 3 + + c := createJetStreamClusterExplicit(t, "R3", clusterSize) + defer c.shutdown() + + chaos := createClusterChaosMonkeyController( + t, + c, + &clusterBouncerChaosMonkey{ + minDowntime: 0 * time.Second, + maxDowntime: 2 * time.Second, + minDownServers: clusterSize, + maxDownServers: clusterSize, + pause: 3 * time.Second, + }, + ) + chaos.start() + defer chaos.stop() + + <-time.After(duration) +} + +// Bounces a subset of the nodes, then brings them back up. +// Fails if some nodes don't come back online. +func TestJetStreamChaosClusterBounceSubset(t *testing.T) { + + const duration = 60 * time.Second + const clusterSize = 3 + + c := createJetStreamClusterExplicit(t, "R3", clusterSize) + defer c.shutdown() + + chaos := createClusterChaosMonkeyController( + t, + c, + &clusterBouncerChaosMonkey{ + minDowntime: 0 * time.Second, + maxDowntime: 2 * time.Second, + minDownServers: 1, + maxDownServers: clusterSize, + pause: 3 * time.Second, + }, + ) + chaos.start() + defer chaos.stop() + + <-time.After(duration) +} diff --git a/server/jetstream_chaos_helpers_test.go b/server/jetstream_chaos_helpers_test.go new file mode 100644 index 00000000..ca32bd4f --- /dev/null +++ b/server/jetstream_chaos_helpers_test.go @@ -0,0 +1,144 @@ +// Copyright 2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "math/rand" + "sync" + "testing" + "time" +) + +// Support functions for "chaos" testing (random injected failures) + +type ChaosMonkeyController interface { + // Launch the monkey as background routine and return + start() + // Stop a monkey that was previously started + stop() + // Run the monkey synchronously, until it is manually stopped via stopCh + run() +} + +type ClusterChaosMonkey interface { + // Set defaults and validates the monkey parameters + validate(t *testing.T, c *cluster) + // Run the monkey synchronously, until it is manually stopped via stopCh + run(t *testing.T, c *cluster, stopCh <-chan bool) +} + +// Chaos Monkey Controller that acts on a cluster +type clusterChaosMonkeyController struct { + t *testing.T + cluster *cluster + wg sync.WaitGroup + stopCh chan bool + ccm ClusterChaosMonkey +} + +func createClusterChaosMonkeyController(t *testing.T, c *cluster, ccm ClusterChaosMonkey) ChaosMonkeyController { + ccm.validate(t, c) + return &clusterChaosMonkeyController{ + t: t, + cluster: c, + stopCh: make(chan bool, 3), + ccm: ccm, + } +} + +func (m *clusterChaosMonkeyController) start() { + m.t.Logf("🐵 Starting monkey") + m.wg.Add(1) + go func() { + defer m.wg.Done() + m.run() + }() +} + +func (m *clusterChaosMonkeyController) stop() { + m.t.Logf("🐵 Stopping monkey") + m.stopCh <- true + m.wg.Wait() + m.t.Logf("🐵 Monkey stopped") +} + +func (m *clusterChaosMonkeyController) run() { + m.ccm.run(m.t, m.cluster, m.stopCh) +} + +// Cluster Chaos Monkey that selects a random subset of the nodes in a cluster (according to min/max provided), +// shuts them down for a given duration (according to min/max provided), then brings them back up. +// Then sleeps for a given time, and does it again until stopped. +type clusterBouncerChaosMonkey struct { + minDowntime time.Duration + maxDowntime time.Duration + minDownServers int + maxDownServers int + pause time.Duration +} + +func (m *clusterBouncerChaosMonkey) validate(t *testing.T, c *cluster) { + if m.minDowntime > m.maxDowntime { + t.Fatalf("Min downtime %v cannot be larger than max downtime %v", m.minDowntime, m.maxDowntime) + } + + if m.minDownServers > m.maxDownServers { + t.Fatalf("Min down servers %v cannot be larger than max down servers %v", m.minDownServers, m.maxDownServers) + } +} + +func (m *clusterBouncerChaosMonkey) run(t *testing.T, c *cluster, stopCh <-chan bool) { + for { + // Pause between actions + select { + case <-stopCh: + return + case <-time.After(m.pause): + } + + // Pick a random subset of servers + numServersDown := rand.Intn(1+m.maxDownServers-m.minDownServers) + m.minDownServers + servers := c.selectRandomServers(numServersDown) + serverNames := []string{} + for _, s := range servers { + serverNames = append(serverNames, s.info.Name) + } + + // Pick a random outage interval + minOutageNanos := m.minDowntime.Nanoseconds() + maxOutageNanos := m.maxDowntime.Nanoseconds() + outageDurationNanos := rand.Int63n(1+maxOutageNanos-minOutageNanos) + minOutageNanos + outageDuration := time.Duration(outageDurationNanos) + + // Take down selected servers + t.Logf("🐵 Taking down %d/%d servers for %v (%v)", numServersDown, len(c.servers), outageDuration, serverNames) + c.stopSubset(servers) + + // Wait for the "outage" duration + select { + case <-stopCh: + return + case <-time.After(outageDuration): + } + + // Restart servers and wait for cluster to be healthy + t.Logf("🐵 Restoring cluster") + c.restartAllSamePorts() + c.waitOnClusterHealthz() + + c.waitOnClusterReady() + c.waitOnAllCurrent() + c.waitOnLeader() + } +} diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index 325aa698..a6325fc7 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -1315,6 +1315,13 @@ func (c *cluster) waitOnClusterReadyWithNumPeers(numPeersExpected int) { } } +func (c *cluster) waitOnClusterHealthz() { + c.t.Helper() + for _, cs := range c.servers { + c.waitOnServerHealthz(cs) + } +} + // Helper function to remove JetStream from a server. func (c *cluster) removeJetStream(s *Server) { c.t.Helper() @@ -1349,6 +1356,13 @@ func (c *cluster) stopAll() { } } +func (c *cluster) stopSubset(toStop []*Server) { + c.t.Helper() + for _, s := range toStop { + s.Shutdown() + } +} + func (c *cluster) restartAll() { c.t.Helper() for i, s := range c.servers { @@ -1396,6 +1410,19 @@ func (c *cluster) stableTotalSubs() (total int) { } +func (c *cluster) selectRandomServers(numServers int) []*Server { + c.t.Helper() + if numServers > len(c.servers) { + panic(fmt.Sprintf("Can't select %d servers in a cluster of %d", numServers, len(c.servers))) + } + var selectedServers []*Server + selectedServers = append(selectedServers, c.servers...) + rand.Shuffle(len(selectedServers), func(x, y int) { + selectedServers[x], selectedServers[y] = selectedServers[y], selectedServers[x] + }) + return selectedServers[0:numServers] +} + func addStream(t *testing.T, nc *nats.Conn, cfg *StreamConfig) *StreamInfo { t.Helper() req, err := json.Marshal(cfg)