Fix tests that made assumptions about single server processing.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-02-28 12:26:11 -08:00
parent 3807441fd7
commit 6bda358fa3

View File

@@ -207,12 +207,10 @@ func TestJetStreamAddStream(t *testing.T) {
}
defer mset.delete()
nc := clientConnectToServer(t, s)
nc, js := jsClientConnect(t, s)
defer nc.Close()
nc.Publish("foo", []byte("Hello World!"))
nc.Flush()
js.Publish("foo", []byte("Hello World!"))
state := mset.state()
if state.Msgs != 1 {
t.Fatalf("Expected 1 message, got %d", state.Msgs)
@@ -221,9 +219,7 @@ func TestJetStreamAddStream(t *testing.T) {
t.Fatalf("Expected non-zero bytes")
}
nc.Publish("foo", []byte("Hello World Again!"))
nc.Flush()
js.Publish("foo", []byte("Hello World Again!"))
state = mset.state()
if state.Msgs != 2 {
t.Fatalf("Expected 2 messages, got %d", state.Msgs)
@@ -5035,7 +5031,7 @@ func TestJetStreamDurableFilteredSubjectConsumerReconnect(t *testing.T) {
}
defer mset.delete()
nc := clientConnectToServer(t, s)
nc, js := jsClientConnect(t, s)
defer nc.Close()
sendMsgs := func(toSend int) {
@@ -5046,11 +5042,9 @@ func TestJetStreamDurableFilteredSubjectConsumerReconnect(t *testing.T) {
} else {
subj = "foo.ZZ"
}
if err := nc.Publish(subj, []byte("OK!")); err != nil {
return
}
_, err := js.Publish(subj, []byte("OK!"))
require_NoError(t, err)
}
nc.Flush()
}
// Send 50 msgs
@@ -5180,14 +5174,14 @@ func TestJetStreamConsumerInactiveNoDeadlock(t *testing.T) {
}
defer mset.delete()
nc := clientConnectToServer(t, s)
nc, js := jsClientConnect(t, s)
defer nc.Close()
// Send lots of msgs and have them queued up.
for i := 0; i < 10000; i++ {
nc.Publish("DC", []byte("OK!"))
js.Publish("DC", []byte("OK!"))
}
nc.Flush()
if state := mset.state(); state.Msgs != 10000 {
t.Fatalf("Expected %d messages, got %d", 10000, state.Msgs)
}
@@ -5339,14 +5333,13 @@ func TestJetStreamRedeliverCount(t *testing.T) {
}
defer mset.delete()
nc := clientConnectToServer(t, s)
nc, js := jsClientConnect(t, s)
defer nc.Close()
// Send 10 msgs
for i := 0; i < 10; i++ {
nc.Publish("DC", []byte("OK!"))
js.Publish("DC", []byte("OK!"))
}
nc.Flush()
if state := mset.state(); state.Msgs != 10 {
t.Fatalf("Expected %d messages, got %d", 10, state.Msgs)
}
@@ -5490,14 +5483,13 @@ func TestJetStreamCanNotNakAckd(t *testing.T) {
}
defer mset.delete()
nc := clientConnectToServer(t, s)
nc, js := jsClientConnect(t, s)
defer nc.Close()
// Send 10 msgs
for i := 0; i < 10; i++ {
nc.Publish("DC", []byte("OK!"))
js.Publish("DC", []byte("OK!"))
}
nc.Flush()
if state := mset.state(); state.Msgs != 10 {
t.Fatalf("Expected %d messages, got %d", 10, state.Msgs)
}
@@ -5564,14 +5556,13 @@ func TestJetStreamStreamPurge(t *testing.T) {
}
defer mset.delete()
nc := clientConnectToServer(t, s)
nc, js := jsClientConnect(t, s)
defer nc.Close()
// Send 100 msgs
for i := 0; i < 100; i++ {
nc.Publish("DC", []byte("OK!"))
js.Publish("DC", []byte("OK!"))
}
nc.Flush()
if state := mset.state(); state.Msgs != 100 {
t.Fatalf("Expected %d messages, got %d", 100, state.Msgs)
}
@@ -5586,8 +5577,7 @@ func TestJetStreamStreamPurge(t *testing.T) {
}
time.Sleep(10 * time.Millisecond)
now := time.Now()
nc.Publish("DC", []byte("OK!"))
nc.Flush()
js.Publish("DC", []byte("OK!"))
state = mset.state()
if state.Msgs != 1 {
@@ -5622,14 +5612,13 @@ func TestJetStreamStreamPurgeWithConsumer(t *testing.T) {
}
defer mset.delete()
nc := clientConnectToServer(t, s)
nc, js := jsClientConnect(t, s)
defer nc.Close()
// Send 100 msgs
for i := 0; i < 100; i++ {
nc.Publish("DC", []byte("OK!"))
js.Publish("DC", []byte("OK!"))
}
nc.Flush()
if state := mset.state(); state.Msgs != 100 {
t.Fatalf("Expected %d messages, got %d", 100, state.Msgs)
}
@@ -5684,7 +5673,7 @@ func TestJetStreamStreamPurgeWithConsumer(t *testing.T) {
t.Fatalf("Expected ackfloor for obsseq to be 75, got %d", state.AckFloor.Consumer)
}
// Also make sure we can get new messages correctly.
nc.Request("DC", []byte("OK-22"), time.Second)
js.Publish("DC", []byte("OK-22"))
if msg, err := nc.Request(nextSubj, nil, time.Second); err != nil {
t.Fatalf("Unexpected error: %v", err)
} else if string(msg.Data) != "OK-22" {
@@ -5713,14 +5702,13 @@ func TestJetStreamStreamPurgeWithConsumerAndRedelivery(t *testing.T) {
}
defer mset.delete()
nc := clientConnectToServer(t, s)
nc, js := jsClientConnect(t, s)
defer nc.Close()
// Send 100 msgs
for i := 0; i < 100; i++ {
nc.Publish("DC", []byte("OK!"))
js.Publish("DC", []byte("OK!"))
}
nc.Flush()
if state := mset.state(); state.Msgs != 100 {
t.Fatalf("Expected %d messages, got %d", 100, state.Msgs)
}
@@ -5768,7 +5756,7 @@ func TestJetStreamStreamPurgeWithConsumerAndRedelivery(t *testing.T) {
t.Fatalf("Expected ackfloor for obsseq to be 75, got %d", state.AckFloor.Consumer)
}
// Also make sure we can get new messages correctly.
nc.Request("DC", []byte("OK-22"), time.Second)
js.Publish("DC", []byte("OK-22"))
if msg, err := nc.Request(nextSubj, nil, time.Second); err != nil {
t.Fatalf("Unexpected error: %v", err)
} else if string(msg.Data) != "OK-22" {
@@ -5797,16 +5785,15 @@ func TestJetStreamInterestRetentionStream(t *testing.T) {
}
defer mset.delete()
nc := clientConnectToServer(t, s)
nc, js := jsClientConnect(t, s)
defer nc.Close()
// Send 100 msgs
totalMsgs := 100
for i := 0; i < totalMsgs; i++ {
nc.Publish("DC", []byte("OK!"))
js.Publish("DC", []byte("OK!"))
}
nc.Flush()
checkNumMsgs := func(numExpected int) {
t.Helper()
@@ -5842,9 +5829,8 @@ func TestJetStreamInterestRetentionStream(t *testing.T) {
mset.addConsumer(&ConsumerConfig{DeliverSubject: sub3.Subject, AckPolicy: AckNone})
for i := 0; i < totalMsgs; i++ {
nc.Publish("DC", []byte("OK!"))
js.Publish("DC", []byte("OK!"))
}
nc.Flush()
checkNumMsgs(totalMsgs)
@@ -8412,15 +8398,14 @@ func TestJetStreamDeleteMsg(t *testing.T) {
t.Fatalf("Unexpected error adding stream: %v", err)
}
nc := clientConnectToServer(t, s)
nc, js := jsClientConnect(t, s)
defer nc.Close()
pubTen := func() {
t.Helper()
for i := 0; i < 10; i++ {
nc.Publish("foo", []byte("Hello World!"))
js.Publish("foo", []byte("Hello World!"))
}
nc.Flush()
}
pubTen()