From b69ffe244e6cc6bacca523f47100086eff051523 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 8 Sep 2022 11:28:23 -0600 Subject: [PATCH] Fixed some tests Code change: - Do not start the processMirrorMsgs and processSourceMsgs go routine if the server has been detected to be shutdown. This would otherwise leave some go routine running at the end of some tests. - Pass the fch and qch to the consumerFileStore's flushLoop otherwise in some tests this routine could be left running. Tests changes: - Added missing defer NATS connection close - Added missing defer server shutdown Signed-off-by: Ivan Kozlovic --- server/accounts_test.go | 5 +++-- server/closed_conns_test.go | 1 + server/events_test.go | 9 +++++++-- server/filestore.go | 7 ++----- server/filestore_test.go | 10 ++++++++++ server/gateway_test.go | 1 + server/jetstream_cluster_test.go | 4 +++- server/jetstream_leafnode_test.go | 2 +- server/jetstream_super_cluster_test.go | 4 ++++ server/jetstream_test.go | 6 +++++- server/jwt_test.go | 6 +++--- server/norace_test.go | 9 ++++++--- server/reload_test.go | 1 + server/routes_test.go | 3 +++ server/stream.go | 8 ++++++-- test/norace_test.go | 2 ++ test/ocsp_test.go | 7 +++++++ test/service_latency_test.go | 1 + 18 files changed, 66 insertions(+), 20 deletions(-) diff --git a/server/accounts_test.go b/server/accounts_test.go index 3d715ed9..19dacfb5 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -308,9 +308,9 @@ func TestAccountIsolationExportImport(t *testing.T) { // Connect with different accounts. ncExp := natsConnect(t, s.ClientURL(), createUserCreds(t, nil, accExpPair), nats.Name(fmt.Sprintf("nc-exporter-%s", c.exp))) + defer ncExp.Close() ncImp := natsConnect(t, s.ClientURL(), createUserCreds(t, nil, accImpPair), nats.Name(fmt.Sprintf("nc-importer-%s", c.imp))) - defer ncExp.Close() defer ncImp.Close() checkIsolation(t, c.pubSubj, ncExp, ncImp) @@ -344,9 +344,9 @@ func TestAccountIsolationExportImport(t *testing.T) { // Connect with different accounts. ncExp := natsConnect(t, s.ClientURL(), nats.UserInfo("accExp", "accExp"), nats.Name(fmt.Sprintf("nc-exporter-%s", c.exp))) + defer ncExp.Close() ncImp := natsConnect(t, s.ClientURL(), nats.UserInfo("accImp", "accImp"), nats.Name(fmt.Sprintf("nc-importer-%s", c.imp))) - defer ncExp.Close() defer ncImp.Close() checkIsolation(t, c.pubSubj, ncExp, ncImp) @@ -3512,6 +3512,7 @@ func TestAccountUserSubPermsWithQueueGroups(t *testing.T) { nc, err := nats.Connect(s.ClientURL(), nats.UserInfo("user", "pass")) require_NoError(t, err) + defer nc.Close() // qsub solo. qsub, err := nc.QueueSubscribeSync("foo.>", "qg") diff --git a/server/closed_conns_test.go b/server/closed_conns_test.go index cd8954c6..273b58fa 100644 --- a/server/closed_conns_test.go +++ b/server/closed_conns_test.go @@ -110,6 +110,7 @@ func TestClosedConnsSubsAccounting(t *testing.T) { if err != nil { t.Fatalf("Error on subscribe: %v", err) } + defer nc.Close() // Now create some subscriptions numSubs := 10 diff --git a/server/events_test.go b/server/events_test.go index ee02afab..e5ef65b4 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -783,6 +783,7 @@ func TestSystemAccountConnectionUpdatesStopAfterNoLocal(t *testing.T) { if err != nil { t.Fatalf("Error on connect: %v", err) } + defer nc.Close() clients = append(clients, nc) } @@ -1589,11 +1590,15 @@ func TestAccountConnsLimitExceededAfterUpdateDisconnectNewOnly(t *testing.T) { newConns := make([]*nats.Conn, 0, 5) url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) for i := 0; i < 5; i++ { - nats.Connect(url, nats.NoReconnect(), createUserCreds(t, s, akp)) + nc, err := nats.Connect(url, nats.NoReconnect(), createUserCreds(t, s, akp)) + require_NoError(t, err) + defer nc.Close() } time.Sleep(500 * time.Millisecond) for i := 0; i < 5; i++ { - nc, _ := nats.Connect(url, nats.NoReconnect(), createUserCreds(t, s, akp)) + nc, err := nats.Connect(url, nats.NoReconnect(), createUserCreds(t, s, akp)) + require_NoError(t, err) + defer nc.Close() newConns = append(newConns, nc) } diff --git a/server/filestore.go b/server/filestore.go index 925ed368..fb63c026 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -5834,7 +5834,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt // Create channels to control our flush go routine. o.fch = make(chan struct{}, 1) o.qch = make(chan struct{}) - go o.flushLoop() + go o.flushLoop(o.fch, o.qch) fs.AddConsumer(o) @@ -5932,10 +5932,7 @@ func (o *consumerFileStore) inFlusher() bool { } // flushLoop watches for consumer updates and the quit channel. -func (o *consumerFileStore) flushLoop() { - o.mu.Lock() - fch, qch := o.fch, o.qch - o.mu.Unlock() +func (o *consumerFileStore) flushLoop(fch, qch chan struct{}) { o.setInFlusher() defer o.clearInFlusher() diff --git a/server/filestore_test.go b/server/filestore_test.go index d23da22a..2d5c8451 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -1544,6 +1544,7 @@ func TestFileStoreAndRetrieveMultiBlock(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + defer fs.Stop() for i := 0; i < 20; i++ { fs.StoreMsg(subj, nil, msg) @@ -2249,6 +2250,7 @@ func TestFileStorePerf(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + defer fs.Stop() start := time.Now() for i := 0; i < int(toStore); i++ { @@ -2273,6 +2275,7 @@ func TestFileStorePerf(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + defer fs.Stop() fmt.Printf("time to restore is %v\n\n", time.Since(start)) fmt.Printf("LOAD: reading %d msgs of %s each, totalling %s\n", @@ -2322,6 +2325,7 @@ func TestFileStorePerf(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + defer fs.Stop() fmt.Printf("\nremoving [in order] %d msgs of %s each, totalling %s\n", toStore, @@ -2388,6 +2392,7 @@ func TestFileStoreReadBackMsgPerf(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + defer fs.Stop() start := time.Now() for i := 0; i < int(toStore); i++ { @@ -2437,6 +2442,7 @@ func TestFileStoreStoreLimitRemovePerf(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + defer fs.Stop() fs.RegisterStorageUpdates(func(md, bd int64, seq uint64, subj string) {}) @@ -2493,6 +2499,7 @@ func TestFileStorePubPerfWithSmallBlkSize(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + defer fs.Stop() start := time.Now() for i := 0; i < int(toStore); i++ { @@ -3336,6 +3343,7 @@ func TestFileStoreSparseCompaction(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + defer fs.Stop() msg := bytes.Repeat([]byte("ABC"), 33) // ~100bytes loadMsgs := func(n int) { @@ -3443,6 +3451,7 @@ func TestFileStoreSparseCompaction(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + defer fs.Stop() loadMsgs(1000) checkState(1000, 1, 1000) @@ -3469,6 +3478,7 @@ func TestFileStoreSparseCompactionWithInteriorDeletes(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + defer fs.Stop() for i := 1; i <= 1000; i++ { if _, _, err := fs.StoreMsg(fmt.Sprintf("kv.%d", i%10), nil, []byte("OK")); err != nil { diff --git a/server/gateway_test.go b/server/gateway_test.go index a983f44e..bd720f6b 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -3614,6 +3614,7 @@ func TestGatewayRaceOnClose(t *testing.T) { bURL := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port) ncB := natsConnect(t, bURL, nats.NoReconnect()) + defer ncB.Close() wg := sync.WaitGroup{} wg.Add(1) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 8287f9e5..a914bb07 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -3671,6 +3671,7 @@ func TestJetStreamClusterPeerExclusionTag(t *testing.T) { changeCurrentConfigContentWithNewContent(t, srv.configFile, []byte(newContent)) ncSys := natsConnect(t, c.randomServer().ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + defer ncSys.Close() sub, err := ncSys.SubscribeSync(fmt.Sprintf("$SYS.SERVER.%s.STATSZ", srv.ID())) require_NoError(t, err) @@ -5026,7 +5027,8 @@ func TestJetStreamClusterStreamPerf(t *testing.T) { var conns []nats.JetStream for i := 0; i < numConnections; i++ { s := c.randomServer() - _, js := jsClientConnect(t, s) + nc, js := jsClientConnect(t, s) + defer nc.Close() conns = append(conns, js) } diff --git a/server/jetstream_leafnode_test.go b/server/jetstream_leafnode_test.go index 6277c1fc..fa1664a0 100644 --- a/server/jetstream_leafnode_test.go +++ b/server/jetstream_leafnode_test.go @@ -1206,7 +1206,7 @@ default_js_domain: {B:"DHUB"} require_Equal(t, si.Cluster.Name, "HUB") } -func TestLeafNodeSvcImportExportCycle(t *testing.T) { +func TestJetStreamLeafNodeSvcImportExportCycle(t *testing.T) { accounts := ` accounts { SYS: { diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 668d0e38..d56a08d1 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -2357,6 +2357,7 @@ func TestJetStreamSuperClusterMaxHaAssets(t *testing.T) { defer nc.Close() ncSys := natsConnect(t, sc.randomServer().ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + defer ncSys.Close() statszSub, err := ncSys.SubscribeSync(fmt.Sprintf(serverStatsSubj, "*")) require_NoError(t, err) require_NoError(t, ncSys.Flush()) @@ -2482,10 +2483,13 @@ func TestJetStreamSuperClusterStreamAlternates(t *testing.T) { // Connect to different clusters to check ordering. nc, _ = jsClientConnect(t, sc.clusterForName("C1").randomServer()) + defer nc.Close() getStreamInfo(nc, "C1") nc, _ = jsClientConnect(t, sc.clusterForName("C2").randomServer()) + defer nc.Close() getStreamInfo(nc, "C2") nc, _ = jsClientConnect(t, sc.clusterForName("C3").randomServer()) + defer nc.Close() getStreamInfo(nc, "C3") } diff --git a/server/jetstream_test.go b/server/jetstream_test.go index a2238223..f3325d4e 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -16619,6 +16619,7 @@ func TestJetStreamCrossAccounts(t *testing.T) { func TestJetStreamInvalidRestoreRequests(t *testing.T) { test := func(t *testing.T, s *Server, replica int) { nc := natsConnect(t, s.ClientURL()) + defer nc.Close() // test invalid stream config in restore request require_fail := func(cfg StreamConfig, errDesc string) { t.Helper() @@ -17041,6 +17042,7 @@ func TestJetStreamImportConsumerStreamSubjectRemapSingle(t *testing.T) { nc2, err := nats.Connect(s.ClientURL(), nats.UserInfo("im", "pwd")) require_NoError(t, err) + defer nc2.Close() var sub *nats.Subscription if queue { @@ -18544,6 +18546,7 @@ func benchJetStreamWorkersAndBatch(b *testing.B, numWorkers, batchSize int) { if err != nil { b.Fatalf("Failed to create client: %v", err) } + defer nc.Close() deliverTo := nats.NewInbox() nc.Subscribe(deliverTo, func(m *nats.Msg) { @@ -18708,7 +18711,8 @@ func TestJetStreamMultiplePullPerf(t *testing.T) { count := 0 for i := 0; i < np; i++ { - _, js := jsClientConnect(t, s) + nc, js := jsClientConnect(t, s) + defer nc.Close() sub, err := js.PullSubscribe("mp22", "d") require_NoError(t, err) diff --git a/server/jwt_test.go b/server/jwt_test.go index 15fa4f71..54e82a5a 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -4024,11 +4024,11 @@ func TestJWTTimeExpiration(t *testing.T) { errChan <- struct{}{} } })) + defer c.Close() chanRecv(t, errChan, 10*time.Second) chanRecv(t, disconnectChan, 10*time.Second) require_True(t, c.IsReconnecting()) require_False(t, c.IsConnected()) - c.Close() }) } t.Run("double expiration", func(t *testing.T) { @@ -4064,12 +4064,12 @@ func TestJWTTimeExpiration(t *testing.T) { errChan <- struct{}{} } })) + defer c.Close() chanRecv(t, errChan, 10*time.Second) chanRecv(t, reConnectChan, 10*time.Second) require_False(t, c.IsReconnecting()) require_True(t, c.IsConnected()) chanRecv(t, errChan, 10*time.Second) - c.Close() }) t.Run("lower jwt expiration overwrites time", func(t *testing.T) { start := time.Now() @@ -4098,11 +4098,11 @@ func TestJWTTimeExpiration(t *testing.T) { errChan <- struct{}{} } })) + defer c.Close() chanRecv(t, errChan, 10*time.Second) chanRecv(t, disconnectChan, 10*time.Second) require_True(t, c.IsReconnecting()) require_False(t, c.IsConnected()) - c.Close() }) } diff --git a/server/norace_test.go b/server/norace_test.go index ecf36fd4..217cbc69 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -602,6 +602,9 @@ func TestNoRaceRouteMemUsage(t *testing.T) { for i := 0; i < 100; i++ { requestor := natsConnect(t, bURL) + // Don't use a defer here otherwise that will make the memory check fail! + // We are closing the connection just after these few instructions that + // are not calling t.Fatal() anyway. inbox := nats.NewInbox() sub := natsSubSync(t, requestor, inbox) natsPubReq(t, requestor, "foo", inbox, payload) @@ -2761,13 +2764,12 @@ func TestNoRaceJetStreamSuperClusterAccountConnz(t *testing.T) { } nc, js := jsClientConnect(t, sc.randomServer(), nats.UserInfo("two", "p"), nats.Name("two")) + defer nc.Close() nc.SubscribeSync("baz") nc.SubscribeSync("foo.bar.*") nc.SubscribeSync(fmt.Sprintf("id.%d", i+1)) js.AddStream(&nats.StreamConfig{Name: fmt.Sprintf("TEST:%d", i+1)}) - - defer nc.Close() } type czapi struct { @@ -4792,6 +4794,7 @@ func TestNoRaceJetStreamOrderedConsumerLongRTTPerformance(t *testing.T) { nc, err = nats.Connect(proxy.clientURL()) require_NoError(t, err) + defer nc.Close() js, err = nc.JetStream() require_NoError(t, err) @@ -5254,8 +5257,8 @@ func TestNoRaceJetStreamClusterDirectAccessAllPeersSubs(t *testing.T) { go func() { defer wg.Done() nc, _ := jsClientConnect(t, c.randomServer()) - js, _ := nc.JetStream(nats.MaxWait(500 * time.Millisecond)) defer nc.Close() + js, _ := nc.JetStream(nats.MaxWait(500 * time.Millisecond)) for { select { case <-qch: diff --git a/server/reload_test.go b/server/reload_test.go index 48a7d2e8..fdb28916 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -4186,6 +4186,7 @@ func TestLoggingReload(t *testing.T) { if err != nil { t.Fatalf("Error creating client: %v\n", err) } + defer nc.Close() msgs := make(chan *nats.Msg, 1) defer close(msgs) diff --git a/server/routes_test.go b/server/routes_test.go index 3c2beab9..c5465b5b 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -794,6 +794,7 @@ func TestServerPoolUpdatedWhenRouteGoesAway(t *testing.T) { if err != nil { t.Fatalf("Error on connect") } + defer nc.Close() s2Opts := DefaultOptions() s2Opts.ServerName = "B" @@ -1512,12 +1513,14 @@ func testTLSRoutesCertificateImplicitAllow(t *testing.T, pass bool) { func TestSubjectRenameViaJetStreamAck(t *testing.T) { s := RunRandClientPortServer() + defer s.Shutdown() errChan := make(chan error) defer close(errChan) ncPub := natsConnect(t, s.ClientURL(), nats.UserInfo("client", "pwd"), nats.ErrorHandler(func(conn *nats.Conn, s *nats.Subscription, err error) { errChan <- err })) + defer ncPub.Close() require_NoError(t, ncPub.PublishRequest("SVC.ALLOWED", "$JS.ACK.whatever@ADMIN", nil)) select { case err := <-errChan: diff --git a/server/stream.go b/server/stream.go index 20c47ec1..ac4b25a1 100644 --- a/server/stream.go +++ b/server/stream.go @@ -2278,7 +2278,9 @@ func (mset *stream) setupMirrorConsumer() error { mirror.qch = make(chan struct{}) mirror.wg.Add(1) ready.Add(1) - mset.srv.startGoRoutine(func() { mset.processMirrorMsgs(mirror, &ready) }) + if !mset.srv.startGoRoutine(func() { mset.processMirrorMsgs(mirror, &ready) }) { + ready.Done() + } } mset.mu.Unlock() ready.Wait() @@ -2554,7 +2556,9 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.T si.qch = make(chan struct{}) si.wg.Add(1) ready.Add(1) - mset.srv.startGoRoutine(func() { mset.processSourceMsgs(si, &ready) }) + if !mset.srv.startGoRoutine(func() { mset.processSourceMsgs(si, &ready) }) { + ready.Done() + } } } mset.mu.Unlock() diff --git a/test/norace_test.go b/test/norace_test.go index 07b17fec..5e677911 100644 --- a/test/norace_test.go +++ b/test/norace_test.go @@ -451,8 +451,10 @@ func TestNoRaceClusterLeaksSubscriptions(t *testing.T) { // Create 100 repliers for i := 0; i < 50; i++ { nc1, _ := nats.Connect(urlA) + defer nc1.Close() nc1.SetErrorHandler(noOpErrHandler) nc2, _ := nats.Connect(urlB) + defer nc2.Close() nc2.SetErrorHandler(noOpErrHandler) repliers = append(repliers, nc1, nc2) nc1.Subscribe("test.reply", func(m *nats.Msg) { diff --git a/test/ocsp_test.go b/test/ocsp_test.go index cb77733c..c08cabc8 100644 --- a/test/ocsp_test.go +++ b/test/ocsp_test.go @@ -102,6 +102,7 @@ func TestOCSPAlwaysMustStapleAndShutdown(t *testing.T) { if err != nil { t.Fatal(err) } + defer nc.Close() sub, err := nc.SubscribeSync("foo") if err != nil { t.Fatal(err) @@ -199,6 +200,7 @@ func TestOCSPMustStapleShutdown(t *testing.T) { if err != nil { t.Fatal(err) } + defer nc.Close() sub, err := nc.SubscribeSync("foo") if err != nil { t.Fatal(err) @@ -616,6 +618,7 @@ func TestOCSPReloadRotateTLSCertWithNoURL(t *testing.T) { if err != nil { t.Fatal(err) } + defer nc.Close() sub, err := nc.SubscribeSync("foo") if err != nil { t.Fatal(err) @@ -727,6 +730,7 @@ func TestOCSPReloadRotateTLSCertDisableMustStaple(t *testing.T) { if err != nil { t.Fatal(err) } + defer nc.Close() sub, err := nc.SubscribeSync("foo") if err != nil { t.Fatal(err) @@ -914,6 +918,7 @@ func TestOCSPReloadRotateTLSCertEnableMustStaple(t *testing.T) { if err != nil { t.Fatal(err) } + defer nc.Close() sub, err := nc.SubscribeSync("foo") if err != nil { t.Fatal(err) @@ -2214,6 +2219,7 @@ func TestOCSPCustomConfigReloadDisable(t *testing.T) { if err != nil { t.Fatal(err) } + defer nc.Close() sub, err := nc.SubscribeSync("foo") if err != nil { t.Fatal(err) @@ -2316,6 +2322,7 @@ func TestOCSPCustomConfigReloadEnable(t *testing.T) { if err != nil { t.Fatal(err) } + defer nc.Close() sub, err := nc.SubscribeSync("foo") if err != nil { t.Fatal(err) diff --git a/test/service_latency_test.go b/test/service_latency_test.go index 6da77acd..7c7e8715 100644 --- a/test/service_latency_test.go +++ b/test/service_latency_test.go @@ -584,6 +584,7 @@ func TestServiceLatencyNoSubsLeak(t *testing.T) { for i := 0; i < 100; i++ { nc := clientConnect(t, sc.clusters[1].opts[1], "bar") + defer nc.Close() if _, err := nc.Request("ngs.usage", []byte("1h"), time.Second); err != nil { t.Fatalf("Error on request: %v", err) }