Merge pull request #2206 from nats-io/sd_fix

[FIXED] A store directory on disk without 'jetstream' could appear to lose assets on restart.
This commit is contained in:
Derek Collison
2021-05-12 08:56:21 -07:00
committed by GitHub
3 changed files with 115 additions and 1 deletions

View File

@@ -32,6 +32,7 @@ import (
"github.com/minio/highwayhash"
"github.com/nats-io/nats-server/v2/server/sysmem"
"github.com/nats-io/nkeys"
"github.com/nats-io/nuid"
)
@@ -164,9 +165,56 @@ func (s *Server) EnableJetStream(config *JetStreamConfig) error {
cfg.StoreDir = filepath.Join(os.TempDir(), JetStreamStoreDir)
}
// We will consistently place the 'jetstream' directory under the storedir that was handed to us. Prior to 2.2.3 though
// we could have a directory on disk without the 'jetstream' directory. This will check and fix if needed.
s.checkStoreDir(&cfg)
return s.enableJetStream(cfg)
}
// Check to make sure directory has the jetstream directory.
// We will have it properly configured here now regardless, so need to look inside.
func (s *Server) checkStoreDir(cfg *JetStreamConfig) {
fis, _ := ioutil.ReadDir(cfg.StoreDir)
// If we have nothing underneath us, could be just starting new, but if we see this we can check.
if len(fis) != 0 {
return
}
// Let's check the directory above. If it has us 'jetstream' but also other stuff that we can
// identify as accounts then we can fix.
fis, _ = ioutil.ReadDir(filepath.Dir(cfg.StoreDir))
// If just one that is us 'jetstream' and all is ok.
if len(fis) == 1 {
return
}
for _, fi := range fis {
// Skip the 'jetstream' directory.
if fi.Name() == JetStreamStoreDir {
continue
}
// Let's see if this is an account.
if accName := fi.Name(); accName != _EMPTY_ {
_, ok := s.accounts.Load(accName)
if !ok && s.AccountResolver() != nil && nkeys.IsValidPublicAccountKey(accName) {
if _, err := s.fetchAccount(accName); err != nil {
s.Errorf("Unable to resolve account %q: %v", accName, err)
} else {
ok = true
}
}
// If this seems to be an account go ahead and move the directory. This will include all assets
// like streams and consumers.
if ok {
old := filepath.Join(filepath.Dir(cfg.StoreDir), fi.Name())
new := filepath.Join(cfg.StoreDir, fi.Name())
s.Noticef("JetStream relocated account %q to %q", old, new)
os.Rename(old, new)
}
}
}
}
// enableJetStream will start up the JetStream subsystem.
func (s *Server) enableJetStream(cfg JetStreamConfig) error {
s.mu.Lock()

View File

@@ -9095,6 +9095,72 @@ func TestJetStreamServerResourcesConfig(t *testing.T) {
}
}
// From 2.2.2 to 2.2.3 we fixed a bug that would not consistently place a jetstream directory
// under the store directory configured. However there were some cases where the directory was
// created that way and therefore 2.2.3 would start and not recognize the existing accounts,
// streams and consumers.
func TestJetStreamStoreDirectoryFix(t *testing.T) {
sd := filepath.Join(os.TempDir(), "sd_test")
defer removeDir(t, sd)
conf := createConfFile(t, []byte(fmt.Sprintf("listen: 127.0.0.1:-1\njetstream: {store_dir: %q}\n", sd)))
defer removeFile(t, conf)
s, _ := RunServerWithConfig(conf)
defer s.Shutdown()
nc, js := jsClientConnect(t, s)
defer nc.Close()
if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST"}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if _, err := js.Publish("TEST", []byte("TSS")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
// Push based.
sub, err := js.SubscribeSync("TEST", nats.Durable("dlc"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()
// Now shutdown the server.
nc.Close()
s.Shutdown()
// Now move stuff up from the jetstream directory etc.
jssd := filepath.Join(sd, JetStreamStoreDir)
fis, _ := ioutil.ReadDir(jssd)
// This will be accounts, move them up one directory.
for _, fi := range fis {
os.Rename(filepath.Join(jssd, fi.Name()), filepath.Join(sd, fi.Name()))
}
removeDir(t, jssd)
// Restart our server. Make sure our assets got moved.
s, _ = RunServerWithConfig(conf)
defer s.Shutdown()
nc, js = jsClientConnect(t, s)
defer nc.Close()
var names []string
for name := range js.StreamNames() {
names = append(names, name)
}
if len(names) != 1 {
t.Fatalf("Expected only 1 stream but got %d", len(names))
}
names = names[:0]
for name := range js.ConsumerNames("TEST") {
names = append(names, name)
}
if len(names) != 1 {
t.Fatalf("Expected only 1 consumer but got %d", len(names))
}
}
func TestJetStreamPushConsumersPullError(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

View File

@@ -1370,7 +1370,7 @@ func (s *Server) updateAccountWithClaimJWT(acc *Account, claimJWT string) error
func (s *Server) fetchRawAccountClaims(name string) (string, error) {
accResolver := s.AccountResolver()
if accResolver == nil {
return "", ErrNoAccountResolver
return _EMPTY_, ErrNoAccountResolver
}
// Need to do actual Fetch
start := time.Now()