Merge JS Chaos tests into a single file

This commit is contained in:
Marco Primi
2023-04-26 16:01:09 -07:00
parent 7908d8c05c
commit 82eade93b4
5 changed files with 1281 additions and 1324 deletions

View File

@@ -1,76 +0,0 @@
// Copyright 2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build js_chaos_tests
// +build js_chaos_tests
package server
import (
"testing"
"time"
)
// Bounces the entire set of nodes, then brings them back up.
// Fail if some nodes don't come back online.
func TestJetStreamChaosClusterBounce(t *testing.T) {
const duration = 60 * time.Second
const clusterSize = 3
c := createJetStreamClusterExplicit(t, "R3", clusterSize)
defer c.shutdown()
chaos := createClusterChaosMonkeyController(
t,
c,
&clusterBouncerChaosMonkey{
minDowntime: 0 * time.Second,
maxDowntime: 2 * time.Second,
minDownServers: clusterSize,
maxDownServers: clusterSize,
pause: 3 * time.Second,
},
)
chaos.start()
defer chaos.stop()
<-time.After(duration)
}
// Bounces a subset of the nodes, then brings them back up.
// Fails if some nodes don't come back online.
func TestJetStreamChaosClusterBounceSubset(t *testing.T) {
const duration = 60 * time.Second
const clusterSize = 3
c := createJetStreamClusterExplicit(t, "R3", clusterSize)
defer c.shutdown()
chaos := createClusterChaosMonkeyController(
t,
c,
&clusterBouncerChaosMonkey{
minDowntime: 0 * time.Second,
maxDowntime: 2 * time.Second,
minDownServers: 1,
maxDownServers: clusterSize,
pause: 3 * time.Second,
},
)
chaos.start()
defer chaos.stop()
<-time.After(duration)
}

View File

@@ -1,615 +0,0 @@
// Copyright 2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build js_chaos_tests
// +build js_chaos_tests
package server
import (
"bytes"
"fmt"
"strings"
"sync"
"testing"
"time"
"github.com/nats-io/nats.go"
)
const (
chaosConsumerTestsClusterName = "CONSUMERS_CHAOS_TEST"
chaosConsumerTestsStreamName = "CONSUMER_CHAOS_TEST_STREAM"
chaosConsumerTestsSubject = "foo"
chaosConsumerTestsDebug = false
)
// Creates stream and fills it with the given number of messages.
// Each message is the string representation of the stream sequence number,
// e.g. the first message (seqno: 1) contains data "1".
// This allows consumers to verify the content of each message without tracking additional state
func createStreamForConsumerChaosTest(t *testing.T, c *cluster, replicas, numMessages int) {
t.Helper()
const publishBatchSize = 1_000
pubNc, pubJs := jsClientConnectCluster(t, c)
defer pubNc.Close()
_, err := pubJs.AddStream(&nats.StreamConfig{
Name: chaosConsumerTestsStreamName,
Subjects: []string{chaosConsumerTestsSubject},
Replicas: replicas,
})
if err != nil {
t.Fatalf("Error creating stream: %v", err)
}
ackFutures := make([]nats.PubAckFuture, 0, publishBatchSize)
for i := 1; i <= numMessages; i++ {
message := []byte(fmt.Sprintf("%d", i))
pubAckFuture, err := pubJs.PublishAsync(chaosConsumerTestsSubject, message, nats.ExpectLastSequence(uint64(i-1)))
if err != nil {
t.Fatalf("Publish error: %s", err)
}
ackFutures = append(ackFutures, pubAckFuture)
if (i > 0 && i%publishBatchSize == 0) || i == numMessages {
select {
case <-pubJs.PublishAsyncComplete():
for _, pubAckFuture := range ackFutures {
select {
case <-pubAckFuture.Ok():
// Noop
case pubAckErr := <-pubAckFuture.Err():
t.Fatalf("Error publishing: %s", pubAckErr)
case <-time.After(30 * time.Second):
t.Fatalf("Timeout verifying pubAck for message: %s", pubAckFuture.Msg().Data)
}
}
ackFutures = make([]nats.PubAckFuture, 0, publishBatchSize)
t.Logf("Published %d/%d messages", i, numMessages)
case <-time.After(30 * time.Second):
t.Fatalf("Publish timed out")
}
}
}
}
// Verify ordered delivery despite cluster-wide outages
func TestJetStreamChaosConsumerOrdered(t *testing.T) {
const numMessages = 30_000
const maxRetries = 100
const retryDelay = 500 * time.Millisecond
const fetchTimeout = 250 * time.Millisecond
const clusterSize = 3
const replicas = 3
c := createJetStreamClusterExplicit(t, chaosConsumerTestsClusterName, clusterSize)
defer c.shutdown()
createStreamForConsumerChaosTest(t, c, replicas, numMessages)
chaos := createClusterChaosMonkeyController(
t,
c,
&clusterBouncerChaosMonkey{
minDowntime: 0 * time.Second,
maxDowntime: 2 * time.Second,
minDownServers: clusterSize, // Whole cluster outage
maxDownServers: clusterSize,
pause: 1 * time.Second,
},
)
subNc, subJs := jsClientConnectCluster(t, c)
defer subNc.Close()
sub, err := subJs.SubscribeSync(
chaosConsumerTestsSubject,
nats.OrderedConsumer(),
)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()
if chaosConsumerTestsDebug {
t.Logf("Initial subscription: %s", toIndentedJsonString(sub))
}
chaos.start()
defer chaos.stop()
for i := 1; i <= numMessages; i++ {
var msg *nats.Msg
var nextMsgErr error
var expectedMsgData = []byte(fmt.Sprintf("%d", i))
nextMsgRetryLoop:
for r := 0; r <= maxRetries; r++ {
msg, nextMsgErr = sub.NextMsg(fetchTimeout)
if nextMsgErr == nil {
break nextMsgRetryLoop
} else if r == maxRetries {
t.Fatalf("Exceeded max retries for NextMsg")
} else if nextMsgErr == nats.ErrBadSubscription {
t.Fatalf("Subscription is invalid: %s", toIndentedJsonString(sub))
} else {
time.Sleep(retryDelay)
}
}
metadata, err := msg.Metadata()
if err != nil {
t.Fatalf("Failed to get message metadata: %v", err)
}
if metadata.Sequence.Stream != uint64(i) {
t.Fatalf("Expecting stream sequence %d, got %d instead", i, metadata.Sequence.Stream)
}
if !bytes.Equal(msg.Data, expectedMsgData) {
t.Fatalf("Expecting message %s, got %s instead", expectedMsgData, msg.Data)
}
// Simulate application processing (and gives the monkey some time to brew chaos)
time.Sleep(10 * time.Millisecond)
if i%1000 == 0 {
t.Logf("Consumed %d/%d", i, numMessages)
}
}
}
// Verify ordered delivery despite cluster-wide outages
func TestJetStreamChaosConsumerAsync(t *testing.T) {
const numMessages = 30_000
const timeout = 30 * time.Second // No (new) messages for 30s => terminate
const maxRetries = 25
const retryDelay = 500 * time.Millisecond
const clusterSize = 3
const replicas = 3
c := createJetStreamClusterExplicit(t, chaosConsumerTestsClusterName, clusterSize)
defer c.shutdown()
createStreamForConsumerChaosTest(t, c, replicas, numMessages)
chaos := createClusterChaosMonkeyController(
t,
c,
&clusterBouncerChaosMonkey{
minDowntime: 0 * time.Second,
maxDowntime: 2 * time.Second,
minDownServers: clusterSize,
maxDownServers: clusterSize,
pause: 2 * time.Second,
},
)
subNc, subJs := jsClientConnectCluster(t, c)
defer subNc.Close()
timeoutTimer := time.NewTimer(timeout)
deliveryCount := uint64(0)
received := NewBitset(numMessages)
handleMsg := func(msg *nats.Msg) {
deliveryCount += 1
metadata, err := msg.Metadata()
if err != nil {
t.Fatalf("Failed to get message metadata: %v", err)
}
seq := metadata.Sequence.Stream
var expectedMsgData = []byte(fmt.Sprintf("%d", seq))
if !bytes.Equal(msg.Data, expectedMsgData) {
t.Fatalf("Expecting message content '%s', got '%s' instead", expectedMsgData, msg.Data)
}
isDupe := received.get(seq - 1)
if isDupe {
if chaosConsumerTestsDebug {
t.Logf("Duplicate message delivery, seq: %d", seq)
}
return
}
// Mark this sequence as received
received.set(seq-1, true)
if received.count() < numMessages {
// Reset timeout
timeoutTimer.Reset(timeout)
} else {
// All received, speed up the shutdown
timeoutTimer.Reset(1 * time.Second)
}
if received.count()%1000 == 0 {
t.Logf("Consumed %d/%d", received.count(), numMessages)
}
// Simulate application processing (and gives the monkey some time to brew chaos)
time.Sleep(10 * time.Millisecond)
ackRetryLoop:
for i := 0; i <= maxRetries; i++ {
ackErr := msg.Ack()
if ackErr == nil {
break ackRetryLoop
} else if i == maxRetries {
t.Fatalf("Failed to ACK message %d (retried %d times)", seq, maxRetries)
} else {
time.Sleep(retryDelay)
}
}
}
subOpts := []nats.SubOpt{}
sub, err := subJs.Subscribe(chaosConsumerTestsSubject, handleMsg, subOpts...)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()
chaos.start()
defer chaos.stop()
// Wait for long enough silence.
// Either a stall, or all messages received
<-timeoutTimer.C
// Shut down consumer
sub.Unsubscribe()
uniqueDeliveredCount := received.count()
t.Logf(
"Delivered %d/%d messages %d duplicate deliveries",
uniqueDeliveredCount,
numMessages,
deliveryCount-uniqueDeliveredCount,
)
if uniqueDeliveredCount != numMessages {
t.Fatalf("No new message delivered in the last %s, %d/%d messages never delivered", timeout, numMessages-uniqueDeliveredCount, numMessages)
}
}
// Verify durable consumer retains state despite cluster-wide outages
// The consumer connection is also periodically closed, and the consumer 'resumes' on a different one
func TestJetStreamChaosConsumerDurable(t *testing.T) {
const numMessages = 30_000
const timeout = 30 * time.Second // No (new) messages for 60s => terminate
const clusterSize = 3
const replicas = 3
const maxRetries = 25
const retryDelay = 500 * time.Millisecond
const durableConsumerName = "durable"
c := createJetStreamClusterExplicit(t, chaosConsumerTestsClusterName, clusterSize)
defer c.shutdown()
createStreamForConsumerChaosTest(t, c, replicas, numMessages)
chaos := createClusterChaosMonkeyController(
t,
c,
&clusterBouncerChaosMonkey{
minDowntime: 0 * time.Second,
maxDowntime: 2 * time.Second,
minDownServers: 1,
maxDownServers: clusterSize,
pause: 3 * time.Second,
},
)
var nc *nats.Conn
var sub *nats.Subscription
var subLock sync.Mutex
var handleMsgFun func(msg *nats.Msg)
var natsURL string
{
var sb strings.Builder
for _, s := range c.servers {
sb.WriteString(s.ClientURL())
sb.WriteString(",")
}
natsURL = sb.String()
}
resetDurableConsumer := func() {
subLock.Lock()
defer subLock.Unlock()
if nc != nil {
nc.Close()
}
var newNc *nats.Conn
connectRetryLoop:
for r := 0; r <= maxRetries; r++ {
var connErr error
newNc, connErr = nats.Connect(natsURL)
if connErr == nil {
break connectRetryLoop
} else if r == maxRetries {
t.Fatalf("Failed to connect, exceeded max retries, last error: %s", connErr)
} else {
time.Sleep(retryDelay)
}
}
var newJs nats.JetStreamContext
jsRetryLoop:
for r := 0; r <= maxRetries; r++ {
var jsErr error
newJs, jsErr = newNc.JetStream(nats.MaxWait(10 * time.Second))
if jsErr == nil {
break jsRetryLoop
} else if r == maxRetries {
t.Fatalf("Failed to get JS, exceeded max retries, last error: %s", jsErr)
} else {
time.Sleep(retryDelay)
}
}
subOpts := []nats.SubOpt{
nats.Durable(durableConsumerName),
}
var newSub *nats.Subscription
subscribeRetryLoop:
for i := 0; i <= maxRetries; i++ {
var subErr error
newSub, subErr = newJs.Subscribe(chaosConsumerTestsSubject, handleMsgFun, subOpts...)
if subErr == nil {
ci, err := newJs.ConsumerInfo(chaosConsumerTestsStreamName, durableConsumerName)
if err == nil {
if chaosConsumerTestsDebug {
t.Logf("Consumer info:\n %s", toIndentedJsonString(ci))
}
} else {
t.Logf("Failed to retrieve consumer info: %s", err)
}
break subscribeRetryLoop
} else if i == maxRetries {
t.Fatalf("Exceeded max retries creating subscription: %v", subErr)
} else {
time.Sleep(retryDelay)
}
}
nc, sub = newNc, newSub
}
timeoutTimer := time.NewTimer(timeout)
deliveryCount := uint64(0)
received := NewBitset(numMessages)
handleMsgFun = func(msg *nats.Msg) {
subLock.Lock()
if msg.Sub != sub {
// Message from a previous instance of durable consumer, drop
defer subLock.Unlock()
return
}
subLock.Unlock()
deliveryCount += 1
metadata, err := msg.Metadata()
if err != nil {
t.Fatalf("Failed to get message metadata: %v", err)
}
seq := metadata.Sequence.Stream
var expectedMsgData = []byte(fmt.Sprintf("%d", seq))
if !bytes.Equal(msg.Data, expectedMsgData) {
t.Fatalf("Expecting message content '%s', got '%s' instead", expectedMsgData, msg.Data)
}
isDupe := received.get(seq - 1)
if isDupe {
if chaosConsumerTestsDebug {
t.Logf("Duplicate message delivery, seq: %d", seq)
}
return
}
// Mark this sequence as received
received.set(seq-1, true)
if received.count() < numMessages {
// Reset timeout
timeoutTimer.Reset(timeout)
} else {
// All received, speed up the shutdown
timeoutTimer.Reset(1 * time.Second)
}
// Simulate application processing (and gives the monkey some time to brew chaos)
time.Sleep(10 * time.Millisecond)
ackRetryLoop:
for i := 0; i <= maxRetries; i++ {
ackErr := msg.Ack()
if ackErr == nil {
break ackRetryLoop
} else if i == maxRetries {
t.Fatalf("Failed to ACK message %d (retried %d times)", seq, maxRetries)
} else {
time.Sleep(retryDelay)
}
}
if received.count()%1000 == 0 {
t.Logf("Consumed %d/%d, duplicate deliveries: %d", received.count(), numMessages, deliveryCount-received.count())
// Close connection and resume consuming on a different one
resetDurableConsumer()
}
}
resetDurableConsumer()
chaos.start()
defer chaos.stop()
// Wait for long enough silence.
// Either a stall, or all messages received
<-timeoutTimer.C
// Shut down consumer
if sub != nil {
sub.Unsubscribe()
}
uniqueDeliveredCount := received.count()
t.Logf(
"Delivered %d/%d messages %d duplicate deliveries",
uniqueDeliveredCount,
numMessages,
deliveryCount-uniqueDeliveredCount,
)
if uniqueDeliveredCount != numMessages {
t.Fatalf("No new message delivered in the last %s, %d/%d messages never delivered", timeout, numMessages-uniqueDeliveredCount, numMessages)
}
}
func TestJetStreamChaosConsumerPull(t *testing.T) {
const numMessages = 10_000
const maxRetries = 100
const retryDelay = 500 * time.Millisecond
const fetchTimeout = 250 * time.Millisecond
const fetchBatchSize = 100
const clusterSize = 3
const replicas = 3
const durableConsumerName = "durable"
c := createJetStreamClusterExplicit(t, chaosConsumerTestsClusterName, clusterSize)
defer c.shutdown()
createStreamForConsumerChaosTest(t, c, replicas, numMessages)
chaos := createClusterChaosMonkeyController(
t,
c,
&clusterBouncerChaosMonkey{
minDowntime: 0 * time.Second,
maxDowntime: 2 * time.Second,
minDownServers: clusterSize, // Whole cluster outage
maxDownServers: clusterSize,
pause: 1 * time.Second,
},
)
subNc, subJs := jsClientConnectCluster(t, c)
defer subNc.Close()
sub, err := subJs.PullSubscribe(
chaosConsumerTestsSubject,
durableConsumerName,
)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()
if chaosConsumerTestsDebug {
t.Logf("Initial subscription: %s", toIndentedJsonString(sub))
}
chaos.start()
defer chaos.stop()
fetchMaxWait := nats.MaxWait(fetchTimeout)
received := NewBitset(numMessages)
deliveredCount := uint64(0)
for received.count() < numMessages {
var msgs []*nats.Msg
var fetchErr error
fetchRetryLoop:
for r := 0; r <= maxRetries; r++ {
msgs, fetchErr = sub.Fetch(fetchBatchSize, fetchMaxWait)
if fetchErr == nil {
break fetchRetryLoop
} else if r == maxRetries {
t.Fatalf("Exceeded max retries for Fetch, last error: %s", fetchErr)
} else if fetchErr == nats.ErrBadSubscription {
t.Fatalf("Subscription is invalid: %s", toIndentedJsonString(sub))
} else {
// t.Logf("Fetch error: %v", fetchErr)
time.Sleep(retryDelay)
}
}
for _, msg := range msgs {
deliveredCount += 1
metadata, err := msg.Metadata()
if err != nil {
t.Fatalf("Failed to get message metadata: %v", err)
}
streamSeq := metadata.Sequence.Stream
expectedMsgData := []byte(fmt.Sprintf("%d", streamSeq))
if !bytes.Equal(msg.Data, expectedMsgData) {
t.Fatalf("Expecting message %s, got %s instead", expectedMsgData, msg.Data)
}
isDupe := received.get(streamSeq - 1)
received.set(streamSeq-1, true)
// Simulate application processing (and gives the monkey some time to brew chaos)
time.Sleep(10 * time.Millisecond)
ackRetryLoop:
for r := 0; r <= maxRetries; r++ {
ackErr := msg.Ack()
if ackErr == nil {
break ackRetryLoop
} else if r == maxRetries {
t.Fatalf("Failed to ACK message %d, last error: %s", streamSeq, ackErr)
} else {
time.Sleep(retryDelay)
}
}
if !isDupe && received.count()%1000 == 0 {
t.Logf("Consumed %d/%d (duplicates: %d)", received.count(), numMessages, deliveredCount-received.count())
}
}
}
}

View File

@@ -1,177 +0,0 @@
// Copyright 2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build js_chaos_tests
// +build js_chaos_tests
package server
import (
"fmt"
"math/rand"
"sync"
"testing"
"time"
)
// Additional cluster helpers
func (c *cluster) waitOnClusterHealthz() {
c.t.Helper()
for _, cs := range c.servers {
c.waitOnServerHealthz(cs)
}
}
func (c *cluster) stopSubset(toStop []*Server) {
c.t.Helper()
for _, s := range toStop {
s.Shutdown()
}
}
func (c *cluster) selectRandomServers(numServers int) []*Server {
c.t.Helper()
if numServers > len(c.servers) {
panic(fmt.Sprintf("Can't select %d servers in a cluster of %d", numServers, len(c.servers)))
}
var selectedServers []*Server
selectedServers = append(selectedServers, c.servers...)
rand.Shuffle(len(selectedServers), func(x, y int) {
selectedServers[x], selectedServers[y] = selectedServers[y], selectedServers[x]
})
return selectedServers[0:numServers]
}
// Support functions for "chaos" testing (random injected failures)
type ChaosMonkeyController interface {
// Launch the monkey as background routine and return
start()
// Stop a monkey that was previously started
stop()
// Run the monkey synchronously, until it is manually stopped via stopCh
run()
}
type ClusterChaosMonkey interface {
// Set defaults and validates the monkey parameters
validate(t *testing.T, c *cluster)
// Run the monkey synchronously, until it is manually stopped via stopCh
run(t *testing.T, c *cluster, stopCh <-chan bool)
}
// Chaos Monkey Controller that acts on a cluster
type clusterChaosMonkeyController struct {
t *testing.T
cluster *cluster
wg sync.WaitGroup
stopCh chan bool
ccm ClusterChaosMonkey
}
func createClusterChaosMonkeyController(t *testing.T, c *cluster, ccm ClusterChaosMonkey) ChaosMonkeyController {
ccm.validate(t, c)
return &clusterChaosMonkeyController{
t: t,
cluster: c,
stopCh: make(chan bool, 3),
ccm: ccm,
}
}
func (m *clusterChaosMonkeyController) start() {
m.t.Logf("🐵 Starting monkey")
m.wg.Add(1)
go func() {
defer m.wg.Done()
m.run()
}()
}
func (m *clusterChaosMonkeyController) stop() {
m.t.Logf("🐵 Stopping monkey")
m.stopCh <- true
m.wg.Wait()
m.t.Logf("🐵 Monkey stopped")
}
func (m *clusterChaosMonkeyController) run() {
m.ccm.run(m.t, m.cluster, m.stopCh)
}
// Cluster Chaos Monkey that selects a random subset of the nodes in a cluster (according to min/max provided),
// shuts them down for a given duration (according to min/max provided), then brings them back up.
// Then sleeps for a given time, and does it again until stopped.
type clusterBouncerChaosMonkey struct {
minDowntime time.Duration
maxDowntime time.Duration
minDownServers int
maxDownServers int
pause time.Duration
}
func (m *clusterBouncerChaosMonkey) validate(t *testing.T, c *cluster) {
if m.minDowntime > m.maxDowntime {
t.Fatalf("Min downtime %v cannot be larger than max downtime %v", m.minDowntime, m.maxDowntime)
}
if m.minDownServers > m.maxDownServers {
t.Fatalf("Min down servers %v cannot be larger than max down servers %v", m.minDownServers, m.maxDownServers)
}
}
func (m *clusterBouncerChaosMonkey) run(t *testing.T, c *cluster, stopCh <-chan bool) {
for {
// Pause between actions
select {
case <-stopCh:
return
case <-time.After(m.pause):
}
// Pick a random subset of servers
numServersDown := rand.Intn(1+m.maxDownServers-m.minDownServers) + m.minDownServers
servers := c.selectRandomServers(numServersDown)
serverNames := []string{}
for _, s := range servers {
serverNames = append(serverNames, s.info.Name)
}
// Pick a random outage interval
minOutageNanos := m.minDowntime.Nanoseconds()
maxOutageNanos := m.maxDowntime.Nanoseconds()
outageDurationNanos := rand.Int63n(1+maxOutageNanos-minOutageNanos) + minOutageNanos
outageDuration := time.Duration(outageDurationNanos)
// Take down selected servers
t.Logf("🐵 Taking down %d/%d servers for %v (%v)", numServersDown, len(c.servers), outageDuration, serverNames)
c.stopSubset(servers)
// Wait for the "outage" duration
select {
case <-stopCh:
return
case <-time.After(outageDuration):
}
// Restart servers and wait for cluster to be healthy
t.Logf("🐵 Restoring cluster")
c.restartAllSamePorts()
c.waitOnClusterHealthz()
c.waitOnClusterReady()
c.waitOnAllCurrent()
c.waitOnLeader()
}
}

View File

@@ -1,456 +0,0 @@
// Copyright 2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build js_chaos_tests
// +build js_chaos_tests
package server
import (
"fmt"
"math/rand"
"strings"
"sync"
"testing"
"time"
"github.com/nats-io/nats.go"
)
const (
chaosKvTestsClusterName = "KV_CHAOS_TEST"
chaosKvTestsBucketName = "KV_CHAOS_TEST_BUCKET"
chaosKvTestsSubject = "foo"
chaosKvTestsDebug = false
)
// Creates KV store (a.k.a. bucket).
func createBucketForKvChaosTest(t *testing.T, c *cluster, replicas int) {
t.Helper()
pubNc, pubJs := jsClientConnectCluster(t, c)
defer pubNc.Close()
config := nats.KeyValueConfig{
Bucket: chaosKvTestsBucketName,
Replicas: replicas,
Description: "Test bucket",
}
kvs, err := pubJs.CreateKeyValue(&config)
if err != nil {
t.Fatalf("Error creating bucket: %v", err)
}
status, err := kvs.Status()
if err != nil {
t.Fatalf("Error retrieving bucket status: %v", err)
}
t.Logf("Bucket created: %s", status.Bucket())
}
// Single client performs a set of PUT on a single key.
// If PUT is successful, perform a GET on the same key.
// If GET is successful, ensure key revision and value match the most recent successful write.
func TestJetStreamChaosKvPutGet(t *testing.T) {
const numOps = 100_000
const clusterSize = 3
const replicas = 3
const key = "key"
const staleReadsOk = true // Set to false to check for violations of 'read committed' consistency
c := createJetStreamClusterExplicit(t, chaosKvTestsClusterName, clusterSize)
defer c.shutdown()
createBucketForKvChaosTest(t, c, replicas)
chaos := createClusterChaosMonkeyController(
t,
c,
&clusterBouncerChaosMonkey{
minDowntime: 0 * time.Second,
maxDowntime: 2 * time.Second,
minDownServers: clusterSize, // Whole cluster outage
maxDownServers: clusterSize,
pause: 1 * time.Second,
},
)
nc, js := jsClientConnectCluster(t, c)
defer nc.Close()
// Create KV bucket
kv, err := js.KeyValue(chaosKvTestsBucketName)
if err != nil {
t.Fatalf("Failed to get KV store: %v", err)
}
// Initialize the only key
firstRevision, err := kv.Create(key, []byte("INITIAL VALUE"))
if err != nil {
t.Fatalf("Failed to create key: %v", err)
} else if firstRevision != 1 {
t.Fatalf("Unexpected revision: %d", firstRevision)
}
// Start chaos
chaos.start()
defer chaos.stop()
staleReadsCount := uint64(0)
successCount := uint64(0)
previousRevision := firstRevision
putGetLoop:
for i := 1; i <= numOps; i++ {
if i%1000 == 0 {
t.Logf("Completed %d/%d PUT+GET operations", i, numOps)
}
// PUT a value
putValue := fmt.Sprintf("value-%d", i)
putRevision, err := kv.Put(key, []byte(putValue))
if err != nil {
t.Logf("PUT error: %v", err)
continue putGetLoop
}
// Check revision is monotonically increasing
if putRevision <= previousRevision {
t.Fatalf("PUT produced revision %d which is not greater than the previous successful PUT revision: %d", putRevision, previousRevision)
}
previousRevision = putRevision
// If PUT was successful, GET the same
kve, err := kv.Get(key)
if err == nats.ErrKeyNotFound {
t.Fatalf("GET key not found, but key does exists (last PUT revision: %d)", putRevision)
} else if err != nil {
t.Logf("GET error: %v", err)
continue putGetLoop
}
getValue := string(kve.Value())
getRevision := kve.Revision()
if putRevision > getRevision {
// Stale read, violates 'read committed' consistency criteria
if !staleReadsOk {
t.Fatalf("PUT value %s (rev: %d) then read value %s (rev: %d)", putValue, putRevision, getValue, getRevision)
} else {
staleReadsCount += 1
}
} else if putRevision < getRevision {
// Returned revision is higher than any ever written, this should never happen
t.Fatalf("GET returned revision %d, but most recent expected revision is %d", getRevision, putRevision)
} else if putValue != getValue {
// Returned revision matches latest, but values do not, this should never happen
t.Fatalf("GET returned revision %d with value %s, but value %s was just committed for that same revision", getRevision, getValue, putValue)
} else {
// Get returned the latest revision/value
successCount += 1
if chaosKvTestsDebug {
t.Logf("PUT+GET %s=%s (rev: %d)", key, putValue, putRevision)
}
}
}
t.Logf("Completed %d PUT+GET cycles of which %d successful, %d GETs returned a stale value", numOps, successCount, staleReadsCount)
}
// A variant TestJetStreamChaosKvPutGet where PUT is retried until successful, and GET is retried until it returns the latest known key revision.
// This validates than a confirmed PUT value is never lost, and becomes eventually visible.
func TestJetStreamChaosKvPutGetWithRetries(t *testing.T) {
const numOps = 10_000
const maxRetries = 20
const retryDelay = 100 * time.Millisecond
const clusterSize = 3
const replicas = 3
const key = "key"
c := createJetStreamClusterExplicit(t, chaosKvTestsClusterName, clusterSize)
defer c.shutdown()
createBucketForKvChaosTest(t, c, replicas)
chaos := createClusterChaosMonkeyController(
t,
c,
&clusterBouncerChaosMonkey{
minDowntime: 0 * time.Second,
maxDowntime: 2 * time.Second,
minDownServers: clusterSize, // Whole cluster outage
maxDownServers: clusterSize,
pause: 1 * time.Second,
},
)
nc, js := jsClientConnectCluster(t, c)
defer nc.Close()
kv, err := js.KeyValue(chaosKvTestsBucketName)
if err != nil {
t.Fatalf("Failed to get KV store: %v", err)
}
// Initialize key value
firstRevision, err := kv.Create(key, []byte("INITIAL VALUE"))
if err != nil {
t.Fatalf("Failed to create key: %v", err)
} else if firstRevision != 1 {
t.Fatalf("Unexpected revision: %d", firstRevision)
}
// Start chaos
chaos.start()
defer chaos.stop()
staleReadCount := 0
previousRevision := firstRevision
putGetLoop:
for i := 1; i <= numOps; i++ {
if i%1000 == 0 {
t.Logf("Completed %d/%d PUT+GET operations", i, numOps)
}
putValue := fmt.Sprintf("value-%d", i)
putRevision := uint64(0)
// Put new value for key, retry until successful or out of retries
putRetryLoop:
for r := 0; r <= maxRetries; r++ {
var putErr error
putRevision, putErr = kv.Put(key, []byte(putValue))
if putErr == nil {
break putRetryLoop
} else if r == maxRetries {
t.Fatalf("Failed to PUT (retried %d times): %v", maxRetries, putErr)
} else {
if chaosKvTestsDebug {
t.Logf("PUT error: %v", putErr)
}
time.Sleep(retryDelay)
}
}
// Ensure key version is monotonically increasing
if putRevision <= previousRevision {
t.Fatalf("Latest PUT created revision %d which is not greater than the previous revision: %d", putRevision, previousRevision)
}
previousRevision = putRevision
// Read value for key, retry until successful, and validate corresponding version and value
getRetryLoop:
for r := 0; r <= maxRetries; r++ {
var getErr error
kve, getErr := kv.Get(key)
if getErr != nil && r == maxRetries {
t.Fatalf("Failed to GET (retried %d times): %v", maxRetries, getErr)
} else if getErr != nil {
if chaosKvTestsDebug {
t.Logf("GET error: %v", getErr)
}
time.Sleep(retryDelay)
continue getRetryLoop
}
// GET successful, check revision and value
getValue := string(kve.Value())
getRevision := kve.Revision()
if putRevision == getRevision {
if putValue != getValue {
t.Fatalf("Unexpected value %s for revision %d, expected: %s", getValue, getRevision, putValue)
}
if chaosKvTestsDebug {
t.Logf("PUT+GET %s=%s (rev: %d) (retry: %d)", key, putValue, putRevision, r)
}
continue putGetLoop
} else if getRevision > putRevision {
t.Fatalf("GET returned version that should not exist yet: %d, last created: %d", getRevision, putRevision)
} else { // get revision < put revision
staleReadCount += 1
if chaosKvTestsDebug {
t.Logf("GET got stale value: %v (rev: %d, latest: %d)", getValue, getRevision, putRevision)
}
time.Sleep(retryDelay)
continue getRetryLoop
}
}
}
t.Logf("Client completed %d PUT+GET cycles, %d GET returned a stale value", numOps, staleReadCount)
}
// Multiple clients updating a finite set of keys with CAS semantics.
// TODO check that revision is never lower than last one seen
// TODO check that KeyNotFound is never returned, as keys are initialized beforehand
func TestJetStreamChaosKvCAS(t *testing.T) {
const numOps = 10_000
const maxRetries = 50
const retryDelay = 300 * time.Millisecond
const clusterSize = 3
const replicas = 3
const numKeys = 15
const numClients = 5
c := createJetStreamClusterExplicit(t, chaosKvTestsClusterName, clusterSize)
defer c.shutdown()
createBucketForKvChaosTest(t, c, replicas)
chaos := createClusterChaosMonkeyController(
t,
c,
&clusterBouncerChaosMonkey{
minDowntime: 0 * time.Second,
maxDowntime: 2 * time.Second,
minDownServers: clusterSize, // Whole cluster outage
maxDownServers: clusterSize,
pause: 1 * time.Second,
},
)
nc, js := jsClientConnectCluster(t, c)
defer nc.Close()
// Create bucket
kv, err := js.KeyValue(chaosKvTestsBucketName)
if err != nil {
t.Fatalf("Failed to get KV store: %v", err)
}
// Create set of keys and initialize them with dummy value
keys := make([]string, numKeys)
for k := 0; k < numKeys; k++ {
key := fmt.Sprintf("key-%d", k)
keys[k] = key
_, err := kv.Create(key, []byte("Initial value"))
if err != nil {
t.Fatalf("Failed to create key: %v", err)
}
}
wgStart := sync.WaitGroup{}
wgComplete := sync.WaitGroup{}
// Client routine
client := func(clientId int, kv nats.KeyValue) {
defer wgComplete.Done()
rng := rand.New(rand.NewSource(int64(clientId)))
successfulUpdates := 0
casRejectUpdates := 0
otherUpdateErrors := 0
// Map to track last known revision for each of the keys
knownRevisions := map[string]uint64{}
for _, key := range keys {
knownRevisions[key] = 0
}
// Wait for all clients to reach this point before proceeding
wgStart.Done()
wgStart.Wait()
for i := 1; i <= numOps; i++ {
if i%1000 == 0 {
t.Logf("Client %d completed %d/%d updates", clientId, i, numOps)
}
// Pick random key from the set
key := keys[rng.Intn(numKeys)]
// Prepare unique value to be written
value := fmt.Sprintf("client: %d operation %d", clientId, i)
// Try to update a key with CAS
newRevision, updateErr := kv.Update(key, []byte(value), knownRevisions[key])
if updateErr == nil {
// Update successful
knownRevisions[key] = newRevision
successfulUpdates += 1
if chaosKvTestsDebug {
t.Logf("Client %d updated key %s, new revision: %d", clientId, key, newRevision)
}
} else if updateErr != nil && strings.Contains(fmt.Sprint(updateErr), "wrong last sequence") {
// CAS rejected update, learn current revision for this key
casRejectUpdates += 1
for r := 0; r <= maxRetries; r++ {
kve, getErr := kv.Get(key)
if getErr == nil {
currentRevision := kve.Revision()
if currentRevision < knownRevisions[key] {
// Revision number moved backward, this should never happen
t.Fatalf("Current revision for key %s is %d, which is lower than the last known revision %d", key, currentRevision, knownRevisions[key])
}
knownRevisions[key] = currentRevision
if chaosKvTestsDebug {
t.Logf("Client %d learn key %s revision: %d", clientId, key, currentRevision)
}
break
} else if r == maxRetries {
t.Fatalf("Failed to GET (retried %d times): %v", maxRetries, getErr)
} else {
time.Sleep(retryDelay)
}
}
} else {
// Other update error
otherUpdateErrors += 1
if chaosKvTestsDebug {
t.Logf("Client %d update error for key %s: %v", clientId, key, updateErr)
}
time.Sleep(retryDelay)
}
}
t.Logf("Client %d done, %d kv updates, %d CAS rejected, %d other errors", clientId, successfulUpdates, casRejectUpdates, otherUpdateErrors)
}
// Launch all clients
for i := 1; i <= numClients; i++ {
cNc, cJs := jsClientConnectCluster(t, c)
defer cNc.Close()
cKv, err := cJs.KeyValue(chaosKvTestsBucketName)
if err != nil {
t.Fatalf("Failed to get KV store: %v", err)
}
wgStart.Add(1)
wgComplete.Add(1)
go client(i, cKv)
}
// Wait for clients to be connected and ready
wgStart.Wait()
// Start failures
chaos.start()
defer chaos.stop()
// Wait for all clients to be done
wgComplete.Wait()
}

File diff suppressed because it is too large Load Diff