From bad9fba8ec6cd9d26552b2d9a93fa3bc28642e1f Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 10 May 2021 19:24:58 -0700 Subject: [PATCH] Fix for store directory on disk prior to 2.2.3 without the 'jetstream' directory. Signed-off-by: Derek Collison --- server/jetstream.go | 47 ++++++++++++++++++++++++++++ server/jetstream_test.go | 66 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 113 insertions(+) diff --git a/server/jetstream.go b/server/jetstream.go index ad6e5d51..4a80c19f 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -164,9 +164,56 @@ func (s *Server) EnableJetStream(config *JetStreamConfig) error { cfg.StoreDir = filepath.Join(os.TempDir(), JetStreamStoreDir) } + // We will consistently place the 'jetstream' directory under the storedir that was handed to us. Prior to 2.2.3 though + // we could have a directory on disk without the 'jetstream' directory. This will check and fix if needed. + s.checkStoreDir(&cfg) + return s.enableJetStream(cfg) } +// Check to make sure directory has the jetstream directory. +// We will have ot properly configured here now regardless, so need to look inside. +func (s *Server) checkStoreDir(cfg *JetStreamConfig) { + fis, _ := ioutil.ReadDir(cfg.StoreDir) + // If we have nothing underneath us, could be just starting new, but if we see this we can check. + if len(fis) != 0 { + return + } + // Let's check the directory above. If it has us 'jetstream' but also other stuff that we can + // identify as accounts then we can fix. + fis, _ = ioutil.ReadDir(filepath.Dir(cfg.StoreDir)) + // If just one that is us 'jetstream' and all is ok. + if len(fis) == 1 { + return + } + + s.Warnf("JetStream store being migrated to a new directory structure") + + for _, fi := range fis { + // Skip the 'jetstream' directory. + if fi.Name() == JetStreamStoreDir { + continue + } + // Let's see if this is an account. + if accName := fi.Name(); accName != _EMPTY_ { + _, ok := s.accounts.Load(accName) + if !ok { + if acc, err := s.lookupAccount(accName); err != nil && acc != nil { + ok = true + } + } + // If this seems to be an account go ahead and move the directory. This will include all assets + // like streams and consumers. + if ok { + old := filepath.Join(filepath.Dir(cfg.StoreDir), fi.Name()) + new := filepath.Join(cfg.StoreDir, fi.Name()) + s.Debugf("JetStream relocated %q to %q", old, new) + os.Rename(old, new) + } + } + } +} + // enableJetStream will start up the JetStream subsystem. func (s *Server) enableJetStream(cfg JetStreamConfig) error { s.mu.Lock() diff --git a/server/jetstream_test.go b/server/jetstream_test.go index dae4bf03..2e070e40 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -9095,6 +9095,72 @@ func TestJetStreamServerResourcesConfig(t *testing.T) { } } +// From 2.2.2 to 2.2.3 we fixed a bug that would not consistently place a jetstream directory +// under the store directory configured. However there were some cases where the directory was +// created that way and therefore 2.2.3 would start and not recognize the existing accounts, +// streams and consumers. +func TestJetStreamStoreDirectoryFix(t *testing.T) { + sd := filepath.Join(os.TempDir(), "sd_test") + defer removeDir(t, sd) + + conf := createConfFile(t, []byte(fmt.Sprintf("listen: 127.0.0.1:-1\njetstream: {store_dir: %q}\n", sd))) + defer removeFile(t, conf) + + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST"}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if _, err := js.Publish("TEST", []byte("TSS")); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + // Push based. + sub, err := js.SubscribeSync("TEST", nats.Durable("dlc")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer sub.Unsubscribe() + + // Now shutdown the server. + nc.Close() + s.Shutdown() + + // Now move stuff up from the jetstream directory etc. + jssd := filepath.Join(sd, JetStreamStoreDir) + fis, _ := ioutil.ReadDir(jssd) + // This will be accounts, move them up one directory. + for _, fi := range fis { + os.Rename(filepath.Join(jssd, fi.Name()), filepath.Join(sd, fi.Name())) + } + removeDir(t, jssd) + + // Restart our server. Make sure our assets got moved. + s, _ = RunServerWithConfig(conf) + defer s.Shutdown() + + nc, js = jsClientConnect(t, s) + defer nc.Close() + + var names []string + for name := range js.StreamNames() { + names = append(names, name) + } + if len(names) != 1 { + t.Fatalf("Expected only 1 stream but got %d", len(names)) + } + names = names[:0] + for name := range js.ConsumerNames("TEST") { + names = append(names, name) + } + if len(names) != 1 { + t.Fatalf("Expected only 1 consumer but got %d", len(names)) + } +} + func TestJetStreamPushConsumersPullError(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown()