When a system account was configured and not the default when we did a reload we would lose the JetStream service exports.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-11-30 14:44:15 -08:00
parent 5168e3b1c3
commit 7e27042e6e
3 changed files with 95 additions and 2 deletions

View File

@@ -258,7 +258,15 @@ func (s *Server) configJetStream(acc *Account) error {
// configAllJetStreamAccounts walk all configured accounts and turn on jetstream if requested.
func (s *Server) configAllJetStreamAccounts() error {
var jsAccounts []*Account
// Check to see if system account has been enabled. We could arrive here via reload and
// a non-default system account.
if sacc := s.SystemAccount(); sacc != nil && !sacc.IsExportService(JSApiAccountInfo) {
for _, export := range allJsExports {
if err := sacc.AddServiceExport(export, nil); err != nil {
return fmt.Errorf("Error setting up jetstream service exports: %v", err)
}
}
}
// Snapshot into our own list. Might not be needed.
s.mu.Lock()
@@ -268,11 +276,15 @@ func (s *Server) configAllJetStreamAccounts() error {
s.mu.Unlock()
return nil
}
var jsAccounts []*Account
s.accounts.Range(func(k, v interface{}) bool {
jsAccounts = append(jsAccounts, v.(*Account))
return true
})
s.mu.Unlock()
// Process any jetstream enabled accounts here.
for _, acc := range jsAccounts {
if err := s.configJetStream(acc); err != nil {

View File

@@ -143,7 +143,7 @@ const (
jsSnapshotAckT = "$JS.SNAPSHOT.ACK.%s.%s"
jsRestoreDeliverT = "$JS.SNAPSHOT.RESTORE.%s.%s"
// JetStreamAckT is the template for the ack message stream coming back from a consumer
// jsAckT is the template for the ack message stream coming back from a consumer
// when they ACK/NAK, etc a message.
jsAckT = "$JS.ACK.%s.%s"
jsAckPre = "$JS.ACK."

View File

@@ -95,6 +95,7 @@ func RunJetStreamServerOnPort(port int, sd string) *server.Server {
}
func clientConnectToServer(t *testing.T, s *server.Server) *nats.Conn {
t.Helper()
nc, err := nats.Connect(s.ClientURL(),
nats.Name("JS-TEST"),
nats.ReconnectWait(5*time.Millisecond),
@@ -10145,3 +10146,83 @@ func TestJetStreamAccountImportAll(t *testing.T) {
t.Fatalf("Unexpected error: %+v", namesResponse.Error)
}
}
// https://github.com/nats-io/nats-server/issues/1736
func TestJetStreamServerReload(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: 127.0.0.1:-1
jetstream: {max_mem_store: 64GB, max_file_store: 10TB }
accounts: {
A: { users: [ {user: ua, password: pwd} ] },
B: {
jetstream: {max_mem: 1GB, max_store: 1TB, max_streams: 10, max_consumers: 1k}
users: [ {user: ub, password: pwd} ]
},
SYS: { users: [ {user: uc, password: pwd} ] },
}
no_auth_user: ub
system_account: SYS
`))
defer os.Remove(conf)
s, _ := RunServerWithConfig(conf)
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
if !s.JetStreamEnabled() {
t.Fatalf("Expected JetStream to be enabled")
}
// Client for API requests.
nc := clientConnectToServer(t, s)
defer nc.Close()
checkJSAccount := func() {
t.Helper()
resp, err := nc.Request(server.JSApiAccountInfo, nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var info server.JSApiAccountInfoResponse
if err := json.Unmarshal(resp.Data, &info); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
checkJSAccount()
acc, err := s.LookupAccount("B")
if err != nil {
t.Fatalf("Unexpected error looking up account: %v", err)
}
mset, err := acc.AddStream(&server.StreamConfig{Name: "22"})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
toSend := 10
for i := 0; i < toSend; i++ {
sendStreamMsg(t, nc, "22", fmt.Sprintf("MSG: %d", i+1))
}
if msgs := mset.State().Msgs; msgs != uint64(toSend) {
t.Fatalf("Expected %d messages, got %d", toSend, msgs)
}
if err := s.Reload(); err != nil {
t.Fatalf("Error on server reload: %v", err)
}
// Wait to get reconnected.
checkFor(t, 5*time.Second, 10*time.Millisecond, func() error {
if !nc.IsConnected() {
return fmt.Errorf("Not connected")
}
return nil
})
checkJSAccount()
sendStreamMsg(t, nc, "22", "MSG: 22")
}