Create helper function to check on account jetstream status.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-05-03 14:21:21 -07:00
committed by Ivan Kozlovic
parent 0bd92e85da
commit c87eed8792

View File

@@ -855,6 +855,15 @@ func (a *Account) trackAPIErr() {
const badAPIRequestT = "Malformed JetStream API Request: %q"
// Helper function to check on JetStream being enabled but also on status of leafnodes
// If the local account is not enabled but does have leafnode connectivity we will not
// want to error immediately and let the other side decide.
func (a *Account) checkJetStream() (enabled, shouldError bool) {
a.mu.RLock()
defer a.mu.RUnlock()
return a.js != nil, a.nleafs == 0
}
// Request for current usage and limits for this account.
func (s *Server) jsAccountInfoRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) {
if c == nil || !s.JetStreamEnabled() {
@@ -885,10 +894,8 @@ func (s *Server) jsAccountInfoRequest(sub *subscription, c *client, subject, rep
}
}
if !acc.JetStreamEnabled() {
// This local account is not enabled, but we need to check of we are using leafnodes to bridge
// this account and if so not answer locally.
if acc.NumLeafNodes() > 0 {
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if !doErr {
return
}
resp.Error = jsNotEnabledErr
@@ -1162,10 +1169,8 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, subject, re
}
}
if !acc.JetStreamEnabled() {
// This local account is not enabled, but we need to check of we are using leafnodes to bridge
// this account and if so not answer locally.
if acc.NumLeafNodes() == 0 {
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
@@ -1344,10 +1349,8 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, subject, re
}
}
if !acc.JetStreamEnabled() {
// This local account is not enabled, but we need to check of we are using leafnodes to bridge
// this account and if so not answer locally.
if acc.NumLeafNodes() == 0 {
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
@@ -1428,10 +1431,8 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, subject, rep
}
}
if !acc.JetStreamEnabled() {
// This local account is not enabled, but we need to check of we are using leafnodes to bridge
// this account and if so not answer locally.
if acc.NumLeafNodes() == 0 {
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
@@ -1557,10 +1558,8 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, subject, repl
}
}
if !acc.JetStreamEnabled() {
// This local account is not enabled, but we need to check of we are using leafnodes to bridge
// this account and if so not answer locally.
if acc.NumLeafNodes() == 0 {
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
@@ -1645,10 +1644,8 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, subject, repl
if isLeader && sa == nil {
// We can't find the stream, so mimic what would be the errors below.
if !acc.JetStreamEnabled() {
// This local account is not enabled, but we need to check of we are using leafnodes to bridge
// this account and if so not answer locally.
if acc.NumLeafNodes() == 0 {
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
@@ -1681,10 +1678,8 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, subject, repl
}
}
if !acc.JetStreamEnabled() {
// This local account is not enabled, but we need to check of we are using leafnodes to bridge
// this account and if so not answer locally.
if acc.NumLeafNodes() == 0 {
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
@@ -1778,10 +1773,8 @@ func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, sub
return
}
if !acc.JetStreamEnabled() {
// This local account is not enabled, but we need to check of we are using leafnodes to bridge
// this account and if so not answer locally.
if acc.NumLeafNodes() == 0 {
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
@@ -1889,10 +1882,8 @@ func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, s
return
}
if !acc.JetStreamEnabled() {
// This local account is not enabled, but we need to check of we are using leafnodes to bridge
// this account and if so not answer locally.
if acc.NumLeafNodes() == 0 {
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
@@ -1967,10 +1958,8 @@ func (s *Server) jsStreamRemovePeerRequest(sub *subscription, c *client, subject
return
}
if !acc.JetStreamEnabled() {
// This local account is not enabled, but we need to check of we are using leafnodes to bridge
// this account and if so not answer locally.
if acc.NumLeafNodes() == 0 {
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
@@ -2219,15 +2208,14 @@ func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, subject, re
}
}
if !acc.JetStreamEnabled() {
// This local account is not enabled, but we need to check of we are using leafnodes to bridge
// this account and if so not answer locally.
if acc.NumLeafNodes() == 0 {
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
return
}
if !isEmptyRequest(msg) {
resp.Error = jsNotEmptyRequestErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
@@ -2292,10 +2280,8 @@ func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, subject, reply
if isLeader && sa == nil {
// We can't find the stream, so mimic what would be the errors below.
if !acc.JetStreamEnabled() {
// This local account is not enabled, but we need to check of we are using leafnodes to bridge
// this account and if so not answer locally.
if acc.NumLeafNodes() == 0 {
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
@@ -2322,10 +2308,8 @@ func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, subject, reply
}
}
if !acc.JetStreamEnabled() {
// This local account is not enabled, but we need to check of we are using leafnodes to bridge
// this account and if so not answer locally.
if acc.NumLeafNodes() == 0 {
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
@@ -2405,10 +2389,8 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, subject, reply st
if isLeader && sa == nil {
// We can't find the stream, so mimic what would be the errors below.
if !acc.JetStreamEnabled() {
// This local account is not enabled, but we need to check of we are using leafnodes to bridge
// this account and if so not answer locally.
if acc.NumLeafNodes() == 0 {
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
@@ -2435,10 +2417,8 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, subject, reply st
}
}
if !acc.JetStreamEnabled() {
// This local account is not enabled, but we need to check of we are using leafnodes to bridge
// this account and if so not answer locally.
if acc.NumLeafNodes() == 0 {
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
@@ -2508,10 +2488,8 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, subject, rep
if isLeader && sa == nil {
// We can't find the stream, so mimic what would be the errors below.
if !acc.JetStreamEnabled() {
// This local account is not enabled, but we need to check of we are using leafnodes to bridge
// this account and if so not answer locally.
if acc.NumLeafNodes() == 0 {
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
@@ -2546,10 +2524,8 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, subject, rep
}
}
if !acc.JetStreamEnabled() {
// This local account is not enabled, but we need to check of we are using leafnodes to bridge
// this account and if so not answer locally.
if acc.NumLeafNodes() == 0 {
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
@@ -2630,10 +2606,8 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, subject, r
return
}
if !acc.JetStreamEnabled() {
// This local account is not enabled, but we need to check of we are using leafnodes to bridge
// this account and if so not answer locally.
if acc.NumLeafNodes() == 0 {
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
@@ -2869,10 +2843,8 @@ func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, subject,
return
}
if !acc.JetStreamEnabled() {
// This local account is not enabled, but we need to check of we are using leafnodes to bridge
// this account and if so not answer locally.
if acc.NumLeafNodes() == 0 {
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
}
@@ -3097,10 +3069,8 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, subject, reply s
}
}
if !acc.JetStreamEnabled() {
// This local account is not enabled, but we need to check of we are using leafnodes to bridge
// this account and if so not answer locally.
if acc.NumLeafNodes() == 0 {
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
@@ -3198,10 +3168,8 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, subject, r
}
}
if !acc.JetStreamEnabled() {
// This local account is not enabled, but we need to check of we are using leafnodes to bridge
// this account and if so not answer locally.
if acc.NumLeafNodes() == 0 {
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
@@ -3321,10 +3289,8 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, subject, re
}
}
if !acc.JetStreamEnabled() {
// This local account is not enabled, but we need to check of we are using leafnodes to bridge
// this account and if so not answer locally.
if acc.NumLeafNodes() == 0 {
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
@@ -3416,10 +3382,8 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, subject, re
if isLeader && ca == nil {
// We can't find the consumer, so mimic what would be the errors below.
if !acc.JetStreamEnabled() {
// This local account is not enabled, but we need to check of we are using leafnodes to bridge
// this account and if so not answer locally.
if acc.NumLeafNodes() == 0 {
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
@@ -3525,10 +3489,8 @@ func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, subject,
}
}
if !acc.JetStreamEnabled() {
// This local account is not enabled, but we need to check of we are using leafnodes to bridge
// this account and if so not answer locally.
if acc.NumLeafNodes() == 0 {
if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}