[added] ability to respond to stream/consumer names/info

If a follower was disconnected long enough from the meta group leader,
it will respond to stream/consumer names/info requests with a delay.

Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
Matthias Hanel
2021-04-19 18:23:06 -04:00
parent b73be52862
commit 83d520667f
2 changed files with 205 additions and 67 deletions

View File

@@ -336,6 +336,7 @@ type JSApiStreamNamesResponse struct {
ApiResponse
ApiPaged
Streams []string `json:"streams"`
ByLeader bool `json:"by_meta_leader"`
}
const JSApiStreamNamesResponseType = "io.nats.jetstream.api.v1.stream_names_response"
@@ -346,6 +347,8 @@ type JSApiStreamListResponse struct {
ApiResponse
ApiPaged
Streams []*StreamInfo `json:"streams"`
ByLeader bool `json:"by_meta_leader"`
Complete bool `json:"complete"`
}
const JSApiStreamListResponseType = "io.nats.jetstream.api.v1.stream_list_response"
@@ -530,6 +533,7 @@ type JSApiConsumerNamesResponse struct {
ApiResponse
ApiPaged
Consumers []string `json:"consumers"`
ByLeader bool `json:"by_meta_leader"`
}
const JSApiConsumerNamesResponseType = "io.nats.jetstream.api.v1.consumer_names_response"
@@ -539,6 +543,8 @@ type JSApiConsumerListResponse struct {
ApiResponse
ApiPaged
Consumers []*ConsumerInfo `json:"consumers"`
ByLeader bool `json:"by_meta_leader"`
Complete bool `json:"complete"`
}
const JSApiConsumerListResponseType = "io.nats.jetstream.api.v1.consumer_list_response"
@@ -600,7 +606,6 @@ var (
jsClusterNotAvailErr = &ApiError{Code: 503, Description: "JetStream system temporarily unavailable"}
jsClusterRequiredErr = &ApiError{Code: 503, Description: "JetStream clustering support required"}
jsPeerNotMemberErr = &ApiError{Code: 400, Description: "peer not a member"}
jsClusterIncompleteErr = &ApiError{Code: 503, Description: "incomplete results"}
jsClusterTagsErr = &ApiError{Code: 400, Description: "tags placement not supported for operation"}
jsClusterNoPeersErr = &ApiError{Code: 400, Description: "no suitable peers for placement"}
jsServerNotMemberErr = &ApiError{Code: 400, Description: "server is not a member of the cluster"}
@@ -1363,6 +1368,14 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, subject, re
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
}
func delayResponseBy(leaderExists bool) time.Duration {
if leaderExists {
return 500 * time.Millisecond
} else {
return 800 * time.Millisecond
}
}
// Request for the list of all stream names.
func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) {
if c == nil || !s.JetStreamEnabled() {
@@ -1374,7 +1387,9 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, subject, rep
return
}
var resp = JSApiStreamNamesResponse{ApiResponse: ApiResponse{Type: JSApiStreamNamesResponseType}}
var responseDelay time.Duration
var clusteredNonLeader bool
var resp = JSApiStreamNamesResponse{ApiResponse: ApiResponse{Type: JSApiStreamNamesResponseType}, ByLeader: true}
// Determine if we should proceed here when we are in clustered mode.
if s.JetStreamIsClustered() {
@@ -1382,18 +1397,25 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, subject, rep
if js == nil || cc == nil {
return
}
if js.isLeaderless() {
resp.Error = jsClusterNotAvailErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
// determine leadership status
leaderExists, selfIsLeader, leaderCurrent := s.JetStreamLeaderInfo()
resp.ByLeader = selfIsLeader
clusteredNonLeader = !selfIsLeader
// proceed if leader, skip if meta leader is reasonably current, delay response if no leader found/not current
if !selfIsLeader {
if leaderExists && leaderCurrent {
return
}
// Make sure we are meta leader.
if !s.JetStreamIsLeader() {
return
responseDelay = delayResponseBy(leaderExists)
}
}
if !acc.JetStreamEnabled() {
// Make sure we are meta leader.
if clusteredNonLeader {
return
}
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
@@ -1480,7 +1502,17 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, subject, rep
resp.Total = numStreams
resp.Limit = JSApiNamesLimit
resp.Offset = offset
if responseDelay != 0 {
// Need to copy these off before sending..
msg = append(msg[:0:0], msg...)
s.startGoRoutine(func() {
defer s.grWG.Done()
time.Sleep(responseDelay)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
})
} else {
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
}
}
// Request for the list of all detailed stream info.
@@ -1495,9 +1527,13 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, subject, repl
return
}
var responseDelay time.Duration
var clusteredNonLeader bool
var resp = JSApiStreamListResponse{
ApiResponse: ApiResponse{Type: JSApiStreamListResponseType},
Streams: []*StreamInfo{},
ByLeader: true,
Complete: true,
}
// Determine if we should proceed here when we are in clustered mode.
@@ -1506,18 +1542,24 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, subject, repl
if js == nil || cc == nil {
return
}
if js.isLeaderless() {
resp.Error = jsClusterNotAvailErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
// determine leadership status
leaderExists, selfIsLeader, leaderCurrent := s.JetStreamLeaderInfo()
resp.ByLeader = selfIsLeader
clusteredNonLeader = !selfIsLeader
// proceed if leader, skip if meta leader is reasonably current, delay response if no leader found/not current
if !selfIsLeader {
if leaderExists && leaderCurrent {
return
}
// Make sure we are meta leader.
if !s.JetStreamIsLeader() {
return
responseDelay = delayResponseBy(leaderExists)
}
}
if !acc.JetStreamEnabled() {
if clusteredNonLeader {
return
}
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
@@ -1538,7 +1580,9 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, subject, repl
if s.JetStreamIsClustered() {
// Need to copy these off before sending..
msg = append(msg[:0:0], msg...)
s.startGoRoutine(func() { s.jsClusteredStreamListRequest(acc, ci, offset, subject, reply, msg) })
s.startGoRoutine(func() {
s.jsClusteredStreamListRequest(acc, ci, offset, subject, reply, msg, responseDelay, resp.ByLeader)
})
return
}
@@ -1569,7 +1613,17 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, subject, repl
resp.Total = scnt
resp.Limit = JSApiListLimit
resp.Offset = offset
if responseDelay != 0 {
// Need to copy these off before sending..
msg = append(msg[:0:0], msg...)
s.startGoRoutine(func() {
defer s.grWG.Done()
time.Sleep(responseDelay)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
})
} else {
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
}
}
// Request for information about a stream.
@@ -3057,9 +3111,12 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, subject, r
return
}
var responseDelay time.Duration
var clusteredNonLeader bool
var resp = JSApiConsumerNamesResponse{
ApiResponse: ApiResponse{Type: JSApiConsumerNamesResponseType},
Consumers: []string{},
ByLeader: true,
}
// Determine if we should proceed here when we are in clustered mode.
@@ -3068,18 +3125,23 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, subject, r
if js == nil || cc == nil {
return
}
if js.isLeaderless() {
resp.Error = jsClusterNotAvailErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
leaderExists, selfIsLeader, leaderCurrent := s.JetStreamLeaderInfo()
resp.ByLeader = selfIsLeader
clusteredNonLeader = !selfIsLeader
// proceed if leader, skip if meta leader is reasonably current, delay response if no leader found/not current
if !selfIsLeader {
if leaderExists && leaderCurrent {
return
}
// Make sure we are meta leader.
if !s.JetStreamIsLeader() {
return
responseDelay = delayResponseBy(leaderExists)
}
}
if !acc.JetStreamEnabled() {
if clusteredNonLeader {
return
}
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
@@ -3161,7 +3223,17 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, subject, r
resp.Total = numConsumers
resp.Limit = JSApiNamesLimit
resp.Offset = offset
if responseDelay != 0 {
// Need to copy these off before sending..
msg = append(msg[:0:0], msg...)
s.startGoRoutine(func() {
defer s.grWG.Done()
time.Sleep(responseDelay)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
})
} else {
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
}
}
// Request for the list of all detailed consumer information.
@@ -3176,9 +3248,13 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, subject, re
return
}
var responseDelay time.Duration
var clusteredNonLeader bool
var resp = JSApiConsumerListResponse{
ApiResponse: ApiResponse{Type: JSApiConsumerListResponseType},
Consumers: []*ConsumerInfo{},
ByLeader: true,
Complete: true,
}
// Determine if we should proceed here when we are in clustered mode.
@@ -3187,18 +3263,23 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, subject, re
if js == nil || cc == nil {
return
}
if js.isLeaderless() {
resp.Error = jsClusterNotAvailErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
leaderExists, selfIsLeader, leaderCurrent := s.JetStreamLeaderInfo()
resp.ByLeader = selfIsLeader
clusteredNonLeader = !selfIsLeader
// proceed if leader, skip if meta leader is reasonably current, delay response if no leader found/not current
if !selfIsLeader {
if leaderExists && leaderCurrent {
return
}
// Make sure we are meta leader.
if !s.JetStreamIsLeader() {
return
responseDelay = delayResponseBy(leaderExists)
}
}
if !acc.JetStreamEnabled() {
if clusteredNonLeader {
return
}
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
@@ -3221,7 +3302,7 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, subject, re
if s.JetStreamIsClustered() {
msg = append(msg[:0:0], msg...)
s.startGoRoutine(func() {
s.jsClusteredConsumerListRequest(acc, ci, offset, streamName, subject, reply, msg)
s.jsClusteredConsumerListRequest(acc, ci, offset, streamName, subject, reply, msg, responseDelay, resp.ByLeader)
})
return
}
@@ -3252,7 +3333,17 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, subject, re
resp.Total = ocnt
resp.Limit = JSApiListLimit
resp.Offset = offset
if responseDelay != 0 {
// Need to copy these off before sending..
msg = append(msg[:0:0], msg...)
s.startGoRoutine(func() {
defer s.grWG.Done()
time.Sleep(responseDelay)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
})
} else {
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
}
}
// Request for information about an consumer.

View File

@@ -195,6 +195,32 @@ func (s *Server) JetStreamIsClustered() bool {
return isClustered
}
// Lock must not be held - returns hasLeader, isLeader, leaderCurrent
func (s *Server) JetStreamLeaderInfo() (bool, bool, bool) {
js := s.getJetStream()
if js == nil {
return false, false, false
}
selfLeader := s.JetStreamIsLeader()
if selfLeader {
return true, true, true
}
gl := js.getMetaGroup().GroupLeader()
for _, p := range js.getMetaGroup().Peers() {
if p == nil {
continue
}
if p.ID != gl {
continue
}
if time.Since(p.Last) <= lostQuorumInterval {
return true, false, true
}
return true, false, false
}
return false, false, false
}
func (s *Server) JetStreamIsLeader() bool {
js := s.getJetStream()
if js == nil {
@@ -3557,7 +3583,7 @@ func (s *Server) allPeersOffline(rg *raftGroup) bool {
// This will do a scatter and gather operation for all streams for this account. This is only called from metadata leader.
// This will be running in a separate Go routine.
func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, offset int, subject, reply string, rmsg []byte) {
func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, offset int, subject, reply string, rmsg []byte, respDelay time.Duration, byLdr bool) {
defer s.grWG.Done()
js, cc := s.getJetStreamCluster()
@@ -3592,6 +3618,8 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, offs
var resp = JSApiStreamListResponse{
ApiResponse: ApiResponse{Type: JSApiStreamListResponseType},
Streams: make([]*StreamInfo, 0, len(streams)),
Complete: true,
ByLeader: byLdr,
}
if len(streams) == 0 {
@@ -3645,7 +3673,7 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, offs
// Don't hold lock.
js.mu.Unlock()
const timeout = 5 * time.Second
const timeout = 4 * time.Second
notActive := time.NewTimer(timeout)
defer notActive.Stop()
@@ -3655,8 +3683,8 @@ LOOP:
case <-s.quitCh:
return
case <-notActive.C:
s.Warnf("Did not receive all stream info results for %q", acc)
resp.Error = jsClusterIncompleteErr
s.Warnf("Did not receive all stream info results for %q (retrieved %d out of %d)", acc, len(resp.Streams), len(streams))
resp.Complete = false
break LOOP
case si := <-rc:
resp.Streams = append(resp.Streams, si)
@@ -3677,12 +3705,20 @@ LOOP:
resp.Total = len(resp.Streams)
resp.Limit = JSApiListLimit
resp.Offset = offset
if respDelay != 0 {
s.startGoRoutine(func() {
defer s.grWG.Done()
time.Sleep(respDelay)
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp))
})
} else {
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp))
}
}
// This will do a scatter and gather operation for all consumers for this stream and account.
// This will be running in a separate Go routine.
func (s *Server) jsClusteredConsumerListRequest(acc *Account, ci *ClientInfo, offset int, stream, subject, reply string, rmsg []byte) {
func (s *Server) jsClusteredConsumerListRequest(acc *Account, ci *ClientInfo, offset int, stream, subject, reply string, rmsg []byte, respDelay time.Duration, byLdr bool) {
defer s.grWG.Done()
js, cc := s.getJetStreamCluster()
@@ -3723,6 +3759,8 @@ func (s *Server) jsClusteredConsumerListRequest(acc *Account, ci *ClientInfo, of
var resp = JSApiConsumerListResponse{
ApiResponse: ApiResponse{Type: JSApiConsumerListResponseType},
Consumers: []*ConsumerInfo{},
Complete: true,
ByLeader: byLdr,
}
if len(consumers) == 0 {
@@ -3774,7 +3812,7 @@ func (s *Server) jsClusteredConsumerListRequest(acc *Account, ci *ClientInfo, of
}
js.mu.Unlock()
const timeout = 2 * time.Second
const timeout = 4 * time.Second
notActive := time.NewTimer(timeout)
defer notActive.Stop()
@@ -3784,7 +3822,8 @@ LOOP:
case <-s.quitCh:
return
case <-notActive.C:
s.Warnf("Did not receive all stream info results for %q", acc)
s.Warnf("Did not receive all consumer info results for %q (retrieved %d out of %d)", acc, len(resp.Consumers), len(consumers))
resp.Complete = false
break LOOP
case ci := <-rc:
resp.Consumers = append(resp.Consumers, ci)
@@ -3805,7 +3844,15 @@ LOOP:
resp.Total = len(resp.Consumers)
resp.Limit = JSApiListLimit
resp.Offset = offset
if respDelay != 0 {
s.startGoRoutine(func() {
defer s.grWG.Done()
time.Sleep(respDelay)
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp))
})
} else {
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp))
}
}
func encodeStreamPurge(sp *streamPurge) []byte {