From c87eed879291b42c944ac93ded5b0c5f72a958b6 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 3 May 2021 14:21:21 -0700 Subject: [PATCH] Create helper function to check on account jetstream status. Signed-off-by: Derek Collison --- server/jetstream_api.go | 154 +++++++++++++++------------------------- 1 file changed, 58 insertions(+), 96 deletions(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index e5c8d6e1..452d041d 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -855,6 +855,15 @@ func (a *Account) trackAPIErr() { const badAPIRequestT = "Malformed JetStream API Request: %q" +// Helper function to check on JetStream being enabled but also on status of leafnodes +// If the local account is not enabled but does have leafnode connectivity we will not +// want to error immediately and let the other side decide. +func (a *Account) checkJetStream() (enabled, shouldError bool) { + a.mu.RLock() + defer a.mu.RUnlock() + return a.js != nil, a.nleafs == 0 +} + // Request for current usage and limits for this account. func (s *Server) jsAccountInfoRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) { if c == nil || !s.JetStreamEnabled() { @@ -885,10 +894,8 @@ func (s *Server) jsAccountInfoRequest(sub *subscription, c *client, subject, rep } } - if !acc.JetStreamEnabled() { - // This local account is not enabled, but we need to check of we are using leafnodes to bridge - // this account and if so not answer locally. - if acc.NumLeafNodes() > 0 { + if hasJS, doErr := acc.checkJetStream(); !hasJS { + if !doErr { return } resp.Error = jsNotEnabledErr @@ -1162,10 +1169,8 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, subject, re } } - if !acc.JetStreamEnabled() { - // This local account is not enabled, but we need to check of we are using leafnodes to bridge - // this account and if so not answer locally. - if acc.NumLeafNodes() == 0 { + if hasJS, doErr := acc.checkJetStream(); !hasJS { + if doErr { resp.Error = jsNotEnabledErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) } @@ -1344,10 +1349,8 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, subject, re } } - if !acc.JetStreamEnabled() { - // This local account is not enabled, but we need to check of we are using leafnodes to bridge - // this account and if so not answer locally. - if acc.NumLeafNodes() == 0 { + if hasJS, doErr := acc.checkJetStream(); !hasJS { + if doErr { resp.Error = jsNotEnabledErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) } @@ -1428,10 +1431,8 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, subject, rep } } - if !acc.JetStreamEnabled() { - // This local account is not enabled, but we need to check of we are using leafnodes to bridge - // this account and if so not answer locally. - if acc.NumLeafNodes() == 0 { + if hasJS, doErr := acc.checkJetStream(); !hasJS { + if doErr { resp.Error = jsNotEnabledErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) } @@ -1557,10 +1558,8 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, subject, repl } } - if !acc.JetStreamEnabled() { - // This local account is not enabled, but we need to check of we are using leafnodes to bridge - // this account and if so not answer locally. - if acc.NumLeafNodes() == 0 { + if hasJS, doErr := acc.checkJetStream(); !hasJS { + if doErr { resp.Error = jsNotEnabledErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) } @@ -1645,10 +1644,8 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, subject, repl if isLeader && sa == nil { // We can't find the stream, so mimic what would be the errors below. - if !acc.JetStreamEnabled() { - // This local account is not enabled, but we need to check of we are using leafnodes to bridge - // this account and if so not answer locally. - if acc.NumLeafNodes() == 0 { + if hasJS, doErr := acc.checkJetStream(); !hasJS { + if doErr { resp.Error = jsNotEnabledErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) } @@ -1681,10 +1678,8 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, subject, repl } } - if !acc.JetStreamEnabled() { - // This local account is not enabled, but we need to check of we are using leafnodes to bridge - // this account and if so not answer locally. - if acc.NumLeafNodes() == 0 { + if hasJS, doErr := acc.checkJetStream(); !hasJS { + if doErr { resp.Error = jsNotEnabledErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) } @@ -1778,10 +1773,8 @@ func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, sub return } - if !acc.JetStreamEnabled() { - // This local account is not enabled, but we need to check of we are using leafnodes to bridge - // this account and if so not answer locally. - if acc.NumLeafNodes() == 0 { + if hasJS, doErr := acc.checkJetStream(); !hasJS { + if doErr { resp.Error = jsNotEnabledErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) } @@ -1889,10 +1882,8 @@ func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, s return } - if !acc.JetStreamEnabled() { - // This local account is not enabled, but we need to check of we are using leafnodes to bridge - // this account and if so not answer locally. - if acc.NumLeafNodes() == 0 { + if hasJS, doErr := acc.checkJetStream(); !hasJS { + if doErr { resp.Error = jsNotEnabledErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) } @@ -1967,10 +1958,8 @@ func (s *Server) jsStreamRemovePeerRequest(sub *subscription, c *client, subject return } - if !acc.JetStreamEnabled() { - // This local account is not enabled, but we need to check of we are using leafnodes to bridge - // this account and if so not answer locally. - if acc.NumLeafNodes() == 0 { + if hasJS, doErr := acc.checkJetStream(); !hasJS { + if doErr { resp.Error = jsNotEnabledErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) } @@ -2219,15 +2208,14 @@ func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, subject, re } } - if !acc.JetStreamEnabled() { - // This local account is not enabled, but we need to check of we are using leafnodes to bridge - // this account and if so not answer locally. - if acc.NumLeafNodes() == 0 { + if hasJS, doErr := acc.checkJetStream(); !hasJS { + if doErr { resp.Error = jsNotEnabledErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) } return } + if !isEmptyRequest(msg) { resp.Error = jsNotEmptyRequestErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) @@ -2292,10 +2280,8 @@ func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, subject, reply if isLeader && sa == nil { // We can't find the stream, so mimic what would be the errors below. - if !acc.JetStreamEnabled() { - // This local account is not enabled, but we need to check of we are using leafnodes to bridge - // this account and if so not answer locally. - if acc.NumLeafNodes() == 0 { + if hasJS, doErr := acc.checkJetStream(); !hasJS { + if doErr { resp.Error = jsNotEnabledErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) } @@ -2322,10 +2308,8 @@ func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, subject, reply } } - if !acc.JetStreamEnabled() { - // This local account is not enabled, but we need to check of we are using leafnodes to bridge - // this account and if so not answer locally. - if acc.NumLeafNodes() == 0 { + if hasJS, doErr := acc.checkJetStream(); !hasJS { + if doErr { resp.Error = jsNotEnabledErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) } @@ -2405,10 +2389,8 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, subject, reply st if isLeader && sa == nil { // We can't find the stream, so mimic what would be the errors below. - if !acc.JetStreamEnabled() { - // This local account is not enabled, but we need to check of we are using leafnodes to bridge - // this account and if so not answer locally. - if acc.NumLeafNodes() == 0 { + if hasJS, doErr := acc.checkJetStream(); !hasJS { + if doErr { resp.Error = jsNotEnabledErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) } @@ -2435,10 +2417,8 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, subject, reply st } } - if !acc.JetStreamEnabled() { - // This local account is not enabled, but we need to check of we are using leafnodes to bridge - // this account and if so not answer locally. - if acc.NumLeafNodes() == 0 { + if hasJS, doErr := acc.checkJetStream(); !hasJS { + if doErr { resp.Error = jsNotEnabledErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) } @@ -2508,10 +2488,8 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, subject, rep if isLeader && sa == nil { // We can't find the stream, so mimic what would be the errors below. - if !acc.JetStreamEnabled() { - // This local account is not enabled, but we need to check of we are using leafnodes to bridge - // this account and if so not answer locally. - if acc.NumLeafNodes() == 0 { + if hasJS, doErr := acc.checkJetStream(); !hasJS { + if doErr { resp.Error = jsNotEnabledErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) } @@ -2546,10 +2524,8 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, subject, rep } } - if !acc.JetStreamEnabled() { - // This local account is not enabled, but we need to check of we are using leafnodes to bridge - // this account and if so not answer locally. - if acc.NumLeafNodes() == 0 { + if hasJS, doErr := acc.checkJetStream(); !hasJS { + if doErr { resp.Error = jsNotEnabledErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) } @@ -2630,10 +2606,8 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, subject, r return } - if !acc.JetStreamEnabled() { - // This local account is not enabled, but we need to check of we are using leafnodes to bridge - // this account and if so not answer locally. - if acc.NumLeafNodes() == 0 { + if hasJS, doErr := acc.checkJetStream(); !hasJS { + if doErr { resp.Error = jsNotEnabledErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) } @@ -2869,10 +2843,8 @@ func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, subject, return } - if !acc.JetStreamEnabled() { - // This local account is not enabled, but we need to check of we are using leafnodes to bridge - // this account and if so not answer locally. - if acc.NumLeafNodes() == 0 { + if hasJS, doErr := acc.checkJetStream(); !hasJS { + if doErr { resp.Error = jsNotEnabledErr s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp)) } @@ -3097,10 +3069,8 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, subject, reply s } } - if !acc.JetStreamEnabled() { - // This local account is not enabled, but we need to check of we are using leafnodes to bridge - // this account and if so not answer locally. - if acc.NumLeafNodes() == 0 { + if hasJS, doErr := acc.checkJetStream(); !hasJS { + if doErr { resp.Error = jsNotEnabledErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) } @@ -3198,10 +3168,8 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, subject, r } } - if !acc.JetStreamEnabled() { - // This local account is not enabled, but we need to check of we are using leafnodes to bridge - // this account and if so not answer locally. - if acc.NumLeafNodes() == 0 { + if hasJS, doErr := acc.checkJetStream(); !hasJS { + if doErr { resp.Error = jsNotEnabledErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) } @@ -3321,10 +3289,8 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, subject, re } } - if !acc.JetStreamEnabled() { - // This local account is not enabled, but we need to check of we are using leafnodes to bridge - // this account and if so not answer locally. - if acc.NumLeafNodes() == 0 { + if hasJS, doErr := acc.checkJetStream(); !hasJS { + if doErr { resp.Error = jsNotEnabledErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) } @@ -3416,10 +3382,8 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, subject, re if isLeader && ca == nil { // We can't find the consumer, so mimic what would be the errors below. - if !acc.JetStreamEnabled() { - // This local account is not enabled, but we need to check of we are using leafnodes to bridge - // this account and if so not answer locally. - if acc.NumLeafNodes() == 0 { + if hasJS, doErr := acc.checkJetStream(); !hasJS { + if doErr { resp.Error = jsNotEnabledErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) } @@ -3525,10 +3489,8 @@ func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, subject, } } - if !acc.JetStreamEnabled() { - // This local account is not enabled, but we need to check of we are using leafnodes to bridge - // this account and if so not answer locally. - if acc.NumLeafNodes() == 0 { + if hasJS, doErr := acc.checkJetStream(); !hasJS { + if doErr { resp.Error = jsNotEnabledErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) }