Fix for store directory on disk prior to 2.2.3 without the 'jetstream' directory.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-05-10 19:24:58 -07:00
parent 6c6c4b9c04
commit bad9fba8ec
2 changed files with 113 additions and 0 deletions

View File

@@ -164,9 +164,56 @@ func (s *Server) EnableJetStream(config *JetStreamConfig) error {
cfg.StoreDir = filepath.Join(os.TempDir(), JetStreamStoreDir)
}
// We will consistently place the 'jetstream' directory under the storedir that was handed to us. Prior to 2.2.3 though
// we could have a directory on disk without the 'jetstream' directory. This will check and fix if needed.
s.checkStoreDir(&cfg)
return s.enableJetStream(cfg)
}
// Check to make sure directory has the jetstream directory.
// We will have ot properly configured here now regardless, so need to look inside.
func (s *Server) checkStoreDir(cfg *JetStreamConfig) {
fis, _ := ioutil.ReadDir(cfg.StoreDir)
// If we have nothing underneath us, could be just starting new, but if we see this we can check.
if len(fis) != 0 {
return
}
// Let's check the directory above. If it has us 'jetstream' but also other stuff that we can
// identify as accounts then we can fix.
fis, _ = ioutil.ReadDir(filepath.Dir(cfg.StoreDir))
// If just one that is us 'jetstream' and all is ok.
if len(fis) == 1 {
return
}
s.Warnf("JetStream store being migrated to a new directory structure")
for _, fi := range fis {
// Skip the 'jetstream' directory.
if fi.Name() == JetStreamStoreDir {
continue
}
// Let's see if this is an account.
if accName := fi.Name(); accName != _EMPTY_ {
_, ok := s.accounts.Load(accName)
if !ok {
if acc, err := s.lookupAccount(accName); err != nil && acc != nil {
ok = true
}
}
// If this seems to be an account go ahead and move the directory. This will include all assets
// like streams and consumers.
if ok {
old := filepath.Join(filepath.Dir(cfg.StoreDir), fi.Name())
new := filepath.Join(cfg.StoreDir, fi.Name())
s.Debugf("JetStream relocated %q to %q", old, new)
os.Rename(old, new)
}
}
}
}
// enableJetStream will start up the JetStream subsystem.
func (s *Server) enableJetStream(cfg JetStreamConfig) error {
s.mu.Lock()

View File

@@ -9095,6 +9095,72 @@ func TestJetStreamServerResourcesConfig(t *testing.T) {
}
}
// From 2.2.2 to 2.2.3 we fixed a bug that would not consistently place a jetstream directory
// under the store directory configured. However there were some cases where the directory was
// created that way and therefore 2.2.3 would start and not recognize the existing accounts,
// streams and consumers.
func TestJetStreamStoreDirectoryFix(t *testing.T) {
sd := filepath.Join(os.TempDir(), "sd_test")
defer removeDir(t, sd)
conf := createConfFile(t, []byte(fmt.Sprintf("listen: 127.0.0.1:-1\njetstream: {store_dir: %q}\n", sd)))
defer removeFile(t, conf)
s, _ := RunServerWithConfig(conf)
defer s.Shutdown()
nc, js := jsClientConnect(t, s)
defer nc.Close()
if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST"}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if _, err := js.Publish("TEST", []byte("TSS")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
// Push based.
sub, err := js.SubscribeSync("TEST", nats.Durable("dlc"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()
// Now shutdown the server.
nc.Close()
s.Shutdown()
// Now move stuff up from the jetstream directory etc.
jssd := filepath.Join(sd, JetStreamStoreDir)
fis, _ := ioutil.ReadDir(jssd)
// This will be accounts, move them up one directory.
for _, fi := range fis {
os.Rename(filepath.Join(jssd, fi.Name()), filepath.Join(sd, fi.Name()))
}
removeDir(t, jssd)
// Restart our server. Make sure our assets got moved.
s, _ = RunServerWithConfig(conf)
defer s.Shutdown()
nc, js = jsClientConnect(t, s)
defer nc.Close()
var names []string
for name := range js.StreamNames() {
names = append(names, name)
}
if len(names) != 1 {
t.Fatalf("Expected only 1 stream but got %d", len(names))
}
names = names[:0]
for name := range js.ConsumerNames("TEST") {
names = append(names, name)
}
if len(names) != 1 {
t.Fatalf("Expected only 1 consumer but got %d", len(names))
}
}
func TestJetStreamPushConsumersPullError(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()