diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 4ebf3489..e25c00c7 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1532,14 +1532,10 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, subject, repl } // Check to see if we are a member of the group and if the group has no leader. - if js.isGroupLeaderless(sa.Group) { - resp.Error = jsClusterNotAvailErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } + isLeaderless := js.isGroupLeaderless(sa.Group) // We have the stream assigned and a leader, so only the stream leader should answer. - if !acc.JetStreamIsStreamLeader(streamName) { + if !acc.JetStreamIsStreamLeader(streamName) && !isLeaderless { if js.isLeaderless() { resp.Error = jsClusterNotAvailErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) @@ -1839,12 +1835,6 @@ func (s *Server) jsStreamRemovePeerRequest(sub *subscription, c *client, subject } // Check to see if we are a member of the group and if the group has no leader. - if js.isGroupLeaderless(sa.Group) { - resp.Error = jsClusterNotAvailErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - // Peers here is a server name, convert to node name. nodeName := string(getHash(req.Peer)) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index b8a06d91..c1bc9a3d 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -977,11 +977,6 @@ func (js *jetStream) processRemovePeer(peer string) { js.mu.Lock() s, cc := js.srv, js.cluster - // Only leader will do remappings for streams and consumers. - if cc.isLeader() { - js.remapStreamsLocked(peer) - } - // All nodes will check if this is them. isUs := cc.meta.ID() == peer disabled := js.disabled @@ -4252,6 +4247,12 @@ func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo { now := time.Now() id, peers := n.ID(), n.Peers() + + // If we are leaderless, do not suppress putting us in the peer list. + if ci.Leader == _EMPTY_ { + id = _EMPTY_ + } + for _, rp := range peers { if rp.ID != id && rg.isMember(rp.ID) { lastSeen := now.Sub(rp.Last) @@ -4271,9 +4272,15 @@ func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo { func (mset *stream) handleClusterStreamInfoRequest(sub *subscription, c *client, subject, reply string, msg []byte) { mset.mu.RLock() - sysc, js, config := mset.sysc, mset.srv.js, mset.cfg + sysc, js, sa, config := mset.sysc, mset.srv.js, mset.sa, mset.cfg mset.mu.RUnlock() + // By design all members will receive this. Normally we only want the leader answering. + // But if we have stalled and lost quorom all can respond. + if sa != nil && !js.isGroupLeaderless(sa.Group) && !mset.isLeader() { + return + } + si := &StreamInfo{ Created: mset.createdTime(), State: mset.state(), diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index ac15df65..e6e0a216 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -3129,11 +3129,6 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) { return err != nil && strings.Contains(err.Error(), "unavailable") } - // Expect to get errors here. - if _, err := js.StreamInfo("NO-Q"); !notAvailableErr(err) { - t.Fatalf("Expected an 'unavailable' error, got %v", err) - } - checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { if cl := c.consumerLeader("$G", "NO-Q", ci.Name); cl == nil { return nil @@ -3166,9 +3161,6 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) { if err := js.DeleteStream("NO-Q"); !notAvailableErr(err) { t.Fatalf("Expected an 'unavailable' error, got %v", err) } - if _, err := js.StreamInfo("NO-Q"); !notAvailableErr(err) { - t.Fatalf("Expected an 'unavailable' error, got %v", err) - } if err := js.PurgeStream("NO-Q"); !notAvailableErr(err) { t.Fatalf("Expected an 'unavailable' error, got %v", err) } diff --git a/server/stream.go b/server/stream.go index 10b7f2f7..38644937 100644 --- a/server/stream.go +++ b/server/stream.go @@ -399,9 +399,18 @@ func (mset *stream) setStreamAssignment(sa *streamAssignment) { mset.mu.Lock() defer mset.mu.Unlock() mset.sa = sa + if sa == nil { + return + } + // Set our node. - if sa != nil { - mset.node = sa.Group.node + mset.node = sa.Group.node + + // Setup our info sub here as well for all stream members. This is now by design. + if mset.infoSub == nil { + isubj := fmt.Sprintf(clusterStreamInfoT, mset.jsa.acc(), mset.cfg.Name) + // Note below the way we subscribe here is so that we can send requests to ourselves. + mset.infoSub, _ = mset.srv.systemSubscribe(isubj, _EMPTY_, false, mset.sysc, mset.handleClusterStreamInfoRequest) } } @@ -438,13 +447,6 @@ func (mset *stream) setLeader(isLeader bool) error { // Lock should be held. func (mset *stream) startClusterSubs() { - if mset.infoSub == nil { - if jsa := mset.jsa; jsa != nil { - isubj := fmt.Sprintf(clusterStreamInfoT, jsa.acc(), mset.cfg.Name) - // Note below the way we subscribe here is so that we can send requests to ourselves. - mset.infoSub, _ = mset.srv.systemSubscribe(isubj, _EMPTY_, false, mset.sysc, mset.handleClusterStreamInfoRequest) - } - } if mset.isClustered() && mset.syncSub == nil { mset.syncSub, _ = mset.srv.systemSubscribe(mset.sa.Sync, _EMPTY_, false, mset.sysc, mset.handleClusterSyncRequest) } @@ -452,10 +454,6 @@ func (mset *stream) startClusterSubs() { // Lock should be held. func (mset *stream) stopClusterSubs() { - if mset.infoSub != nil { - mset.srv.sysUnsubscribe(mset.infoSub) - mset.infoSub = nil - } if mset.syncSub != nil { mset.srv.sysUnsubscribe(mset.syncSub) mset.syncSub = nil @@ -2524,6 +2522,12 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { // Unsubscribe from direct stream. mset.unsubscribeToStream() + // Our info sub if we spun it up. + if mset.infoSub != nil { + mset.srv.sysUnsubscribe(mset.infoSub) + mset.infoSub = nil + } + // Quit channel. if mset.qch != nil { close(mset.qch)