Merge pull request #1875 from nats-io/jsc-remove

Removing peers from streams and consumers and stepdown functionality
This commit is contained in:
Derek Collison
2021-02-02 09:28:03 -07:00
committed by GitHub
11 changed files with 1451 additions and 204 deletions

View File

@@ -3097,20 +3097,20 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
a.srv = s
}
if jsEnabled {
if ac.Limits.JetStreamLimits.DiskStorage != 0 || ac.Limits.JetStreamLimits.MemoryStorage != 0 {
// JetStreamAccountLimits and jwt.JetStreamLimits use same value for unlimited
a.jsLimits = &JetStreamAccountLimits{
MaxMemory: ac.Limits.JetStreamLimits.MemoryStorage,
MaxStore: ac.Limits.JetStreamLimits.DiskStorage,
MaxStreams: int(ac.Limits.JetStreamLimits.Streams),
MaxConsumers: int(ac.Limits.JetStreamLimits.Consumer),
}
} else if a.jsLimits != nil {
// covers failed update followed by disable
a.jsLimits = nil
// Setup js limits regardless of whether this server has jsEnabled.
if ac.Limits.JetStreamLimits.DiskStorage != 0 || ac.Limits.JetStreamLimits.MemoryStorage != 0 {
// JetStreamAccountLimits and jwt.JetStreamLimits use same value for unlimited
a.jsLimits = &JetStreamAccountLimits{
MaxMemory: ac.Limits.JetStreamLimits.MemoryStorage,
MaxStore: ac.Limits.JetStreamLimits.DiskStorage,
MaxStreams: int(ac.Limits.JetStreamLimits.Streams),
MaxConsumers: int(ac.Limits.JetStreamLimits.Consumer),
}
} else if a.jsLimits != nil {
// covers failed update followed by disable
a.jsLimits = nil
}
a.updated = time.Now()
a.mu.Unlock()
@@ -3132,6 +3132,11 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
a.incomplete = true
a.mu.Unlock()
}
} else if a.jsLimits != nil {
// We do not have JS enabled for this server, but the account has it enabled so setup
// our imports properly. This allows this server to proxy JS traffic correctly.
s.checkJetStreamExports()
a.enableAllJetStreamServiceImports()
}
for i, c := range clients {

View File

@@ -603,6 +603,12 @@ func (mset *Stream) addConsumer(config *ConsumerConfig, oname string, ca *consum
return o, nil
}
func (o *Consumer) consumerAssignment() *consumerAssignment {
o.mu.RLock()
defer o.mu.RUnlock()
return o.ca
}
func (o *Consumer) setConsumerAssignment(ca *consumerAssignment) {
o.mu.Lock()
defer o.mu.Unlock()
@@ -1294,9 +1300,26 @@ func (o *Consumer) loopAndDeliverMsgs(qch chan struct{}) {
// Info returns our current consumer state.
func (o *Consumer) Info() *ConsumerInfo {
ci := o.srv.clusterInfo(o.node)
o.mu.RLock()
mset := o.mset
if mset == nil || mset.srv == nil {
o.mu.RUnlock()
return nil
}
o.mu.RUnlock()
s := mset.srv
s.mu.Lock()
js := s.js
s.mu.Unlock()
if js == nil {
return nil
}
ci := js.clusterInfo(o.raftGroup())
o.mu.RLock()
defer o.mu.RUnlock()
o.mu.Lock()
info := &ConsumerInfo{
Stream: o.stream,
Name: o.name,
@@ -1319,7 +1342,6 @@ func (o *Consumer) Info() *ConsumerInfo {
if o.isPullMode() {
info.NumWaiting = o.waiting.len()
}
o.mu.Unlock()
return info
}
@@ -1602,6 +1624,8 @@ func (wq *waitQueue) pop() *waitingRequest {
// a single message. If the payload is a formal request or a number parseable with Atoi(), then we will send a
// batch of messages without requiring another request to this endpoint, or an ACK.
func (o *Consumer) processNextMsgReq(_ *subscription, c *client, _, reply string, msg []byte) {
_, msg = c.msgParts(msg)
o.mu.Lock()
mset := o.mset
if mset == nil || o.isPushMode() || o.sendq == nil {

View File

@@ -27,6 +27,7 @@ import (
"strconv"
"strings"
"sync"
"time"
"github.com/minio/highwayhash"
"github.com/nats-io/nats-server/v2/server/sysmem"
@@ -211,6 +212,73 @@ func (s *Server) EnableJetStream(config *JetStreamConfig) error {
return nil
}
// checkStreamExports will check if we have the JS exports setup
// on the system account, and if not go ahead and set them up.
func (s *Server) checkJetStreamExports() {
sacc := s.SystemAccount()
if sacc.getServiceExport(allJsExports[0]) == nil {
s.setupJetStreamExports()
}
}
func (s *Server) setupJetStreamExports() {
// Setup our internal system exports.
sacc := s.SystemAccount()
for _, export := range allJsExports {
if err := sacc.AddServiceExport(export, nil); err != nil {
s.Warnf("Error setting up jetstream service exports: %v", err)
}
}
}
// Turns off JetStream and signals in clustered mode
// to have the metacontroller remove us from the peer list.
func (s *Server) RemoveJetStream() error {
if s.JetStreamIsClustered() {
s.Noticef("JetStream cluster shutting down")
wasLeader := s.JetStreamIsLeader()
js, cc := s.getJetStreamCluster()
js.mu.RLock()
meta := cc.meta
js.mu.RUnlock()
s.transferRaftLeaders()
if wasLeader {
// Wait til the new metacontroller leader is established
const timeout = 2 * time.Second
maxWait := time.NewTimer(timeout)
defer maxWait.Stop()
t := time.NewTicker(50 * time.Millisecond)
defer t.Stop()
LOOP:
for {
select {
case <-s.quitCh:
return nil
case <-maxWait.C:
break LOOP
case <-t.C:
if cc.isCurrent() {
break LOOP
}
}
}
}
// Once here we can forward our proposal to remove ourselves.
if meta != nil {
meta.ProposeRemovePeer(meta.ID())
meta.Delete()
}
} else {
s.Noticef("JetStream shutting down")
}
s.shutdownJetStream()
return nil
}
func (s *Server) enableJetStreamAccounts() error {
// If we have no configured accounts setup then setup imports on global account.
if s.globalAccountOnly() {
@@ -347,18 +415,20 @@ func (s *Server) shutdownJetStream() {
return
}
var _jsa [512]*jsAccount
jsas := _jsa[:0]
var _a [512]*Account
accounts := _a[:0]
js.mu.RLock()
// Collect accounts.
for _, jsa := range js.accounts {
jsas = append(jsas, jsa)
if a := jsa.acc(); a != nil {
accounts = append(accounts, a)
}
}
js.mu.RUnlock()
for _, jsa := range jsas {
js.disableJetStream(jsa)
for _, a := range accounts {
a.removeJetStream()
}
s.mu.Lock()
@@ -850,6 +920,26 @@ func (a *Account) DisableJetStream() error {
return js.disableJetStream(js.lookupAccount(a))
}
// removeJetStream is called when JetStream has been disabled for this
// server.
func (a *Account) removeJetStream() error {
a.mu.Lock()
s := a.srv
a.js = nil
a.mu.Unlock()
if s == nil {
return fmt.Errorf("jetstream account not registered")
}
js := s.getJetStream()
if js == nil {
return ErrJetStreamNotEnabled
}
return js.disableJetStream(js.lookupAccount(a))
}
// Disable JetStream for the account.
func (js *jetStream) disableJetStream(jsa *jsAccount) error {
if jsa == nil {

View File

@@ -142,6 +142,21 @@ const (
jsSnapshotAckT = "$JS.SNAPSHOT.ACK.%s.%s"
jsRestoreDeliverT = "$JS.SNAPSHOT.RESTORE.%s.%s"
// JSApiStreamRemovePeer is the endpoint to remove a peer from a clustered stream and its consumers.
// Will return JSON response.
JSApiStreamRemovePeer = "$JS.API.STREAM.PEER.REMOVE.*"
JSApiStreamRemovePeerT = "$JS.API.STREAM.PEER.REMOVE.%s"
// JSApiStreamLeaderStepDown is the endpoint to have stream leader stepdown.
// Will return JSON response.
JSApiStreamLeaderStepDown = "$JS.API.STREAM.LEADER.STEPDOWN.*"
JSApiStreamLeaderStepDownT = "$JS.API.STREAM.LEADER.STEPDOWN.%s"
// JSApiConsumerLeaderStepDown is the endpoint to have consumer leader stepdown.
// Will return JSON response.
JSApiConsumerLeaderStepDown = "$JS.API.CONSUMER.LEADER.STEPDOWN.*.*"
JSApiConsumerLeaderStepDownT = "$JS.API.CONSUMER.LEADER.STEPDOWN.%s.%s"
// jsAckT is the template for the ack message stream coming back from a consumer
// when they ACK/NAK, etc a message.
jsAckT = "$JS.ACK.%s.%s"
@@ -371,6 +386,36 @@ type JSApiStreamRestoreResponse struct {
const JSApiStreamRestoreResponseType = "io.nats.jetstream.api.v1.stream_restore_response"
// JSApiStreamRemovePeerRequest is the required remove peer request.
type JSApiStreamRemovePeerRequest struct {
// Server name of the peer to be removed.
Peer string `json:"peer"`
}
// JSApiStreamRemovePeerResponse is the response to a remove peer request.
type JSApiStreamRemovePeerResponse struct {
ApiResponse
Success bool `json:"success,omitempty"`
}
const JSApiStreamRemovePeerResponseType = "io.nats.jetstream.api.v1.stream_remove_peer_response"
// JSApiStreamLeaderStepDownResponse is the response to a leader stepdown request.
type JSApiStreamLeaderStepDownResponse struct {
ApiResponse
Success bool `json:"success,omitempty"`
}
const JSApiStreamLeaderStepDownResponseType = "io.nats.jetstream.api.v1.stream_leader_stepdown_response"
// JSApiConsumerLeaderStepDownResponse is the response to a consumer leader stepdown request.
type JSApiConsumerLeaderStepDownResponse struct {
ApiResponse
Success bool `json:"success,omitempty"`
}
const JSApiConsumerLeaderStepDownResponseType = "io.nats.jetstream.api.v1.consumer_leader_stepdown_response"
// JSApiMsgGetRequest get a message request.
type JSApiMsgGetRequest struct {
Seq uint64 `json:"seq"`
@@ -480,16 +525,18 @@ type JSApiStreamTemplateNamesResponse struct {
const JSApiStreamTemplateNamesResponseType = "io.nats.jetstream.api.v1.stream_template_names_response"
var (
jsNotEnabledErr = &ApiError{Code: 503, Description: "JetStream not enabled for account"}
jsBadRequestErr = &ApiError{Code: 400, Description: "bad request"}
jsNotEmptyRequestErr = &ApiError{Code: 400, Description: "expected an empty request payload"}
jsInvalidJSONErr = &ApiError{Code: 400, Description: "invalid JSON request"}
jsInsufficientErr = &ApiError{Code: 503, Description: "insufficient Resources"}
jsNoAccountErr = &ApiError{Code: 404, Description: "account not found"}
jsNoConsumerErr = &ApiError{Code: 404, Description: "consumer not found"}
jsStreamMismatchErr = &ApiError{Code: 400, Description: "stream name in subject does not match request"}
jsNoClusterSupportErr = &ApiError{Code: 503, Description: "not currently supported in clustered mode"}
jsClusterNotAvailErr = &ApiError{Code: 503, Description: "JetStream system temporarily unavailable"}
jsNotEnabledErr = &ApiError{Code: 503, Description: "JetStream not enabled for account"}
jsBadRequestErr = &ApiError{Code: 400, Description: "bad request"}
jsNotEmptyRequestErr = &ApiError{Code: 400, Description: "expected an empty request payload"}
jsInvalidJSONErr = &ApiError{Code: 400, Description: "invalid JSON request"}
jsInsufficientErr = &ApiError{Code: 503, Description: "insufficient resources"}
jsNoConsumerErr = &ApiError{Code: 404, Description: "consumer not found"}
jsStreamMismatchErr = &ApiError{Code: 400, Description: "stream name in subject does not match request"}
jsNoClusterSupportErr = &ApiError{Code: 503, Description: "not currently supported in clustered mode"}
jsClusterNotAvailErr = &ApiError{Code: 503, Description: "JetStream system temporarily unavailable"}
jsClusterRequiredErr = &ApiError{Code: 503, Description: "JetStream clustering support required"}
jsPeerNotMemberErr = &ApiError{Code: 400, Description: "peer not a member"}
jsClusterIncompleteErr = &ApiError{Code: 503, Description: "incomplete results"}
)
// For easier handling of exports and imports.
@@ -508,6 +555,9 @@ var allJsExports = []string{
JSApiStreamPurge,
JSApiStreamSnapshot,
JSApiStreamRestore,
JSApiStreamRemovePeer,
JSApiStreamLeaderStepDown,
JSApiConsumerLeaderStepDown,
JSApiMsgDelete,
JSApiMsgGet,
JSApiConsumerCreate,
@@ -537,6 +587,9 @@ func (s *Server) setJetStreamExportSubs() error {
{JSApiStreamPurge, s.jsStreamPurgeRequest},
{JSApiStreamSnapshot, s.jsStreamSnapshotRequest},
{JSApiStreamRestore, s.jsStreamRestoreRequest},
{JSApiStreamRemovePeer, s.jsStreamRemovePeerRequest},
{JSApiStreamLeaderStepDown, s.jsStreamLeaderStepDownRequest},
{JSApiConsumerLeaderStepDown, s.jsConsumerLeaderStepDownRequest},
{JSApiMsgDelete, s.jsMsgDeleteRequest},
{JSApiMsgGet, s.jsMsgGetRequest},
{JSApiConsumerCreate, s.jsConsumerCreateRequest},
@@ -621,7 +674,7 @@ const badAPIRequestT = "Malformed JetStream API Request: %q"
// Request for current usage and limits for this account.
func (s *Server) jsAccountInfoRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) {
if c == nil {
if c == nil || !s.JetStreamEnabled() {
return
}
ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
@@ -893,7 +946,7 @@ func jsNotFoundError(err error) *ApiError {
// Request to create a stream.
func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) {
if c == nil {
if c == nil || !s.JetStreamEnabled() {
return
}
ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
@@ -956,7 +1009,7 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, subject, re
// Request to update a stream.
func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) {
if c == nil {
if c == nil || !s.JetStreamEnabled() {
return
}
@@ -1015,13 +1068,15 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, subject, re
return
}
resp.StreamInfo = &StreamInfo{Created: mset.Created(), State: mset.State(), Config: mset.Config(), Cluster: s.clusterInfo(mset.raftNode())}
js, _ := s.getJetStreamCluster()
resp.StreamInfo = &StreamInfo{Created: mset.Created(), State: mset.State(), Config: mset.Config(), Cluster: js.clusterInfo(mset.raftGroup())}
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
}
// Request for the list of all stream names.
func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) {
if c == nil {
if c == nil || !s.JetStreamEnabled() {
return
}
ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
@@ -1142,7 +1197,7 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, subject, rep
// Request for the list of all detailed stream info.
// TODO(dlc) - combine with above long term
func (s *Server) jsStreamListRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) {
if c == nil {
if c == nil || !s.JetStreamEnabled() {
return
}
ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
@@ -1224,7 +1279,7 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, subject, repl
// Request for information about a stream.
func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) {
if c == nil {
if c == nil || !s.JetStreamEnabled() {
return
}
ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
@@ -1307,7 +1362,297 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, subject, repl
if config.allowNoSubject && len(config.Subjects) == 0 {
config.Subjects = []string{">"}
}
resp.StreamInfo = &StreamInfo{Created: mset.Created(), State: mset.State(), Config: config, Cluster: s.clusterInfo(mset.raftNode())}
js, _ := s.getJetStreamCluster()
resp.StreamInfo = &StreamInfo{Created: mset.Created(), State: mset.State(), Config: config, Cluster: js.clusterInfo(mset.raftGroup())}
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
}
// Request to have a stream leader stepdown.
func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) {
if c == nil || !s.JetStreamEnabled() {
return
}
ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
if err != nil {
s.Warnf(badAPIRequestT, msg)
return
}
// Have extra token for this one.
name := tokenAt(subject, 6)
var resp = JSApiStreamLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiStreamLeaderStepDownResponseType}}
// If we are not in clustered mode this is a failed request.
if !s.JetStreamIsClustered() {
resp.Error = jsClusterRequiredErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// If we are here we are clustered. See if we are the stream leader in order to proceed.
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
return
}
if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ {
resp.Error = jsClusterNotAvailErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
js.mu.RLock()
isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, name)
js.mu.RUnlock()
if isLeader && sa == nil {
resp.Error = jsNotFoundError(ErrJetStreamStreamNotFound)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
} else if sa == nil {
return
}
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if !isEmptyRequest(msg) {
resp.Error = jsBadRequestErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Check to see if we are a member of the group and if the group has no leader.
if js.isGroupLeaderless(sa.Group) {
resp.Error = jsClusterNotAvailErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// We have the stream assigned and a leader, so only the stream leader should answer.
if !acc.JetStreamIsStreamLeader(name) {
return
}
mset, err := acc.LookupStream(name)
if err != nil {
resp.Error = jsNotFoundError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Call actual stepdown.
mset.raftNode().StepDown()
resp.Success = true
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
}
// Request to have a consumer leader stepdown.
func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) {
if c == nil || !s.JetStreamEnabled() {
return
}
ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
if err != nil {
s.Warnf(badAPIRequestT, msg)
return
}
var resp = JSApiConsumerLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiConsumerLeaderStepDownResponseType}}
// If we are not in clustered mode this is a failed request.
if !s.JetStreamIsClustered() {
resp.Error = jsClusterRequiredErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// If we are here we are clustered. See if we are the stream leader in order to proceed.
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
return
}
if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ {
resp.Error = jsClusterNotAvailErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Have extra token for this one.
stream := tokenAt(subject, 6)
consumer := tokenAt(subject, 7)
js.mu.RLock()
isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, stream)
js.mu.RUnlock()
if isLeader && sa == nil {
resp.Error = jsNotFoundError(ErrJetStreamStreamNotFound)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
} else if sa == nil {
return
}
var ca *consumerAssignment
if sa.consumers != nil {
ca = sa.consumers[consumer]
}
if ca == nil {
resp.Error = jsNoConsumerErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Check to see if we are a member of the group and if the group has no leader.
if js.isGroupLeaderless(ca.Group) {
resp.Error = jsClusterNotAvailErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if !acc.JetStreamIsConsumerLeader(stream, consumer) {
return
}
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if !isEmptyRequest(msg) {
resp.Error = jsBadRequestErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
mset, err := acc.LookupStream(stream)
if err != nil {
resp.Error = jsNotFoundError(ErrJetStreamStreamNotFound)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
o := mset.LookupConsumer(consumer)
if o == nil {
resp.Error = jsNoConsumerErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Call actual stepdown.
o.raftNode().StepDown()
resp.Success = true
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
}
// Request to remove a peer from a clustered stream.
func (s *Server) jsStreamRemovePeerRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) {
if c == nil || !s.JetStreamEnabled() {
return
}
ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
if err != nil {
s.Warnf(badAPIRequestT, msg)
return
}
// Have extra token for this one.
name := tokenAt(subject, 6)
var resp = JSApiStreamRemovePeerResponse{ApiResponse: ApiResponse{Type: JSApiStreamRemovePeerResponseType}}
// If we are not in clustered mode this is a failed request.
if !s.JetStreamIsClustered() {
resp.Error = jsClusterRequiredErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// If we are here we are clustered. See if we are the stream leader in order to proceed.
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
return
}
if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ {
resp.Error = jsClusterNotAvailErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
js.mu.RLock()
isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, name)
js.mu.RUnlock()
// Make sure we are meta leader.
if !isLeader {
return
}
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if isEmptyRequest(msg) {
resp.Error = jsBadRequestErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
var req JSApiStreamRemovePeerRequest
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = jsInvalidJSONErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if req.Peer == _EMPTY_ {
resp.Error = jsBadRequestErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if sa == nil {
// No stream present.
resp.Error = jsNotFoundError(ErrJetStreamStreamNotFound)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Check to see if we are a member of the group and if the group has no leader.
if js.isGroupLeaderless(sa.Group) {
resp.Error = jsClusterNotAvailErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Peers here is a server name, convert to node name.
nodeName := string(getHash(req.Peer))
js.mu.RLock()
rg := sa.Group
isMember := rg.isMember(nodeName)
js.mu.RUnlock()
// Make sure we are a member.
if !isMember {
resp.Error = jsPeerNotMemberErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// If we are here we have a valid peer member set for removal.
js.mu.Lock()
js.removePeerFromStream(sa, nodeName)
js.mu.Unlock()
resp.Success = true
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
}
@@ -1332,7 +1677,7 @@ func isEmptyRequest(req []byte) bool {
// Request to delete a stream.
func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) {
if c == nil {
if c == nil || !s.JetStreamEnabled() {
return
}
ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
@@ -1374,7 +1719,7 @@ func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, subject, re
// Clustered.
if s.JetStreamIsClustered() {
s.jsClusteredStreamDeleteRequest(ci, stream, reply, msg)
s.jsClusteredStreamDeleteRequest(ci, stream, subject, reply, msg)
return
}
@@ -1402,7 +1747,7 @@ func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, subject, re
// Request to delete a message.
// This expects a stream sequence number as the msg body.
func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) {
if c == nil {
if c == nil || !s.JetStreamEnabled() {
return
}
ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
@@ -1503,7 +1848,7 @@ func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, subject, reply
// Request to get a raw stream message.
func (s *Server) jsMsgGetRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) {
if c == nil {
if c == nil || !s.JetStreamEnabled() {
return
}
ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
@@ -1556,7 +1901,7 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, subject, reply st
// Request to purge a stream.
func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) {
if c == nil {
if c == nil || !s.JetStreamEnabled() {
return
}
ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
@@ -1858,7 +2203,7 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, stream, subj
// Process a snapshot request.
func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) {
if c == nil {
if c == nil || !s.JetStreamEnabled() {
return
}
ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
@@ -2061,7 +2406,7 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, subject,
}
func (s *Server) jsConsumerCreate(sub *subscription, c *client, subject, reply string, rmsg []byte, expectDurable bool) {
if c == nil {
if c == nil || !s.JetStreamEnabled() {
return
}
@@ -2169,7 +2514,7 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, subject, reply s
// Request for the list of all consumer names.
func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) {
if c == nil {
if c == nil || !s.JetStreamEnabled() {
return
}
ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
@@ -2287,7 +2632,7 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, subject, r
// Request for the list of all detailed consumer information.
func (s *Server) jsConsumerListRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) {
if c == nil {
if c == nil || !s.JetStreamEnabled() {
return
}
@@ -2378,7 +2723,7 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, subject, re
// Request for information about an consumer.
func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) {
if c == nil {
if c == nil || !s.JetStreamEnabled() {
return
}
ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
@@ -2473,7 +2818,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, subject, re
// Request to delete an Consumer.
func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, subject, reply string, rmsg []byte) {
if c == nil {
if c == nil || !s.JetStreamEnabled() {
return
}
ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)

View File

@@ -84,6 +84,7 @@ type streamAssignment struct {
Config *StreamConfig `json:"stream"`
Group *raftGroup `json:"group"`
Sync string `json:"sync"`
Subject string `json:"subject"`
Reply string `json:"reply"`
Restore *StreamState `json:"restore_state,omitempty"`
// Internal
@@ -100,6 +101,7 @@ type consumerAssignment struct {
Stream string `json:"stream"`
Config *ConsumerConfig `json:"consumer"`
Group *raftGroup `json:"group"`
Subject string `json:"subject"`
Reply string `json:"reply"`
State *ConsumerState `json:"state,omitempty"`
// Internal
@@ -109,17 +111,19 @@ type consumerAssignment struct {
// streamPurge is what the stream leader will replicate when purging a stream.
type streamPurge struct {
Client *ClientInfo `json:"client,omitempty"`
Stream string `json:"stream"`
Reply string `json:"reply"`
Client *ClientInfo `json:"client,omitempty"`
Stream string `json:"stream"`
Subject string `json:"subject"`
Reply string `json:"reply"`
}
// streamMsgDelete is what the stream leader will replicate when deleting a message.
type streamMsgDelete struct {
Client *ClientInfo `json:"client,omitempty"`
Stream string `json:"stream"`
Seq uint64 `json:"seq"`
Reply string `json:"reply"`
Client *ClientInfo `json:"client,omitempty"`
Stream string `json:"stream"`
Seq uint64 `json:"seq"`
Subject string `json:"subject"`
Reply string `json:"reply"`
}
const (
@@ -667,8 +671,7 @@ func (js *jetStream) monitorCluster() {
n.PausePropose()
defer n.ResumePropose()
if snap := js.metaSnapshot(); !bytes.Equal(lastSnap, snap) {
if err := n.Snapshot(snap); err != nil {
} else {
if err := n.Snapshot(snap); err == nil {
lastSnap = snap
snapout = true
}
@@ -695,6 +698,7 @@ func (js *jetStream) monitorCluster() {
n.Applied(ce.Index)
if hadSnapshot {
snapout = false
n.Compact(ce.Index)
}
}
if isLeader && !snapout {
@@ -871,12 +875,64 @@ func (js *jetStream) setConsumerAssignmentResponded(ca *consumerAssignment) {
ca.responded = true
}
// Just copied over and changes out the group so it can be encoded.
// Lock should be held.
func (sa *streamAssignment) copyGroup() *streamAssignment {
csa, cg := *sa, *sa.Group
csa.Group = &cg
csa.Group.Peers = append(sa.Group.Peers[:0:0], sa.Group.Peers...)
return &csa
}
func (js *jetStream) processRemovePeer(peer string) {
js.mu.Lock()
defer js.mu.Unlock()
cc := js.cluster
// Only leader should process and re-assign mappings.
if !cc.isLeader() {
return
}
// Grab our nodes.
// FIXME(dlc) - Make sure these are live.
// Need to search for this peer in our stream assignments for potential remapping.
for _, as := range cc.streams {
for _, sa := range as {
if sa.Group.isMember(peer) {
js.removePeerFromStream(sa, peer)
}
}
}
}
// Assumes all checks have already been done.
// Lock should be held.
func (js *jetStream) removePeerFromStream(sa *streamAssignment, peer string) {
s, cc := js.srv, js.cluster
csa := sa.copyGroup()
if !cc.remapStreamAssignment(csa, peer) {
s.Warnf("JetStream cluster could not remap stream '%s > %s'", sa.Client.Account, sa.Config.Name)
}
// Send our proposal for this csa. Also use same group definition for all the consumers as well.
cc.meta.Propose(encodeAddStreamAssignment(csa))
rg := csa.Group
for _, ca := range sa.consumers {
cca := *ca
cca.Group.Peers = rg.Peers
cc.meta.Propose(encodeAddConsumerAssignment(&cca))
}
}
func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool, error) {
var didSnap bool
for _, e := range entries {
if e.Type == EntrySnapshot {
js.applyMetaSnapshot(e.Data, isRecovering)
didSnap = true
} else if e.Type == EntryRemovePeer {
js.processRemovePeer(string(e.Data))
} else {
buf := e.Data
switch entryOp(buf[0]) {
@@ -979,6 +1035,7 @@ func (js *jetStream) createRaftGroup(rg *raftGroup) error {
// We already have this assigned.
if node := s.lookupRaftNode(rg.Name); node != nil {
s.Debugf("JetStream cluster already has raft group %q assigned", rg.Name)
rg.node = node
return nil
}
@@ -1020,6 +1077,18 @@ func (js *jetStream) createRaftGroup(rg *raftGroup) error {
return nil
}
func (mset *Stream) raftGroup() *raftGroup {
if mset == nil {
return nil
}
mset.mu.RLock()
defer mset.mu.RUnlock()
if mset.sa == nil {
return nil
}
return mset.sa.Group
}
func (mset *Stream) raftNode() RaftNode {
if mset == nil {
return nil
@@ -1141,7 +1210,7 @@ func (js *jetStream) monitorStream(mset *Stream, sa *streamAssignment) {
if !isLeader {
panic("Finished restore but not leader")
}
js.processStreamLeaderChange(mset, sa, isLeader)
js.processStreamLeaderChange(mset, isLeader)
attemptSnapshot()
// Check to see if we have restored consumers here.
@@ -1224,7 +1293,7 @@ func (js *jetStream) monitorStream(mset *Stream, sa *streamAssignment) {
if !isLeader && n.GroupLeader() != noLeader {
js.setStreamAssignmentResponded(sa)
}
js.processStreamLeaderChange(mset, sa, isLeader)
js.processStreamLeaderChange(mset, isLeader)
}
case <-t.C:
if isLeader {
@@ -1240,6 +1309,14 @@ func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry) (bool,
if e.Type == EntrySnapshot {
mset.processSnapshot(e.Data)
didSnap = true
} else if e.Type == EntryRemovePeer {
js.mu.RLock()
ourID := js.cluster.meta.ID()
js.mu.RUnlock()
if peer := string(e.Data); peer == ourID {
mset.stop(true, false)
}
return false, nil
} else {
buf := e.Data
switch entryOp(buf[0]) {
@@ -1274,13 +1351,13 @@ func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry) (bool,
var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}}
if err != nil {
resp.Error = jsError(err)
s.sendAPIErrResponse(md.Client, mset.account(), _EMPTY_, md.Reply, _EMPTY_, s.jsonResponse(resp))
s.sendAPIErrResponse(md.Client, mset.account(), md.Subject, md.Reply, _EMPTY_, s.jsonResponse(resp))
} else if !removed {
resp.Error = &ApiError{Code: 400, Description: fmt.Sprintf("sequence [%d] not found", md.Seq)}
s.sendAPIErrResponse(md.Client, mset.account(), _EMPTY_, md.Reply, _EMPTY_, s.jsonResponse(resp))
s.sendAPIErrResponse(md.Client, mset.account(), md.Subject, md.Reply, _EMPTY_, s.jsonResponse(resp))
} else {
resp.Success = true
s.sendAPIResponse(md.Client, mset.account(), _EMPTY_, md.Reply, _EMPTY_, s.jsonResponse(resp))
s.sendAPIResponse(md.Client, mset.account(), md.Subject, md.Reply, _EMPTY_, s.jsonResponse(resp))
}
}
case purgeStreamOp:
@@ -1300,11 +1377,11 @@ func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry) (bool,
var resp = JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}}
if err != nil {
resp.Error = jsError(err)
s.sendAPIErrResponse(sp.Client, mset.account(), _EMPTY_, sp.Reply, _EMPTY_, s.jsonResponse(resp))
s.sendAPIErrResponse(sp.Client, mset.account(), sp.Subject, sp.Reply, _EMPTY_, s.jsonResponse(resp))
} else {
resp.Purged = purged
resp.Success = true
s.sendAPIResponse(sp.Client, mset.account(), _EMPTY_, sp.Reply, _EMPTY_, s.jsonResponse(resp))
s.sendAPIResponse(sp.Client, mset.account(), sp.Subject, sp.Reply, _EMPTY_, s.jsonResponse(resp))
}
}
default:
@@ -1327,10 +1404,30 @@ func (s *Server) replicas(node RaftNode) []*PeerInfo {
return replicas
}
func (js *jetStream) processStreamLeaderChange(mset *Stream, sa *streamAssignment, isLeader bool) {
// Will check our node peers and see if we should remove a peer.
func (js *jetStream) checkPeers(rg *raftGroup) {
js.mu.Lock()
defer js.mu.Unlock()
// FIXME(dlc) - Single replicas?
if rg == nil || rg.node == nil {
return
}
for _, peer := range rg.node.Peers() {
if !rg.isMember(peer.ID) {
rg.node.ProposeRemovePeer(peer.ID)
}
}
}
func (js *jetStream) processStreamLeaderChange(mset *Stream, isLeader bool) {
sa := mset.streamAssignment()
if sa == nil {
return
}
js.mu.Lock()
s, account, err := js.srv, sa.Client.Account, sa.err
client, reply := sa.Client, sa.Reply
client, subject, reply := sa.Client, sa.Subject, sa.Reply
hasResponded := sa.responded
sa.responded = true
js.mu.Unlock()
@@ -1340,10 +1437,12 @@ func (js *jetStream) processStreamLeaderChange(mset *Stream, sa *streamAssignmen
if isLeader {
s.Noticef("JetStream cluster new stream leader for '%s > %s'", sa.Client.Account, stream)
s.sendStreamLeaderElectAdvisory(mset)
// Check for peer removal and process here if needed.
js.checkPeers(sa.Group)
} else {
// We are stepping down.
// Make sure if we are doing so because we have lost quorum that we send the appropriate advisories.
if node := mset.raftNode(); node != nil && !node.Quorum() {
if node := mset.raftNode(); node != nil && !node.Quorum() && time.Since(node.Created()) > time.Second {
s.sendStreamLostQuorumAdvisory(mset)
}
}
@@ -1364,10 +1463,10 @@ func (js *jetStream) processStreamLeaderChange(mset *Stream, sa *streamAssignmen
var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
if err != nil {
resp.Error = jsError(err)
s.sendAPIErrResponse(client, acc, _EMPTY_, reply, _EMPTY_, s.jsonResponse(&resp))
s.sendAPIErrResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
} else {
resp.StreamInfo = &StreamInfo{Created: mset.Created(), State: mset.State(), Config: mset.Config(), Cluster: s.clusterInfo(mset.raftNode())}
s.sendAPIResponse(client, acc, _EMPTY_, reply, _EMPTY_, s.jsonResponse(&resp))
resp.StreamInfo = &StreamInfo{Created: mset.Created(), State: mset.State(), Config: mset.Config(), Cluster: js.clusterInfo(mset.raftGroup())}
s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
if node := mset.raftNode(); node != nil {
mset.sendCreateAdvisory()
}
@@ -1483,30 +1582,40 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) {
stream := sa.Config.Name
js.mu.Lock()
// Check if we already have this assigned.
accStreams := cc.streams[acc.Name]
if accStreams != nil && accStreams[stream] != nil {
// TODO(dlc) - Debug?
// We already have this assignment, should we check they are the same?
js.mu.Unlock()
return
ourID := cc.meta.ID()
var isMember bool
if sa.Group != nil && cc.meta != nil {
isMember = sa.Group.isMember(ourID)
}
accStreams := cc.streams[acc.Name]
if accStreams == nil {
accStreams = make(map[string]*streamAssignment)
} else if osa := accStreams[stream]; osa != nil {
// Copy over private existing state from former SA.
sa.Group.node = osa.Group.node
sa.consumers = osa.consumers
sa.responded = osa.responded
sa.err = osa.err
}
// Update our state.
accStreams[stream] = sa
cc.streams[acc.Name] = accStreams
var isMember bool
if sa.Group != nil && cc.meta != nil {
isMember = sa.Group.isMember(cc.meta.ID())
}
js.mu.Unlock()
// Check if this is for us..
if isMember {
js.processClusterCreateStream(acc, sa)
} else if mset, _ := acc.LookupStream(sa.Config.Name); mset != nil {
// We have one here even though we are not a member. This can happen on re-assignment.
s.Debugf("JetStream removing stream '%s > %s' from this server, re-assigned", sa.Client.Account, sa.Config.Name)
if node := mset.raftNode(); node != nil {
node.ProposeRemovePeer(ourID)
}
mset.stop(true, false)
}
}
@@ -1519,6 +1628,7 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme
js.mu.RLock()
s, rg := js.srv, sa.Group
alreadyRunning := rg.node != nil
js.mu.RUnlock()
// Process the raft group and make sure it's running if needed.
@@ -1533,6 +1643,7 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme
}
}
// Our stream.
var mset *Stream
// Process here if not restoring or not the leader.
@@ -1578,7 +1689,9 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme
// Start our monitoring routine.
if rg.node != nil {
s.startGoRoutine(func() { js.monitorStream(mset, sa) })
if !alreadyRunning {
s.startGoRoutine(func() { js.monitorStream(mset, sa) })
}
} else {
// Single replica stream, process manually here.
// If we are restoring, process that first.
@@ -1615,7 +1728,7 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme
s.sendInternalMsgLocked(streamAssignmentSubj, _EMPTY_, nil, b)
return
}
js.processStreamLeaderChange(mset, sa, true)
js.processStreamLeaderChange(mset, true)
// Check to see if we have restored consumers here.
// These are not currently assigned so we will need to do so here.
@@ -1664,7 +1777,7 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme
}
})
} else {
js.processStreamLeaderChange(mset, sa, true)
js.processStreamLeaderChange(mset, true)
}
}
}
@@ -1738,10 +1851,10 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember,
if resp.Error == nil {
resp.Error = jsError(err)
}
s.sendAPIErrResponse(sa.Client, acc, _EMPTY_, sa.Reply, _EMPTY_, s.jsonResponse(resp))
s.sendAPIErrResponse(sa.Client, acc, sa.Subject, sa.Reply, _EMPTY_, s.jsonResponse(resp))
} else {
resp.Success = true
s.sendAPIResponse(sa.Client, acc, _EMPTY_, sa.Reply, _EMPTY_, s.jsonResponse(resp))
s.sendAPIResponse(sa.Client, acc, sa.Subject, sa.Reply, _EMPTY_, s.jsonResponse(resp))
}
}
@@ -1755,6 +1868,12 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
return
}
acc, err := s.LookupAccount(ca.Client.Account)
if err != nil {
// TODO(dlc) - log error
return
}
sa := js.streamAssignment(ca.Client.Account, ca.Stream)
if sa == nil {
s.Debugf("Consumer create failed, could not locate stream '%s > %s'", ca.Client.Account, ca.Stream)
@@ -1775,18 +1894,37 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
if sa.consumers == nil {
sa.consumers = make(map[string]*consumerAssignment)
} else if oca := sa.consumers[ca.Name]; oca != nil {
// Copy over private existing state from former CA.
ca.Group.node = oca.Group.node
ca.responded = oca.responded
ca.err = oca.err
}
// Place into our internal map under the stream assignment.
// Ok to replace an existing one, we check on process call below.
sa.consumers[ca.Name] = ca
// See if we are a member
isMember := ca.Group.isMember(cc.meta.ID())
ourID := cc.meta.ID()
isMember := ca.Group.isMember(ourID)
js.mu.Unlock()
// Check if this is for us..
if isMember {
js.processClusterCreateConsumer(ca)
} else {
// We are not a member, if we have this consumer on this
// server remove it.
if mset, _ := acc.LookupStream(ca.Stream); mset != nil {
if o := mset.LookupConsumer(ca.Name); o != nil {
s.Debugf("JetStream removing consumer '%s > %s > %s' from this server, re-assigned",
ca.Client.Account, ca.Stream, ca.Name)
if node := o.raftNode(); node != nil {
node.ProposeRemovePeer(ourID)
}
o.stop(true, false, false)
}
}
}
}
@@ -1837,6 +1975,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment) {
return
}
rg := ca.Group
alreadyRunning := rg.node != nil
js.mu.RUnlock()
// Go ahead and create or update the consumer.
@@ -1906,10 +2045,12 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment) {
o.setCreated(ca.Created)
// Start our monitoring routine.
if rg.node != nil {
s.startGoRoutine(func() { js.monitorConsumer(o, ca) })
if !alreadyRunning {
s.startGoRoutine(func() { js.monitorConsumer(o, ca) })
}
} else {
// Single replica consumer, process manually here.
js.processConsumerLeaderChange(o, ca, true)
js.processConsumerLeaderChange(o, true)
}
}
}
@@ -1956,10 +2097,10 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb
if resp.Error == nil {
resp.Error = jsError(err)
}
s.sendAPIErrResponse(ca.Client, acc, _EMPTY_, ca.Reply, _EMPTY_, s.jsonResponse(resp))
s.sendAPIErrResponse(ca.Client, acc, ca.Subject, ca.Reply, _EMPTY_, s.jsonResponse(resp))
} else {
resp.Success = true
s.sendAPIResponse(ca.Client, acc, _EMPTY_, ca.Reply, _EMPTY_, s.jsonResponse(resp))
s.sendAPIResponse(ca.Client, acc, ca.Subject, ca.Reply, _EMPTY_, s.jsonResponse(resp))
}
}
@@ -2014,6 +2155,18 @@ func (cc *jetStreamCluster) isConsumerAssigned(a *Account, stream, consumer stri
return false
}
func (o *Consumer) raftGroup() *raftGroup {
if o == nil {
return nil
}
o.mu.RLock()
defer o.mu.RUnlock()
if o.ca == nil {
return nil
}
return o.ca.Group
}
func (o *Consumer) raftNode() RaftNode {
if o == nil {
return nil
@@ -2070,7 +2223,7 @@ func (js *jetStream) monitorConsumer(o *Consumer, ca *consumerAssignment) {
if !isLeader && n.GroupLeader() != noLeader {
js.setConsumerAssignmentResponded(ca)
}
js.processConsumerLeaderChange(o, ca, isLeader)
js.processConsumerLeaderChange(o, isLeader)
case <-t.C:
// TODO(dlc) - We should have this delayed a bit to not race the invariants.
if last != 0 {
@@ -2091,6 +2244,14 @@ func (js *jetStream) applyConsumerEntries(o *Consumer, ce *CommittedEntry) (bool
}
o.store.Update(state)
didSnap = true
} else if e.Type == EntryRemovePeer {
js.mu.RLock()
ourID := js.cluster.meta.ID()
js.mu.RUnlock()
if peer := string(e.Data); peer == ourID {
o.stop(true, false, false)
}
return false, nil
} else {
buf := e.Data
switch entryOp(buf[0]) {
@@ -2109,7 +2270,7 @@ func (js *jetStream) applyConsumerEntries(o *Consumer, ce *CommittedEntry) (bool
}
o.store.UpdateAcks(dseq, sseq)
default:
panic("JetStream Cluster Unknown group entry op type!")
panic(fmt.Sprintf("JetStream Cluster Unknown group entry op type! %v", entryOp(buf[0])))
}
}
}
@@ -2151,10 +2312,14 @@ func decodeDeliveredUpdate(buf []byte) (dseq, sseq, dc uint64, ts int64, err err
return dseq, sseq, dc, ts, nil
}
func (js *jetStream) processConsumerLeaderChange(o *Consumer, ca *consumerAssignment, isLeader bool) {
func (js *jetStream) processConsumerLeaderChange(o *Consumer, isLeader bool) {
ca := o.consumerAssignment()
if ca == nil {
return
}
js.mu.Lock()
s, account, err := js.srv, ca.Client.Account, ca.err
client, reply := ca.Client, ca.Reply
client, subject, reply := ca.Client, ca.Subject, ca.Reply
hasResponded := ca.responded
ca.responded = true
js.mu.Unlock()
@@ -2169,10 +2334,12 @@ func (js *jetStream) processConsumerLeaderChange(o *Consumer, ca *consumerAssign
if isLeader {
s.Noticef("JetStream cluster new consumer leader for '%s > %s > %s'", ca.Client.Account, stream, consumer)
s.sendConsumerLeaderElectAdvisory(o)
// Check for peer removal and process here if needed.
js.checkPeers(ca.Group)
} else {
// We are stepping down.
// Make sure if we are doing so because we have lost quorum that we send the appropriate advisories.
if node := o.raftNode(); node != nil && !node.Quorum() {
if node := o.raftNode(); node != nil && !node.Quorum() && time.Since(node.Created()) > time.Second {
s.sendConsumerLostQuorumAdvisory(o)
}
}
@@ -2187,10 +2354,10 @@ func (js *jetStream) processConsumerLeaderChange(o *Consumer, ca *consumerAssign
var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}
if err != nil {
resp.Error = jsError(err)
s.sendAPIErrResponse(client, acc, _EMPTY_, reply, _EMPTY_, s.jsonResponse(&resp))
s.sendAPIErrResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
} else {
resp.ConsumerInfo = o.Info()
s.sendAPIResponse(client, acc, _EMPTY_, reply, _EMPTY_, s.jsonResponse(&resp))
s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
if node := o.raftNode(); node != nil {
o.sendCreateAdvisory()
}
@@ -2308,7 +2475,7 @@ func (js *jetStream) processStreamAssignmentResults(sub *subscription, c *client
} else if result.Restore != nil {
resp = s.jsonResponse(result.Restore)
}
js.srv.sendAPIErrResponse(sa.Client, acc, _EMPTY_, sa.Reply, _EMPTY_, resp)
js.srv.sendAPIErrResponse(sa.Client, acc, sa.Subject, sa.Reply, _EMPTY_, resp)
sa.responded = true
// TODO(dlc) - Could have mixed results, should track per peer.
// Set sa.err while we are deleting so we will not respond to list/names requests.
@@ -2336,7 +2503,7 @@ func (js *jetStream) processConsumerAssignmentResults(sub *subscription, c *clie
if sa := js.streamAssignment(result.Account, result.Stream); sa != nil && sa.consumers != nil {
if ca := sa.consumers[result.Consumer]; ca != nil && !ca.responded {
js.srv.sendAPIErrResponse(ca.Client, acc, _EMPTY_, ca.Reply, _EMPTY_, s.jsonResponse(result.Response))
js.srv.sendAPIErrResponse(ca.Client, acc, ca.Subject, ca.Reply, _EMPTY_, s.jsonResponse(result.Response))
ca.responded = true
// Check if this failed.
// TODO(dlc) - Could have mixed results, should track per peer.
@@ -2394,6 +2561,25 @@ func (js *jetStream) processLeaderChange(isLeader bool) {
}
}
// Lock should be held.
func (cc *jetStreamCluster) remapStreamAssignment(sa *streamAssignment, removePeer string) bool {
// Need to select a replacement peer
for _, p := range cc.meta.Peers() {
if !sa.Group.isMember(p.ID) {
for i, peer := range sa.Group.Peers {
if peer == removePeer {
sa.Group.Peers[i] = p.ID
break
}
}
// Don't influence preferred leader.
sa.Group.Preferred = _EMPTY_
return true
}
}
return true
}
// selectPeerGroup will select a group of peers to start a raft group.
// TODO(dlc) - For now randomly select. Can be way smarter.
func (cc *jetStreamCluster) selectPeerGroup(r int) []string {
@@ -2403,6 +2589,7 @@ func (cc *jetStreamCluster) selectPeerGroup(r int) []string {
s := cc.s
ourID := cc.meta.ID()
for _, p := range peers {
// FIXME(dlc) - cluster scoped.
if p.ID == ourID || s.getRouteByHash([]byte(p.ID)) != nil {
nodes = append(nodes, p.ID)
}
@@ -2504,11 +2691,11 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, subject, reply string,
// Pick a preferred leader.
rg.setPreferred()
// Sync subject for post snapshot sync.
sa := &streamAssignment{Group: rg, Sync: syncSubjForStream(), Config: cfg, Reply: reply, Client: ci, Created: time.Now()}
sa := &streamAssignment{Group: rg, Sync: syncSubjForStream(), Config: cfg, Subject: subject, Reply: reply, Client: ci, Created: time.Now()}
cc.meta.Propose(encodeAddStreamAssignment(sa))
}
func (s *Server) jsClusteredStreamDeleteRequest(ci *ClientInfo, stream, reply string, rmsg []byte) {
func (s *Server) jsClusteredStreamDeleteRequest(ci *ClientInfo, stream, subject, reply string, rmsg []byte) {
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
return
@@ -2523,7 +2710,7 @@ func (s *Server) jsClusteredStreamDeleteRequest(ci *ClientInfo, stream, reply st
if err == nil {
var resp = JSApiStreamDeleteResponse{ApiResponse: ApiResponse{Type: JSApiStreamDeleteResponseType}}
resp.Error = jsNotFoundError(ErrJetStreamStreamNotFound)
s.sendAPIErrResponse(ci, acc, _EMPTY_, reply, string(rmsg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
}
return
}
@@ -2533,7 +2720,7 @@ func (s *Server) jsClusteredStreamDeleteRequest(ci *ClientInfo, stream, reply st
cc.meta.Propose(encodeDeleteConsumerAssignment(ca))
}
sa := &streamAssignment{Group: osa.Group, Config: osa.Config, Reply: reply, Client: ci}
sa := &streamAssignment{Group: osa.Group, Config: osa.Config, Subject: subject, Reply: reply, Client: ci}
cc.meta.Propose(encodeDeleteStreamAssignment(sa))
}
@@ -2560,7 +2747,7 @@ func (s *Server) jsClusteredStreamPurgeRequest(ci *ClientInfo, stream, subject,
}
n := sa.Group.node
sp := &streamPurge{Stream: stream, Reply: reply, Client: ci}
sp := &streamPurge{Stream: stream, Subject: subject, Reply: reply, Client: ci}
n.Propose(encodeStreamPurge(sp))
}
@@ -2591,13 +2778,13 @@ func (s *Server) jsClusteredStreamRestoreRequest(ci *ClientInfo, acc *Account, r
}
// Pick a preferred leader.
rg.setPreferred()
sa := &streamAssignment{Group: rg, Sync: syncSubjForStream(), Config: cfg, Reply: reply, Client: ci, Created: time.Now()}
sa := &streamAssignment{Group: rg, Sync: syncSubjForStream(), Config: cfg, Subject: subject, Reply: reply, Client: ci, Created: time.Now()}
// Now add in our restore state and pre-select a peer to handle the actual receipt of the snapshot.
sa.Restore = &req.State
cc.meta.Propose(encodeAddStreamAssignment(sa))
}
// This will do a scatter and gather operation for all streams for this account.
// This will do a scatter and gather operation for all streams for this account. This is only called from metadata leader.
// This will be running in a separate Go routine.
func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, offset int, subject, reply string, rmsg []byte) {
defer s.grWG.Done()
@@ -2608,7 +2795,6 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, offs
}
js.mu.Lock()
defer js.mu.Unlock()
var streams []*streamAssignment
for _, sa := range cc.streams[acc.Name] {
@@ -2632,10 +2818,23 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, offs
streams = streams[:JSApiListLimit]
}
rc := make(chan *StreamInfo, len(streams))
var resp = JSApiStreamListResponse{
ApiResponse: ApiResponse{Type: JSApiStreamListResponseType},
Streams: make([]*StreamInfo, 0, len(streams)),
}
if len(streams) == 0 {
js.mu.Unlock()
resp.Limit = JSApiListLimit
resp.Offset = offset
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp))
return
}
// Create an inbox for our responses and send out requests.
inbox := infoReplySubject()
rc := make(chan *StreamInfo, len(streams))
rsub, _ := s.systemSubscribe(inbox, _EMPTY_, false, cc.c, func(_ *subscription, _ *client, _, reply string, msg []byte) {
var si StreamInfo
if err := json.Unmarshal(msg, &si); err != nil {
@@ -2655,16 +2854,13 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, offs
isubj := fmt.Sprintf(clusterStreamInfoT, sa.Client.Account, sa.Config.Name)
s.sendInternalMsgLocked(isubj, inbox, nil, nil)
}
// Don't hold lock.
js.mu.Unlock()
const timeout = 2 * time.Second
notActive := time.NewTimer(timeout)
defer notActive.Stop()
var resp = JSApiStreamListResponse{
ApiResponse: ApiResponse{Type: JSApiStreamListResponseType},
Streams: make([]*StreamInfo, 0, len(streams)),
}
LOOP:
for {
select {
@@ -2672,6 +2868,7 @@ LOOP:
return
case <-notActive.C:
s.Warnf("Did not receive all stream info results for %q", acc)
resp.Error = jsClusterIncompleteErr
break LOOP
case si := <-rc:
resp.Streams = append(resp.Streams, si)
@@ -2706,7 +2903,6 @@ func (s *Server) jsClusteredConsumerListRequest(acc *Account, ci *ClientInfo, of
}
js.mu.Lock()
defer js.mu.Unlock()
var consumers []*consumerAssignment
if sas := cc.streams[acc.Name]; sas != nil {
@@ -2735,10 +2931,23 @@ func (s *Server) jsClusteredConsumerListRequest(acc *Account, ci *ClientInfo, of
consumers = consumers[:JSApiListLimit]
}
rc := make(chan *ConsumerInfo, len(consumers))
// Send out our requests here.
var resp = JSApiConsumerListResponse{
ApiResponse: ApiResponse{Type: JSApiConsumerListResponseType},
Consumers: []*ConsumerInfo{},
}
if len(consumers) == 0 {
js.mu.Unlock()
resp.Limit = JSApiListLimit
resp.Offset = offset
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp))
return
}
// Create an inbox for our responses and send out requests.
inbox := infoReplySubject()
rc := make(chan *ConsumerInfo, len(consumers))
rsub, _ := s.systemSubscribe(inbox, _EMPTY_, false, cc.c, func(_ *subscription, _ *client, _, reply string, msg []byte) {
var ci ConsumerInfo
if err := json.Unmarshal(msg, &ci); err != nil {
@@ -2753,23 +2962,11 @@ func (s *Server) jsClusteredConsumerListRequest(acc *Account, ci *ClientInfo, of
})
defer s.sysUnsubscribe(rsub)
// Send out our requests here.
var resp = JSApiConsumerListResponse{
ApiResponse: ApiResponse{Type: JSApiConsumerListResponseType},
Consumers: []*ConsumerInfo{},
}
if len(consumers) == 0 {
resp.Limit = JSApiListLimit
resp.Offset = offset
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp))
return
}
for _, ca := range consumers {
isubj := fmt.Sprintf(clusterConsumerInfoT, ca.Client.Account, stream, ca.Name)
s.sendInternalMsgLocked(isubj, inbox, nil, nil)
}
js.mu.Unlock()
const timeout = 2 * time.Second
notActive := time.NewTimer(timeout)
@@ -2824,25 +3021,36 @@ func (s *Server) jsClusteredConsumerDeleteRequest(ci *ClientInfo, stream, consum
return
}
acc, err := s.LookupAccount(ci.Account)
if err != nil {
return
}
js.mu.Lock()
defer js.mu.Unlock()
var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerDeleteResponseType}}
sa := js.streamAssignment(ci.Account, stream)
if sa == nil || sa.consumers == nil {
// TODO(dlc) - Should respond? Log?
if sa == nil {
resp.Error = jsNotFoundError(ErrJetStreamStreamNotFound)
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
if sa.consumers == nil {
resp.Error = jsNoConsumerErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
oca := sa.consumers[consumer]
if oca == nil {
acc, err := s.LookupAccount(ci.Account)
if err == nil {
var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerDeleteResponseType}}
resp.Error = jsNoAccountErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
}
resp.Error = jsNoConsumerErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
ca := &consumerAssignment{Group: oca.Group, Stream: stream, Name: consumer, Config: oca.Config, Reply: reply, Client: ci}
ca := &consumerAssignment{Group: oca.Group, Stream: stream, Name: consumer, Config: oca.Config, Subject: subject, Reply: reply, Client: ci}
cc.meta.Propose(encodeDeleteConsumerAssignment(ca))
}
@@ -2874,7 +3082,7 @@ func (s *Server) jsClusteredMsgDeleteRequest(ci *ClientInfo, stream, subject, re
return
}
n := sa.Group.node
md := &streamMsgDelete{Seq: seq, Stream: stream, Reply: reply, Client: ci}
md := &streamMsgDelete{Seq: seq, Stream: stream, Subject: subject, Reply: reply, Client: ci}
n.Propose(encodeMsgDelete(md))
}
@@ -2962,7 +3170,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, subject, reply strin
}
}
ca := &consumerAssignment{Group: rg, Stream: stream, Name: oname, Config: cfg, Reply: reply, Client: ci, Created: time.Now()}
ca := &consumerAssignment{Group: rg, Stream: stream, Name: oname, Config: cfg, Subject: subject, Reply: reply, Client: ci, Created: time.Now()}
cc.meta.Propose(encodeAddConsumerAssignment(ca))
}
@@ -3355,8 +3563,7 @@ RETRY:
case <-qch:
return
case isLeader := <-lch:
sa := js.streamAssignment(mset.account().Name, mset.Name())
js.processStreamLeaderChange(mset, sa, isLeader)
js.processStreamLeaderChange(mset, isLeader)
}
}
}
@@ -3399,13 +3606,21 @@ func (mset *Stream) handleClusterSyncRequest(sub *subscription, c *client, subje
}
// clusterInfo will report on the status of the raft group.
func (s *Server) clusterInfo(n RaftNode) *ClusterInfo {
if n == nil {
func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo {
if js == nil {
return nil
}
js.mu.RLock()
defer js.mu.RUnlock()
s := js.srv
if rg == nil || rg.node == nil {
return &ClusterInfo{
Name: s.ClusterName(),
Leader: s.Name(),
}
}
n := rg.node
ci := &ClusterInfo{
Name: s.ClusterName(),
@@ -3416,7 +3631,7 @@ func (s *Server) clusterInfo(n RaftNode) *ClusterInfo {
id, peers := n.ID(), n.Peers()
for _, rp := range peers {
if rp.ID != id {
if rp.ID != id && rg.isMember(rp.ID) {
lastSeen := now.Sub(rp.Last)
current := rp.Current
if current && lastSeen > lostQuorumInterval {
@@ -3435,10 +3650,10 @@ func (mset *Stream) handleClusterStreamInfoRequest(sub *subscription, c *client,
mset.mu.RUnlock()
return
}
s, config, node := mset.srv, mset.config, mset.node
s, js, config := mset.srv, mset.srv.js, mset.config
mset.mu.RUnlock()
si := &StreamInfo{Created: mset.Created(), State: mset.State(), Config: config, Cluster: s.clusterInfo(node)}
si := &StreamInfo{Created: mset.Created(), State: mset.State(), Config: config, Cluster: js.clusterInfo(mset.raftGroup())}
b, _ := json.Marshal(si)
s.sendInternalMsgLocked(reply, _EMPTY_, nil, b)
}

View File

@@ -52,6 +52,7 @@ type RaftNode interface {
ResumeApply()
LeadChangeC() <-chan bool
QuitC() <-chan struct{}
Created() time.Time
Stop()
Delete()
}
@@ -107,6 +108,7 @@ func (state RaftState) String() string {
type raft struct {
sync.RWMutex
created time.Time
group string
sd string
id string
@@ -133,11 +135,14 @@ type raft struct {
// Subjects for votes, updates, replays.
psubj string
rpsubj string
vsubj string
vreply string
asubj string
areply string
aesub *subscription
// For when we need to catch up as a follower.
catchup *catchupState
@@ -206,6 +211,7 @@ var (
errPeersNotCurrent = errors.New("raft: all peers are not current")
errFailedToApply = errors.New("raft: could not place apply entry")
errEntryLoadFailed = errors.New("raft: could not load entry from WAL")
errNodeClosed = errors.New("raft: node is closed")
)
// This will bootstrap a raftNode by writing its config into the store directory.
@@ -256,6 +262,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
return nil, errors.New("raft: cluster too small")
}
n := &raft{
created: time.Now(),
id: hash[:idLen],
group: cfg.Name,
sd: cfg.Store,
@@ -270,13 +277,13 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
c: s.createInternalSystemClient(),
sendq: sendq,
quit: make(chan struct{}),
reqs: make(chan *voteRequest, 4),
votes: make(chan *voteResponse, 8),
reqs: make(chan *voteRequest, 8),
votes: make(chan *voteResponse, 32),
resp: make(chan *appendEntryResponse, 256),
propc: make(chan *Entry, 256),
applyc: make(chan *CommittedEntry, 512),
leadc: make(chan bool, 4),
stepdown: make(chan string, 4),
leadc: make(chan bool, 8),
stepdown: make(chan string, 8),
}
n.c.registerWithAccount(sacc)
@@ -380,11 +387,25 @@ func (s *Server) lookupRaftNode(group string) RaftNode {
return n
}
func (s *Server) reloadDebugRaftNodes() {
if s == nil {
return
}
debug := atomic.LoadInt32(&s.logging.debug) > 0
s.rnMu.RLock()
for _, ni := range s.raftNodes {
n := ni.(*raft)
n.Lock()
n.dflag = debug
n.Unlock()
}
s.rnMu.RUnlock()
}
func (s *Server) transferRaftLeaders() bool {
if s == nil {
return false
}
var nodes []RaftNode
s.rnMu.RLock()
if len(s.raftNodes) > 0 {
@@ -516,7 +537,27 @@ func (n *raft) ProposeAddPeer(peer string) error {
// ProposeRemovePeer is called to remove a peer from the group.
func (n *raft) ProposeRemovePeer(peer string) error {
return errors.New("no impl")
n.RLock()
propc, subj := n.propc, n.rpsubj
isUs, isLeader := peer == n.id, n.state == Leader
n.RUnlock()
if isLeader {
if isUs {
n.StepDown()
} else {
select {
case propc <- &Entry{EntryRemovePeer, []byte(peer)}:
default:
return errProposalFailed
}
return nil
}
}
// Need to forward.
n.sendRPC(subj, _EMPTY_, []byte(peer))
return nil
}
// PauseApply will allow us to pause processing of append entries onto our
@@ -624,6 +665,7 @@ func (n *raft) Leader() bool {
func (n *raft) isCurrent() bool {
// First check if we match commit and applied.
if n.commit != n.applied {
n.debug("Not current, commit %d != applied %d", n.commit, n.applied)
return false
}
// Make sure we are the leader or we know we have heard from the leader recently.
@@ -643,6 +685,10 @@ func (n *raft) isCurrent() bool {
if ps := n.peers[n.leader]; ps != nil && ps.ts > 0 && (ts-ps.ts) <= okInterval {
return true
}
n.debug("Not current, no recent leader contact")
}
if cs := n.catchup; cs != nil {
n.debug("Not current, still catching up pindex=%d, cindex=%d", n.pindex, cs.cindex)
}
return false
}
@@ -748,6 +794,9 @@ func (n *raft) Size() (uint64, uint64) {
}
func (n *raft) ID() string {
if n == nil {
return _EMPTY_
}
n.RLock()
defer n.RUnlock()
return n.id
@@ -771,6 +820,12 @@ func (n *raft) Peers() []*Peer {
return peers
}
func (n *raft) Created() time.Time {
n.RLock()
defer n.RUnlock()
return n.created
}
func (n *raft) Stop() {
n.shutdown(false)
}
@@ -828,10 +883,11 @@ func (n *raft) newInbox(cn string) string {
}
const (
raftVoteSubj = "$NRG.V.%s.%s"
raftAppendSubj = "$NRG.E.%s.%s"
raftPropSubj = "$NRG.P.%s"
raftReplySubj = "$NRG.R.%s"
raftVoteSubj = "$NRG.V.%s.%s"
raftAppendSubj = "$NRG.E.%s.%s"
raftPropSubj = "$NRG.P.%s"
raftRemovePeerSubj = "$NRG.RP.%s"
raftReplySubj = "$NRG.R.%s"
)
// Our internal subscribe.
@@ -845,6 +901,7 @@ func (n *raft) createInternalSubs() error {
n.vsubj, n.vreply = fmt.Sprintf(raftVoteSubj, cn, n.group), n.newInbox(cn)
n.asubj, n.areply = fmt.Sprintf(raftAppendSubj, cn, n.group), n.newInbox(cn)
n.psubj = fmt.Sprintf(raftPropSubj, n.group)
n.rpsubj = fmt.Sprintf(raftRemovePeerSubj, n.group)
// Votes
if _, err := n.subscribe(n.vreply, n.handleVoteResponse); err != nil {
@@ -857,11 +914,12 @@ func (n *raft) createInternalSubs() error {
if _, err := n.subscribe(n.areply, n.handleAppendEntryResponse); err != nil {
return err
}
if _, err := n.subscribe(n.asubj, n.handleAppendEntry); err != nil {
if sub, err := n.subscribe(n.asubj, n.handleAppendEntry); err != nil {
return err
} else {
n.aesub = sub
}
// TODO(dlc) change events.
return nil
}
@@ -1112,6 +1170,32 @@ func (n *raft) decodeAppendEntryResponse(msg []byte) *appendEntryResponse {
return ar
}
// Called when a remove peer proposal has been forwarded
func (n *raft) handleForwardedRemovePeerProposal(sub *subscription, c *client, _, reply string, msg []byte) {
n.debug("Received forwarded remove peer proposal: %q", msg)
if !n.Leader() {
n.debug("Ignoring forwarded peer removal proposal, not leader")
return
}
if len(msg) != idLen {
n.warn("Received invalid peer name for remove proposal: %q", msg)
return
}
// Need to copy since this is underlying client/route buffer.
peer := string(append(msg[:0:0], msg...))
n.RLock()
propc := n.propc
n.RUnlock()
select {
case propc <- &Entry{EntryRemovePeer, []byte(peer)}:
default:
n.warn("Failed to place peer removal proposal onto propose chan")
}
}
// Called when a peer has forwarded a proposal.
func (n *raft) handleForwardedProposal(sub *subscription, c *client, _, reply string, msg []byte) {
if !n.Leader() {
@@ -1126,11 +1210,16 @@ func (n *raft) handleForwardedProposal(sub *subscription, c *client, _, reply st
}
func (n *raft) runAsLeader() {
n.Lock()
// For forwarded proposals.
fsub, err := n.subscribe(n.psubj, n.handleForwardedProposal)
n.Unlock()
n.RLock()
psubj, rpsubj := n.psubj, n.rpsubj
n.RUnlock()
// For forwarded proposals, both normal and remove peer proposals.
fsub, err := n.subscribe(psubj, n.handleForwardedProposal)
if err != nil {
panic(fmt.Sprintf("Error subscribing to forwarded proposals: %v", err))
}
rpsub, err := n.subscribe(rpsubj, n.handleForwardedRemovePeerProposal)
if err != nil {
panic(fmt.Sprintf("Error subscribing to forwarded proposals: %v", err))
}
@@ -1141,6 +1230,9 @@ func (n *raft) runAsLeader() {
if fsub != nil {
n.s.sysUnsubscribe(fsub)
}
if rpsub != nil {
n.s.sysUnsubscribe(rpsub)
}
n.Unlock()
}()
@@ -1354,10 +1446,10 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) {
if n.progress == nil {
n.progress = make(map[string]chan uint64)
}
if _, ok := n.progress[ar.peer]; ok {
n.debug("Existing entry for catching up %q", ar.peer)
n.Unlock()
return
if ch, ok := n.progress[ar.peer]; ok {
n.debug("Will cancel existing entry for catching up %q", ar.peer)
delete(n.progress, ar.peer)
ch <- n.pindex
}
ae, err := n.loadEntry(ar.index + 1)
if err != nil {
@@ -1391,6 +1483,9 @@ func (n *raft) loadEntry(index uint64) (*appendEntry, error) {
// applyCommit will update our commit index and apply the entry to the apply chan.
// lock should be held.
func (n *raft) applyCommit(index uint64) error {
if n.state == Closed {
return errNodeClosed
}
if index <= n.commit {
n.debug("Ignoring apply commit for %d, already processed", index)
return nil
@@ -1405,7 +1500,9 @@ func (n *raft) applyCommit(index uint64) error {
// FIXME(dlc) - Can keep this in memory if this too slow.
ae, err := n.loadEntry(index)
if err != nil {
n.debug("Got an error loading %d index: %v", index, err)
if err != ErrStoreClosed {
n.debug("Got an error loading %d index: %v", index, err)
}
n.commit = original
return errEntryLoadFailed
}
@@ -1433,6 +1530,21 @@ func (n *raft) applyCommit(index uint64) error {
n.peers[newPeer] = &lps{time.Now().UnixNano(), 0}
}
writePeerState(n.sd, &peerState{n.peerNames(), n.csz})
case EntryRemovePeer:
oldPeer := string(e.Data)
n.debug("Removing peer %q", oldPeer)
// FIXME(dlc) - Check if this is us??
if _, ok := n.peers[oldPeer]; ok {
// We should decrease our cluster size since we are tracking this peer.
n.debug("Decreasing our clustersize: %d -> %d", n.csz, n.csz-1)
n.csz--
n.qn = n.csz/2 + 1
delete(n.peers, oldPeer)
}
writePeerState(n.sd, &peerState{n.peerNames(), n.csz})
// We pass these up as well.
committed = append(committed, e)
}
}
// Pass to the upper layers if we have normal entries.
@@ -1510,7 +1622,6 @@ func (n *raft) trackPeer(peer string) error {
// this is an error.
if len(n.peers) >= n.csz {
n.Unlock()
n.debug("Leader detected a new peer! %q", peer)
return errUnknownPeer
}
needPeerUpdate = true
@@ -1585,7 +1696,9 @@ func (n *raft) handleAppendEntry(sub *subscription, c *client, subject, reply st
// Lock should be held.
func (n *raft) cancelCatchup() {
n.debug("Canceling catchup subscription since we are now up to date")
n.s.sysUnsubscribe(n.catchup.sub)
if n.catchup != nil && n.catchup.sub != nil {
n.s.sysUnsubscribe(n.catchup.sub)
}
n.catchup = nil
}
@@ -1652,13 +1765,14 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.attemptStepDown(ae.leader)
}
n.resetElectionTimeout()
// Catching up state.
catchingUp := n.catchup != nil
// Is this a new entry or a replay on startup?
isNew := sub != nil && (!catchingUp || sub != n.catchup.sub)
// Is this a new entry?
isNew := sub == n.aesub
if isNew {
n.resetElectionTimeout()
// Track leader directly
if ae.leader != noLeader {
if ps := n.peers[ae.leader]; ps != nil {
@@ -1676,6 +1790,12 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
return
}
// If we are catching up ignore old catchup subs.
// This can happen when we stall a cacthup.
if catchingUp && !isNew && sub != n.catchup.sub {
return
}
// Check state if we are catching up.
if catchingUp && isNew {
if cs := n.catchup; cs != nil && n.pterm >= cs.cterm && n.pindex >= cs.cindex {
@@ -1691,11 +1811,11 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
inbox = n.createCatchup(ae)
ar = &appendEntryResponse{n.pterm, n.pindex, n.id, false, _EMPTY_}
}
// Ignore new while catching up or replaying.
n.Unlock()
if ar != nil {
n.sendRPC(ae.reply, inbox, ar.encode())
}
// Ignore new while catching up or replaying.
return
}
}
@@ -1711,7 +1831,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
}
}
if n.leader != ae.leader && n.state == Follower {
if isNew && n.leader != ae.leader && n.state == Follower {
n.debug("AppendEntry updating leader to %q", ae.leader)
n.leader = ae.leader
n.vote = noVote
@@ -1749,11 +1869,11 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
// Only store if an original which will have sub != nil
if sub != nil {
if err := n.storeToWAL(ae); err != nil {
n.debug("Error storing to WAL: %v", err)
if err == ErrStoreClosed {
n.Unlock()
return
}
n.debug("Error storing to WAL: %v", err)
}
} else {
// This is a replay on startup so just take the appendEntry version.
@@ -1866,7 +1986,10 @@ func (n *raft) sendAppendEntry(entries []*Entry) {
// If we have entries store this in our wal.
if len(entries) > 0 {
if err := n.storeToWAL(ae); err != nil {
panic("Error storing!")
if err == ErrStoreClosed {
return
}
panic(fmt.Sprintf("Error storing to WAL: %v", err))
}
// We count ourselves.
n.acks[n.pindex] = map[string]struct{}{n.id: struct{}{}}
@@ -2225,9 +2348,12 @@ const (
)
func (n *raft) switchToFollower(leader string) {
n.notice("Switching to follower")
n.Lock()
defer n.Unlock()
if n.state == Closed {
return
}
n.notice("Switching to follower")
n.leader = leader
n.switchState(Follower)
}
@@ -2235,6 +2361,9 @@ func (n *raft) switchToFollower(leader string) {
func (n *raft) switchToCandidate() {
n.Lock()
defer n.Unlock()
if n.state == Closed {
return
}
if n.state != Candidate {
n.notice("Switching to candidate")
} else if n.lostQuorumLocked() {
@@ -2249,9 +2378,12 @@ func (n *raft) switchToCandidate() {
}
func (n *raft) switchToLeader() {
n.notice("Switching to leader")
n.Lock()
defer n.Unlock()
if n.state == Closed {
return
}
n.notice("Switching to leader")
n.leader = n.id
n.switchState(Leader)
}

View File

@@ -130,9 +130,11 @@ type debugOption struct {
newValue bool
}
// Apply is a no-op because logging will be reloaded after options are applied.
// Apply is mostly a no-op because logging will be reloaded after options are applied.
// However we will kick the raft nodes if they exist to reload.
func (d *debugOption) Apply(server *Server) {
server.Noticef("Reloaded: debug = %v", d.newValue)
server.reloadDebugRaftNodes()
}
// logtimeOption implements the option interface for the `logtime` setting.
@@ -552,6 +554,9 @@ type jetStreamOption struct {
func (a *jetStreamOption) Apply(s *Server) {
s.Noticef("Reloaded: jetstream")
if !a.newValue {
s.RemoveJetStream()
}
}
func (jso jetStreamOption) IsJetStreamChange() bool {
@@ -985,18 +990,33 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
return nil, fmt.Errorf("config reload not supported for %s: old=%v, new=%v",
field.Name, oldValue, newValue)
}
case "storedir":
return nil, fmt.Errorf("config reload not supported for jetstream storage directory")
case "jetstream":
new := newValue.(bool)
old := oldValue.(bool)
if new != old {
diffOpts = append(diffOpts, &jetStreamOption{newValue: new})
}
case "storedir":
new := newValue.(string)
old := oldValue.(string)
// If they are the same, or, we have turned it off (-1) that is ok.
if new != _EMPTY_ && new != old {
return nil, fmt.Errorf("config reload not supported for jetstream storage directory")
}
case "jetstreammaxmemory":
return nil, fmt.Errorf("config reload not supported for jetstream max memory")
old := oldValue.(int64)
new := newValue.(int64)
// If they are the same, or, we have turned it off (-1) that is ok.
if new != -1 && new != old {
return nil, fmt.Errorf("config reload not supported for jetstream max memory")
}
case "jetstreammaxstore":
return nil, fmt.Errorf("config reload not supported for jetstream max storage")
old := oldValue.(int64)
new := newValue.(int64)
// If they are the same, or, we have turned it off (-1) that is ok.
if new != -1 && new != old {
return nil, fmt.Errorf("config reload not supported for jetstream max storage")
}
case "websocket":
// Similar to gateways
tmpOld := oldValue.(WebsocketOpts)

View File

@@ -1527,16 +1527,15 @@ func (s *Server) Start() {
return
}
} else {
// Check to see if any configured accounts have JetStream enabled
// and warn if they do.
// Check to see if any configured accounts have JetStream enabled.
s.accounts.Range(func(k, v interface{}) bool {
acc := v.(*Account)
acc.mu.RLock()
hasJs := acc.jsLimits != nil
name := acc.Name
acc.mu.RUnlock()
if hasJs {
s.Warnf("Account [%q] has JetStream configuration but JetStream not enabled", name)
s.checkJetStreamExports()
acc.enableAllJetStreamServiceImports()
}
return true
})

View File

@@ -245,7 +245,7 @@ func (a *Account) addStream(config *StreamConfig, fsConfig *FileStoreConfig, sa
}
fsCfg.StoreDir = storeDir
if err := mset.setupStore(fsCfg); err != nil {
mset.Delete()
mset.stop(true, false)
return nil, err
}
@@ -270,7 +270,7 @@ func (a *Account) addStream(config *StreamConfig, fsConfig *FileStoreConfig, sa
// This can be called though before we actually setup clustering, so check both.
if !s.JetStreamIsClustered() && s.standAloneMode() {
if err := mset.setLeader(true); err != nil {
mset.Delete()
mset.stop(true, false)
return nil, err
}
}
@@ -296,6 +296,12 @@ func (a *Account) addStream(config *StreamConfig, fsConfig *FileStoreConfig, sa
return mset, nil
}
func (mset *Stream) streamAssignment() *streamAssignment {
mset.mu.RLock()
defer mset.mu.RUnlock()
return mset.sa
}
func (mset *Stream) setStreamAssignment(sa *streamAssignment) {
mset.mu.Lock()
defer mset.mu.Unlock()
@@ -325,8 +331,6 @@ func (mset *Stream) setLeader(isLeader bool) error {
// Setup subscriptions
if err := mset.subscribeToStream(); err != nil {
mset.mu.Unlock()
// FIXME(dlc)
mset.Delete()
return err
}
} else {
@@ -1788,21 +1792,21 @@ func (a *Account) RestoreStream(stream string, r io.Reader) (*Stream, error) {
metafile := path.Join(odir, ofi.Name(), JetStreamMetaFile)
metasum := path.Join(odir, ofi.Name(), JetStreamMetaFileSum)
if _, err := os.Stat(metafile); os.IsNotExist(err) {
mset.Delete()
mset.stop(true, false)
return nil, fmt.Errorf("error restoring consumer [%q]: %v", ofi.Name(), err)
}
buf, err := ioutil.ReadFile(metafile)
if err != nil {
mset.Delete()
mset.stop(true, false)
return nil, fmt.Errorf("error restoring consumer [%q]: %v", ofi.Name(), err)
}
if _, err := os.Stat(metasum); os.IsNotExist(err) {
mset.Delete()
mset.stop(true, false)
return nil, fmt.Errorf("error restoring consumer [%q]: %v", ofi.Name(), err)
}
var cfg FileConsumerInfo
if err := json.Unmarshal(buf, &cfg); err != nil {
mset.Delete()
mset.stop(true, false)
return nil, fmt.Errorf("error restoring consumer [%q]: %v", ofi.Name(), err)
}
isEphemeral := !isDurableConsumer(&cfg.ConsumerConfig)
@@ -1813,7 +1817,7 @@ func (a *Account) RestoreStream(stream string, r io.Reader) (*Stream, error) {
}
obs, err := mset.AddConsumer(&cfg.ConsumerConfig)
if err != nil {
mset.Delete()
mset.stop(true, false)
return nil, fmt.Errorf("error restoring consumer [%q]: %v", ofi.Name(), err)
}
if isEphemeral {
@@ -1823,7 +1827,7 @@ func (a *Account) RestoreStream(stream string, r io.Reader) (*Stream, error) {
obs.setCreated(cfg.Created)
}
if err := obs.readStoredState(); err != nil {
mset.Delete()
mset.stop(true, false)
return nil, fmt.Errorf("error restoring consumer [%q]: %v", ofi.Name(), err)
}
}

View File

@@ -691,7 +691,6 @@ func TestJetStreamClusterMetaSnapshotsMultiChange(t *testing.T) {
// Shut it down.
rs.Shutdown()
time.Sleep(250 * time.Millisecond)
// We want to make changes here that test each delta scenario for the meta snapshots.
// Add new stream and consumer.
@@ -726,8 +725,9 @@ func TestJetStreamClusterMetaSnapshotsMultiChange(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
c.leader().JetStreamSnapshotMeta()
time.Sleep(250 * time.Millisecond)
cl := c.leader()
cl.JetStreamSnapshotMeta()
c.waitOnServerCurrent(cl)
rs = c.restartServer(rs)
c.checkClusterFormed()
@@ -2465,7 +2465,7 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) {
}
// Make sure we received our lost quorum advisories.
adv, _ := ssub.NextMsg(0)
adv, _ := ssub.NextMsg(2 * time.Second)
if adv == nil {
t.Fatalf("Expected to receive a stream quorum lost advisory")
}
@@ -2540,6 +2540,46 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) {
}
}
func TestJetStreamClusterCreateResponseAdvisoriesHaveSubject(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
sub, err := nc.SubscribeSync("$JS.EVENT.ADVISORY.API")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()
if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Replicas: 2}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if _, err := js.SubscribeSync("TEST", nats.Durable("DLC")); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if err := js.PurgeStream("TEST"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if err := js.DeleteStream("TEST"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkSubsPending(t, sub, 6)
for m, err := sub.NextMsg(0); err == nil; m, err = sub.NextMsg(0) {
var audit server.JSAPIAudit
if err := json.Unmarshal(m.Data, &audit); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if audit.Subject == "" {
t.Fatalf("Expected subject, got nothing")
}
}
}
func TestJetStreamClusterRestartAndRemoveAdvisories(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
@@ -2757,6 +2797,306 @@ func TestJetStreamClusterNoDupePeerSelection(t *testing.T) {
}
}
func TestJetStreamClusterRemovePeer(t *testing.T) {
c := createJetStreamClusterExplicit(t, "RNS", 5)
defer c.shutdown()
// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Replicas: 3})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Send in 10 messages.
msg, toSend := []byte("Hello JS Clustering"), 10
for i := 0; i < toSend; i++ {
if _, err = js.Publish("TEST", msg); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
sub, err := js.SubscribeSync("TEST", nats.Durable("cat"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkSubsPending(t, sub, toSend)
// Grab stream info.
si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
peers := []string{si.Cluster.Leader}
for _, p := range si.Cluster.Replicas {
peers = append(peers, p.Name)
}
rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] })
toRemove := peers[0]
// First test bad peer.
req := &server.JSApiStreamRemovePeerRequest{Peer: "NOT VALID"}
jsreq, err := json.Marshal(req)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Need to call this by hand for now.
resp, err := nc.Request(fmt.Sprintf(server.JSApiStreamRemovePeerT, "TEST"), jsreq, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var rpResp server.JSApiStreamRemovePeerResponse
if err := json.Unmarshal(resp.Data, &rpResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if rpResp.Error == nil || !strings.Contains(rpResp.Error.Description, "peer not a member") {
t.Fatalf("Expected error for bad peer, got %+v", rpResp.Error)
}
rpResp.Error = nil
req = &server.JSApiStreamRemovePeerRequest{Peer: toRemove}
jsreq, err = json.Marshal(req)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
resp, err = nc.Request(fmt.Sprintf(server.JSApiStreamRemovePeerT, "TEST"), jsreq, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if err := json.Unmarshal(resp.Data, &rpResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if rpResp.Error != nil {
t.Fatalf("Unexpected error: %+v", rpResp.Error)
}
// Grab shorter timeout jetstream context.
js, err = nc.JetStream(nats.MaxWait(100 * time.Millisecond))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkFor(t, 2*time.Second, 50*time.Millisecond, func() error {
si, err := js.StreamInfo("TEST")
if err != nil {
return fmt.Errorf("Could not fetch stream info: %v", err)
}
if len(si.Cluster.Replicas) != 2 {
return fmt.Errorf("Expected 2 replicas, got %d", len(si.Cluster.Replicas))
}
for _, peer := range si.Cluster.Replicas {
if !peer.Current {
return fmt.Errorf("Expected replica to be current: %+v", peer)
}
}
if si.Cluster.Leader == toRemove {
return fmt.Errorf("Peer not removed yet: %+v", toRemove)
}
for _, p := range si.Cluster.Replicas {
if p.Name == toRemove {
return fmt.Errorf("Peer not removed yet: %+v", toRemove)
}
}
return nil
})
// Now check consumer info as well.
checkFor(t, 2*time.Second, 50*time.Millisecond, func() error {
ci, err := sub.ConsumerInfo()
if err != nil {
return fmt.Errorf("Could not fetch consumer info: %v", err)
}
if len(ci.Cluster.Replicas) != 2 {
return fmt.Errorf("Expected 2 replicas, got %d", len(ci.Cluster.Replicas))
}
for _, peer := range ci.Cluster.Replicas {
if !peer.Current {
return fmt.Errorf("Expected replica to be current: %+v", peer)
}
}
if ci.Cluster.Leader == toRemove {
return fmt.Errorf("Peer not removed yet: %+v", toRemove)
}
for _, p := range ci.Cluster.Replicas {
if p.Name == toRemove {
return fmt.Errorf("Peer not removed yet: %+v", toRemove)
}
}
return nil
})
}
func TestJetStreamClusterStepDown(t *testing.T) {
c := createJetStreamClusterExplicit(t, "RNS", 3)
defer c.shutdown()
// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Replicas: 3})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Send in 10 messages.
msg, toSend := []byte("Hello JS Clustering"), 10
for i := 0; i < toSend; i++ {
if _, err = js.Publish("TEST", msg); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
sub, err := js.SubscribeSync("TEST", nats.Durable("cat"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()
oldLeader := c.streamLeader("$G", "TEST").Name()
// Need to call this by hand for now.
resp, err := nc.Request(fmt.Sprintf(server.JSApiStreamLeaderStepDownT, "TEST"), nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var sdResp server.JSApiStreamLeaderStepDownResponse
if err := json.Unmarshal(resp.Data, &sdResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if sdResp.Error != nil {
t.Fatalf("Unexpected error: %+v", sdResp.Error)
}
// Grab shorter timeout jetstream context.
js, err = nc.JetStream(nats.MaxWait(100 * time.Millisecond))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkFor(t, 2*time.Second, 50*time.Millisecond, func() error {
si, err := js.StreamInfo("TEST")
if err != nil {
return fmt.Errorf("Could not fetch stream info: %v", err)
}
if si.Cluster.Leader == oldLeader {
return fmt.Errorf("Still have old leader")
}
if len(si.Cluster.Replicas) != 2 {
return fmt.Errorf("Expected 2 replicas, got %d", len(si.Cluster.Replicas))
}
for _, peer := range si.Cluster.Replicas {
if !peer.Current {
return fmt.Errorf("Expected replica to be current: %+v", peer)
}
}
return nil
})
// Now do consumer.
oldLeader = c.consumerLeader("$G", "TEST", "cat").Name()
// Need to call this by hand for now.
resp, err = nc.Request(fmt.Sprintf(server.JSApiConsumerLeaderStepDownT, "TEST", "cat"), nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var cdResp server.JSApiConsumerLeaderStepDownResponse
if err := json.Unmarshal(resp.Data, &cdResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if cdResp.Error != nil {
t.Fatalf("Unexpected error: %+v", sdResp.Error)
}
checkFor(t, 2*time.Second, 50*time.Millisecond, func() error {
ci, err := sub.ConsumerInfo()
if err != nil {
return fmt.Errorf("Could not fetch consumer info: %v", err)
}
if ci.Cluster.Leader == oldLeader {
return fmt.Errorf("Still have old leader")
}
if len(ci.Cluster.Replicas) != 2 {
return fmt.Errorf("Expected 2 replicas, got %d", len(ci.Cluster.Replicas))
}
for _, peer := range ci.Cluster.Replicas {
if !peer.Current {
return fmt.Errorf("Expected replica to be current: %+v", peer)
}
}
return nil
})
}
func TestJetStreamClusterRemoveServer(t *testing.T) {
c := createJetStreamClusterExplicit(t, "RNS", 5)
defer c.shutdown()
// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Replicas: 3})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Send in 10 messages.
msg, toSend := []byte("Hello JS Clustering"), 10
for i := 0; i < toSend; i++ {
if _, err = js.Publish("TEST", msg); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
sub, err := js.SubscribeSync("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkSubsPending(t, sub, toSend)
sl := c.streamLeader("$G", "TEST")
c.removeJetStream(sl)
c.waitOnStreamLeader("$G", "TEST")
// Check the stream info is eventually correct.
checkFor(t, 2*time.Second, 50*time.Millisecond, func() error {
si, err := js.StreamInfo("TEST")
if err != nil {
return fmt.Errorf("Could not fetch stream info: %v", err)
}
if len(si.Cluster.Replicas) != 2 {
return fmt.Errorf("Expected 2 replicas, got %d", len(si.Cluster.Replicas))
}
for _, peer := range si.Cluster.Replicas {
if !peer.Current {
return fmt.Errorf("Expected replica to be current: %+v", peer)
}
}
return nil
})
// Now do consumer.
checkFor(t, 5*time.Second, 50*time.Millisecond, func() error {
ci, err := sub.ConsumerInfo()
if err != nil {
return fmt.Errorf("Could not fetch consumer info: %v", err)
}
if len(ci.Cluster.Replicas) != 2 {
return fmt.Errorf("Expected 2 replicas, got %d", len(ci.Cluster.Replicas))
}
for _, peer := range ci.Cluster.Replicas {
if !peer.Current {
return fmt.Errorf("Expected replica to be current: %+v", peer)
}
}
return nil
})
}
func TestJetStreamClusterStreamPerf(t *testing.T) {
// Comment out to run, holding place for now.
skip(t)
@@ -3123,6 +3463,33 @@ func (c *cluster) waitOnClusterReady() {
c.t.Fatalf("Expected a cluster leader and fully formed cluster")
}
// Helper function to check that a cluster is formed
func (c *cluster) removeJetStream(s *server.Server) {
c.t.Helper()
index := -1
for i, cs := range c.servers {
if cs == s {
index = i
break
}
}
cf := c.opts[index].ConfigFile
cb, _ := ioutil.ReadFile(cf)
var sb strings.Builder
for _, l := range strings.Split(string(cb), "\n") {
if !strings.HasPrefix(strings.TrimSpace(l), "jetstream") {
sb.WriteString(l + "\n")
}
}
if err := ioutil.WriteFile(cf, []byte(sb.String()), 0644); err != nil {
c.t.Fatalf("Error writing updated config file: %v", err)
}
if err := s.Reload(); err != nil {
c.t.Fatalf("Error on server reload: %v", err)
}
time.Sleep(100 * time.Millisecond)
}
// Helper function to check that a cluster is formed
func (c *cluster) stopAll() {
c.t.Helper()
@@ -3142,5 +3509,5 @@ func (c *cluster) restartAll() {
}
}
c.waitOnClusterReady()
c.waitOnClusterReady()
c.waitOnLeader()
}

View File

@@ -718,6 +718,52 @@ func TestJetStreamClusterLargeStreamInlineCatchup(t *testing.T) {
})
}
func TestJetStreamClusterStreamCreateAndLostQuorum(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R5S", 5)
defer c.shutdown()
// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
sub, err := nc.SubscribeSync(server.JSAdvisoryStreamQuorumLostPre + ".*")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if _, err := js.AddStream(&nats.StreamConfig{Name: "NO-LQ-START", Replicas: 5}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
c.waitOnStreamLeader("$G", "NO-LQ-START")
checkSubsPending(t, sub, 0)
c.stopAll()
// Start up the one we were connected to first and wait for it to be connected.
s = c.restartServer(s)
nc, err = nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Failed to create client: %v", err)
}
defer nc.Close()
sub, err = nc.SubscribeSync(server.JSAdvisoryStreamQuorumLostPre + ".*")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
nc.Flush()
c.restartAll()
c.waitOnStreamLeader("$G", "NO-LQ-START")
for m, err := sub.NextMsg(0); err == nil; m, err = sub.NextMsg(0) {
fmt.Printf("m: %s\n", m.Data)
}
checkSubsPending(t, sub, 0)
}
func TestNoRaceLeafNodeSmapUpdate(t *testing.T) {
s, opts := runLeafServer()
defer s.Shutdown()