This changes our behaviors for streams and peer removals in several ways.

First we no longer try to auto-remap stream assignments on peer removals from the system.
We also now can always respond to stream info requests if at least a member is running.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-03-11 06:52:28 -05:00
parent 6600c223bb
commit 299f44cddf
4 changed files with 32 additions and 39 deletions

View File

@@ -1532,14 +1532,10 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, subject, repl
}
// Check to see if we are a member of the group and if the group has no leader.
if js.isGroupLeaderless(sa.Group) {
resp.Error = jsClusterNotAvailErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
isLeaderless := js.isGroupLeaderless(sa.Group)
// We have the stream assigned and a leader, so only the stream leader should answer.
if !acc.JetStreamIsStreamLeader(streamName) {
if !acc.JetStreamIsStreamLeader(streamName) && !isLeaderless {
if js.isLeaderless() {
resp.Error = jsClusterNotAvailErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
@@ -1839,12 +1835,6 @@ func (s *Server) jsStreamRemovePeerRequest(sub *subscription, c *client, subject
}
// Check to see if we are a member of the group and if the group has no leader.
if js.isGroupLeaderless(sa.Group) {
resp.Error = jsClusterNotAvailErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Peers here is a server name, convert to node name.
nodeName := string(getHash(req.Peer))

View File

@@ -977,11 +977,6 @@ func (js *jetStream) processRemovePeer(peer string) {
js.mu.Lock()
s, cc := js.srv, js.cluster
// Only leader will do remappings for streams and consumers.
if cc.isLeader() {
js.remapStreamsLocked(peer)
}
// All nodes will check if this is them.
isUs := cc.meta.ID() == peer
disabled := js.disabled
@@ -4252,6 +4247,12 @@ func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo {
now := time.Now()
id, peers := n.ID(), n.Peers()
// If we are leaderless, do not suppress putting us in the peer list.
if ci.Leader == _EMPTY_ {
id = _EMPTY_
}
for _, rp := range peers {
if rp.ID != id && rg.isMember(rp.ID) {
lastSeen := now.Sub(rp.Last)
@@ -4271,9 +4272,15 @@ func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo {
func (mset *stream) handleClusterStreamInfoRequest(sub *subscription, c *client, subject, reply string, msg []byte) {
mset.mu.RLock()
sysc, js, config := mset.sysc, mset.srv.js, mset.cfg
sysc, js, sa, config := mset.sysc, mset.srv.js, mset.sa, mset.cfg
mset.mu.RUnlock()
// By design all members will receive this. Normally we only want the leader answering.
// But if we have stalled and lost quorom all can respond.
if sa != nil && !js.isGroupLeaderless(sa.Group) && !mset.isLeader() {
return
}
si := &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),

View File

@@ -3129,11 +3129,6 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) {
return err != nil && strings.Contains(err.Error(), "unavailable")
}
// Expect to get errors here.
if _, err := js.StreamInfo("NO-Q"); !notAvailableErr(err) {
t.Fatalf("Expected an 'unavailable' error, got %v", err)
}
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
if cl := c.consumerLeader("$G", "NO-Q", ci.Name); cl == nil {
return nil
@@ -3166,9 +3161,6 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) {
if err := js.DeleteStream("NO-Q"); !notAvailableErr(err) {
t.Fatalf("Expected an 'unavailable' error, got %v", err)
}
if _, err := js.StreamInfo("NO-Q"); !notAvailableErr(err) {
t.Fatalf("Expected an 'unavailable' error, got %v", err)
}
if err := js.PurgeStream("NO-Q"); !notAvailableErr(err) {
t.Fatalf("Expected an 'unavailable' error, got %v", err)
}

View File

@@ -399,9 +399,18 @@ func (mset *stream) setStreamAssignment(sa *streamAssignment) {
mset.mu.Lock()
defer mset.mu.Unlock()
mset.sa = sa
if sa == nil {
return
}
// Set our node.
if sa != nil {
mset.node = sa.Group.node
mset.node = sa.Group.node
// Setup our info sub here as well for all stream members. This is now by design.
if mset.infoSub == nil {
isubj := fmt.Sprintf(clusterStreamInfoT, mset.jsa.acc(), mset.cfg.Name)
// Note below the way we subscribe here is so that we can send requests to ourselves.
mset.infoSub, _ = mset.srv.systemSubscribe(isubj, _EMPTY_, false, mset.sysc, mset.handleClusterStreamInfoRequest)
}
}
@@ -438,13 +447,6 @@ func (mset *stream) setLeader(isLeader bool) error {
// Lock should be held.
func (mset *stream) startClusterSubs() {
if mset.infoSub == nil {
if jsa := mset.jsa; jsa != nil {
isubj := fmt.Sprintf(clusterStreamInfoT, jsa.acc(), mset.cfg.Name)
// Note below the way we subscribe here is so that we can send requests to ourselves.
mset.infoSub, _ = mset.srv.systemSubscribe(isubj, _EMPTY_, false, mset.sysc, mset.handleClusterStreamInfoRequest)
}
}
if mset.isClustered() && mset.syncSub == nil {
mset.syncSub, _ = mset.srv.systemSubscribe(mset.sa.Sync, _EMPTY_, false, mset.sysc, mset.handleClusterSyncRequest)
}
@@ -452,10 +454,6 @@ func (mset *stream) startClusterSubs() {
// Lock should be held.
func (mset *stream) stopClusterSubs() {
if mset.infoSub != nil {
mset.srv.sysUnsubscribe(mset.infoSub)
mset.infoSub = nil
}
if mset.syncSub != nil {
mset.srv.sysUnsubscribe(mset.syncSub)
mset.syncSub = nil
@@ -2524,6 +2522,12 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
// Unsubscribe from direct stream.
mset.unsubscribeToStream()
// Our info sub if we spun it up.
if mset.infoSub != nil {
mset.srv.sysUnsubscribe(mset.infoSub)
mset.infoSub = nil
}
// Quit channel.
if mset.qch != nil {
close(mset.qch)