diff --git a/server/jetstream_chaos_test.go b/server/jetstream_chaos_test.go index 2ef91f6a..b0873382 100644 --- a/server/jetstream_chaos_test.go +++ b/server/jetstream_chaos_test.go @@ -329,7 +329,8 @@ func createStreamForConsumerChaosTest(t *testing.T, c *cluster, replicas, numMes // Verify ordered delivery despite cluster-wide outages func TestJetStreamChaosConsumerOrdered(t *testing.T) { - const numMessages = 30_000 + const numMessages = 5_000 + const numBatch = 500 const maxRetries = 100 const retryDelay = 500 * time.Millisecond const fetchTimeout = 250 * time.Millisecond @@ -407,7 +408,7 @@ func TestJetStreamChaosConsumerOrdered(t *testing.T) { // Simulate application processing (and gives the monkey some time to brew chaos) time.Sleep(10 * time.Millisecond) - if i%1000 == 0 { + if i%numBatch == 0 { t.Logf("Consumed %d/%d", i, numMessages) } } @@ -416,7 +417,8 @@ func TestJetStreamChaosConsumerOrdered(t *testing.T) { // Verify ordered delivery despite cluster-wide outages func TestJetStreamChaosConsumerAsync(t *testing.T) { - const numMessages = 30_000 + const numMessages = 5_000 + const numBatch = 500 const timeout = 30 * time.Second // No (new) messages for 30s => terminate const maxRetries = 25 const retryDelay = 500 * time.Millisecond @@ -480,7 +482,7 @@ func TestJetStreamChaosConsumerAsync(t *testing.T) { timeoutTimer.Reset(1 * time.Second) } - if received.count()%1000 == 0 { + if received.count()%numBatch == 0 { t.Logf("Consumed %d/%d", received.count(), numMessages) } @@ -535,7 +537,8 @@ func TestJetStreamChaosConsumerAsync(t *testing.T) { // The consumer connection is also periodically closed, and the consumer 'resumes' on a different one func TestJetStreamChaosConsumerDurable(t *testing.T) { - const numMessages = 30_000 + const numMessages = 5_000 + const numBatch = 500 const timeout = 30 * time.Second // No (new) messages for 60s => terminate const clusterSize = 3 const replicas = 3 @@ -703,7 +706,7 @@ func TestJetStreamChaosConsumerDurable(t *testing.T) { } } - if received.count()%1000 == 0 { + if received.count()%numBatch == 0 { t.Logf("Consumed %d/%d, duplicate deliveries: %d", received.count(), numMessages, deliveryCount-received.count()) // Close connection and resume consuming on a different one resetDurableConsumer() @@ -740,7 +743,8 @@ func TestJetStreamChaosConsumerDurable(t *testing.T) { func TestJetStreamChaosConsumerPull(t *testing.T) { - const numMessages = 10_000 + const numMessages = 5_000 + const numBatch = 500 const maxRetries = 100 const retryDelay = 500 * time.Millisecond const fetchTimeout = 250 * time.Millisecond @@ -845,7 +849,7 @@ func TestJetStreamChaosConsumerPull(t *testing.T) { } } - if !isDupe && received.count()%1000 == 0 { + if !isDupe && received.count()%numBatch == 0 { t.Logf("Consumed %d/%d (duplicates: %d)", received.count(), numMessages, deliveredCount-received.count()) } }