diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 12023985..d4905a84 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -581,7 +581,9 @@ const JSApiLeaderStepDownResponseType = "io.nats.jetstream.api.v1.meta_leader_st type JSApiMetaServerRemoveRequest struct { // Server name of the peer to be removed. Server string `json:"peer"` - ByID bool `json:"by_id,omitempty"` + // Peer ID of the peer to be removed. If specified this is used + // instead of the server name. + Peer string `json:"peer_id,omitempty"` } // JSApiMetaServerRemoveResponse is the response to a peer removal request in the meta group. @@ -2216,9 +2218,10 @@ func (s *Server) jsLeaderServerRemoveRequest(sub *subscription, c *client, _ *Ac var found string js.mu.RLock() for _, p := range cc.meta.Peers() { - if req.ByID { - if p.ID == req.Server { - found = req.Server + // If Peer is specified, it takes precedence + if req.Peer != _EMPTY_ { + if p.ID == req.Peer { + found = req.Peer break } continue diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 3b9fd81d..0ef749d6 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2000,10 +2000,10 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps var newLeaderPeer, newLeader string neededCurrent, current := replicas/2+1, 0 for _, r := range ci.Replicas { - if r.Current && newPeerSet[r.peer] { + if r.Current && newPeerSet[r.Peer] { current++ if newLeader == _EMPTY_ { - newLeaderPeer, newLeader = r.peer, r.Name + newLeaderPeer, newLeader = r.Peer, r.Name } } } @@ -2427,7 +2427,7 @@ func (s *Server) replicas(node RaftNode) []*PeerInfo { for _, rp := range node.Peers() { if sir, ok := s.nodeToInfo.Load(rp.ID); ok && sir != nil { si := sir.(nodeInfo) - pi := &PeerInfo{Name: si.name, Current: rp.Current, Active: now.Sub(rp.Last), Offline: si.offline, Lag: rp.Lag} + pi := &PeerInfo{Peer: rp.ID, Name: si.name, Current: rp.Current, Active: now.Sub(rp.Last), Offline: si.offline, Lag: rp.Lag} replicas = append(replicas, pi) } } @@ -3925,10 +3925,10 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { var newLeaderPeer, newLeader, newCluster string neededCurrent, current := replicas/2+1, 0 for _, r := range ci.Replicas { - if r.Current && newPeerSet[r.peer] { + if r.Current && newPeerSet[r.Peer] { current++ if newCluster == _EMPTY_ { - newLeaderPeer, newLeader, newCluster = r.peer, r.Name, r.cluster + newLeaderPeer, newLeader, newCluster = r.Peer, r.Name, r.cluster } } } @@ -7087,7 +7087,7 @@ func (js *jetStream) offlineClusterInfo(rg *raftGroup) *ClusterInfo { for _, peer := range rg.Peers { if sir, ok := s.nodeToInfo.Load(peer); ok && sir != nil { si := sir.(nodeInfo) - pi := &PeerInfo{Name: si.name, Current: false, Offline: true} + pi := &PeerInfo{Peer: peer, Name: si.name, Current: false, Offline: true} ci.Replicas = append(ci.Replicas, pi) } } @@ -7143,7 +7143,7 @@ func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo { Offline: true, Active: lastSeen, Lag: rp.Lag, - peer: rp.ID, + Peer: rp.ID, } // If node is found, complete/update the settings. if sir, ok := s.nodeToInfo.Load(rp.ID); ok && sir != nil { @@ -7153,6 +7153,8 @@ func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo { // If not, then add a name that indicates that the server name // is unknown at this time, and clear the lag since it is misleading // (the node may not have that much lag). + // Note: We return now the Peer ID in PeerInfo, so the "(peerID: %s)" + // would technically not be required, but keeping it for now. pi.Name, pi.Lag = fmt.Sprintf("Server name unknown at this time (peerID: %s)", rp.ID), 0 } ci.Replicas = append(ci.Replicas, pi) diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index 01f4920e..1a30a3ad 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -7317,8 +7317,7 @@ func TestJetStreamClusterRemovePeerByID(t *testing.T) { require_True(t, IsNatsErr(resp.Error, JSClusterServerNotMemberErr)) // Now try by ID, but first with an ID that does not match any peerID - req.Server = "some_bad_id" - req.ByID = true + req.Peer = "some_bad_id" jsreq, err = json.Marshal(req) require_NoError(t, err) rmsg, err = nc.Request(JSApiRemoveServer, jsreq, 2*time.Second) @@ -7331,7 +7330,7 @@ func TestJetStreamClusterRemovePeerByID(t *testing.T) { require_True(t, IsNatsErr(resp.Error, JSClusterServerNotMemberErr)) // Now with the proper peer ID - req.Server = peerID + req.Peer = peerID jsreq, err = json.Marshal(req) require_NoError(t, err) rmsg, err = nc.Request(JSApiRemoveServer, jsreq, 2*time.Second) diff --git a/server/monitor_test.go b/server/monitor_test.go index f8186937..b8eed2a3 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -4365,6 +4365,11 @@ func TestMonitorJsz(t *testing.T) { info := readJsInfo(url + "") if info.Meta.Replicas != nil { found++ + for _, r := range info.Meta.Replicas { + if r.Peer == _EMPTY_ { + t.Fatalf("Replicas' Peer is empty: %+v", r) + } + } if info.Meta.Leader != srvs[i].Name() { t.Fatalf("received cluster info from non leader: leader %s, server: %s", info.Meta.Leader, srvs[i].Name()) } diff --git a/server/stream.go b/server/stream.go index ac4b25a1..45b2d4fc 100644 --- a/server/stream.go +++ b/server/stream.go @@ -143,9 +143,9 @@ type PeerInfo struct { Offline bool `json:"offline,omitempty"` Active time.Duration `json:"active"` Lag uint64 `json:"lag,omitempty"` + Peer string `json:"peer"` // For migrations. cluster string - peer string } // StreamSourceInfo shows information about an upstream stream source.