Fix to reenable JetStream for account when reenabled

Signed-off-by: Waldemar Quevedo <wally@nats.io>
This commit is contained in:
Waldemar Quevedo
2022-12-06 08:08:26 -08:00
parent d410fce426
commit 6df92f40dc
2 changed files with 200 additions and 8 deletions

View File

@@ -3817,9 +3817,13 @@ func removeCb(s *Server, pubKey string) {
a.mu.Unlock()
// set the account to be expired and disconnect clients
a.expiredTimeout()
// For JS, we need also to disable JS
// For JS, we need also to disable it.
if js := s.getJetStream(); js != nil && jsa != nil {
js.disableJetStream(jsa)
// Remove JetStream state in memory, this will be reset
// on the changed callback from the account in case it is
// enabled again.
a.js = nil
}
// We also need to remove all ServerImport subscriptions
a.removeAllServiceImportSubs()
@@ -3838,11 +3842,23 @@ func (dr *DirAccResolver) Start(s *Server) error {
dr.Server = s
dr.operator = opKeys
dr.DirJWTStore.changed = func(pubKey string) {
if v, ok := s.accounts.Load(pubKey); !ok {
} else if theJwt, err := dr.LoadAcc(pubKey); err != nil {
s.Errorf("update got error on load: %v", err)
} else if err := s.updateAccountWithClaimJWT(v.(*Account), theJwt); err != nil {
s.Errorf("update resulted in error %v", err)
if v, ok := s.accounts.Load(pubKey); ok {
if theJwt, err := dr.LoadAcc(pubKey); err != nil {
s.Errorf("update got error on load: %v", err)
} else {
acc := v.(*Account)
if err = s.updateAccountWithClaimJWT(acc, theJwt); err != nil {
s.Errorf("update resulted in error %v", err)
} else {
if _, jsa, err := acc.checkForJetStream(); err != nil {
s.Warnf("error checking for JetStream enabled error %v", err)
} else if jsa == nil {
if err = s.configJetStream(acc); err != nil {
s.Errorf("updated resulted in error when configuring JetStream %v", err)
}
}
}
}
}
}
dr.DirJWTStore.deleted = func(pubKey string) {

View File

@@ -18,6 +18,7 @@ package server
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"net/http/httptest"
@@ -1040,7 +1041,7 @@ func TestJetStreamDeletedAccountDoesNotLeakSubscriptions(t *testing.T) {
conf := createConfFile(t, []byte(fmt.Sprintf(`
listen: 127.0.0.1:-1
operator: %s
jetstream: {max_mem_store: 10Mb, max_file_store: 10Mb}
jetstream: {max_mem_store: 10Mb, max_file_store: 10Mb, store_dir: %v}
system_account: %s
resolver: {
type: full
@@ -1048,7 +1049,7 @@ func TestJetStreamDeletedAccountDoesNotLeakSubscriptions(t *testing.T) {
dir: '%s'
timeout: "500ms"
}
`, opJwt, syspub, dirSrv)))
`, opJwt, dirSrv, syspub, dirSrv)))
defer removeFile(t, conf)
s, _ := RunServerWithConfig(conf)
@@ -1113,3 +1114,178 @@ func TestJetStreamDeletedAccountDoesNotLeakSubscriptions(t *testing.T) {
// and that will not go away, so discount it.
checkNumSubs(beforeCreate + 1)
}
func TestJetStreamDeletedAccountIsReEnabled(t *testing.T) {
op, _ := nkeys.CreateOperator()
opPk, _ := op.PublicKey()
sk, _ := nkeys.CreateOperator()
skPk, _ := sk.PublicKey()
opClaim := jwt.NewOperatorClaims(opPk)
opClaim.SigningKeys.Add(skPk)
opJwt, err := opClaim.Encode(op)
require_NoError(t, err)
createAccountAndUser := func(pubKey, jwt1, creds1 *string) {
t.Helper()
kp, _ := nkeys.CreateAccount()
*pubKey, _ = kp.PublicKey()
claim := jwt.NewAccountClaims(*pubKey)
claim.Limits.JetStreamLimits = jwt.JetStreamLimits{MemoryStorage: 7 * 1024 * 1024, DiskStorage: 7 * 1024 * 1024, Streams: 10}
var err error
*jwt1, err = claim.Encode(sk)
require_NoError(t, err)
ukp, _ := nkeys.CreateUser()
seed, _ := ukp.Seed()
upub, _ := ukp.PublicKey()
uclaim := newJWTTestUserClaims()
uclaim.Subject = upub
ujwt1, err := uclaim.Encode(kp)
require_NoError(t, err)
*creds1 = genCredsFile(t, ujwt1, seed)
}
generateRequest := func(accs []string, kp nkeys.KeyPair) []byte {
t.Helper()
opk, _ := kp.PublicKey()
c := jwt.NewGenericClaims(opk)
c.Data["accounts"] = accs
cJwt, err := c.Encode(kp)
if err != nil {
t.Fatalf("Expected no error %v", err)
}
return []byte(cJwt)
}
// admin user
var syspub, sysjwt, sysCreds string
createAccountAndUser(&syspub, &sysjwt, &sysCreds)
defer removeFile(t, sysCreds)
dirSrv := createDir(t, "srv")
defer removeDir(t, dirSrv)
conf := createConfFile(t, []byte(fmt.Sprintf(`
listen: 127.0.0.1:-1
operator: %s
jetstream: {max_mem_store: 10Mb, max_file_store: 10Mb, store_dir: %v}
system_account: %s
resolver: {
type: full
allow_delete: true
dir: '%s'
timeout: "500ms"
}
`, opJwt, dirSrv, syspub, dirSrv)))
defer removeFile(t, conf)
s, _ := RunServerWithConfig(conf)
defer s.Shutdown()
// update system account jwt
updateJwt(t, s.ClientURL(), sysCreds, sysjwt, 1)
// create account
var apub, ajwt1, aCreds1 string
kp, _ := nkeys.CreateAccount()
apub, _ = kp.PublicKey()
claim := jwt.NewAccountClaims(apub)
claim.Limits.JetStreamLimits = jwt.JetStreamLimits{
MemoryStorage: 7 * 1024 * 1024,
DiskStorage: 7 * 1024 * 1024,
Streams: 10,
}
ajwt1, err = claim.Encode(sk)
require_NoError(t, err)
// user
ukp, _ := nkeys.CreateUser()
seed, _ := ukp.Seed()
upub, _ := ukp.PublicKey()
uclaim := newJWTTestUserClaims()
uclaim.Subject = upub
ujwt1, err := uclaim.Encode(kp)
require_NoError(t, err)
aCreds1 = genCredsFile(t, ujwt1, seed)
defer removeFile(t, aCreds1)
// push user account
updateJwt(t, s.ClientURL(), sysCreds, ajwt1, 1)
ncA, jsA := jsClientConnect(t, s, nats.UserCredentials(aCreds1))
defer ncA.Close()
jsA.AddStream(&nats.StreamConfig{Name: "foo"})
jsA.Publish("foo", []byte("Hello World"))
jsA.Publish("foo", []byte("Hello Again"))
// JS should be working
ai, err := jsA.AccountInfo()
require_NoError(t, err)
require_True(t, ai.Limits.MaxMemory == 7*1024*1024)
require_True(t, ai.Limits.MaxStore == 7*1024*1024)
require_True(t, ai.Tier.Streams == 1)
// connect with a different connection and delete the account.
nc := natsConnect(t, s.ClientURL(), nats.UserCredentials(sysCreds))
defer nc.Close()
// delete account
resp, err := nc.Request(accDeleteReqSubj, generateRequest([]string{apub}, sk), time.Second)
require_NoError(t, err)
require_True(t, strings.Contains(string(resp.Data), `"message":"deleted 1 accounts"`))
// account was disabled and now disconnected, this should get a connection is closed error.
_, err = jsA.AccountInfo()
if err == nil || !errors.Is(err, nats.ErrConnectionClosed) {
t.Errorf("Expected connection closed error, got: %v", err)
}
ncA.Close()
// re-enable, same claims would be detected
updateJwt(t, s.ClientURL(), sysCreds, ajwt1, 1)
// expected to get authorization timeout at this time
_, err = nats.Connect(s.ClientURL(), nats.UserCredentials(aCreds1))
if !errors.Is(err, nats.ErrAuthorization) {
t.Errorf("Expected authorization issue on connect, got: %v", err)
}
// edit the account and push again with updated claims to same account
claim = jwt.NewAccountClaims(apub)
claim.Limits.JetStreamLimits = jwt.JetStreamLimits{
MemoryStorage: -1,
DiskStorage: 10 * 1024 * 1024,
Streams: 10,
}
ajwt1, err = claim.Encode(sk)
require_NoError(t, err)
updateJwt(t, s.ClientURL(), sysCreds, ajwt1, 1)
// reconnect with the updated account
ncA, jsA = jsClientConnect(t, s, nats.UserCredentials(aCreds1))
defer ncA.Close()
ai, err = jsA.AccountInfo()
if err != nil {
t.Fatal(err)
}
require_True(t, ai.Limits.MaxMemory == -1)
require_True(t, ai.Limits.MaxStore == 10*1024*1024)
require_True(t, ai.Tier.Streams == 1)
// should be possible to get stream info again
si, err := jsA.StreamInfo("foo")
if err != nil {
t.Fatal(err)
}
if si.State.Msgs != 2 {
t.Fatal("Unexpected number of messages from recovered stream")
}
msg, err := jsA.GetMsg("foo", 1)
if err != nil {
t.Fatal(err)
}
if string(msg.Data) != "Hello World" {
t.Error("Unexpected message")
}
ncA.Close()
}