From d61ecf8a89f9000c4b4f9d3e7fbcd0f4c8fb69d1 Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Sat, 14 Oct 2023 12:28:36 +0300 Subject: [PATCH] Report the raft group name in stream and consumer info Signed-off-by: R.I.Pienaar --- server/jetstream_cluster.go | 9 ++--- server/jetstream_cluster_1_test.go | 53 ++++++++++++++++++++++++++++++ server/stream.go | 7 ++-- 3 files changed, 62 insertions(+), 7 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 67278f20..076de817 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -6472,7 +6472,7 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filt s.sys.replies[inbox] = func(sub *subscription, _ *client, _ *Account, subject, _ string, msg []byte) { var si StreamInfo if err := json.Unmarshal(msg, &si); err != nil { - s.Warnf("Error unmarshaling clustered stream info response:%v", err) + s.Warnf("Error unmarshalling clustered stream info response:%v", err) return } select { @@ -8121,7 +8121,7 @@ func (mset *stream) handleClusterSyncRequest(sub *subscription, c *client, _ *Ac func (js *jetStream) offlineClusterInfo(rg *raftGroup) *ClusterInfo { s := js.srv - ci := &ClusterInfo{Name: s.ClusterName()} + ci := &ClusterInfo{Name: s.ClusterName(), RaftGroup: rg.Name} for _, peer := range rg.Peers { if sir, ok := s.nodeToInfo.Load(peer); ok && sir != nil { si := sir.(nodeInfo) @@ -8150,8 +8150,9 @@ func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo { n := rg.node ci := &ClusterInfo{ - Name: s.cachedClusterName(), - Leader: s.serverNameForNode(n.GroupLeader()), + Name: s.cachedClusterName(), + Leader: s.serverNameForNode(n.GroupLeader()), + RaftGroup: rg.Name, } now := time.Now() diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 06d0c1e0..fa28a41f 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -145,6 +145,59 @@ func TestJetStreamClusterStreamLimitWithAccountDefaults(t *testing.T) { require_Contains(t, err.Error(), "no suitable peers for placement", "insufficient storage") } +func TestJetStreamClusterInfoRaftGroup(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R1S", 3) + defer c.shutdown() + + s := c.randomNonLeader() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + acc := s.GlobalAccount() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo", "bar"}, + Storage: nats.FileStorage, + Replicas: 3, + }) + require_NoError(t, err) + + nfoResp, err := nc.Request("$JS.API.STREAM.INFO.TEST", nil, time.Second) + require_NoError(t, err) + + var si StreamInfo + err = json.Unmarshal(nfoResp.Data, &si) + require_NoError(t, err) + + if si.Cluster == nil { + t.Fatalf("Expected cluster info, got none") + } + + stream, err := acc.lookupStream("TEST") + require_NoError(t, err) + + if si.Cluster.RaftGroup != stream.raftGroup().Name { + t.Fatalf("Expected raft group %q to equal %q", si.Cluster.RaftGroup, stream.raftGroup().Name) + } + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "DURABLE", Replicas: 3}) + require_NoError(t, err) + + consumer := stream.lookupConsumer("DURABLE") + + var ci ConsumerInfo + nfoResp, err = nc.Request("$JS.API.CONSUMER.INFO.TEST.DURABLE", nil, time.Second) + require_NoError(t, err) + + err = json.Unmarshal(nfoResp.Data, &ci) + require_NoError(t, err) + + if ci.Cluster.RaftGroup != consumer.raftGroup().Name { + t.Fatalf("Expected raft group %q to equal %q", ci.Cluster.RaftGroup, consumer.raftGroup().Name) + } +} + func TestJetStreamClusterSingleReplicaStreams(t *testing.T) { c := createJetStreamClusterExplicit(t, "R1S", 3) defer c.shutdown() diff --git a/server/stream.go b/server/stream.go index 096c850e..e868c31d 100644 --- a/server/stream.go +++ b/server/stream.go @@ -160,9 +160,10 @@ type StreamAlternate struct { // ClusterInfo shows information about the underlying set of servers // that make up the stream or consumer. type ClusterInfo struct { - Name string `json:"name,omitempty"` - Leader string `json:"leader,omitempty"` - Replicas []*PeerInfo `json:"replicas,omitempty"` + Name string `json:"name,omitempty"` + RaftGroup string `json:"raft_group,omitempty"` + Leader string `json:"leader,omitempty"` + Replicas []*PeerInfo `json:"replicas,omitempty"` } // PeerInfo shows information about all the peers in the cluster that