From f113163b9f0b2ec229c1657171e991544355b1ea Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 15 Sep 2022 10:36:22 -0600 Subject: [PATCH] Change ByID boolean to Peer string and add Peer id in replicas output The CLI will now be able to display the peer IDs in MetaGroupInfo if it choses to do so, and possibly help user select the peer ID from a list with a new command to remove by peer ID instead of by server name. Signed-off-by: Ivan Kozlovic --- server/jetstream_api.go | 11 +++++++---- server/jetstream_cluster.go | 16 +++++++++------- server/jetstream_cluster_2_test.go | 5 ++--- server/monitor_test.go | 5 +++++ server/stream.go | 2 +- 5 files changed, 24 insertions(+), 15 deletions(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 12023985..d4905a84 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -581,7 +581,9 @@ const JSApiLeaderStepDownResponseType = "io.nats.jetstream.api.v1.meta_leader_st type JSApiMetaServerRemoveRequest struct { // Server name of the peer to be removed. Server string `json:"peer"` - ByID bool `json:"by_id,omitempty"` + // Peer ID of the peer to be removed. If specified this is used + // instead of the server name. + Peer string `json:"peer_id,omitempty"` } // JSApiMetaServerRemoveResponse is the response to a peer removal request in the meta group. @@ -2216,9 +2218,10 @@ func (s *Server) jsLeaderServerRemoveRequest(sub *subscription, c *client, _ *Ac var found string js.mu.RLock() for _, p := range cc.meta.Peers() { - if req.ByID { - if p.ID == req.Server { - found = req.Server + // If Peer is specified, it takes precedence + if req.Peer != _EMPTY_ { + if p.ID == req.Peer { + found = req.Peer break } continue diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 3b9fd81d..0ef749d6 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2000,10 +2000,10 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps var newLeaderPeer, newLeader string neededCurrent, current := replicas/2+1, 0 for _, r := range ci.Replicas { - if r.Current && newPeerSet[r.peer] { + if r.Current && newPeerSet[r.Peer] { current++ if newLeader == _EMPTY_ { - newLeaderPeer, newLeader = r.peer, r.Name + newLeaderPeer, newLeader = r.Peer, r.Name } } } @@ -2427,7 +2427,7 @@ func (s *Server) replicas(node RaftNode) []*PeerInfo { for _, rp := range node.Peers() { if sir, ok := s.nodeToInfo.Load(rp.ID); ok && sir != nil { si := sir.(nodeInfo) - pi := &PeerInfo{Name: si.name, Current: rp.Current, Active: now.Sub(rp.Last), Offline: si.offline, Lag: rp.Lag} + pi := &PeerInfo{Peer: rp.ID, Name: si.name, Current: rp.Current, Active: now.Sub(rp.Last), Offline: si.offline, Lag: rp.Lag} replicas = append(replicas, pi) } } @@ -3925,10 +3925,10 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { var newLeaderPeer, newLeader, newCluster string neededCurrent, current := replicas/2+1, 0 for _, r := range ci.Replicas { - if r.Current && newPeerSet[r.peer] { + if r.Current && newPeerSet[r.Peer] { current++ if newCluster == _EMPTY_ { - newLeaderPeer, newLeader, newCluster = r.peer, r.Name, r.cluster + newLeaderPeer, newLeader, newCluster = r.Peer, r.Name, r.cluster } } } @@ -7087,7 +7087,7 @@ func (js *jetStream) offlineClusterInfo(rg *raftGroup) *ClusterInfo { for _, peer := range rg.Peers { if sir, ok := s.nodeToInfo.Load(peer); ok && sir != nil { si := sir.(nodeInfo) - pi := &PeerInfo{Name: si.name, Current: false, Offline: true} + pi := &PeerInfo{Peer: peer, Name: si.name, Current: false, Offline: true} ci.Replicas = append(ci.Replicas, pi) } } @@ -7143,7 +7143,7 @@ func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo { Offline: true, Active: lastSeen, Lag: rp.Lag, - peer: rp.ID, + Peer: rp.ID, } // If node is found, complete/update the settings. if sir, ok := s.nodeToInfo.Load(rp.ID); ok && sir != nil { @@ -7153,6 +7153,8 @@ func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo { // If not, then add a name that indicates that the server name // is unknown at this time, and clear the lag since it is misleading // (the node may not have that much lag). + // Note: We return now the Peer ID in PeerInfo, so the "(peerID: %s)" + // would technically not be required, but keeping it for now. pi.Name, pi.Lag = fmt.Sprintf("Server name unknown at this time (peerID: %s)", rp.ID), 0 } ci.Replicas = append(ci.Replicas, pi) diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index 01f4920e..1a30a3ad 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -7317,8 +7317,7 @@ func TestJetStreamClusterRemovePeerByID(t *testing.T) { require_True(t, IsNatsErr(resp.Error, JSClusterServerNotMemberErr)) // Now try by ID, but first with an ID that does not match any peerID - req.Server = "some_bad_id" - req.ByID = true + req.Peer = "some_bad_id" jsreq, err = json.Marshal(req) require_NoError(t, err) rmsg, err = nc.Request(JSApiRemoveServer, jsreq, 2*time.Second) @@ -7331,7 +7330,7 @@ func TestJetStreamClusterRemovePeerByID(t *testing.T) { require_True(t, IsNatsErr(resp.Error, JSClusterServerNotMemberErr)) // Now with the proper peer ID - req.Server = peerID + req.Peer = peerID jsreq, err = json.Marshal(req) require_NoError(t, err) rmsg, err = nc.Request(JSApiRemoveServer, jsreq, 2*time.Second) diff --git a/server/monitor_test.go b/server/monitor_test.go index f8186937..b8eed2a3 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -4365,6 +4365,11 @@ func TestMonitorJsz(t *testing.T) { info := readJsInfo(url + "") if info.Meta.Replicas != nil { found++ + for _, r := range info.Meta.Replicas { + if r.Peer == _EMPTY_ { + t.Fatalf("Replicas' Peer is empty: %+v", r) + } + } if info.Meta.Leader != srvs[i].Name() { t.Fatalf("received cluster info from non leader: leader %s, server: %s", info.Meta.Leader, srvs[i].Name()) } diff --git a/server/stream.go b/server/stream.go index ac4b25a1..45b2d4fc 100644 --- a/server/stream.go +++ b/server/stream.go @@ -143,9 +143,9 @@ type PeerInfo struct { Offline bool `json:"offline,omitempty"` Active time.Duration `json:"active"` Lag uint64 `json:"lag,omitempty"` + Peer string `json:"peer"` // For migrations. cluster string - peer string } // StreamSourceInfo shows information about an upstream stream source.