diff --git a/server/events_test.go b/server/events_test.go index 434de770..3e2bee68 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -813,6 +813,11 @@ func TestSystemAccountSystemConnectionLimitsHonored(t *testing.T) { } defer ncb1.Close() tc++ + + // The account's connection count is exchanged between servers + // so that the local count on each server reflects the total count. + // Pause a bit to give a change for each server to process the update. + time.Sleep(15 * time.Millisecond) } if tc != 10 { t.Fatalf("Expected to get 10 external connections, got %d", tc) diff --git a/server/gateway_test.go b/server/gateway_test.go index a99f0d81..96a4a0e4 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -1344,8 +1344,7 @@ func setAccountUserPassInOptions(o *Options, accName, username, password string) func TestGatewayAccountInterest(t *testing.T) { o2 := testDefaultOptionsForGateway("B") - // Add users to cause s2 to require auth. Will add an account with user - // later. + // Add users to cause s2 to require auth. Will add an account with user later. o2.Users = append([]*User(nil), &User{Username: "test", Password: "pwd"}) s2 := runGatewayServer(o2) defer s2.Shutdown() @@ -1387,19 +1386,13 @@ func TestGatewayAccountInterest(t *testing.T) { gwcc := s1.getOutboundGatewayConnection("C") checkCount(t, gwcc, 1) - // S2 should have sent a protocol indicating no account interest. - checkFor(t, time.Second, 15*time.Millisecond, func() error { - if e, inMap := gwcb.gw.outsim.Load("$foo"); !inMap || e != nil { - return fmt.Errorf("Did not receive account no interest") - } - return nil - }) + // S2 and S3 should have sent a protocol indicating no account interest. + checkForAccountNoInterest(t, gwcb, "$foo", true, 2*time.Second) + checkForAccountNoInterest(t, gwcc, "$foo", true, 2*time.Second) // Second send should not go through to B natsPub(t, nc, "foo", []byte("hello")) natsFlush(t, nc) checkCount(t, gwcb, 1) - // it won't go to C, not because there is no account interest, - // but because there is no interest on the subject. checkCount(t, gwcc, 1) // Add account to S2 and a client, this should clear the no-interest @@ -1433,12 +1426,7 @@ func TestGatewayAccountInterest(t *testing.T) { // account will disappear and since S2 sent an A+, it will send // an A-. ncS2.Close() - checkFor(t, time.Second, 15*time.Millisecond, func() error { - if _, inMap := gwcb.gw.outsim.Load("$foo"); !inMap { - return fmt.Errorf("NoInterest should be set") - } - return nil - }) + checkForSubjectNoInterest(t, gwcb, "$foo", "foo", true, 2*time.Second) // Restart C and that should reset the no-interest s3.Shutdown() diff --git a/server/server_test.go b/server/server_test.go index 759d3a92..c15f437c 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -699,13 +699,22 @@ func TestLameDuckMode(t *testing.T) { checkClientsCount(t, srvB, total) // Check closed status on server A - cz := pollConz(t, srvA, 1, "", &ConnzOptions{State: ConnClosed}) - if n := len(cz.Conns); n != total { - t.Fatalf("Expected %v closed connections, got %v", total, n) - } - for _, c := range cz.Conns { - checkReason(t, c.Reason, ServerShutdown) - } + // Connections are saved in go routines, so although we have evaluated the number + // of connections in the server A to be 0, the polling of connection closed may + // need a bit more time. + checkFor(t, time.Second, 15*time.Millisecond, func() error { + cz := pollConz(t, srvA, 1, "", &ConnzOptions{State: ConnClosed}) + if n := len(cz.Conns); n != total { + return fmt.Errorf("expected %v closed connections, got %v", total, n) + } + for _, c := range cz.Conns { + if !strings.Contains(c.Reason, ServerShutdown.String()) { + return fmt.Errorf("expected closed connection with `%s` state, got `%s`", + ServerShutdown.String(), c.Reason) + } + } + return nil + }) stopClientsAndSrvB(ncs) diff --git a/test/client_cluster_test.go b/test/client_cluster_test.go index 347d8561..4047f3d3 100644 --- a/test/client_cluster_test.go +++ b/test/client_cluster_test.go @@ -142,32 +142,22 @@ func TestServerRestartAndQueueSubs(t *testing.T) { // Helper to wait on a reconnect. waitOnReconnect := func() { - var rcs int64 - for { - select { - case <-reconnectsDone: - atomic.AddInt64(&rcs, 1) - // Client that is connected to srvA will reconnect - // to srvB when srvA is shutdown, then when srvB is - // shutdown, both clients will reconnect to srvA. So - // it is a total of 3 reconnects. - if rcs >= 3 { - return - } - case <-time.After(2 * time.Second): - t.Fatalf("Expected a reconnect, timedout!\n") - } + t.Helper() + select { + case <-reconnectsDone: + case <-time.After(2 * time.Second): + t.Fatalf("Expected a reconnect, timedout!\n") } } // Create two clients.. - opts.Servers = []string{urlA} + opts.Servers = []string{urlA, urlB} nc1, err := opts.Connect() if err != nil { t.Fatalf("Failed to create connection for nc1: %v\n", err) } - opts.Servers = []string{urlB} + opts.Servers = []string{urlB, urlA} nc2, err := opts.Connect() if err != nil { t.Fatalf("Failed to create connection for nc2: %v\n", err) @@ -260,15 +250,20 @@ func TestServerRestartAndQueueSubs(t *testing.T) { //////////////////////////////////////////////////////////////////////////// srvA.Shutdown() + // Wait for client on A to reconnect to B. + waitOnReconnect() + srvA = RunServer(optsA) defer srvA.Shutdown() srvB.Shutdown() + // Now both clients should reconnect to A. + waitOnReconnect() + waitOnReconnect() + srvB = RunServer(optsB) defer srvB.Shutdown() - waitOnReconnect() - // Make sure the cluster is reformed checkClusterFormed(t, srvA, srvB) @@ -325,7 +320,9 @@ func TestRequestsAcrossRoutes(t *testing.T) { // Make sure the route and the subscription are propagated. nc1.Flush() - checkExpectedSubs(1, srvA, srvB) + if err := checkExpectedSubs(1, srvA, srvB); err != nil { + t.Fatalf(err.Error()) + } var resp string @@ -374,7 +371,9 @@ func TestRequestsAcrossRoutesToQueues(t *testing.T) { nc2.Publish(m.Reply, response) }) - checkExpectedSubs(2, srvA, srvB) + if err := checkExpectedSubs(2, srvA, srvB); err != nil { + t.Fatalf(err.Error()) + } var resp string diff --git a/test/new_routes_test.go b/test/new_routes_test.go index 580fd180..b882e3ae 100644 --- a/test/new_routes_test.go +++ b/test/new_routes_test.go @@ -869,6 +869,10 @@ func TestNewRouteSinglePublishOnNewAccount(t *testing.T) { sendA("SUB foo 1\r\nPING\r\n") expectA(pongRe) + if err := checkExpectedSubs(1, srvB); err != nil { + t.Fatalf(err.Error()) + } + clientB := createClientConn(t, optsB.Host, optsB.Port) defer clientB.Close() @@ -1063,7 +1067,9 @@ func TestNewRouteStreamImport(t *testing.T) { // The subscription on "foo" for account $bar will also become // a subscription on "foo" for account $foo due to import. // So total of 2 subs. - checkExpectedSubs(2, srvA) + if err := checkExpectedSubs(2, srvA); err != nil { + t.Fatalf(err.Error()) + } // Send on clientA sendA("PING\r\n") @@ -1086,7 +1092,9 @@ func TestNewRouteStreamImport(t *testing.T) { sendB("UNSUB 1\r\nPING\r\n") expectB(pongRe) - checkExpectedSubs(0, srvA) + if err := checkExpectedSubs(0, srvA); err != nil { + t.Fatalf(err.Error()) + } sendA("PUB foo 2\r\nok\r\nPING\r\n") expectA(pongRe)