If we fail to load an account while processing a stream assignment, send error back to metaleader.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-04-07 10:15:13 -07:00
parent 3e4f6b8e4e
commit 1ea4a430da
3 changed files with 33 additions and 2 deletions

View File

@@ -600,6 +600,7 @@ var (
jsClusterNoPeersErr = &ApiError{Code: 400, Description: "no suitable peers for placement"}
jsServerNotMemberErr = &ApiError{Code: 400, Description: "server is not a member of the cluster"}
jsNoMessageFoundErr = &ApiError{Code: 404, Description: "no message found"}
jsNoAccountErr = &ApiError{Code: 503, Description: "account not found"}
)
// For easier handling of exports and imports.

View File

@@ -1807,18 +1807,28 @@ func (js *jetStream) streamAssignment(account, stream string) (sa *streamAssignm
func (js *jetStream) processStreamAssignment(sa *streamAssignment) bool {
js.mu.RLock()
s, cc := js.srv, js.cluster
accName, stream := sa.Client.serviceAccount(), sa.Config.Name
js.mu.RUnlock()
if s == nil || cc == nil {
// TODO(dlc) - debug at least
return false
}
acc, err := s.LookupAccount(sa.Client.serviceAccount())
acc, err := s.LookupAccount(accName)
if err != nil {
// If we can not lookup our account send this result back to the metacontroller leader.
result := &streamAssignmentResult{
Account: accName,
Stream: stream,
Response: &JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}},
}
result.Response.Error = jsNoAccountErr
s.sendInternalMsgLocked(streamAssignmentSubj, _EMPTY_, nil, result)
// TODO(dlc) - log error
return false
}
stream := sa.Config.Name
js.mu.Lock()
if cc.meta == nil {

View File

@@ -5284,6 +5284,26 @@ func TestJetStreamClusterServerLimits(t *testing.T) {
}
}
func TestJetStreamClusterAccountLoadFailure(t *testing.T) {
c := createJetStreamClusterWithTemplate(t, jsClusterLimitsTempl, "R3L", 3)
defer c.shutdown()
// Client based API
nc, js := jsClientConnect(t, c.leader())
defer nc.Close()
// Remove the "ONE" account from non-leader
s := c.randomNonLeader()
s.mu.Lock()
s.accounts.Delete("ONE")
s.mu.Unlock()
_, err := js.AddStream(&nats.StreamConfig{Name: "F", Replicas: 3})
if err == nil || !strings.Contains(err.Error(), "account not found") {
t.Fatalf("Expected an 'account not found' error but got %v", err)
}
}
// Support functions
// Used to setup superclusters for tests.