diff --git a/server/accounts.go b/server/accounts.go index 1ccb6295..f409f6b3 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -3817,9 +3817,13 @@ func removeCb(s *Server, pubKey string) { a.mu.Unlock() // set the account to be expired and disconnect clients a.expiredTimeout() - // For JS, we need also to disable JS + // For JS, we need also to disable it. if js := s.getJetStream(); js != nil && jsa != nil { js.disableJetStream(jsa) + // Remove JetStream state in memory, this will be reset + // on the changed callback from the account in case it is + // enabled again. + a.js = nil } // We also need to remove all ServerImport subscriptions a.removeAllServiceImportSubs() @@ -3838,11 +3842,23 @@ func (dr *DirAccResolver) Start(s *Server) error { dr.Server = s dr.operator = opKeys dr.DirJWTStore.changed = func(pubKey string) { - if v, ok := s.accounts.Load(pubKey); !ok { - } else if theJwt, err := dr.LoadAcc(pubKey); err != nil { - s.Errorf("update got error on load: %v", err) - } else if err := s.updateAccountWithClaimJWT(v.(*Account), theJwt); err != nil { - s.Errorf("update resulted in error %v", err) + if v, ok := s.accounts.Load(pubKey); ok { + if theJwt, err := dr.LoadAcc(pubKey); err != nil { + s.Errorf("update got error on load: %v", err) + } else { + acc := v.(*Account) + if err = s.updateAccountWithClaimJWT(acc, theJwt); err != nil { + s.Errorf("update resulted in error %v", err) + } else { + if _, jsa, err := acc.checkForJetStream(); err != nil { + s.Warnf("error checking for JetStream enabled error %v", err) + } else if jsa == nil { + if err = s.configJetStream(acc); err != nil { + s.Errorf("updated resulted in error when configuring JetStream %v", err) + } + } + } + } } } dr.DirJWTStore.deleted = func(pubKey string) { diff --git a/server/jetstream_jwt_test.go b/server/jetstream_jwt_test.go index 0317c882..faaef767 100644 --- a/server/jetstream_jwt_test.go +++ b/server/jetstream_jwt_test.go @@ -18,6 +18,7 @@ package server import ( "encoding/json" + "errors" "fmt" "net/http" "net/http/httptest" @@ -1040,7 +1041,7 @@ func TestJetStreamDeletedAccountDoesNotLeakSubscriptions(t *testing.T) { conf := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 operator: %s - jetstream: {max_mem_store: 10Mb, max_file_store: 10Mb} + jetstream: {max_mem_store: 10Mb, max_file_store: 10Mb, store_dir: %v} system_account: %s resolver: { type: full @@ -1048,7 +1049,7 @@ func TestJetStreamDeletedAccountDoesNotLeakSubscriptions(t *testing.T) { dir: '%s' timeout: "500ms" } - `, opJwt, syspub, dirSrv))) + `, opJwt, dirSrv, syspub, dirSrv))) defer removeFile(t, conf) s, _ := RunServerWithConfig(conf) @@ -1113,3 +1114,178 @@ func TestJetStreamDeletedAccountDoesNotLeakSubscriptions(t *testing.T) { // and that will not go away, so discount it. checkNumSubs(beforeCreate + 1) } + +func TestJetStreamDeletedAccountIsReEnabled(t *testing.T) { + op, _ := nkeys.CreateOperator() + opPk, _ := op.PublicKey() + sk, _ := nkeys.CreateOperator() + skPk, _ := sk.PublicKey() + opClaim := jwt.NewOperatorClaims(opPk) + opClaim.SigningKeys.Add(skPk) + opJwt, err := opClaim.Encode(op) + require_NoError(t, err) + createAccountAndUser := func(pubKey, jwt1, creds1 *string) { + t.Helper() + kp, _ := nkeys.CreateAccount() + *pubKey, _ = kp.PublicKey() + claim := jwt.NewAccountClaims(*pubKey) + claim.Limits.JetStreamLimits = jwt.JetStreamLimits{MemoryStorage: 7 * 1024 * 1024, DiskStorage: 7 * 1024 * 1024, Streams: 10} + var err error + *jwt1, err = claim.Encode(sk) + require_NoError(t, err) + + ukp, _ := nkeys.CreateUser() + seed, _ := ukp.Seed() + upub, _ := ukp.PublicKey() + uclaim := newJWTTestUserClaims() + uclaim.Subject = upub + + ujwt1, err := uclaim.Encode(kp) + require_NoError(t, err) + *creds1 = genCredsFile(t, ujwt1, seed) + } + generateRequest := func(accs []string, kp nkeys.KeyPair) []byte { + t.Helper() + opk, _ := kp.PublicKey() + c := jwt.NewGenericClaims(opk) + c.Data["accounts"] = accs + cJwt, err := c.Encode(kp) + if err != nil { + t.Fatalf("Expected no error %v", err) + } + return []byte(cJwt) + } + + // admin user + var syspub, sysjwt, sysCreds string + createAccountAndUser(&syspub, &sysjwt, &sysCreds) + defer removeFile(t, sysCreds) + + dirSrv := createDir(t, "srv") + defer removeDir(t, dirSrv) + conf := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:-1 + operator: %s + jetstream: {max_mem_store: 10Mb, max_file_store: 10Mb, store_dir: %v} + system_account: %s + resolver: { + type: full + allow_delete: true + dir: '%s' + timeout: "500ms" + } + `, opJwt, dirSrv, syspub, dirSrv))) + defer removeFile(t, conf) + + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + + // update system account jwt + updateJwt(t, s.ClientURL(), sysCreds, sysjwt, 1) + + // create account + var apub, ajwt1, aCreds1 string + kp, _ := nkeys.CreateAccount() + apub, _ = kp.PublicKey() + claim := jwt.NewAccountClaims(apub) + claim.Limits.JetStreamLimits = jwt.JetStreamLimits{ + MemoryStorage: 7 * 1024 * 1024, + DiskStorage: 7 * 1024 * 1024, + Streams: 10, + } + ajwt1, err = claim.Encode(sk) + require_NoError(t, err) + + // user + ukp, _ := nkeys.CreateUser() + seed, _ := ukp.Seed() + upub, _ := ukp.PublicKey() + uclaim := newJWTTestUserClaims() + uclaim.Subject = upub + + ujwt1, err := uclaim.Encode(kp) + require_NoError(t, err) + aCreds1 = genCredsFile(t, ujwt1, seed) + defer removeFile(t, aCreds1) + + // push user account + updateJwt(t, s.ClientURL(), sysCreds, ajwt1, 1) + + ncA, jsA := jsClientConnect(t, s, nats.UserCredentials(aCreds1)) + defer ncA.Close() + + jsA.AddStream(&nats.StreamConfig{Name: "foo"}) + jsA.Publish("foo", []byte("Hello World")) + jsA.Publish("foo", []byte("Hello Again")) + + // JS should be working + ai, err := jsA.AccountInfo() + require_NoError(t, err) + require_True(t, ai.Limits.MaxMemory == 7*1024*1024) + require_True(t, ai.Limits.MaxStore == 7*1024*1024) + require_True(t, ai.Tier.Streams == 1) + + // connect with a different connection and delete the account. + nc := natsConnect(t, s.ClientURL(), nats.UserCredentials(sysCreds)) + defer nc.Close() + + // delete account + resp, err := nc.Request(accDeleteReqSubj, generateRequest([]string{apub}, sk), time.Second) + require_NoError(t, err) + require_True(t, strings.Contains(string(resp.Data), `"message":"deleted 1 accounts"`)) + + // account was disabled and now disconnected, this should get a connection is closed error. + _, err = jsA.AccountInfo() + if err == nil || !errors.Is(err, nats.ErrConnectionClosed) { + t.Errorf("Expected connection closed error, got: %v", err) + } + ncA.Close() + + // re-enable, same claims would be detected + updateJwt(t, s.ClientURL(), sysCreds, ajwt1, 1) + + // expected to get authorization timeout at this time + _, err = nats.Connect(s.ClientURL(), nats.UserCredentials(aCreds1)) + if !errors.Is(err, nats.ErrAuthorization) { + t.Errorf("Expected authorization issue on connect, got: %v", err) + } + + // edit the account and push again with updated claims to same account + claim = jwt.NewAccountClaims(apub) + claim.Limits.JetStreamLimits = jwt.JetStreamLimits{ + MemoryStorage: -1, + DiskStorage: 10 * 1024 * 1024, + Streams: 10, + } + ajwt1, err = claim.Encode(sk) + require_NoError(t, err) + updateJwt(t, s.ClientURL(), sysCreds, ajwt1, 1) + + // reconnect with the updated account + ncA, jsA = jsClientConnect(t, s, nats.UserCredentials(aCreds1)) + defer ncA.Close() + ai, err = jsA.AccountInfo() + if err != nil { + t.Fatal(err) + } + require_True(t, ai.Limits.MaxMemory == -1) + require_True(t, ai.Limits.MaxStore == 10*1024*1024) + require_True(t, ai.Tier.Streams == 1) + + // should be possible to get stream info again + si, err := jsA.StreamInfo("foo") + if err != nil { + t.Fatal(err) + } + if si.State.Msgs != 2 { + t.Fatal("Unexpected number of messages from recovered stream") + } + msg, err := jsA.GetMsg("foo", 1) + if err != nil { + t.Fatal(err) + } + if string(msg.Data) != "Hello World" { + t.Error("Unexpected message") + } + ncA.Close() +}