mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Merge pull request #1993 from nats-io/remove-peer
Lost quorum changes for streams
This commit is contained in:
@@ -41,7 +41,7 @@ var (
|
||||
|
||||
const (
|
||||
// VERSION is the current version for the server.
|
||||
VERSION = "2.2.0-RC.7.5"
|
||||
VERSION = "2.2.0-RC.8"
|
||||
|
||||
// PROTO is the currently supported protocol.
|
||||
// 0 was the original
|
||||
|
||||
@@ -1532,14 +1532,10 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, subject, repl
|
||||
}
|
||||
|
||||
// Check to see if we are a member of the group and if the group has no leader.
|
||||
if js.isGroupLeaderless(sa.Group) {
|
||||
resp.Error = jsClusterNotAvailErr
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
isLeaderless := js.isGroupLeaderless(sa.Group)
|
||||
|
||||
// We have the stream assigned and a leader, so only the stream leader should answer.
|
||||
if !acc.JetStreamIsStreamLeader(streamName) {
|
||||
if !acc.JetStreamIsStreamLeader(streamName) && !isLeaderless {
|
||||
if js.isLeaderless() {
|
||||
resp.Error = jsClusterNotAvailErr
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
@@ -1839,12 +1835,6 @@ func (s *Server) jsStreamRemovePeerRequest(sub *subscription, c *client, subject
|
||||
}
|
||||
|
||||
// Check to see if we are a member of the group and if the group has no leader.
|
||||
if js.isGroupLeaderless(sa.Group) {
|
||||
resp.Error = jsClusterNotAvailErr
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
// Peers here is a server name, convert to node name.
|
||||
nodeName := string(getHash(req.Peer))
|
||||
|
||||
|
||||
@@ -977,11 +977,6 @@ func (js *jetStream) processRemovePeer(peer string) {
|
||||
js.mu.Lock()
|
||||
s, cc := js.srv, js.cluster
|
||||
|
||||
// Only leader will do remappings for streams and consumers.
|
||||
if cc.isLeader() {
|
||||
js.remapStreamsLocked(peer)
|
||||
}
|
||||
|
||||
// All nodes will check if this is them.
|
||||
isUs := cc.meta.ID() == peer
|
||||
disabled := js.disabled
|
||||
@@ -4252,6 +4247,12 @@ func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo {
|
||||
now := time.Now()
|
||||
|
||||
id, peers := n.ID(), n.Peers()
|
||||
|
||||
// If we are leaderless, do not suppress putting us in the peer list.
|
||||
if ci.Leader == _EMPTY_ {
|
||||
id = _EMPTY_
|
||||
}
|
||||
|
||||
for _, rp := range peers {
|
||||
if rp.ID != id && rg.isMember(rp.ID) {
|
||||
lastSeen := now.Sub(rp.Last)
|
||||
@@ -4271,9 +4272,15 @@ func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo {
|
||||
|
||||
func (mset *stream) handleClusterStreamInfoRequest(sub *subscription, c *client, subject, reply string, msg []byte) {
|
||||
mset.mu.RLock()
|
||||
sysc, js, config := mset.sysc, mset.srv.js, mset.cfg
|
||||
sysc, js, sa, config := mset.sysc, mset.srv.js, mset.sa, mset.cfg
|
||||
mset.mu.RUnlock()
|
||||
|
||||
// By design all members will receive this. Normally we only want the leader answering.
|
||||
// But if we have stalled and lost quorom all can respond.
|
||||
if sa != nil && !js.isGroupLeaderless(sa.Group) && !mset.isLeader() {
|
||||
return
|
||||
}
|
||||
|
||||
si := &StreamInfo{
|
||||
Created: mset.createdTime(),
|
||||
State: mset.state(),
|
||||
|
||||
@@ -3129,11 +3129,6 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) {
|
||||
return err != nil && strings.Contains(err.Error(), "unavailable")
|
||||
}
|
||||
|
||||
// Expect to get errors here.
|
||||
if _, err := js.StreamInfo("NO-Q"); !notAvailableErr(err) {
|
||||
t.Fatalf("Expected an 'unavailable' error, got %v", err)
|
||||
}
|
||||
|
||||
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
|
||||
if cl := c.consumerLeader("$G", "NO-Q", ci.Name); cl == nil {
|
||||
return nil
|
||||
@@ -3166,9 +3161,6 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) {
|
||||
if err := js.DeleteStream("NO-Q"); !notAvailableErr(err) {
|
||||
t.Fatalf("Expected an 'unavailable' error, got %v", err)
|
||||
}
|
||||
if _, err := js.StreamInfo("NO-Q"); !notAvailableErr(err) {
|
||||
t.Fatalf("Expected an 'unavailable' error, got %v", err)
|
||||
}
|
||||
if err := js.PurgeStream("NO-Q"); !notAvailableErr(err) {
|
||||
t.Fatalf("Expected an 'unavailable' error, got %v", err)
|
||||
}
|
||||
|
||||
@@ -1438,6 +1438,10 @@ func TestNoRaceJetStreamClusterStreamCreateAndLostQuorum(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNoRaceJetStreamClusterSuperClusterMirrors(t *testing.T) {
|
||||
// These pass locally but are flaky on Travis.
|
||||
// Disable for now.
|
||||
skip(t)
|
||||
|
||||
sc := createJetStreamSuperCluster(t, 3, 3)
|
||||
defer sc.shutdown()
|
||||
|
||||
@@ -1572,6 +1576,10 @@ func TestNoRaceJetStreamClusterSuperClusterMirrors(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNoRaceJetStreamClusterSuperClusterSources(t *testing.T) {
|
||||
// These pass locally but are flaky on Travis.
|
||||
// Disable for now.
|
||||
skip(t)
|
||||
|
||||
sc := createJetStreamSuperCluster(t, 3, 3)
|
||||
defer sc.shutdown()
|
||||
|
||||
|
||||
@@ -399,9 +399,18 @@ func (mset *stream) setStreamAssignment(sa *streamAssignment) {
|
||||
mset.mu.Lock()
|
||||
defer mset.mu.Unlock()
|
||||
mset.sa = sa
|
||||
if sa == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Set our node.
|
||||
if sa != nil {
|
||||
mset.node = sa.Group.node
|
||||
mset.node = sa.Group.node
|
||||
|
||||
// Setup our info sub here as well for all stream members. This is now by design.
|
||||
if mset.infoSub == nil {
|
||||
isubj := fmt.Sprintf(clusterStreamInfoT, mset.jsa.acc(), mset.cfg.Name)
|
||||
// Note below the way we subscribe here is so that we can send requests to ourselves.
|
||||
mset.infoSub, _ = mset.srv.systemSubscribe(isubj, _EMPTY_, false, mset.sysc, mset.handleClusterStreamInfoRequest)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -438,13 +447,6 @@ func (mset *stream) setLeader(isLeader bool) error {
|
||||
|
||||
// Lock should be held.
|
||||
func (mset *stream) startClusterSubs() {
|
||||
if mset.infoSub == nil {
|
||||
if jsa := mset.jsa; jsa != nil {
|
||||
isubj := fmt.Sprintf(clusterStreamInfoT, jsa.acc(), mset.cfg.Name)
|
||||
// Note below the way we subscribe here is so that we can send requests to ourselves.
|
||||
mset.infoSub, _ = mset.srv.systemSubscribe(isubj, _EMPTY_, false, mset.sysc, mset.handleClusterStreamInfoRequest)
|
||||
}
|
||||
}
|
||||
if mset.isClustered() && mset.syncSub == nil {
|
||||
mset.syncSub, _ = mset.srv.systemSubscribe(mset.sa.Sync, _EMPTY_, false, mset.sysc, mset.handleClusterSyncRequest)
|
||||
}
|
||||
@@ -452,10 +454,6 @@ func (mset *stream) startClusterSubs() {
|
||||
|
||||
// Lock should be held.
|
||||
func (mset *stream) stopClusterSubs() {
|
||||
if mset.infoSub != nil {
|
||||
mset.srv.sysUnsubscribe(mset.infoSub)
|
||||
mset.infoSub = nil
|
||||
}
|
||||
if mset.syncSub != nil {
|
||||
mset.srv.sysUnsubscribe(mset.syncSub)
|
||||
mset.syncSub = nil
|
||||
@@ -2524,6 +2522,12 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
|
||||
// Unsubscribe from direct stream.
|
||||
mset.unsubscribeToStream()
|
||||
|
||||
// Our info sub if we spun it up.
|
||||
if mset.infoSub != nil {
|
||||
mset.srv.sysUnsubscribe(mset.infoSub)
|
||||
mset.infoSub = nil
|
||||
}
|
||||
|
||||
// Quit channel.
|
||||
if mset.qch != nil {
|
||||
close(mset.qch)
|
||||
|
||||
Reference in New Issue
Block a user