From 83d520667f09f48ce40a8032cb3609f2f8e29e9c Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Mon, 19 Apr 2021 18:23:06 -0400 Subject: [PATCH] [added] ability to respond to stream/consumer names/info If a follower was disconnected long enough from the meta group leader, it will respond to stream/consumer names/info requests with a delay. Signed-off-by: Matthias Hanel --- server/jetstream_api.go | 207 ++++++++++++++++++++++++++---------- server/jetstream_cluster.go | 65 +++++++++-- 2 files changed, 205 insertions(+), 67 deletions(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index c110203d..603fca7f 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -335,7 +335,8 @@ type JSApiStreamNamesRequest struct { type JSApiStreamNamesResponse struct { ApiResponse ApiPaged - Streams []string `json:"streams"` + Streams []string `json:"streams"` + ByLeader bool `json:"by_meta_leader"` } const JSApiStreamNamesResponseType = "io.nats.jetstream.api.v1.stream_names_response" @@ -345,7 +346,9 @@ const JSApiStreamNamesResponseType = "io.nats.jetstream.api.v1.stream_names_resp type JSApiStreamListResponse struct { ApiResponse ApiPaged - Streams []*StreamInfo `json:"streams"` + Streams []*StreamInfo `json:"streams"` + ByLeader bool `json:"by_meta_leader"` + Complete bool `json:"complete"` } const JSApiStreamListResponseType = "io.nats.jetstream.api.v1.stream_list_response" @@ -530,6 +533,7 @@ type JSApiConsumerNamesResponse struct { ApiResponse ApiPaged Consumers []string `json:"consumers"` + ByLeader bool `json:"by_meta_leader"` } const JSApiConsumerNamesResponseType = "io.nats.jetstream.api.v1.consumer_names_response" @@ -539,6 +543,8 @@ type JSApiConsumerListResponse struct { ApiResponse ApiPaged Consumers []*ConsumerInfo `json:"consumers"` + ByLeader bool `json:"by_meta_leader"` + Complete bool `json:"complete"` } const JSApiConsumerListResponseType = "io.nats.jetstream.api.v1.consumer_list_response" @@ -589,23 +595,22 @@ type JSApiStreamTemplateNamesResponse struct { const JSApiStreamTemplateNamesResponseType = "io.nats.jetstream.api.v1.stream_template_names_response" var ( - jsNotEnabledErr = &ApiError{Code: 503, Description: "JetStream not enabled for account"} - jsBadRequestErr = &ApiError{Code: 400, Description: "bad request"} - jsNotEmptyRequestErr = &ApiError{Code: 400, Description: "expected an empty request payload"} - jsInvalidJSONErr = &ApiError{Code: 400, Description: "invalid JSON"} - jsInsufficientErr = &ApiError{Code: 503, Description: "insufficient resources"} - jsNoConsumerErr = &ApiError{Code: 404, Description: "consumer not found"} - jsStreamMismatchErr = &ApiError{Code: 400, Description: "stream name in subject does not match request"} - jsNoClusterSupportErr = &ApiError{Code: 503, Description: "not currently supported in clustered mode"} - jsClusterNotAvailErr = &ApiError{Code: 503, Description: "JetStream system temporarily unavailable"} - jsClusterRequiredErr = &ApiError{Code: 503, Description: "JetStream clustering support required"} - jsPeerNotMemberErr = &ApiError{Code: 400, Description: "peer not a member"} - jsClusterIncompleteErr = &ApiError{Code: 503, Description: "incomplete results"} - jsClusterTagsErr = &ApiError{Code: 400, Description: "tags placement not supported for operation"} - jsClusterNoPeersErr = &ApiError{Code: 400, Description: "no suitable peers for placement"} - jsServerNotMemberErr = &ApiError{Code: 400, Description: "server is not a member of the cluster"} - jsNoMessageFoundErr = &ApiError{Code: 404, Description: "no message found"} - jsNoAccountErr = &ApiError{Code: 503, Description: "account not found"} + jsNotEnabledErr = &ApiError{Code: 503, Description: "JetStream not enabled for account"} + jsBadRequestErr = &ApiError{Code: 400, Description: "bad request"} + jsNotEmptyRequestErr = &ApiError{Code: 400, Description: "expected an empty request payload"} + jsInvalidJSONErr = &ApiError{Code: 400, Description: "invalid JSON"} + jsInsufficientErr = &ApiError{Code: 503, Description: "insufficient resources"} + jsNoConsumerErr = &ApiError{Code: 404, Description: "consumer not found"} + jsStreamMismatchErr = &ApiError{Code: 400, Description: "stream name in subject does not match request"} + jsNoClusterSupportErr = &ApiError{Code: 503, Description: "not currently supported in clustered mode"} + jsClusterNotAvailErr = &ApiError{Code: 503, Description: "JetStream system temporarily unavailable"} + jsClusterRequiredErr = &ApiError{Code: 503, Description: "JetStream clustering support required"} + jsPeerNotMemberErr = &ApiError{Code: 400, Description: "peer not a member"} + jsClusterTagsErr = &ApiError{Code: 400, Description: "tags placement not supported for operation"} + jsClusterNoPeersErr = &ApiError{Code: 400, Description: "no suitable peers for placement"} + jsServerNotMemberErr = &ApiError{Code: 400, Description: "server is not a member of the cluster"} + jsNoMessageFoundErr = &ApiError{Code: 404, Description: "no message found"} + jsNoAccountErr = &ApiError{Code: 503, Description: "account not found"} ) // For easier handling of exports and imports. @@ -1363,6 +1368,14 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, subject, re s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) } +func delayResponseBy(leaderExists bool) time.Duration { + if leaderExists { + return 500 * time.Millisecond + } else { + return 800 * time.Millisecond + } +} + // Request for the list of all stream names. func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) { if c == nil || !s.JetStreamEnabled() { @@ -1374,7 +1387,9 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, subject, rep return } - var resp = JSApiStreamNamesResponse{ApiResponse: ApiResponse{Type: JSApiStreamNamesResponseType}} + var responseDelay time.Duration + var clusteredNonLeader bool + var resp = JSApiStreamNamesResponse{ApiResponse: ApiResponse{Type: JSApiStreamNamesResponseType}, ByLeader: true} // Determine if we should proceed here when we are in clustered mode. if s.JetStreamIsClustered() { @@ -1382,18 +1397,25 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, subject, rep if js == nil || cc == nil { return } - if js.isLeaderless() { - resp.Error = jsClusterNotAvailErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - // Make sure we are meta leader. - if !s.JetStreamIsLeader() { - return + // determine leadership status + leaderExists, selfIsLeader, leaderCurrent := s.JetStreamLeaderInfo() + resp.ByLeader = selfIsLeader + clusteredNonLeader = !selfIsLeader + + // proceed if leader, skip if meta leader is reasonably current, delay response if no leader found/not current + if !selfIsLeader { + if leaderExists && leaderCurrent { + return + } + responseDelay = delayResponseBy(leaderExists) } } if !acc.JetStreamEnabled() { + // Make sure we are meta leader. + if clusteredNonLeader { + return + } resp.Error = jsNotEnabledErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return @@ -1480,7 +1502,17 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, subject, rep resp.Total = numStreams resp.Limit = JSApiNamesLimit resp.Offset = offset - s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) + if responseDelay != 0 { + // Need to copy these off before sending.. + msg = append(msg[:0:0], msg...) + s.startGoRoutine(func() { + defer s.grWG.Done() + time.Sleep(responseDelay) + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) + }) + } else { + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) + } } // Request for the list of all detailed stream info. @@ -1495,9 +1527,13 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, subject, repl return } + var responseDelay time.Duration + var clusteredNonLeader bool var resp = JSApiStreamListResponse{ ApiResponse: ApiResponse{Type: JSApiStreamListResponseType}, Streams: []*StreamInfo{}, + ByLeader: true, + Complete: true, } // Determine if we should proceed here when we are in clustered mode. @@ -1506,18 +1542,24 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, subject, repl if js == nil || cc == nil { return } - if js.isLeaderless() { - resp.Error = jsClusterNotAvailErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - // Make sure we are meta leader. - if !s.JetStreamIsLeader() { - return + // determine leadership status + leaderExists, selfIsLeader, leaderCurrent := s.JetStreamLeaderInfo() + resp.ByLeader = selfIsLeader + clusteredNonLeader = !selfIsLeader + + // proceed if leader, skip if meta leader is reasonably current, delay response if no leader found/not current + if !selfIsLeader { + if leaderExists && leaderCurrent { + return + } + responseDelay = delayResponseBy(leaderExists) } } if !acc.JetStreamEnabled() { + if clusteredNonLeader { + return + } resp.Error = jsNotEnabledErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return @@ -1538,7 +1580,9 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, subject, repl if s.JetStreamIsClustered() { // Need to copy these off before sending.. msg = append(msg[:0:0], msg...) - s.startGoRoutine(func() { s.jsClusteredStreamListRequest(acc, ci, offset, subject, reply, msg) }) + s.startGoRoutine(func() { + s.jsClusteredStreamListRequest(acc, ci, offset, subject, reply, msg, responseDelay, resp.ByLeader) + }) return } @@ -1569,7 +1613,17 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, subject, repl resp.Total = scnt resp.Limit = JSApiListLimit resp.Offset = offset - s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) + if responseDelay != 0 { + // Need to copy these off before sending.. + msg = append(msg[:0:0], msg...) + s.startGoRoutine(func() { + defer s.grWG.Done() + time.Sleep(responseDelay) + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) + }) + } else { + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) + } } // Request for information about a stream. @@ -3057,9 +3111,12 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, subject, r return } + var responseDelay time.Duration + var clusteredNonLeader bool var resp = JSApiConsumerNamesResponse{ ApiResponse: ApiResponse{Type: JSApiConsumerNamesResponseType}, Consumers: []string{}, + ByLeader: true, } // Determine if we should proceed here when we are in clustered mode. @@ -3068,18 +3125,23 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, subject, r if js == nil || cc == nil { return } - if js.isLeaderless() { - resp.Error = jsClusterNotAvailErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - // Make sure we are meta leader. - if !s.JetStreamIsLeader() { - return + leaderExists, selfIsLeader, leaderCurrent := s.JetStreamLeaderInfo() + resp.ByLeader = selfIsLeader + clusteredNonLeader = !selfIsLeader + + // proceed if leader, skip if meta leader is reasonably current, delay response if no leader found/not current + if !selfIsLeader { + if leaderExists && leaderCurrent { + return + } + responseDelay = delayResponseBy(leaderExists) } } if !acc.JetStreamEnabled() { + if clusteredNonLeader { + return + } resp.Error = jsNotEnabledErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return @@ -3161,7 +3223,17 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, subject, r resp.Total = numConsumers resp.Limit = JSApiNamesLimit resp.Offset = offset - s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) + if responseDelay != 0 { + // Need to copy these off before sending.. + msg = append(msg[:0:0], msg...) + s.startGoRoutine(func() { + defer s.grWG.Done() + time.Sleep(responseDelay) + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) + }) + } else { + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) + } } // Request for the list of all detailed consumer information. @@ -3176,9 +3248,13 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, subject, re return } + var responseDelay time.Duration + var clusteredNonLeader bool var resp = JSApiConsumerListResponse{ ApiResponse: ApiResponse{Type: JSApiConsumerListResponseType}, Consumers: []*ConsumerInfo{}, + ByLeader: true, + Complete: true, } // Determine if we should proceed here when we are in clustered mode. @@ -3187,18 +3263,23 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, subject, re if js == nil || cc == nil { return } - if js.isLeaderless() { - resp.Error = jsClusterNotAvailErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - // Make sure we are meta leader. - if !s.JetStreamIsLeader() { - return + leaderExists, selfIsLeader, leaderCurrent := s.JetStreamLeaderInfo() + resp.ByLeader = selfIsLeader + clusteredNonLeader = !selfIsLeader + + // proceed if leader, skip if meta leader is reasonably current, delay response if no leader found/not current + if !selfIsLeader { + if leaderExists && leaderCurrent { + return + } + responseDelay = delayResponseBy(leaderExists) } } if !acc.JetStreamEnabled() { + if clusteredNonLeader { + return + } resp.Error = jsNotEnabledErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return @@ -3221,7 +3302,7 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, subject, re if s.JetStreamIsClustered() { msg = append(msg[:0:0], msg...) s.startGoRoutine(func() { - s.jsClusteredConsumerListRequest(acc, ci, offset, streamName, subject, reply, msg) + s.jsClusteredConsumerListRequest(acc, ci, offset, streamName, subject, reply, msg, responseDelay, resp.ByLeader) }) return } @@ -3252,7 +3333,17 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, subject, re resp.Total = ocnt resp.Limit = JSApiListLimit resp.Offset = offset - s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) + if responseDelay != 0 { + // Need to copy these off before sending.. + msg = append(msg[:0:0], msg...) + s.startGoRoutine(func() { + defer s.grWG.Done() + time.Sleep(responseDelay) + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) + }) + } else { + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) + } } // Request for information about an consumer. diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index a5418ba0..b99a9863 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -195,6 +195,32 @@ func (s *Server) JetStreamIsClustered() bool { return isClustered } +// Lock must not be held - returns hasLeader, isLeader, leaderCurrent +func (s *Server) JetStreamLeaderInfo() (bool, bool, bool) { + js := s.getJetStream() + if js == nil { + return false, false, false + } + selfLeader := s.JetStreamIsLeader() + if selfLeader { + return true, true, true + } + gl := js.getMetaGroup().GroupLeader() + for _, p := range js.getMetaGroup().Peers() { + if p == nil { + continue + } + if p.ID != gl { + continue + } + if time.Since(p.Last) <= lostQuorumInterval { + return true, false, true + } + return true, false, false + } + return false, false, false +} + func (s *Server) JetStreamIsLeader() bool { js := s.getJetStream() if js == nil { @@ -3557,7 +3583,7 @@ func (s *Server) allPeersOffline(rg *raftGroup) bool { // This will do a scatter and gather operation for all streams for this account. This is only called from metadata leader. // This will be running in a separate Go routine. -func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, offset int, subject, reply string, rmsg []byte) { +func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, offset int, subject, reply string, rmsg []byte, respDelay time.Duration, byLdr bool) { defer s.grWG.Done() js, cc := s.getJetStreamCluster() @@ -3592,6 +3618,8 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, offs var resp = JSApiStreamListResponse{ ApiResponse: ApiResponse{Type: JSApiStreamListResponseType}, Streams: make([]*StreamInfo, 0, len(streams)), + Complete: true, + ByLeader: byLdr, } if len(streams) == 0 { @@ -3645,7 +3673,7 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, offs // Don't hold lock. js.mu.Unlock() - const timeout = 5 * time.Second + const timeout = 4 * time.Second notActive := time.NewTimer(timeout) defer notActive.Stop() @@ -3655,8 +3683,8 @@ LOOP: case <-s.quitCh: return case <-notActive.C: - s.Warnf("Did not receive all stream info results for %q", acc) - resp.Error = jsClusterIncompleteErr + s.Warnf("Did not receive all stream info results for %q (retrieved %d out of %d)", acc, len(resp.Streams), len(streams)) + resp.Complete = false break LOOP case si := <-rc: resp.Streams = append(resp.Streams, si) @@ -3677,12 +3705,20 @@ LOOP: resp.Total = len(resp.Streams) resp.Limit = JSApiListLimit resp.Offset = offset - s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp)) + if respDelay != 0 { + s.startGoRoutine(func() { + defer s.grWG.Done() + time.Sleep(respDelay) + s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp)) + }) + } else { + s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp)) + } } // This will do a scatter and gather operation for all consumers for this stream and account. // This will be running in a separate Go routine. -func (s *Server) jsClusteredConsumerListRequest(acc *Account, ci *ClientInfo, offset int, stream, subject, reply string, rmsg []byte) { +func (s *Server) jsClusteredConsumerListRequest(acc *Account, ci *ClientInfo, offset int, stream, subject, reply string, rmsg []byte, respDelay time.Duration, byLdr bool) { defer s.grWG.Done() js, cc := s.getJetStreamCluster() @@ -3723,6 +3759,8 @@ func (s *Server) jsClusteredConsumerListRequest(acc *Account, ci *ClientInfo, of var resp = JSApiConsumerListResponse{ ApiResponse: ApiResponse{Type: JSApiConsumerListResponseType}, Consumers: []*ConsumerInfo{}, + Complete: true, + ByLeader: byLdr, } if len(consumers) == 0 { @@ -3774,7 +3812,7 @@ func (s *Server) jsClusteredConsumerListRequest(acc *Account, ci *ClientInfo, of } js.mu.Unlock() - const timeout = 2 * time.Second + const timeout = 4 * time.Second notActive := time.NewTimer(timeout) defer notActive.Stop() @@ -3784,7 +3822,8 @@ LOOP: case <-s.quitCh: return case <-notActive.C: - s.Warnf("Did not receive all stream info results for %q", acc) + s.Warnf("Did not receive all consumer info results for %q (retrieved %d out of %d)", acc, len(resp.Consumers), len(consumers)) + resp.Complete = false break LOOP case ci := <-rc: resp.Consumers = append(resp.Consumers, ci) @@ -3805,7 +3844,15 @@ LOOP: resp.Total = len(resp.Consumers) resp.Limit = JSApiListLimit resp.Offset = offset - s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp)) + if respDelay != 0 { + s.startGoRoutine(func() { + defer s.grWG.Done() + time.Sleep(respDelay) + s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp)) + }) + } else { + s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp)) + } } func encodeStreamPurge(sp *streamPurge) []byte {