From 8de83bc2ef36526b2df298aba9ac864ee5b70ae2 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Mon, 4 Sep 2023 16:10:25 +0100 Subject: [PATCH] Use `TempDir` in more tests Signed-off-by: Neil Twigg --- server/jetstream_test.go | 208 +++++++++++++-------------------------- server/mqtt_test.go | 124 ++++++++++++++--------- server/norace_test.go | 16 +-- server/routes_test.go | 2 +- server/test_test.go | 3 +- 5 files changed, 160 insertions(+), 193 deletions(-) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 01b96407..f7253a31 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -50,10 +50,7 @@ import ( ) func TestJetStreamBasicNilConfig(t *testing.T) { - s := RunRandClientPortServer() - if config := s.JetStreamConfig(); config != nil { - defer removeDir(t, config.StoreDir) - } + s := RunRandClientPortServer(t) defer s.Shutdown() if err := s.EnableJetStream(nil); err != nil { @@ -301,18 +298,14 @@ func TestJetStreamAddStreamDiscardNew(t *testing.T) { } func TestJetStreamAutoTuneFSConfig(t *testing.T) { - s := RunRandClientPortServer() + s := RunRandClientPortServer(t) defer s.Shutdown() - jsconfig := &JetStreamConfig{MaxMemory: -1, MaxStore: 128 * 1024 * 1024} + jsconfig := &JetStreamConfig{MaxMemory: -1, MaxStore: 128 * 1024 * 1024, StoreDir: t.TempDir()} if err := s.EnableJetStream(jsconfig); err != nil { t.Fatalf("Expected no error, got %v", err) } - if config := s.JetStreamConfig(); config != nil { - defer removeDir(t, config.StoreDir) - } - maxMsgSize := int32(512) streamConfig := func(name string, maxMsgs, maxBytes int64) *StreamConfig { t.Helper() @@ -2974,10 +2967,6 @@ func TestJetStreamConsumerAckAck(t *testing.T) { s := RunBasicJetStreamServer(t) defer s.Shutdown() - if config := s.JetStreamConfig(); config != nil { - defer removeDir(t, config.StoreDir) - } - mname := "ACK-ACK" mset, err := s.GlobalAccount().addStream(&StreamConfig{Name: mname, Storage: MemoryStorage}) if err != nil { @@ -4023,10 +4012,6 @@ func TestJetStreamSnapshotsAPI(t *testing.T) { checkLeafNodeConnected(t, s) - if config := s.JetStreamConfig(); config != nil { - defer removeDir(t, config.StoreDir) - } - mname := "MY-STREAM" subjects := []string{"foo", "bar", "baz"} cfg := StreamConfig{ @@ -6353,10 +6338,7 @@ func TestJetStreamConsumerReplayQuit(t *testing.T) { } func TestJetStreamSystemLimits(t *testing.T) { - s := RunRandClientPortServer() - if config := s.JetStreamConfig(); config != nil { - defer removeDir(t, config.StoreDir) - } + s := RunRandClientPortServer(t) defer s.Shutdown() if _, _, err := s.JetStreamReservedResources(); err == nil { @@ -6367,7 +6349,7 @@ func TestJetStreamSystemLimits(t *testing.T) { bacc, _ := s.LookupOrRegisterAccount("BAR") zacc, _ := s.LookupOrRegisterAccount("BAZ") - jsconfig := &JetStreamConfig{MaxMemory: 1024, MaxStore: 8192} + jsconfig := &JetStreamConfig{MaxMemory: 1024, MaxStore: 8192, StoreDir: t.TempDir()} if err := s.EnableJetStream(jsconfig); err != nil { t.Fatalf("Expected no error, got %v", err) } @@ -8893,9 +8875,10 @@ func TestJetStreamCanNotEnableOnSystemAccount(t *testing.T) { } func TestJetStreamMultipleAccountsBasics(t *testing.T) { - conf := createConfFile(t, []byte(` + tdir := t.TempDir() + conf := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 - jetstream: {max_mem_store: 64GB, max_file_store: 10TB} + jetstream: {max_mem_store: 64GB, max_file_store: 10TB, store_dir: %q} accounts: { A: { jetstream: enabled @@ -8909,12 +8892,9 @@ func TestJetStreamMultipleAccountsBasics(t *testing.T) { users: [ {user: uc, password: pwd} ] }, } - `)) + `, tdir))) s, opts := RunServerWithConfig(conf) - if config := s.JetStreamConfig(); config != nil { - defer removeDir(t, config.StoreDir) - } defer s.Shutdown() if !s.JetStreamEnabled() { @@ -8972,9 +8952,9 @@ func TestJetStreamMultipleAccountsBasics(t *testing.T) { expectNotEnabled(ncc.Request(JSApiAccountInfo, nil, 250*time.Millisecond)) // Now do simple reload and check that we do the right thing. Testing enable and disable and also change in limits - newConf := []byte(` + newConf := []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 - jetstream: {max_mem_store: 64GB, max_file_store: 10TB} + jetstream: {max_mem_store: 64GB, max_file_store: 10TB, store_dir: %q} accounts: { A: { jetstream: disabled @@ -8989,7 +8969,7 @@ func TestJetStreamMultipleAccountsBasics(t *testing.T) { users: [ {user: uc, password: pwd} ] }, } - `) + `, tdir)) if err := os.WriteFile(conf, newConf, 0600); err != nil { t.Fatalf("Error rewriting server's config file: %v", err) } @@ -9057,10 +9037,10 @@ func TestJetStreamMultipleAccountsBasics(t *testing.T) { } func TestJetStreamServerResourcesConfig(t *testing.T) { - conf := createConfFile(t, []byte(` + conf := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 - jetstream: {max_mem_store: 2GB, max_file_store: 1TB} - `)) + jetstream: {max_mem_store: 2GB, max_file_store: 1TB, store_dir: %q} + `, t.TempDir()))) s, _ := RunServerWithConfig(conf) defer s.Shutdown() @@ -9069,10 +9049,6 @@ func TestJetStreamServerResourcesConfig(t *testing.T) { t.Fatalf("Expected JetStream to be enabled") } - if config := s.JetStreamConfig(); config != nil { - defer removeDir(t, config.StoreDir) - } - gb := int64(1024 * 1024 * 1024) jsc := s.JetStreamConfig() if jsc.MaxMemory != 2*gb { @@ -10322,10 +10298,8 @@ func TestJetStreamDeliveryAfterServerRestart(t *testing.T) { opts := DefaultTestOptions opts.Port = -1 opts.JetStream = true + opts.StoreDir = t.TempDir() s := RunServer(&opts) - if config := s.JetStreamConfig(); config != nil { - defer removeDir(t, config.StoreDir) - } defer s.Shutdown() mset, err := s.GlobalAccount().addStream(&StreamConfig{ @@ -10424,10 +10398,10 @@ func TestJetStreamDeliveryAfterServerRestart(t *testing.T) { // This is for the basics of importing the ability to send to a stream and consume // from a consumer that is pull based on push based on a well known delivery subject. func TestJetStreamAccountImportBasics(t *testing.T) { - conf := createConfFile(t, []byte(` + conf := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 no_auth_user: rip - jetstream: {max_mem_store: 64GB, max_file_store: 10TB} + jetstream: {max_mem_store: 64GB, max_file_store: 10TB, store_dir: %q} accounts: { JS: { jetstream: enabled @@ -10453,12 +10427,9 @@ func TestJetStreamAccountImportBasics(t *testing.T) { ] }, } - `)) + `, t.TempDir()))) s, _ := RunServerWithConfig(conf) - if config := s.JetStreamConfig(); config != nil { - defer removeDir(t, config.StoreDir) - } defer s.Shutdown() acc, err := s.LookupAccount("JS") @@ -10568,10 +10539,10 @@ func TestJetStreamAccountImportBasics(t *testing.T) { // for each account in aggregate account config. // This test fails as it is not receiving the api audit event ($JS.EVENT.ADVISORY.API). func TestJetStreamAccountImportJSAdvisoriesAsService(t *testing.T) { - conf := createConfFile(t, []byte(` + conf := createConfFile(t, []byte(fmt.Sprintf(` listen=127.0.0.1:-1 no_auth_user: pp - jetstream: {max_mem_store: 64GB, max_file_store: 10TB} + jetstream: {max_mem_store: 64GB, max_file_store: 10TB, store_dir: %q} accounts { JS { jetstream: enabled @@ -10587,12 +10558,9 @@ func TestJetStreamAccountImportJSAdvisoriesAsService(t *testing.T) { ] } } - `)) + `, t.TempDir()))) s, _ := RunServerWithConfig(conf) - if config := s.JetStreamConfig(); config != nil { - defer removeDir(t, config.StoreDir) - } defer s.Shutdown() // This should be the pp user, one which manages JetStream assets @@ -10675,10 +10643,10 @@ func TestJetStreamAccountImportJSAdvisoriesAsService(t *testing.T) { // as long as there is a separate stream import entry for each account // in aggregate account config. func TestJetStreamAccountImportJSAdvisoriesAsStream(t *testing.T) { - conf := createConfFile(t, []byte(` + conf := createConfFile(t, []byte(fmt.Sprintf(` listen=127.0.0.1:-1 no_auth_user: pp - jetstream: {max_mem_store: 64GB, max_file_store: 10TB} + jetstream: {max_mem_store: 64GB, max_file_store: 10TB, store_dir: %q} accounts { JS { jetstream: enabled @@ -10694,12 +10662,9 @@ func TestJetStreamAccountImportJSAdvisoriesAsStream(t *testing.T) { ] } } - `)) + `, t.TempDir()))) s, _ := RunServerWithConfig(conf) - if config := s.JetStreamConfig(); config != nil { - defer removeDir(t, config.StoreDir) - } defer s.Shutdown() // This should be the pp user, one which manages JetStream assets @@ -10779,10 +10744,10 @@ func TestJetStreamAccountImportJSAdvisoriesAsStream(t *testing.T) { // This is for importing all of JetStream into another account for admin purposes. func TestJetStreamAccountImportAll(t *testing.T) { - conf := createConfFile(t, []byte(` + conf := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 no_auth_user: rip - jetstream: {max_mem_store: 64GB, max_file_store: 10TB} + jetstream: {max_mem_store: 64GB, max_file_store: 10TB, store_dir: %q} accounts: { JS: { jetstream: enabled @@ -10794,12 +10759,9 @@ func TestJetStreamAccountImportAll(t *testing.T) { imports [ { service: { subject: "$JS.API.>", account: JS }, to: "jsapi.>"} ] }, } - `)) + `, t.TempDir()))) s, _ := RunServerWithConfig(conf) - if config := s.JetStreamConfig(); config != nil { - defer removeDir(t, config.StoreDir) - } defer s.Shutdown() acc, err := s.LookupAccount("JS") @@ -10845,9 +10807,9 @@ func TestJetStreamAccountImportAll(t *testing.T) { // https://github.com/nats-io/nats-server/issues/1736 func TestJetStreamServerReload(t *testing.T) { - conf := createConfFile(t, []byte(` + conf := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 - jetstream: {max_mem_store: 64GB, max_file_store: 10TB } + jetstream: {max_mem_store: 64GB, max_file_store: 10TB, store_dir: %q } accounts: { A: { users: [ {user: ua, password: pwd} ] }, B: { @@ -10858,12 +10820,9 @@ func TestJetStreamServerReload(t *testing.T) { } no_auth_user: ub system_account: SYS - `)) + `, t.TempDir()))) s, _ := RunServerWithConfig(conf) - if config := s.JetStreamConfig(); config != nil { - defer removeDir(t, config.StoreDir) - } defer s.Shutdown() if !s.JetStreamEnabled() { @@ -10920,6 +10879,7 @@ func TestJetStreamServerReload(t *testing.T) { } func TestJetStreamConfigReloadWithGlobalAccount(t *testing.T) { + tdir := t.TempDir() template := ` listen: 127.0.0.1:-1 authorization { @@ -10929,14 +10889,13 @@ func TestJetStreamConfigReloadWithGlobalAccount(t *testing.T) { ] } no_auth_user: anonymous - jetstream: enabled + jetstream { + store_dir = %q + } ` - conf := createConfFile(t, []byte(fmt.Sprintf(template, "pwd"))) + conf := createConfFile(t, []byte(fmt.Sprintf(template, "pwd", tdir))) s, _ := RunServerWithConfig(conf) - if config := s.JetStreamConfig(); config != nil { - defer removeDir(t, config.StoreDir) - } defer s.Shutdown() // Client for API requests. @@ -10968,7 +10927,7 @@ func TestJetStreamConfigReloadWithGlobalAccount(t *testing.T) { t.Fatalf("Expected %d msgs after restart, got %d", toSend, si.State.Msgs) } - if err := os.WriteFile(conf, []byte(fmt.Sprintf(template, "pwd2")), 0666); err != nil { + if err := os.WriteFile(conf, []byte(fmt.Sprintf(template, "pwd2", tdir)), 0666); err != nil { t.Fatalf("Error writing config: %v", err) } @@ -11975,10 +11934,10 @@ func TestJetStreamServerDomainBadConfig(t *testing.T) { } func TestJetStreamServerDomainConfig(t *testing.T) { - conf := createConfFile(t, []byte(` + conf := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 - jetstream: {domain: "HUB"} - `)) + jetstream: {domain: "HUB", store_dir: %q} + `, t.TempDir()))) s, _ := RunServerWithConfig(conf) defer s.Shutdown() @@ -11988,9 +11947,6 @@ func TestJetStreamServerDomainConfig(t *testing.T) { } config := s.JetStreamConfig() - if config != nil { - defer removeDir(t, config.StoreDir) - } if config.Domain != "HUB" { t.Fatalf("Expected %q as domain name, got %q", "HUB", config.Domain) } @@ -12016,16 +11972,12 @@ func TestJetStreamServerDomainConfigButDisabled(t *testing.T) { } func TestJetStreamDomainInPubAck(t *testing.T) { - conf := createConfFile(t, []byte(` + conf := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 - jetstream: {domain: "HUB"} - `)) + jetstream: {domain: "HUB", store_dir: %q} + `, t.TempDir()))) s, _ := RunServerWithConfig(conf) - config := s.JetStreamConfig() - if config != nil { - defer removeDir(t, config.StoreDir) - } defer s.Shutdown() nc, js := jsClientConnect(t, s) @@ -13789,9 +13741,9 @@ func TestJetStreamRecoverBadMirrorConfigWithSubjects(t *testing.T) { } func TestJetStreamCrossAccountsDeliverSubjectInterest(t *testing.T) { - conf := createConfFile(t, []byte(` + conf := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 - jetstream: {max_mem_store: 4GB, max_file_store: 1TB} + jetstream: {max_mem_store: 4GB, max_file_store: 1TB, store_dir: %q} accounts: { A: { jetstream: enabled @@ -13807,12 +13759,9 @@ func TestJetStreamCrossAccountsDeliverSubjectInterest(t *testing.T) { ] }, } - `)) + `, t.TempDir()))) s, _ := RunServerWithConfig(conf) - if config := s.JetStreamConfig(); config != nil { - defer removeDir(t, config.StoreDir) - } defer s.Shutdown() nc, js := jsClientConnect(t, s, nats.UserInfo("a", "pwd")) @@ -14055,9 +14004,9 @@ func TestJetStreamEphemeralPullConsumersInactiveThresholdAndNoWait(t *testing.T) } func TestJetStreamPullConsumerCrossAccountExpires(t *testing.T) { - conf := createConfFile(t, []byte(` + conf := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 - jetstream: {max_mem_store: 4GB, max_file_store: 1TB} + jetstream: {max_mem_store: 4GB, max_file_store: 1TB, store_dir: %q} accounts: { JS: { jetstream: enabled @@ -14075,12 +14024,9 @@ func TestJetStreamPullConsumerCrossAccountExpires(t *testing.T) { imports [ { service: { subject: "$JS.API.CONSUMER.MSG.NEXT.*.*", account: IU } } ] }, } - `)) + `, t.TempDir()))) s, _ := RunServerWithConfig(conf) - if config := s.JetStreamConfig(); config != nil { - defer removeDir(t, config.StoreDir) - } defer s.Shutdown() // Connect to JS account and create stream, put some messages into it. @@ -14261,9 +14207,9 @@ func TestJetStreamPullConsumerCrossAccountExpires(t *testing.T) { } func TestJetStreamPullConsumerCrossAccountExpiresNoDataRace(t *testing.T) { - conf := createConfFile(t, []byte(` + conf := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 - jetstream: {max_mem_store: 4GB, max_file_store: 1TB} + jetstream: {max_mem_store: 4GB, max_file_store: 1TB, store_dir: %q} accounts: { JS: { jetstream: enabled @@ -14276,13 +14222,10 @@ func TestJetStreamPullConsumerCrossAccountExpiresNoDataRace(t *testing.T) { imports [ { service: { subject: "$JS.API.CONSUMER.MSG.NEXT.*.*", account: JS } }] }, } - `)) + `, t.TempDir()))) test := func() { s, _ := RunServerWithConfig(conf) - if config := s.JetStreamConfig(); config != nil { - defer removeDir(t, config.StoreDir) - } defer s.Shutdown() // Connect to JS account and create stream, put some messages into it. @@ -14335,10 +14278,10 @@ func TestJetStreamPullConsumerCrossAccountExpiresNoDataRace(t *testing.T) { // This tests account export/import replies across a LN connection with account import/export // on both sides of the LN. func TestJetStreamPullConsumerCrossAccountsAndLeafNodes(t *testing.T) { - conf := createConfFile(t, []byte(` + conf := createConfFile(t, []byte(fmt.Sprintf(` server_name: SJS listen: 127.0.0.1:-1 - jetstream: {max_mem_store: 4GB, max_file_store: 1TB, domain: JSD } + jetstream: {max_mem_store: 4GB, max_file_store: 1TB, domain: JSD, store_dir: %q } accounts: { JS: { jetstream: enabled @@ -14351,12 +14294,9 @@ func TestJetStreamPullConsumerCrossAccountsAndLeafNodes(t *testing.T) { }, } leaf { listen: "127.0.0.1:-1" } - `)) + `, t.TempDir()))) s, o := RunServerWithConfig(conf) - if config := s.JetStreamConfig(); config != nil { - defer removeDir(t, config.StoreDir) - } defer s.Shutdown() conf2 := createConfFile(t, []byte(fmt.Sprintf(` @@ -15984,9 +15924,11 @@ func TestJetStreamBackOffCheckPending(t *testing.T) { } func TestJetStreamCrossAccounts(t *testing.T) { - conf := createConfFile(t, []byte(` + conf := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 - jetstream: enabled + jetstream { + store_dir = %q + } accounts: { A: { users: [ {user: a, password: a} ] @@ -16005,11 +15947,8 @@ func TestJetStreamCrossAccounts(t *testing.T) { {stream: {subject: 'accI.>', account: A}} ] } - }`)) + }`, t.TempDir()))) s, _ := RunServerWithConfig(conf) - if config := s.JetStreamConfig(); config != nil { - defer removeDir(t, config.StoreDir) - } defer s.Shutdown() watchNext := func(w nats.KeyWatcher) nats.KeyValueEntry { @@ -16525,9 +16464,9 @@ func TestJetStreamRecoverSealedAfterServerRestart(t *testing.T) { } func TestJetStreamImportConsumerStreamSubjectRemapSingle(t *testing.T) { - conf := createConfFile(t, []byte(` + conf := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 - jetstream: {max_mem_store: 4GB, max_file_store: 1TB} + jetstream: {max_mem_store: 4GB, max_file_store: 1TB, store_dir: %q} accounts: { JS: { jetstream: enabled @@ -16551,13 +16490,10 @@ func TestJetStreamImportConsumerStreamSubjectRemapSingle(t *testing.T) { ] }, } - `)) + `, t.TempDir()))) test := func(t *testing.T, queue bool) { s, _ := RunServerWithConfig(conf) - if config := s.JetStreamConfig(); config != nil { - defer removeDir(t, config.StoreDir) - } defer s.Shutdown() nc, js := jsClientConnect(t, s, nats.UserInfo("js", "pwd")) @@ -18261,9 +18197,9 @@ func TestJetStreamMirrorFirstSeqNotSupported(t *testing.T) { } func TestJetStreamDirectGetBySubject(t *testing.T) { - conf := createConfFile(t, []byte(` + conf := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 - jetstream: {max_mem_store: 64GB, max_file_store: 10TB} + jetstream: {max_mem_store: 64GB, max_file_store: 10TB, store_dir: %q} ONLYME = { publish = { allow = "$JS.API.DIRECT.GET.KV.vid.22.>"} @@ -18278,12 +18214,9 @@ func TestJetStreamDirectGetBySubject(t *testing.T) { ] }, } - `)) + `, t.TempDir()))) s, _ := RunServerWithConfig(conf) - if config := s.JetStreamConfig(); config != nil { - defer removeDir(t, config.StoreDir) - } defer s.Shutdown() nc, js := jsClientConnect(t, s, nats.UserInfo("admin", "s3cr3t")) @@ -18707,9 +18640,9 @@ func TestJetStreamConsumerEOFBugNewFileStore(t *testing.T) { } func TestJetStreamSubjectBasedFilteredConsumers(t *testing.T) { - conf := createConfFile(t, []byte(` + conf := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 - jetstream: {max_mem_store: 64GB, max_file_store: 10TB} + jetstream: {max_mem_store: 64GB, max_file_store: 10TB, store_dir: %q} accounts: { A: { jetstream: enabled @@ -18731,12 +18664,9 @@ func TestJetStreamSubjectBasedFilteredConsumers(t *testing.T) { } ] }, } - `)) + `, t.TempDir()))) s, _ := RunServerWithConfig(conf) - if config := s.JetStreamConfig(); config != nil { - defer removeDir(t, config.StoreDir) - } defer s.Shutdown() nc, js := jsClientConnect(t, s, nats.UserInfo("u", "p"), nats.ErrorHandler(noOpErrHandler)) diff --git a/server/mqtt_test.go b/server/mqtt_test.go index 87ab3c92..b13183cf 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -407,8 +407,10 @@ func TestMQTTStandaloneRequiresJetStream(t *testing.T) { } func TestMQTTConfig(t *testing.T) { - conf := createConfFile(t, []byte(` - jetstream: enabled + conf := createConfFile(t, []byte(fmt.Sprintf(` + jetstream { + store_dir = %q + } server_name: mqtt mqtt { port: -1 @@ -417,7 +419,7 @@ func TestMQTTConfig(t *testing.T) { key_file: "./configs/certs/key.pem" } } - `)) + `, t.TempDir()))) s, o := RunServerWithConfig(conf) defer testMQTTShutdownServer(s) if o.MQTT.TLSConfig == nil { @@ -3466,11 +3468,11 @@ func TestMQTTLeafnodeWithoutJSToClusterWithJSNoSharedSysAcc(t *testing.T) { } func TestMQTTImportExport(t *testing.T) { - conf := createConfFile(t, []byte(` + conf := createConfFile(t, []byte(fmt.Sprintf(` listen: "127.0.0.1:-1" server_name: "mqtt" jetstream { - store_dir=org_dir + store_dir = %q } accounts { org { @@ -3487,9 +3489,8 @@ func TestMQTTImportExport(t *testing.T) { listen: "127.0.0.1:-1" } no_auth_user: device - `)) + `, t.TempDir()))) defer os.Remove(conf) - defer os.RemoveAll("org_dir") s, o := RunServerWithConfig(conf) defer s.Shutdown() @@ -3912,7 +3913,9 @@ func TestMQTTWillRetain(t *testing.T) { func TestMQTTWillRetainPermViolation(t *testing.T) { template := ` port: -1 - jetstream: enabled + jetstream { + store_dir = %q + } server_name: mqtt authorization { mqtt_perms = { @@ -3927,7 +3930,8 @@ func TestMQTTWillRetainPermViolation(t *testing.T) { port: -1 } ` - conf := createConfFile(t, []byte(fmt.Sprintf(template, "foo"))) + tdir := t.TempDir() + conf := createConfFile(t, []byte(fmt.Sprintf(template, tdir, "foo"))) s, o := RunServerWithConfig(conf) defer testMQTTShutdownServer(s) @@ -4006,7 +4010,7 @@ func TestMQTTWillRetainPermViolation(t *testing.T) { // Now remove permission to publish on "foo" and check that a new subscription // on "foo" is now not getting the will message because the original user no // longer has permission to do so. - reloadUpdateConfig(t, s, conf, fmt.Sprintf(template, "baz")) + reloadUpdateConfig(t, s, conf, fmt.Sprintf(template, tdir, "baz")) mcs, rs = testMQTTConnect(t, ci, o.MQTT.Host, o.MQTT.Port) defer mcs.Close() @@ -5575,11 +5579,13 @@ func TestMQTTStreamInfoReturnsNonEmptySubject(t *testing.T) { } func TestMQTTWebsocketToMQTTPort(t *testing.T) { - conf := createConfFile(t, []byte(` + conf := createConfFile(t, []byte(fmt.Sprintf(` listen: "127.0.0.1:-1" http: "127.0.0.1:-1" server_name: "mqtt" - jetstream: enabled + jetstream { + store_dir = %q + } mqtt { listen: "127.0.0.1:-1" } @@ -5587,7 +5593,7 @@ func TestMQTTWebsocketToMQTTPort(t *testing.T) { listen: "127.0.0.1:-1" no_tls: true } - `)) + `, t.TempDir()))) s, o := RunServerWithConfig(conf) defer testMQTTShutdownServer(s) @@ -5609,11 +5615,14 @@ func TestMQTTWebsocketToMQTTPort(t *testing.T) { } func TestMQTTWebsocket(t *testing.T) { + tdir := t.TempDir() template := ` listen: "127.0.0.1:-1" http: "127.0.0.1:-1" server_name: "mqtt" - jetstream: enabled + jetstream { + store_dir = %q + } accounts { MQTT { jetstream: enabled @@ -5630,7 +5639,7 @@ func TestMQTTWebsocket(t *testing.T) { no_tls: true } ` - s, o, conf := runReloadServerWithContent(t, []byte(fmt.Sprintf(template, jwt.ConnectionTypeMqtt, ""))) + s, o, conf := runReloadServerWithContent(t, []byte(fmt.Sprintf(template, tdir, jwt.ConnectionTypeMqtt, ""))) defer testMQTTShutdownServer(s) cisub := &mqttConnInfo{clientID: "sub", user: "mqtt", pass: "pwd", ws: true} @@ -5640,7 +5649,7 @@ func TestMQTTWebsocket(t *testing.T) { c.Close() ws := fmt.Sprintf(`, "%s"`, jwt.ConnectionTypeMqttWS) - reloadUpdateConfig(t, s, conf, fmt.Sprintf(template, jwt.ConnectionTypeMqtt, ws)) + reloadUpdateConfig(t, s, conf, fmt.Sprintf(template, tdir, jwt.ConnectionTypeMqtt, ws)) cisub = &mqttConnInfo{clientID: "sub", user: "mqtt", pass: "pwd", ws: true} c, r = testMQTTConnect(t, cisub, o.Websocket.Host, o.Websocket.Port) @@ -5680,11 +5689,13 @@ func (cwc *chunkWriteConn) Write(p []byte) (int, error) { } func TestMQTTPartial(t *testing.T) { - conf := createConfFile(t, []byte(` + conf := createConfFile(t, []byte(fmt.Sprintf(` listen: "127.0.0.1:-1" http: "127.0.0.1:-1" server_name: "mqtt" - jetstream: enabled + jetstream { + store_dir = %q + } mqtt { listen: "127.0.0.1:-1" } @@ -5692,7 +5703,7 @@ func TestMQTTPartial(t *testing.T) { listen: "127.0.0.1:-1" no_tls: true } - `)) + `, t.TempDir()))) s, o := RunServerWithConfig(conf) defer testMQTTShutdownServer(s) @@ -5731,11 +5742,13 @@ func TestMQTTPartial(t *testing.T) { } func TestMQTTWebsocketTLS(t *testing.T) { - conf := createConfFile(t, []byte(` + conf := createConfFile(t, []byte(fmt.Sprintf(` listen: "127.0.0.1:-1" http: "127.0.0.1:-1" server_name: "mqtt" - jetstream: enabled + jetstream { + store_dir = %q + } mqtt { listen: "127.0.0.1:-1" } @@ -5747,7 +5760,7 @@ func TestMQTTWebsocketTLS(t *testing.T) { ca_file: '../test/configs/certs/ca.pem' } } - `)) + `, t.TempDir()))) s, o := RunServerWithConfig(conf) defer testMQTTShutdownServer(s) @@ -5877,11 +5890,13 @@ func TestMQTTTransferSessionStreamsToMuxed(t *testing.T) { } func TestMQTTConnectAndDisconnectEvent(t *testing.T) { - conf := createConfFile(t, []byte(` + conf := createConfFile(t, []byte(fmt.Sprintf(` listen: "127.0.0.1:-1" http: "127.0.0.1:-1" server_name: "mqtt" - jetstream: enabled + jetstream { + store_dir = %q + } accounts { MQTT { jetstream: enabled @@ -5895,7 +5910,7 @@ func TestMQTTConnectAndDisconnectEvent(t *testing.T) { listen: "127.0.0.1:-1" } system_account: "SYS" - `)) + `, t.TempDir()))) defer os.Remove(conf) s, o := RunServerWithConfig(conf) defer testMQTTShutdownServer(s) @@ -6169,15 +6184,18 @@ func TestMQTTStreamReplicasOverride(t *testing.T) { } func TestMQTTStreamReplicasConfigReload(t *testing.T) { + tdir := t.TempDir() tmpl := ` - jetstream: enabled + jetstream { + store_dir = %q + } server_name: mqtt mqtt { port: -1 stream_replicas: %v } ` - conf := createConfFile(t, []byte(fmt.Sprintf(tmpl, 3))) + conf := createConfFile(t, []byte(fmt.Sprintf(tmpl, tdir, 3))) s, o := RunServerWithConfig(conf) defer testMQTTShutdownServer(s) @@ -6198,7 +6216,7 @@ func TestMQTTStreamReplicasConfigReload(t *testing.T) { t.Fatalf("Did not get the error regarding replicas count") } - reloadUpdateConfig(t, s, conf, fmt.Sprintf(tmpl, 1)) + reloadUpdateConfig(t, s, conf, fmt.Sprintf(tmpl, tdir, 1)) mc, r := testMQTTConnect(t, &mqttConnInfo{clientID: "mqtt", cleanSess: false}, o.MQTT.Host, o.MQTT.Port) defer mc.Close() @@ -6347,15 +6365,18 @@ func TestMQTTConsumerReplicasOverride(t *testing.T) { } func TestMQTTConsumerMemStorageReload(t *testing.T) { + tdir := t.TempDir() tmpl := ` - jetstream: enabled + jetstream { + store_dir = %q + } server_name: mqtt mqtt { port: -1 consumer_memory_storage: %s } ` - conf := createConfFile(t, []byte(fmt.Sprintf(tmpl, "false"))) + conf := createConfFile(t, []byte(fmt.Sprintf(tmpl, tdir, "false"))) s, o := RunServerWithConfig(conf) defer testMQTTShutdownServer(s) @@ -6366,7 +6387,7 @@ func TestMQTTConsumerMemStorageReload(t *testing.T) { defer c.Close() testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false) - reloadUpdateConfig(t, s, conf, fmt.Sprintf(tmpl, "true")) + reloadUpdateConfig(t, s, conf, fmt.Sprintf(tmpl, tdir, "true")) testMQTTSub(t, 1, c, r, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{1}) @@ -6450,10 +6471,13 @@ func TestMQTTSessionNotDeletedOnDeleteConsumerError(t *testing.T) { // Test for auto-cleanup of consumers. func TestMQTTConsumerInactiveThreshold(t *testing.T) { + tdir := t.TempDir() tmpl := ` listen: 127.0.0.1:-1 server_name: mqtt - jetstream: enabled + jetstream { + store_dir = %q + } mqtt { listen: 127.0.0.1:-1 @@ -6463,7 +6487,7 @@ func TestMQTTConsumerInactiveThreshold(t *testing.T) { # For access to system account. accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } } ` - conf := createConfFile(t, []byte(fmt.Sprintf(tmpl, "0.2s"))) + conf := createConfFile(t, []byte(fmt.Sprintf(tmpl, tdir, "0.2s"))) s, o := RunServerWithConfig(conf) defer testMQTTShutdownServer(s) @@ -6489,17 +6513,19 @@ func TestMQTTConsumerInactiveThreshold(t *testing.T) { // Check reload. // We will not redo existing consumers however. - reloadUpdateConfig(t, s, conf, fmt.Sprintf(tmpl, "22s")) + reloadUpdateConfig(t, s, conf, fmt.Sprintf(tmpl, tdir, "22s")) if opts := s.getOpts(); opts.MQTT.ConsumerInactiveThreshold != 22*time.Second { t.Fatalf("Expected reloaded value of %v but got %v", 22*time.Second, opts.MQTT.ConsumerInactiveThreshold) } } func TestMQTTSubjectMapping(t *testing.T) { - conf := createConfFile(t, []byte(` + conf := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 server_name: mqtt - jetstream: enabled + jetstream { + store_dir = %q + } mappings = { foo0: bar0 @@ -6509,7 +6535,7 @@ func TestMQTTSubjectMapping(t *testing.T) { mqtt { listen: 127.0.0.1:-1 } - `)) + `, t.TempDir()))) s, o := RunServerWithConfig(conf) defer testMQTTShutdownServer(s) @@ -6599,10 +6625,12 @@ func TestMQTTSubjectMapping(t *testing.T) { } func TestMQTTSubjectMappingWithImportExport(t *testing.T) { - conf := createConfFile(t, []byte(` + conf := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 server_name: mqtt - jetstream: enabled + jetstream { + store_dir = %q + } accounts { A { @@ -6627,7 +6655,7 @@ func TestMQTTSubjectMappingWithImportExport(t *testing.T) { mqtt { listen: 127.0.0.1:-1 } - `)) + `, t.TempDir()))) s, o := RunServerWithConfig(conf) defer testMQTTShutdownServer(s) @@ -6743,14 +6771,16 @@ func TestMQTTSubjectMappingWithImportExport(t *testing.T) { // The MQTT Server MUST NOT match Topic Filters starting with a wildcard character (# or +), // with Topic Names beginning with a $ character [MQTT-4.7.2-1] func TestMQTTSubjectWildcardStart(t *testing.T) { - conf := createConfFile(t, []byte(` + conf := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 server_name: mqtt - jetstream: enabled + jetstream { + store_dir = %q + } mqtt { listen: 127.0.0.1:-1 } - `)) + `, t.TempDir()))) s, o := RunServerWithConfig(conf) defer testMQTTShutdownServer(s) @@ -6905,14 +6935,16 @@ func TestMQTTTopicWithDot(t *testing.T) { // Issue https://github.com/nats-io/nats-server/issues/4291 func TestMQTTJetStreamRepublishAndQoS0Subscribers(t *testing.T) { - conf := createConfFile(t, []byte(` + conf := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 server_name: mqtt - jetstream: enabled + jetstream { + store_dir = %q + } mqtt { listen: 127.0.0.1:-1 } - `)) + `, t.TempDir()))) s, o := RunServerWithConfig(conf) defer testMQTTShutdownServer(s) diff --git a/server/norace_test.go b/server/norace_test.go index 193b382a..6742528f 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -3138,10 +3138,12 @@ func TestNoRaceJetStreamFileStoreCompaction(t *testing.T) { } func TestNoRaceJetStreamEncryptionEnabledOnRestartWithExpire(t *testing.T) { - conf := createConfFile(t, []byte(` + conf := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 - jetstream: enabled - `)) + jetstream { + store_dir = %q + } + `, t.TempDir()))) s, _ := RunServerWithConfig(conf) defer s.Shutdown() @@ -4217,9 +4219,11 @@ func TestNoRaceJetStreamClusterHealthz(t *testing.T) { // Also test that we will fail at some point and the user can fall back to // an orderedconsumer like we do with watch for KV Keys() call. func TestNoRaceJetStreamStreamInfoSubjectDetailsLimits(t *testing.T) { - conf := createConfFile(t, []byte(` + conf := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 - jetstream: enabled + jetstream { + store_dir = %q + } accounts: { default: { jetstream: true @@ -4227,7 +4231,7 @@ func TestNoRaceJetStreamStreamInfoSubjectDetailsLimits(t *testing.T) { limits { max_payload: 256 } } } - `)) + `, t.TempDir()))) s, _ := RunServerWithConfig(conf) if config := s.JetStreamConfig(); config != nil { diff --git a/server/routes_test.go b/server/routes_test.go index 1dac25f3..8385a6ad 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -1592,7 +1592,7 @@ func testTLSRoutesCertificateImplicitAllow(t *testing.T, pass bool) { } func TestSubjectRenameViaJetStreamAck(t *testing.T) { - s := RunRandClientPortServer() + s := RunRandClientPortServer(t) defer s.Shutdown() errChan := make(chan error) defer close(errChan) diff --git a/server/test_test.go b/server/test_test.go index 00bc53e5..7ce6ae7a 100644 --- a/server/test_test.go +++ b/server/test_test.go @@ -45,9 +45,10 @@ func testDefaultClusterOptionsForLeafNodes() *Options { return &o } -func RunRandClientPortServer() *Server { +func RunRandClientPortServer(t *testing.T) *Server { opts := DefaultTestOptions opts.Port = -1 + opts.StoreDir = t.TempDir() return RunServer(&opts) }