From 6bda358fa3ac292e98c72ff1cb3844ae8a05c5bf Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 28 Feb 2023 12:26:11 -0800 Subject: [PATCH] Fix tests that made assumptions about single server processing. Signed-off-by: Derek Collison --- server/jetstream_test.go | 69 ++++++++++++++++------------------------ 1 file changed, 27 insertions(+), 42 deletions(-) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 7cbe543e..eab8a4f5 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -207,12 +207,10 @@ func TestJetStreamAddStream(t *testing.T) { } defer mset.delete() - nc := clientConnectToServer(t, s) + nc, js := jsClientConnect(t, s) defer nc.Close() - nc.Publish("foo", []byte("Hello World!")) - nc.Flush() - + js.Publish("foo", []byte("Hello World!")) state := mset.state() if state.Msgs != 1 { t.Fatalf("Expected 1 message, got %d", state.Msgs) @@ -221,9 +219,7 @@ func TestJetStreamAddStream(t *testing.T) { t.Fatalf("Expected non-zero bytes") } - nc.Publish("foo", []byte("Hello World Again!")) - nc.Flush() - + js.Publish("foo", []byte("Hello World Again!")) state = mset.state() if state.Msgs != 2 { t.Fatalf("Expected 2 messages, got %d", state.Msgs) @@ -5035,7 +5031,7 @@ func TestJetStreamDurableFilteredSubjectConsumerReconnect(t *testing.T) { } defer mset.delete() - nc := clientConnectToServer(t, s) + nc, js := jsClientConnect(t, s) defer nc.Close() sendMsgs := func(toSend int) { @@ -5046,11 +5042,9 @@ func TestJetStreamDurableFilteredSubjectConsumerReconnect(t *testing.T) { } else { subj = "foo.ZZ" } - if err := nc.Publish(subj, []byte("OK!")); err != nil { - return - } + _, err := js.Publish(subj, []byte("OK!")) + require_NoError(t, err) } - nc.Flush() } // Send 50 msgs @@ -5180,14 +5174,14 @@ func TestJetStreamConsumerInactiveNoDeadlock(t *testing.T) { } defer mset.delete() - nc := clientConnectToServer(t, s) + nc, js := jsClientConnect(t, s) defer nc.Close() // Send lots of msgs and have them queued up. for i := 0; i < 10000; i++ { - nc.Publish("DC", []byte("OK!")) + js.Publish("DC", []byte("OK!")) } - nc.Flush() + if state := mset.state(); state.Msgs != 10000 { t.Fatalf("Expected %d messages, got %d", 10000, state.Msgs) } @@ -5339,14 +5333,13 @@ func TestJetStreamRedeliverCount(t *testing.T) { } defer mset.delete() - nc := clientConnectToServer(t, s) + nc, js := jsClientConnect(t, s) defer nc.Close() // Send 10 msgs for i := 0; i < 10; i++ { - nc.Publish("DC", []byte("OK!")) + js.Publish("DC", []byte("OK!")) } - nc.Flush() if state := mset.state(); state.Msgs != 10 { t.Fatalf("Expected %d messages, got %d", 10, state.Msgs) } @@ -5490,14 +5483,13 @@ func TestJetStreamCanNotNakAckd(t *testing.T) { } defer mset.delete() - nc := clientConnectToServer(t, s) + nc, js := jsClientConnect(t, s) defer nc.Close() // Send 10 msgs for i := 0; i < 10; i++ { - nc.Publish("DC", []byte("OK!")) + js.Publish("DC", []byte("OK!")) } - nc.Flush() if state := mset.state(); state.Msgs != 10 { t.Fatalf("Expected %d messages, got %d", 10, state.Msgs) } @@ -5564,14 +5556,13 @@ func TestJetStreamStreamPurge(t *testing.T) { } defer mset.delete() - nc := clientConnectToServer(t, s) + nc, js := jsClientConnect(t, s) defer nc.Close() // Send 100 msgs for i := 0; i < 100; i++ { - nc.Publish("DC", []byte("OK!")) + js.Publish("DC", []byte("OK!")) } - nc.Flush() if state := mset.state(); state.Msgs != 100 { t.Fatalf("Expected %d messages, got %d", 100, state.Msgs) } @@ -5586,8 +5577,7 @@ func TestJetStreamStreamPurge(t *testing.T) { } time.Sleep(10 * time.Millisecond) now := time.Now() - nc.Publish("DC", []byte("OK!")) - nc.Flush() + js.Publish("DC", []byte("OK!")) state = mset.state() if state.Msgs != 1 { @@ -5622,14 +5612,13 @@ func TestJetStreamStreamPurgeWithConsumer(t *testing.T) { } defer mset.delete() - nc := clientConnectToServer(t, s) + nc, js := jsClientConnect(t, s) defer nc.Close() // Send 100 msgs for i := 0; i < 100; i++ { - nc.Publish("DC", []byte("OK!")) + js.Publish("DC", []byte("OK!")) } - nc.Flush() if state := mset.state(); state.Msgs != 100 { t.Fatalf("Expected %d messages, got %d", 100, state.Msgs) } @@ -5684,7 +5673,7 @@ func TestJetStreamStreamPurgeWithConsumer(t *testing.T) { t.Fatalf("Expected ackfloor for obsseq to be 75, got %d", state.AckFloor.Consumer) } // Also make sure we can get new messages correctly. - nc.Request("DC", []byte("OK-22"), time.Second) + js.Publish("DC", []byte("OK-22")) if msg, err := nc.Request(nextSubj, nil, time.Second); err != nil { t.Fatalf("Unexpected error: %v", err) } else if string(msg.Data) != "OK-22" { @@ -5713,14 +5702,13 @@ func TestJetStreamStreamPurgeWithConsumerAndRedelivery(t *testing.T) { } defer mset.delete() - nc := clientConnectToServer(t, s) + nc, js := jsClientConnect(t, s) defer nc.Close() // Send 100 msgs for i := 0; i < 100; i++ { - nc.Publish("DC", []byte("OK!")) + js.Publish("DC", []byte("OK!")) } - nc.Flush() if state := mset.state(); state.Msgs != 100 { t.Fatalf("Expected %d messages, got %d", 100, state.Msgs) } @@ -5768,7 +5756,7 @@ func TestJetStreamStreamPurgeWithConsumerAndRedelivery(t *testing.T) { t.Fatalf("Expected ackfloor for obsseq to be 75, got %d", state.AckFloor.Consumer) } // Also make sure we can get new messages correctly. - nc.Request("DC", []byte("OK-22"), time.Second) + js.Publish("DC", []byte("OK-22")) if msg, err := nc.Request(nextSubj, nil, time.Second); err != nil { t.Fatalf("Unexpected error: %v", err) } else if string(msg.Data) != "OK-22" { @@ -5797,16 +5785,15 @@ func TestJetStreamInterestRetentionStream(t *testing.T) { } defer mset.delete() - nc := clientConnectToServer(t, s) + nc, js := jsClientConnect(t, s) defer nc.Close() // Send 100 msgs totalMsgs := 100 for i := 0; i < totalMsgs; i++ { - nc.Publish("DC", []byte("OK!")) + js.Publish("DC", []byte("OK!")) } - nc.Flush() checkNumMsgs := func(numExpected int) { t.Helper() @@ -5842,9 +5829,8 @@ func TestJetStreamInterestRetentionStream(t *testing.T) { mset.addConsumer(&ConsumerConfig{DeliverSubject: sub3.Subject, AckPolicy: AckNone}) for i := 0; i < totalMsgs; i++ { - nc.Publish("DC", []byte("OK!")) + js.Publish("DC", []byte("OK!")) } - nc.Flush() checkNumMsgs(totalMsgs) @@ -8412,15 +8398,14 @@ func TestJetStreamDeleteMsg(t *testing.T) { t.Fatalf("Unexpected error adding stream: %v", err) } - nc := clientConnectToServer(t, s) + nc, js := jsClientConnect(t, s) defer nc.Close() pubTen := func() { t.Helper() for i := 0; i < 10; i++ { - nc.Publish("foo", []byte("Hello World!")) + js.Publish("foo", []byte("Hello World!")) } - nc.Flush() } pubTen()