mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
[IMPROVED] Replicas ordering and info regarding unknown in stream info
If a cluster is brought down and then partially restarted, the replica information about the non restarted node would be completely missing. The CLI could report replicas 3 but then only the leader and the running replicas, but nothing about the other node. Since this node's server name is not know, this PR adds an entry with something similar to this: ``` <unknown (peerID: jZ6RvVRH)>, outdated, OFFLINE, not seen ``` Also, replicas array is now ordered, which will help when using a watcher or repeating stream info commands in that the replicas output will be stable in regards to the list of replicas. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -6852,21 +6852,34 @@ func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo {
|
||||
if current && lastSeen > lostQuorumInterval {
|
||||
current = false
|
||||
}
|
||||
// Create a peer info with common settings if the peer has not been seen
|
||||
// yet (which can happen after the whole cluster is stopped and only some
|
||||
// of the nodes are restarted).
|
||||
pi := &PeerInfo{
|
||||
Current: current,
|
||||
Offline: true,
|
||||
Active: lastSeen,
|
||||
Lag: rp.Lag,
|
||||
peer: rp.ID,
|
||||
}
|
||||
// If node is found, complete/update the settings.
|
||||
if sir, ok := s.nodeToInfo.Load(rp.ID); ok && sir != nil {
|
||||
si := sir.(nodeInfo)
|
||||
pi := &PeerInfo{
|
||||
Name: si.name,
|
||||
Current: current,
|
||||
Offline: si.offline,
|
||||
Active: lastSeen,
|
||||
Lag: rp.Lag,
|
||||
cluster: si.cluster,
|
||||
peer: rp.ID,
|
||||
}
|
||||
ci.Replicas = append(ci.Replicas, pi)
|
||||
pi.Name, pi.Offline, pi.cluster = si.name, si.offline, si.cluster
|
||||
} else {
|
||||
// If not, then add a name that indicates that the server name
|
||||
// is unknown at this time, and clear the lag since it is misleading
|
||||
// (the node may not have that much lag).
|
||||
pi.Name, pi.Lag = fmt.Sprintf("<unknown (peerID: %s)>", rp.ID), 0
|
||||
}
|
||||
ci.Replicas = append(ci.Replicas, pi)
|
||||
}
|
||||
}
|
||||
// Order the result based on the name so that we get something consistent
|
||||
// when doing repeated stream info in the CLI, etc...
|
||||
sort.Slice(ci.Replicas, func(i, j int) bool {
|
||||
return ci.Replicas[i].Name < ci.Replicas[j].Name
|
||||
})
|
||||
return ci
|
||||
}
|
||||
|
||||
|
||||
@@ -1834,6 +1834,18 @@ func TestJetStreamClusterExtendedStreamInfo(t *testing.T) {
|
||||
t.Fatalf("Expected %d replicas, got %d", 2, len(si.Cluster.Replicas))
|
||||
}
|
||||
|
||||
// Make sure that returned array is ordered
|
||||
for i := 0; i < 50; i++ {
|
||||
si, err := js.StreamInfo("TEST")
|
||||
require_NoError(t, err)
|
||||
require_True(t, len(si.Cluster.Replicas) == 2)
|
||||
s1 := si.Cluster.Replicas[0].Name
|
||||
s2 := si.Cluster.Replicas[1].Name
|
||||
if s1 > s2 {
|
||||
t.Fatalf("Expected replicas to be ordered, got %s then %s", s1, s2)
|
||||
}
|
||||
}
|
||||
|
||||
// Faster timeout since we loop below checking for condition.
|
||||
js2, err := nc.JetStream(nats.MaxWait(250 * time.Millisecond))
|
||||
if err != nil {
|
||||
@@ -12042,3 +12054,67 @@ func TestJetStreamClusterDirectGetFromLeafnode(t *testing.T) {
|
||||
require_NoError(t, err)
|
||||
require_True(t, string(entry.Value()) == "22")
|
||||
}
|
||||
|
||||
func TestJetStreamClusterUnknownReplicaOnClusterRestart(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "JSC", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"foo"}, Replicas: 3})
|
||||
require_NoError(t, err)
|
||||
|
||||
c.waitOnStreamLeader(globalAccountName, "TEST")
|
||||
lname := c.streamLeader(globalAccountName, "TEST").Name()
|
||||
sendStreamMsg(t, nc, "foo", "msg1")
|
||||
|
||||
nc.Close()
|
||||
c.stopAll()
|
||||
// Restart the leader...
|
||||
for _, s := range c.servers {
|
||||
if s.Name() == lname {
|
||||
c.restartServer(s)
|
||||
}
|
||||
}
|
||||
// And one of the other servers
|
||||
for _, s := range c.servers {
|
||||
if s.Name() != lname {
|
||||
c.restartServer(s)
|
||||
break
|
||||
}
|
||||
}
|
||||
c.waitOnStreamLeader(globalAccountName, "TEST")
|
||||
|
||||
nc, js = jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
sendStreamMsg(t, nc, "foo", "msg2")
|
||||
|
||||
si, err := js.StreamInfo("TEST")
|
||||
require_NoError(t, err)
|
||||
if len(si.Cluster.Replicas) != 2 {
|
||||
t.Fatalf("Leader is %s - expected 2 peers, got %+v", si.Cluster.Leader, si.Cluster.Replicas[0])
|
||||
}
|
||||
// However, since the leader does not know the name of the server
|
||||
// we should report an "unknown" name.
|
||||
var ok bool
|
||||
for _, r := range si.Cluster.Replicas {
|
||||
if strings.Contains(r.Name, "unknown") {
|
||||
// Check that it has no lag reported, and the it is not current.
|
||||
if r.Current {
|
||||
t.Fatal("Expected non started node to be marked as not current")
|
||||
}
|
||||
if r.Lag != 0 {
|
||||
t.Fatalf("Expected lag to not be set, was %v", r.Lag)
|
||||
}
|
||||
if r.Active != 0 {
|
||||
t.Fatalf("Expected active to not be set, was: %v", r.Active)
|
||||
}
|
||||
ok = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !ok {
|
||||
t.Fatalf("Should have had an unknown server name, did not: %+v - %+v", si.Cluster.Replicas[0], si.Cluster.Replicas[1])
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user