mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Fix for issue with stream info and R=1 and fix for a flapper
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1243,7 +1243,7 @@ func (js *jetStream) processStreamLeaderChange(mset *Stream, sa *streamAssignmen
|
||||
if err != nil {
|
||||
resp.Error = jsError(err)
|
||||
} else {
|
||||
resp.StreamInfo = &StreamInfo{Created: mset.Created(), State: mset.State(), Config: mset.Config()}
|
||||
resp.StreamInfo = &StreamInfo{Created: mset.Created(), State: mset.State(), Config: mset.Config(), Cluster: s.clusterInfo(nil)}
|
||||
}
|
||||
s.sendAPIResponse(client, acc, _EMPTY_, reply, _EMPTY_, s.jsonResponse(&resp))
|
||||
}
|
||||
|
||||
@@ -122,13 +122,16 @@ func TestJetStreamClusterSingleReplicaStreams(t *testing.T) {
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
si, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"foo", "bar"},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if si.Cluster == nil {
|
||||
t.Fatalf("Expected si to have cluster info")
|
||||
}
|
||||
// Send in 10 messages.
|
||||
msg, toSend := []byte("Hello JS Clustering"), 10
|
||||
for i := 0; i < toSend; i++ {
|
||||
@@ -137,7 +140,7 @@ func TestJetStreamClusterSingleReplicaStreams(t *testing.T) {
|
||||
}
|
||||
}
|
||||
// Now grab info for this stream.
|
||||
si, err := js.StreamInfo("TEST")
|
||||
si, err = js.StreamInfo("TEST")
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
@@ -1401,11 +1404,18 @@ func TestJetStreamClusterExtendedStreamInfo(t *testing.T) {
|
||||
if len(si.Cluster.Replicas) != 2 {
|
||||
t.Fatalf("Expected %d replicas, got %d", 2, len(si.Cluster.Replicas))
|
||||
}
|
||||
for _, peer := range si.Cluster.Replicas {
|
||||
if !peer.Current {
|
||||
t.Fatalf("Expected replica to be current: %+v", peer)
|
||||
// We may need to wait a bit for peers to catch up.
|
||||
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
|
||||
for _, peer := range si.Cluster.Replicas {
|
||||
if !peer.Current {
|
||||
if si, err = js.StreamInfo("TEST"); err != nil {
|
||||
t.Fatalf("Could not retrieve stream info")
|
||||
}
|
||||
return fmt.Errorf("Expected replica to be current: %+v", peer)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Now do consumer.
|
||||
sub, err := js.SubscribeSync("foo", nats.Durable("dlc"), nats.Pull(10))
|
||||
|
||||
Reference in New Issue
Block a user