From a9b8948abeb1bf319047c8a8a5c964f6756fd417 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 27 Jan 2021 13:34:00 -0800 Subject: [PATCH] Add in tracking for quorum in raft and do auto stepdown. Also added in API responses when no leader is present for meta, streams and consumers. Signed-off-by: Derek Collison --- go.mod | 2 +- go.sum | 4 + server/events.go | 3 + server/jetstream_api.go | 336 +++++++++++++++++++++-- server/jetstream_cluster.go | 28 +- server/raft.go | 46 +++- test/jetstream_cluster_test.go | 117 +++++++- vendor/github.com/nats-io/nats.go/js.go | 11 +- vendor/github.com/nats-io/nats.go/jsm.go | 53 +++- vendor/modules.txt | 2 +- 10 files changed, 540 insertions(+), 62 deletions(-) diff --git a/go.mod b/go.mod index ba5dcfad..b5c73c70 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/klauspost/compress v1.11.7 github.com/minio/highwayhash v1.0.0 github.com/nats-io/jwt/v2 v2.0.0-20210125223648-1c24d462becc - github.com/nats-io/nats.go v1.10.1-0.20210122204956-b8ea7fc17ea6 + github.com/nats-io/nats.go v1.10.1-0.20210127212649-5b4924938a9a github.com/nats-io/nkeys v0.2.0 github.com/nats-io/nuid v1.0.1 golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 diff --git a/go.sum b/go.sum index c9554443..5e4793ed 100644 --- a/go.sum +++ b/go.sum @@ -29,6 +29,10 @@ github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv github.com/nats-io/nats.go v1.10.1-0.20201021145452-94be476ad6e0/go.mod h1:VU2zERjp8xmF+Lw2NH4u2t5qWZxwc7jB3+7HVMWQXPI= github.com/nats-io/nats.go v1.10.1-0.20210122204956-b8ea7fc17ea6 h1:cpS+9uyfHXvRG/Q+WcDd3KXRgPa9fo9tDbIeDHCxYAg= github.com/nats-io/nats.go v1.10.1-0.20210122204956-b8ea7fc17ea6/go.mod h1:Sa3kLIonafChP5IF0b55i9uvGR10I3hPETFbi4+9kOI= +github.com/nats-io/nats.go v1.10.1-0.20210123004354-58bf69ad2df8 h1:yxExhj0DStfAEN5lGy6pyL4WJE+J8aKn50xoKt9hFdA= +github.com/nats-io/nats.go v1.10.1-0.20210123004354-58bf69ad2df8/go.mod h1:Sa3kLIonafChP5IF0b55i9uvGR10I3hPETFbi4+9kOI= +github.com/nats-io/nats.go v1.10.1-0.20210127212649-5b4924938a9a h1:EjwBk6T/arS7o0ZGdMgdzYrQHeUITT1GHf3cFQFtr3I= +github.com/nats-io/nats.go v1.10.1-0.20210127212649-5b4924938a9a/go.mod h1:Sa3kLIonafChP5IF0b55i9uvGR10I3hPETFbi4+9kOI= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.2.0 h1:WXKF7diOaPU9cJdLD7nuzwasQy9vT1tBqzXZZf3AMJM= diff --git a/server/events.go b/server/events.go index 96e08b0f..a8b637c5 100644 --- a/server/events.go +++ b/server/events.go @@ -417,6 +417,9 @@ func (s *Server) sendInternalMsg(sub, rply string, si *ServerInfo, msg interface // Locked version of checking if events system running. Also checks server. func (s *Server) eventsRunning() bool { + if s == nil { + return false + } s.mu.Lock() er := s.running && s.eventsEnabled() s.mu.Unlock() diff --git a/server/jetstream_api.go b/server/jetstream_api.go index ccee58a8..d00aa90f 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -476,6 +476,7 @@ var ( jsNoConsumerErr = &ApiError{Code: 404, Description: "consumer not found"} jsStreamMismatchErr = &ApiError{Code: 400, Description: "stream name in subject does not match request"} jsNoClusterSupportErr = &ApiError{Code: 503, Description: "not currently supported in clustered mode"} + jsClusterNotAvailErr = &ApiError{Code: 503, Description: "JetStream system temporarily unavailable"} ) // For easier handling of exports and imports. @@ -572,7 +573,7 @@ const badAPIRequestT = "Malformed JetStream API Request: %q" // Request for current usage and limits for this account. func (s *Server) jsAccountInfoRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) { - if c == nil || !s.jetStreamReadAllowed() { + if c == nil { return } ci, acc, _, msg, err := s.getRequestInfo(c, rmsg) @@ -582,6 +583,24 @@ func (s *Server) jsAccountInfoRequest(sub *subscription, c *client, subject, rep } var resp = JSApiAccountInfoResponse{ApiResponse: ApiResponse{Type: JSApiAccountInfoResponseType}} + + // Determine if we should proceed here when we are in clustered mode. + if s.JetStreamIsClustered() { + js, cc := s.getJetStreamCluster() + if js == nil || cc == nil { + return + } + if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ { + resp.Error = jsClusterNotAvailErr + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + // Make sure we are meta leader. + if !s.JetStreamIsLeader() { + return + } + } + if !acc.JetStreamEnabled() { resp.Error = jsNotEnabledErr } else { @@ -826,7 +845,7 @@ func jsNotFoundError(err error) *ApiError { // Request to create a stream. func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) { - if c == nil || !s.JetStreamIsLeader() { + if c == nil { return } ci, acc, _, msg, err := s.getRequestInfo(c, rmsg) @@ -836,6 +855,24 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, subject, re } var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}} + + // Determine if we should proceed here when we are in clustered mode. + if s.JetStreamIsClustered() { + js, cc := s.getJetStreamCluster() + if js == nil || cc == nil { + return + } + if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ { + resp.Error = jsClusterNotAvailErr + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + // Make sure we are meta leader. + if !s.JetStreamIsLeader() { + return + } + } + if !acc.JetStreamEnabled() { resp.Error = jsNotEnabledErr s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) @@ -871,7 +908,7 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, subject, re // Request to update a stream. func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) { - if c == nil || !s.JetStreamIsLeader() { + if c == nil { return } @@ -882,6 +919,24 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, subject, re } var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}} + + // Determine if we should proceed here when we are in clustered mode. + if s.JetStreamIsClustered() { + js, cc := s.getJetStreamCluster() + if js == nil || cc == nil { + return + } + if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ { + resp.Error = jsClusterNotAvailErr + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + // Make sure we are meta leader. + if !s.JetStreamIsLeader() { + return + } + } + if !acc.JetStreamEnabled() { resp.Error = jsNotEnabledErr s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) @@ -918,7 +973,7 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, subject, re // Request for the list of all stream names. func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) { - if c == nil || !s.JetStreamIsLeader() { + if c == nil { return } ci, acc, _, msg, err := s.getRequestInfo(c, rmsg) @@ -928,6 +983,24 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, subject, rep } var resp = JSApiStreamNamesResponse{ApiResponse: ApiResponse{Type: JSApiStreamNamesResponseType}} + + // Determine if we should proceed here when we are in clustered mode. + if s.JetStreamIsClustered() { + js, cc := s.getJetStreamCluster() + if js == nil || cc == nil { + return + } + if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ { + resp.Error = jsClusterNotAvailErr + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + // Make sure we are meta leader. + if !s.JetStreamIsLeader() { + return + } + } + if !acc.JetStreamEnabled() { resp.Error = jsNotEnabledErr s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) @@ -1021,10 +1094,9 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, subject, rep // Request for the list of all detailed stream info. // TODO(dlc) - combine with above long term func (s *Server) jsStreamListRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) { - if c == nil || !s.JetStreamIsLeader() { + if c == nil { return } - ci, acc, _, msg, err := s.getRequestInfo(c, rmsg) if err != nil { s.Warnf(badAPIRequestT, msg) @@ -1036,6 +1108,23 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, subject, repl Streams: []*StreamInfo{}, } + // Determine if we should proceed here when we are in clustered mode. + if s.JetStreamIsClustered() { + js, cc := s.getJetStreamCluster() + if js == nil || cc == nil { + return + } + if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ { + resp.Error = jsClusterNotAvailErr + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + // Make sure we are meta leader. + if !s.JetStreamIsLeader() { + return + } + } + if !acc.JetStreamEnabled() { resp.Error = jsNotEnabledErr s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) @@ -1102,12 +1191,17 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, subject, repl // If we are in clustered mode we need to be the stream leader to proceed. if s.JetStreamIsClustered() { - // Check to make sure the consumer is assigned. + // Check to make sure the stream is assigned. js, cc := s.getJetStreamCluster() if js == nil || cc == nil { return } - jsEnabled := acc.JetStreamEnabled() + + if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ { + resp.Error = jsClusterNotAvailErr + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } js.mu.RLock() isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, name) @@ -1115,7 +1209,7 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, subject, repl if isLeader && sa == nil { // We can't find the stream, so mimic what would be the errors below. - if !jsEnabled { + if !acc.JetStreamEnabled() { resp.Error = jsNotEnabledErr s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return @@ -1128,7 +1222,14 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, subject, repl return } - // We have the stream assigned, only the stream leader should answer. + // Check to see if we are a member of the group and if the group has no leader. + if js.isGroupLeaderless(sa.Group) { + resp.Error = jsClusterNotAvailErr + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + // We have the stream assigned and a leader, so only the stream leader should answer. if !acc.JetStreamIsStreamLeader(name) { return } @@ -1183,7 +1284,7 @@ func isEmptyRequest(req []byte) bool { // Request to delete a stream. func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) { - if c == nil || !s.JetStreamIsLeader() { + if c == nil { return } ci, acc, _, msg, err := s.getRequestInfo(c, rmsg) @@ -1193,6 +1294,24 @@ func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, subject, re } var resp = JSApiStreamDeleteResponse{ApiResponse: ApiResponse{Type: JSApiStreamDeleteResponseType}} + + // Determine if we should proceed here when we are in clustered mode. + if s.JetStreamIsClustered() { + js, cc := s.getJetStreamCluster() + if js == nil || cc == nil { + return + } + if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ { + resp.Error = jsClusterNotAvailErr + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + // Make sure we are meta leader. + if !s.JetStreamIsLeader() { + return + } + } + if !acc.JetStreamEnabled() { resp.Error = jsNotEnabledErr s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) @@ -1246,12 +1365,54 @@ func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, subject, reply stream := tokenAt(subject, 6) + var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}} + // If we are in clustered mode we need to be the stream leader to proceed. - if s.JetStreamIsClustered() && !acc.JetStreamIsStreamLeader(stream) { - return + if s.JetStreamIsClustered() { + // Check to make sure the stream is assigned. + js, cc := s.getJetStreamCluster() + if js == nil || cc == nil { + return + } + + if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ { + resp.Error = jsClusterNotAvailErr + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + js.mu.RLock() + isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, stream) + js.mu.RUnlock() + + if isLeader && sa == nil { + // We can't find the stream, so mimic what would be the errors below. + if !acc.JetStreamEnabled() { + resp.Error = jsNotEnabledErr + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + // No stream present. + resp.Error = jsNotFoundError(ErrJetStreamStreamNotFound) + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } else if sa == nil { + return + } + + // Check to see if we are a member of the group and if the group has no leader. + if js.isGroupLeaderless(sa.Group) { + resp.Error = jsClusterNotAvailErr + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + // We have the stream assigned and a leader, so only the stream leader should answer. + if !acc.JetStreamIsStreamLeader(stream) { + return + } } - var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}} if !acc.JetStreamEnabled() { resp.Error = jsNotEnabledErr s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) @@ -1358,12 +1519,54 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, subject, rep stream := streamNameFromSubject(subject) + var resp = JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}} + // If we are in clustered mode we need to be the stream leader to proceed. - if s.JetStreamIsClustered() && !acc.JetStreamIsStreamLeader(stream) { - return + if s.JetStreamIsClustered() { + // Check to make sure the stream is assigned. + js, cc := s.getJetStreamCluster() + if js == nil || cc == nil { + return + } + + if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ { + resp.Error = jsClusterNotAvailErr + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + js.mu.RLock() + isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, stream) + js.mu.RUnlock() + + if isLeader && sa == nil { + // We can't find the stream, so mimic what would be the errors below. + if !acc.JetStreamEnabled() { + resp.Error = jsNotEnabledErr + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + // No stream present. + resp.Error = jsNotFoundError(ErrJetStreamStreamNotFound) + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } else if sa == nil { + return + } + + // Check to see if we are a member of the group and if the group has no leader. + if js.isGroupLeaderless(sa.Group) { + resp.Error = jsClusterNotAvailErr + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + // We have the stream assigned and a leader, so only the stream leader should answer. + if !acc.JetStreamIsStreamLeader(stream) { + return + } } - var resp = JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}} if !acc.JetStreamEnabled() { resp.Error = jsNotEnabledErr s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) @@ -1810,7 +2013,7 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, subject, } func (s *Server) jsConsumerCreate(sub *subscription, c *client, subject, reply string, rmsg []byte, expectDurable bool) { - if c == nil || !s.JetStreamIsLeader() { + if c == nil { return } @@ -1820,6 +2023,25 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, subject, reply s return } + var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}} + + // Determine if we should proceed here when we are in clustered mode. + if s.JetStreamIsClustered() { + js, cc := s.getJetStreamCluster() + if js == nil || cc == nil { + return + } + if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ { + resp.Error = jsClusterNotAvailErr + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + // Make sure we are meta leader. + if !s.JetStreamIsLeader() { + return + } + } + var streamName string if expectDurable { streamName = tokenAt(subject, 6) @@ -1827,7 +2049,6 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, subject, reply s streamName = tokenAt(subject, 5) } - var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}} if !acc.JetStreamEnabled() { resp.Error = jsNotEnabledErr s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) @@ -1900,7 +2121,7 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, subject, reply s // Request for the list of all consumer names. func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) { - if c == nil || !s.JetStreamIsLeader() { + if c == nil { return } ci, acc, _, msg, err := s.getRequestInfo(c, rmsg) @@ -1913,6 +2134,24 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, subject, r ApiResponse: ApiResponse{Type: JSApiConsumerNamesResponseType}, Consumers: []string{}, } + + // Determine if we should proceed here when we are in clustered mode. + if s.JetStreamIsClustered() { + js, cc := s.getJetStreamCluster() + if js == nil || cc == nil { + return + } + if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ { + resp.Error = jsClusterNotAvailErr + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + // Make sure we are meta leader. + if !s.JetStreamIsLeader() { + return + } + } + if !acc.JetStreamEnabled() { resp.Error = jsNotEnabledErr s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) @@ -2000,7 +2239,7 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, subject, r // Request for the list of all detailed consumer information. func (s *Server) jsConsumerListRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) { - if c == nil || !s.JetStreamIsLeader() { + if c == nil { return } @@ -2015,6 +2254,23 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, subject, re Consumers: []*ConsumerInfo{}, } + // Determine if we should proceed here when we are in clustered mode. + if s.JetStreamIsClustered() { + js, cc := s.getJetStreamCluster() + if js == nil || cc == nil { + return + } + if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ { + resp.Error = jsClusterNotAvailErr + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + // Make sure we are meta leader. + if !s.JetStreamIsLeader() { + return + } + } + if !acc.JetStreamEnabled() { resp.Error = jsNotEnabledErr s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) @@ -2095,7 +2351,12 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, subject, re if js == nil || cc == nil { return } - jsEnabled := acc.JetStreamEnabled() + + if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ { + resp.Error = jsClusterNotAvailErr + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } js.mu.RLock() isLeader, sa, ca := cc.isLeader(), js.streamAssignment(acc.Name, stream), js.consumerAssignment(acc.Name, stream, consumer) @@ -2103,7 +2364,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, subject, re if isLeader && ca == nil { // We can't find the consumer, so mimic what would be the errors below. - if !jsEnabled { + if !acc.JetStreamEnabled() { resp.Error = jsNotEnabledErr s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return @@ -2121,7 +2382,14 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, subject, re return } - // We have the consumer assigned, only the consumer leader should answer. + // Check to see if we are a member of the group and if the group has no leader. + if js.isGroupLeaderless(ca.Group) { + resp.Error = jsClusterNotAvailErr + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + // We have the consumer assigned and a leader, so only the consumer leader should answer. if !acc.JetStreamIsConsumerLeader(stream, consumer) { return } @@ -2157,7 +2425,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, subject, re // Request to delete an Consumer. func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) { - if c == nil || !s.JetStreamIsLeader() { + if c == nil { return } ci, acc, _, msg, err := s.getRequestInfo(c, rmsg) @@ -2167,6 +2435,24 @@ func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, subject, } var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerDeleteResponseType}} + + // Determine if we should proceed here when we are in clustered mode. + if s.JetStreamIsClustered() { + js, cc := s.getJetStreamCluster() + if js == nil || cc == nil { + return + } + if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ { + resp.Error = jsClusterNotAvailErr + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + // Make sure we are meta leader. + if !s.JetStreamIsLeader() { + return + } + } + if !acc.JetStreamEnabled() { resp.Error = jsNotEnabledErr s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 6444d710..e2f01c7f 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -388,11 +388,6 @@ func (s *Server) JetStreamIsConsumerLeader(account, stream, consumer string) boo return cc.isConsumerLeader(account, stream, consumer) } -func (s *Server) jetStreamReadAllowed() bool { - // FIXME(dlc) - Add in read allowed mode for readonly API. - return s.JetStreamIsLeader() -} - func (s *Server) enableJetStreamClustering() error { if !s.isRunning() { return nil @@ -483,6 +478,27 @@ func (js *jetStream) server() *Server { return s } +// Will respond iff we are a member and we know we have no leader. +func (js *jetStream) isGroupLeaderless(rg *raftGroup) bool { + if rg == nil { + return false + } + js.mu.RLock() + defer js.mu.RUnlock() + + cc := js.cluster + + // If we are not a member we can not say.. + if !rg.isMember(cc.meta.ID()) { + return false + } + // Single peer groups always have a leader if we are here. + if rg.node == nil { + return false + } + return rg.node.GroupLeader() == _EMPTY_ +} + func (s *Server) JetStreamIsStreamAssigned(account, stream string) bool { js, cc := s.getJetStreamCluster() if js == nil || cc == nil { @@ -559,7 +575,7 @@ func (cc *jetStreamCluster) isStreamLeader(account, stream string) bool { ourID := cc.meta.ID() for _, peer := range rg.Peers { if peer == ourID { - if len(rg.Peers) == 1 || rg.node.Leader() { + if len(rg.Peers) == 1 || rg.node != nil && rg.node.Leader() { return true } } diff --git a/server/raft.go b/server/raft.go index 04c09dc1..76c41733 100644 --- a/server/raft.go +++ b/server/raft.go @@ -111,6 +111,7 @@ type raft struct { peers map[string]*lps acks map[uint64]map[string]struct{} elect *time.Timer + active time.Time term uint64 pterm uint64 pindex uint64 @@ -177,6 +178,7 @@ const ( minCampaignTimeout = 50 * time.Millisecond maxCampaignTimeout = 4 * minCampaignTimeout hbInterval = 200 * time.Millisecond + lostQuorumInterval = hbInterval * 3 ) type RaftConfig struct { @@ -1165,7 +1167,13 @@ func (n *raft) runAsLeader() { } n.sendAppendEntry(entries) case <-hb.C: - n.sendHeartbeat() + if n.notActive() { + n.sendHeartbeat() + } + if n.lostQuorum() { + n.switchToFollower(noLeader) + return + } case vresp := <-n.votes: if vresp.term > n.currentTerm() { n.switchToFollower(noLeader) @@ -1188,6 +1196,29 @@ func (n *raft) runAsLeader() { } } +func (n *raft) lostQuorum() bool { + n.RLock() + defer n.RUnlock() + now, nc := time.Now().UnixNano(), 1 + for _, peer := range n.peers { + if now-peer.ts < int64(lostQuorumInterval) { + nc++ + if nc >= n.qn { + return false + } + } + } + return true +} + +// Check for being not active in terms of sending entries. +// Used in determining if we need to send a heartbeat. +func (n *raft) notActive() bool { + n.RLock() + defer n.RUnlock() + return time.Since(n.active) > hbInterval +} + // Return our current term. func (n *raft) currentTerm() uint64 { n.RLock() @@ -1699,7 +1730,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { if maybeLeader == n.id { n.campaign() } - // These will not have commits follow them. We also know this will be by itself. + // This will not have commits follow. We also know this will be by itself. n.commit = ae.pindex + 1 } case EntryAddPeer: @@ -1800,6 +1831,7 @@ func (n *raft) sendAppendEntry(entries []*Entry) { n.sindex = n.pindex } } + n.active = time.Now() } n.sendRPC(n.asubj, n.areply, ae.buf) } @@ -2140,8 +2172,10 @@ func (n *raft) switchState(state RaftState) { n.writeTermVote() } -const noLeader = _EMPTY_ -const noVote = _EMPTY_ +const ( + noLeader = _EMPTY_ + noVote = _EMPTY_ +) func (n *raft) switchToFollower(leader string) { n.notice("Switching to follower") @@ -2152,9 +2186,11 @@ func (n *raft) switchToFollower(leader string) { } func (n *raft) switchToCandidate() { - n.notice("Switching to candidate") n.Lock() defer n.Unlock() + if n.state != Candidate { + n.notice("Switching to candidate") + } // Increment the term. n.term++ // Clear current Leader. diff --git a/test/jetstream_cluster_test.go b/test/jetstream_cluster_test.go index 53c4d3e0..36b42e31 100644 --- a/test/jetstream_cluster_test.go +++ b/test/jetstream_cluster_test.go @@ -2224,6 +2224,115 @@ func TestJetStreamClusterStreamTemplates(t *testing.T) { } } +func TestJetStreamClusterNoQuorumStepdown(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + // Client based API + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + if _, err := js.AddStream(&nats.StreamConfig{Name: "NO-Q", Replicas: 2}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + payload := []byte("Hello JSC") + for i := 0; i < 10; i++ { + if _, err := js.Publish("NO-Q", payload); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } + + sub, err := js.SubscribeSync("NO-Q") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + ci, err := sub.ConsumerInfo() + if err != nil || ci == nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Shutdown the non-leader. + c.randomNonStreamLeader("$G", "NO-Q").Shutdown() + + // This should eventually have us stepdown as leader since we would have lost quorum. + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + if sl := c.streamLeader("$G", "NO-Q"); sl == nil { + return nil + } + return fmt.Errorf("Still have leader for stream") + }) + + notAvailableErr := func(err error) bool { + return err != nil && strings.Contains(err.Error(), "unavailable") + } + + // Expect to get errors here. + if _, err := js.StreamInfo("NO-Q"); !notAvailableErr(err) { + t.Fatalf("Expected an 'unavailable' error, got %v", err) + } + + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + if cl := c.consumerLeader("$G", "NO-Q", ci.Name); cl == nil { + return nil + } + return fmt.Errorf("Still have leader for consumer") + }) + + if _, err = js.ConsumerInfo("NO-Q", ci.Name); !notAvailableErr(err) { + t.Fatalf("Expected an 'unavailable' error, got %v", err) + } + if _, err := sub.ConsumerInfo(); !notAvailableErr(err) { + t.Fatalf("Expected an 'unavailable' error, got %v", err) + } + + // Now let's take out the other non meta-leader server. + // We should get same error for general API calls. + c.randomNonLeader().Shutdown() + c.expectNoLeader() + + // Now make sure the general JS API responds with system unavailable. + if _, err = js.AccountInfo(); !notAvailableErr(err) { + t.Fatalf("Expected an 'unavailable' error, got %v", err) + } + if _, err := js.AddStream(&nats.StreamConfig{Name: "NO-Q33", Replicas: 2}); !notAvailableErr(err) { + t.Fatalf("Expected an 'unavailable' error, got %v", err) + } + if _, err := js.UpdateStream(&nats.StreamConfig{Name: "NO-Q33", Replicas: 2}); !notAvailableErr(err) { + t.Fatalf("Expected an 'unavailable' error, got %v", err) + } + if err := js.DeleteStream("NO-Q"); !notAvailableErr(err) { + t.Fatalf("Expected an 'unavailable' error, got %v", err) + } + if _, err := js.StreamInfo("NO-Q"); !notAvailableErr(err) { + t.Fatalf("Expected an 'unavailable' error, got %v", err) + } + if err := js.PurgeStream("NO-Q"); !notAvailableErr(err) { + t.Fatalf("Expected an 'unavailable' error, got %v", err) + } + if err := js.DeleteMsg("NO-Q", 1); !notAvailableErr(err) { + t.Fatalf("Expected an 'unavailable' error, got %v", err) + } + // Consumer + if _, err := js.AddConsumer("NO-Q", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy}); !notAvailableErr(err) { + t.Fatalf("Expected an 'unavailable' error, got %v", err) + } + if err := js.DeleteConsumer("NO-Q", "dlc"); !notAvailableErr(err) { + t.Fatalf("Expected an 'unavailable' error, got %v", err) + } + if _, err := js.ConsumerInfo("NO-Q", "dlc"); !notAvailableErr(err) { + t.Fatalf("Expected an 'unavailable' error, got %v", err) + } + // Listers + if sl := js.NewStreamLister(); sl.Next() || !notAvailableErr(sl.Err()) { + t.Fatalf("Expected an 'unavailable' error, got %v", sl.Err()) + } + if cl := js.NewConsumerLister("NO-Q"); cl.Next() || !notAvailableErr(cl.Err()) { + t.Fatalf("Expected an 'unavailable' error, got %v", cl.Err()) + } +} + func TestJetStreamClusterStreamPerf(t *testing.T) { // Comment out to run, holding place for now. skip(t) @@ -2516,7 +2625,7 @@ func (c *cluster) waitOnServerCurrent(s *server.Server) { func (c *cluster) randomNonLeader() *server.Server { // range should randomize.. but.. for _, s := range c.servers { - if !s.JetStreamIsLeader() { + if s.Running() && !s.JetStreamIsLeader() { return s } } @@ -2539,10 +2648,12 @@ func (c *cluster) expectNoLeader() { c.t.Helper() expires := time.Now().Add(maxElectionTimeout) for time.Now().Before(expires) { - if c.leader() != nil { - c.t.Fatalf("Expected no leader but have one") + if c.leader() == nil { + return } + time.Sleep(10 * time.Millisecond) } + c.t.Fatalf("Expected no leader but have one") } func (c *cluster) waitOnLeader() { diff --git a/vendor/github.com/nats-io/nats.go/js.go b/vendor/github.com/nats-io/nats.go/js.go index fbb2667e..2e35b189 100644 --- a/vendor/github.com/nats-io/nats.go/js.go +++ b/vendor/github.com/nats-io/nats.go/js.go @@ -131,20 +131,13 @@ func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) { return js, nil } - resp, err := nc.Request(js.apiSubj(apiAccountInfo), nil, js.wait) - if err != nil { + if _, err := js.AccountInfo(); err != nil { if err == ErrNoResponders { err = ErrJetStreamNotEnabled } return nil, err } - var info accountInfoResponse - if err := json.Unmarshal(resp.Data, &info); err != nil { - return nil, err - } - if info.Error != nil && info.Error.Code == 503 { - return nil, ErrJetStreamNotEnabled - } + return js, nil } diff --git a/vendor/github.com/nats-io/nats.go/jsm.go b/vendor/github.com/nats-io/nats.go/jsm.go index 891ee34d..d6a0b136 100644 --- a/vendor/github.com/nats-io/nats.go/jsm.go +++ b/vendor/github.com/nats-io/nats.go/jsm.go @@ -55,6 +55,9 @@ type JetStreamManager interface { // NewConsumerLister is used to return pages of ConsumerInfo objects. NewConsumerLister(stream string) *ConsumerLister + + // AccountInfo retrieves info about the JetStream usage from an account. + AccountInfo() (*AccountInfo, error) } // StreamConfig will determine the properties for a stream. @@ -102,22 +105,48 @@ type apiPagedRequest struct { Offset int `json:"offset"` } -// accountStats returns current statistics about the account's JetStream usage. -type accountStats struct { - Memory uint64 `json:"memory"` - Store uint64 `json:"storage"` - Streams int `json:"streams"` - Limits struct { - MaxMemory int64 `json:"max_memory"` - MaxStore int64 `json:"max_storage"` - MaxStreams int `json:"max_streams"` - MaxConsumers int `json:"max_consumers"` - } `json:"limits"` +// AccountInfo contains info about the JetStream usage from the current account. +type AccountInfo struct { + Memory uint64 `json:"memory"` + Store uint64 `json:"storage"` + Streams int `json:"streams"` + Limits AccountLimits `json:"limits"` +} + +// AccountLimits includes the JetStream limits of the current account. +type AccountLimits struct { + MaxMemory int64 `json:"max_memory"` + MaxStore int64 `json:"max_storage"` + MaxStreams int `json:"max_streams"` + MaxConsumers int `json:"max_consumers"` } type accountInfoResponse struct { apiResponse - accountStats + AccountInfo +} + +// AccountInfo retrieves info about the JetStream usage from the current account. +func (js *js) AccountInfo() (*AccountInfo, error) { + resp, err := js.nc.Request(js.apiSubj(apiAccountInfo), nil, js.wait) + if err != nil { + return nil, err + } + var info accountInfoResponse + if err := json.Unmarshal(resp.Data, &info); err != nil { + return nil, err + } + if info.Error != nil { + var err error + if strings.Contains(info.Error.Description, "not enabled for") { + err = ErrJetStreamNotEnabled + } else { + err = errors.New(info.Error.Description) + } + return nil, err + } + + return &info.AccountInfo, nil } type createConsumerRequest struct { diff --git a/vendor/modules.txt b/vendor/modules.txt index c4e2429a..af89062a 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -7,7 +7,7 @@ github.com/minio/highwayhash # github.com/nats-io/jwt/v2 v2.0.0-20210125223648-1c24d462becc ## explicit github.com/nats-io/jwt/v2 -# github.com/nats-io/nats.go v1.10.1-0.20210122204956-b8ea7fc17ea6 +# github.com/nats-io/nats.go v1.10.1-0.20210127212649-5b4924938a9a ## explicit github.com/nats-io/nats.go github.com/nats-io/nats.go/encoders/builtin