diff --git a/server/accounts_test.go b/server/accounts_test.go index 3d715ed9..19dacfb5 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -308,9 +308,9 @@ func TestAccountIsolationExportImport(t *testing.T) { // Connect with different accounts. ncExp := natsConnect(t, s.ClientURL(), createUserCreds(t, nil, accExpPair), nats.Name(fmt.Sprintf("nc-exporter-%s", c.exp))) + defer ncExp.Close() ncImp := natsConnect(t, s.ClientURL(), createUserCreds(t, nil, accImpPair), nats.Name(fmt.Sprintf("nc-importer-%s", c.imp))) - defer ncExp.Close() defer ncImp.Close() checkIsolation(t, c.pubSubj, ncExp, ncImp) @@ -344,9 +344,9 @@ func TestAccountIsolationExportImport(t *testing.T) { // Connect with different accounts. ncExp := natsConnect(t, s.ClientURL(), nats.UserInfo("accExp", "accExp"), nats.Name(fmt.Sprintf("nc-exporter-%s", c.exp))) + defer ncExp.Close() ncImp := natsConnect(t, s.ClientURL(), nats.UserInfo("accImp", "accImp"), nats.Name(fmt.Sprintf("nc-importer-%s", c.imp))) - defer ncExp.Close() defer ncImp.Close() checkIsolation(t, c.pubSubj, ncExp, ncImp) @@ -3512,6 +3512,7 @@ func TestAccountUserSubPermsWithQueueGroups(t *testing.T) { nc, err := nats.Connect(s.ClientURL(), nats.UserInfo("user", "pass")) require_NoError(t, err) + defer nc.Close() // qsub solo. qsub, err := nc.QueueSubscribeSync("foo.>", "qg") diff --git a/server/closed_conns_test.go b/server/closed_conns_test.go index cd8954c6..273b58fa 100644 --- a/server/closed_conns_test.go +++ b/server/closed_conns_test.go @@ -110,6 +110,7 @@ func TestClosedConnsSubsAccounting(t *testing.T) { if err != nil { t.Fatalf("Error on subscribe: %v", err) } + defer nc.Close() // Now create some subscriptions numSubs := 10 diff --git a/server/events_test.go b/server/events_test.go index ee02afab..e5ef65b4 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -783,6 +783,7 @@ func TestSystemAccountConnectionUpdatesStopAfterNoLocal(t *testing.T) { if err != nil { t.Fatalf("Error on connect: %v", err) } + defer nc.Close() clients = append(clients, nc) } @@ -1589,11 +1590,15 @@ func TestAccountConnsLimitExceededAfterUpdateDisconnectNewOnly(t *testing.T) { newConns := make([]*nats.Conn, 0, 5) url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) for i := 0; i < 5; i++ { - nats.Connect(url, nats.NoReconnect(), createUserCreds(t, s, akp)) + nc, err := nats.Connect(url, nats.NoReconnect(), createUserCreds(t, s, akp)) + require_NoError(t, err) + defer nc.Close() } time.Sleep(500 * time.Millisecond) for i := 0; i < 5; i++ { - nc, _ := nats.Connect(url, nats.NoReconnect(), createUserCreds(t, s, akp)) + nc, err := nats.Connect(url, nats.NoReconnect(), createUserCreds(t, s, akp)) + require_NoError(t, err) + defer nc.Close() newConns = append(newConns, nc) } diff --git a/server/filestore.go b/server/filestore.go index 925ed368..fb63c026 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -5834,7 +5834,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt // Create channels to control our flush go routine. o.fch = make(chan struct{}, 1) o.qch = make(chan struct{}) - go o.flushLoop() + go o.flushLoop(o.fch, o.qch) fs.AddConsumer(o) @@ -5932,10 +5932,7 @@ func (o *consumerFileStore) inFlusher() bool { } // flushLoop watches for consumer updates and the quit channel. -func (o *consumerFileStore) flushLoop() { - o.mu.Lock() - fch, qch := o.fch, o.qch - o.mu.Unlock() +func (o *consumerFileStore) flushLoop(fch, qch chan struct{}) { o.setInFlusher() defer o.clearInFlusher() diff --git a/server/filestore_test.go b/server/filestore_test.go index d23da22a..2d5c8451 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -1544,6 +1544,7 @@ func TestFileStoreAndRetrieveMultiBlock(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + defer fs.Stop() for i := 0; i < 20; i++ { fs.StoreMsg(subj, nil, msg) @@ -2249,6 +2250,7 @@ func TestFileStorePerf(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + defer fs.Stop() start := time.Now() for i := 0; i < int(toStore); i++ { @@ -2273,6 +2275,7 @@ func TestFileStorePerf(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + defer fs.Stop() fmt.Printf("time to restore is %v\n\n", time.Since(start)) fmt.Printf("LOAD: reading %d msgs of %s each, totalling %s\n", @@ -2322,6 +2325,7 @@ func TestFileStorePerf(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + defer fs.Stop() fmt.Printf("\nremoving [in order] %d msgs of %s each, totalling %s\n", toStore, @@ -2388,6 +2392,7 @@ func TestFileStoreReadBackMsgPerf(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + defer fs.Stop() start := time.Now() for i := 0; i < int(toStore); i++ { @@ -2437,6 +2442,7 @@ func TestFileStoreStoreLimitRemovePerf(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + defer fs.Stop() fs.RegisterStorageUpdates(func(md, bd int64, seq uint64, subj string) {}) @@ -2493,6 +2499,7 @@ func TestFileStorePubPerfWithSmallBlkSize(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + defer fs.Stop() start := time.Now() for i := 0; i < int(toStore); i++ { @@ -3336,6 +3343,7 @@ func TestFileStoreSparseCompaction(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + defer fs.Stop() msg := bytes.Repeat([]byte("ABC"), 33) // ~100bytes loadMsgs := func(n int) { @@ -3443,6 +3451,7 @@ func TestFileStoreSparseCompaction(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + defer fs.Stop() loadMsgs(1000) checkState(1000, 1, 1000) @@ -3469,6 +3478,7 @@ func TestFileStoreSparseCompactionWithInteriorDeletes(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + defer fs.Stop() for i := 1; i <= 1000; i++ { if _, _, err := fs.StoreMsg(fmt.Sprintf("kv.%d", i%10), nil, []byte("OK")); err != nil { diff --git a/server/gateway_test.go b/server/gateway_test.go index a983f44e..bd720f6b 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -3614,6 +3614,7 @@ func TestGatewayRaceOnClose(t *testing.T) { bURL := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port) ncB := natsConnect(t, bURL, nats.NoReconnect()) + defer ncB.Close() wg := sync.WaitGroup{} wg.Add(1) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 8287f9e5..a914bb07 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -3671,6 +3671,7 @@ func TestJetStreamClusterPeerExclusionTag(t *testing.T) { changeCurrentConfigContentWithNewContent(t, srv.configFile, []byte(newContent)) ncSys := natsConnect(t, c.randomServer().ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + defer ncSys.Close() sub, err := ncSys.SubscribeSync(fmt.Sprintf("$SYS.SERVER.%s.STATSZ", srv.ID())) require_NoError(t, err) @@ -5026,7 +5027,8 @@ func TestJetStreamClusterStreamPerf(t *testing.T) { var conns []nats.JetStream for i := 0; i < numConnections; i++ { s := c.randomServer() - _, js := jsClientConnect(t, s) + nc, js := jsClientConnect(t, s) + defer nc.Close() conns = append(conns, js) } diff --git a/server/jetstream_leafnode_test.go b/server/jetstream_leafnode_test.go index 6277c1fc..fa1664a0 100644 --- a/server/jetstream_leafnode_test.go +++ b/server/jetstream_leafnode_test.go @@ -1206,7 +1206,7 @@ default_js_domain: {B:"DHUB"} require_Equal(t, si.Cluster.Name, "HUB") } -func TestLeafNodeSvcImportExportCycle(t *testing.T) { +func TestJetStreamLeafNodeSvcImportExportCycle(t *testing.T) { accounts := ` accounts { SYS: { diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 668d0e38..d56a08d1 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -2357,6 +2357,7 @@ func TestJetStreamSuperClusterMaxHaAssets(t *testing.T) { defer nc.Close() ncSys := natsConnect(t, sc.randomServer().ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + defer ncSys.Close() statszSub, err := ncSys.SubscribeSync(fmt.Sprintf(serverStatsSubj, "*")) require_NoError(t, err) require_NoError(t, ncSys.Flush()) @@ -2482,10 +2483,13 @@ func TestJetStreamSuperClusterStreamAlternates(t *testing.T) { // Connect to different clusters to check ordering. nc, _ = jsClientConnect(t, sc.clusterForName("C1").randomServer()) + defer nc.Close() getStreamInfo(nc, "C1") nc, _ = jsClientConnect(t, sc.clusterForName("C2").randomServer()) + defer nc.Close() getStreamInfo(nc, "C2") nc, _ = jsClientConnect(t, sc.clusterForName("C3").randomServer()) + defer nc.Close() getStreamInfo(nc, "C3") } diff --git a/server/jetstream_test.go b/server/jetstream_test.go index a2238223..f3325d4e 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -16619,6 +16619,7 @@ func TestJetStreamCrossAccounts(t *testing.T) { func TestJetStreamInvalidRestoreRequests(t *testing.T) { test := func(t *testing.T, s *Server, replica int) { nc := natsConnect(t, s.ClientURL()) + defer nc.Close() // test invalid stream config in restore request require_fail := func(cfg StreamConfig, errDesc string) { t.Helper() @@ -17041,6 +17042,7 @@ func TestJetStreamImportConsumerStreamSubjectRemapSingle(t *testing.T) { nc2, err := nats.Connect(s.ClientURL(), nats.UserInfo("im", "pwd")) require_NoError(t, err) + defer nc2.Close() var sub *nats.Subscription if queue { @@ -18544,6 +18546,7 @@ func benchJetStreamWorkersAndBatch(b *testing.B, numWorkers, batchSize int) { if err != nil { b.Fatalf("Failed to create client: %v", err) } + defer nc.Close() deliverTo := nats.NewInbox() nc.Subscribe(deliverTo, func(m *nats.Msg) { @@ -18708,7 +18711,8 @@ func TestJetStreamMultiplePullPerf(t *testing.T) { count := 0 for i := 0; i < np; i++ { - _, js := jsClientConnect(t, s) + nc, js := jsClientConnect(t, s) + defer nc.Close() sub, err := js.PullSubscribe("mp22", "d") require_NoError(t, err) diff --git a/server/jwt_test.go b/server/jwt_test.go index 15fa4f71..54e82a5a 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -4024,11 +4024,11 @@ func TestJWTTimeExpiration(t *testing.T) { errChan <- struct{}{} } })) + defer c.Close() chanRecv(t, errChan, 10*time.Second) chanRecv(t, disconnectChan, 10*time.Second) require_True(t, c.IsReconnecting()) require_False(t, c.IsConnected()) - c.Close() }) } t.Run("double expiration", func(t *testing.T) { @@ -4064,12 +4064,12 @@ func TestJWTTimeExpiration(t *testing.T) { errChan <- struct{}{} } })) + defer c.Close() chanRecv(t, errChan, 10*time.Second) chanRecv(t, reConnectChan, 10*time.Second) require_False(t, c.IsReconnecting()) require_True(t, c.IsConnected()) chanRecv(t, errChan, 10*time.Second) - c.Close() }) t.Run("lower jwt expiration overwrites time", func(t *testing.T) { start := time.Now() @@ -4098,11 +4098,11 @@ func TestJWTTimeExpiration(t *testing.T) { errChan <- struct{}{} } })) + defer c.Close() chanRecv(t, errChan, 10*time.Second) chanRecv(t, disconnectChan, 10*time.Second) require_True(t, c.IsReconnecting()) require_False(t, c.IsConnected()) - c.Close() }) } diff --git a/server/norace_test.go b/server/norace_test.go index ecf36fd4..217cbc69 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -602,6 +602,9 @@ func TestNoRaceRouteMemUsage(t *testing.T) { for i := 0; i < 100; i++ { requestor := natsConnect(t, bURL) + // Don't use a defer here otherwise that will make the memory check fail! + // We are closing the connection just after these few instructions that + // are not calling t.Fatal() anyway. inbox := nats.NewInbox() sub := natsSubSync(t, requestor, inbox) natsPubReq(t, requestor, "foo", inbox, payload) @@ -2761,13 +2764,12 @@ func TestNoRaceJetStreamSuperClusterAccountConnz(t *testing.T) { } nc, js := jsClientConnect(t, sc.randomServer(), nats.UserInfo("two", "p"), nats.Name("two")) + defer nc.Close() nc.SubscribeSync("baz") nc.SubscribeSync("foo.bar.*") nc.SubscribeSync(fmt.Sprintf("id.%d", i+1)) js.AddStream(&nats.StreamConfig{Name: fmt.Sprintf("TEST:%d", i+1)}) - - defer nc.Close() } type czapi struct { @@ -4792,6 +4794,7 @@ func TestNoRaceJetStreamOrderedConsumerLongRTTPerformance(t *testing.T) { nc, err = nats.Connect(proxy.clientURL()) require_NoError(t, err) + defer nc.Close() js, err = nc.JetStream() require_NoError(t, err) @@ -5254,8 +5257,8 @@ func TestNoRaceJetStreamClusterDirectAccessAllPeersSubs(t *testing.T) { go func() { defer wg.Done() nc, _ := jsClientConnect(t, c.randomServer()) - js, _ := nc.JetStream(nats.MaxWait(500 * time.Millisecond)) defer nc.Close() + js, _ := nc.JetStream(nats.MaxWait(500 * time.Millisecond)) for { select { case <-qch: diff --git a/server/reload_test.go b/server/reload_test.go index 48a7d2e8..fdb28916 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -4186,6 +4186,7 @@ func TestLoggingReload(t *testing.T) { if err != nil { t.Fatalf("Error creating client: %v\n", err) } + defer nc.Close() msgs := make(chan *nats.Msg, 1) defer close(msgs) diff --git a/server/routes_test.go b/server/routes_test.go index 3c2beab9..c5465b5b 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -794,6 +794,7 @@ func TestServerPoolUpdatedWhenRouteGoesAway(t *testing.T) { if err != nil { t.Fatalf("Error on connect") } + defer nc.Close() s2Opts := DefaultOptions() s2Opts.ServerName = "B" @@ -1512,12 +1513,14 @@ func testTLSRoutesCertificateImplicitAllow(t *testing.T, pass bool) { func TestSubjectRenameViaJetStreamAck(t *testing.T) { s := RunRandClientPortServer() + defer s.Shutdown() errChan := make(chan error) defer close(errChan) ncPub := natsConnect(t, s.ClientURL(), nats.UserInfo("client", "pwd"), nats.ErrorHandler(func(conn *nats.Conn, s *nats.Subscription, err error) { errChan <- err })) + defer ncPub.Close() require_NoError(t, ncPub.PublishRequest("SVC.ALLOWED", "$JS.ACK.whatever@ADMIN", nil)) select { case err := <-errChan: diff --git a/server/stream.go b/server/stream.go index 20c47ec1..ac4b25a1 100644 --- a/server/stream.go +++ b/server/stream.go @@ -2278,7 +2278,9 @@ func (mset *stream) setupMirrorConsumer() error { mirror.qch = make(chan struct{}) mirror.wg.Add(1) ready.Add(1) - mset.srv.startGoRoutine(func() { mset.processMirrorMsgs(mirror, &ready) }) + if !mset.srv.startGoRoutine(func() { mset.processMirrorMsgs(mirror, &ready) }) { + ready.Done() + } } mset.mu.Unlock() ready.Wait() @@ -2554,7 +2556,9 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.T si.qch = make(chan struct{}) si.wg.Add(1) ready.Add(1) - mset.srv.startGoRoutine(func() { mset.processSourceMsgs(si, &ready) }) + if !mset.srv.startGoRoutine(func() { mset.processSourceMsgs(si, &ready) }) { + ready.Done() + } } } mset.mu.Unlock() diff --git a/test/norace_test.go b/test/norace_test.go index 07b17fec..5e677911 100644 --- a/test/norace_test.go +++ b/test/norace_test.go @@ -451,8 +451,10 @@ func TestNoRaceClusterLeaksSubscriptions(t *testing.T) { // Create 100 repliers for i := 0; i < 50; i++ { nc1, _ := nats.Connect(urlA) + defer nc1.Close() nc1.SetErrorHandler(noOpErrHandler) nc2, _ := nats.Connect(urlB) + defer nc2.Close() nc2.SetErrorHandler(noOpErrHandler) repliers = append(repliers, nc1, nc2) nc1.Subscribe("test.reply", func(m *nats.Msg) { diff --git a/test/ocsp_test.go b/test/ocsp_test.go index cb77733c..c08cabc8 100644 --- a/test/ocsp_test.go +++ b/test/ocsp_test.go @@ -102,6 +102,7 @@ func TestOCSPAlwaysMustStapleAndShutdown(t *testing.T) { if err != nil { t.Fatal(err) } + defer nc.Close() sub, err := nc.SubscribeSync("foo") if err != nil { t.Fatal(err) @@ -199,6 +200,7 @@ func TestOCSPMustStapleShutdown(t *testing.T) { if err != nil { t.Fatal(err) } + defer nc.Close() sub, err := nc.SubscribeSync("foo") if err != nil { t.Fatal(err) @@ -616,6 +618,7 @@ func TestOCSPReloadRotateTLSCertWithNoURL(t *testing.T) { if err != nil { t.Fatal(err) } + defer nc.Close() sub, err := nc.SubscribeSync("foo") if err != nil { t.Fatal(err) @@ -727,6 +730,7 @@ func TestOCSPReloadRotateTLSCertDisableMustStaple(t *testing.T) { if err != nil { t.Fatal(err) } + defer nc.Close() sub, err := nc.SubscribeSync("foo") if err != nil { t.Fatal(err) @@ -914,6 +918,7 @@ func TestOCSPReloadRotateTLSCertEnableMustStaple(t *testing.T) { if err != nil { t.Fatal(err) } + defer nc.Close() sub, err := nc.SubscribeSync("foo") if err != nil { t.Fatal(err) @@ -2214,6 +2219,7 @@ func TestOCSPCustomConfigReloadDisable(t *testing.T) { if err != nil { t.Fatal(err) } + defer nc.Close() sub, err := nc.SubscribeSync("foo") if err != nil { t.Fatal(err) @@ -2316,6 +2322,7 @@ func TestOCSPCustomConfigReloadEnable(t *testing.T) { if err != nil { t.Fatal(err) } + defer nc.Close() sub, err := nc.SubscribeSync("foo") if err != nil { t.Fatal(err) diff --git a/test/service_latency_test.go b/test/service_latency_test.go index 6da77acd..7c7e8715 100644 --- a/test/service_latency_test.go +++ b/test/service_latency_test.go @@ -584,6 +584,7 @@ func TestServiceLatencyNoSubsLeak(t *testing.T) { for i := 0; i < 100; i++ { nc := clientConnect(t, sc.clusters[1].opts[1], "bar") + defer nc.Close() if _, err := nc.Request("ngs.usage", []byte("1h"), time.Second); err != nil { t.Fatalf("Error on request: %v", err) }