mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
When moving streams, we could check too soon and be in a gap where the replica peer has not registered a catchup request.
This would cause us to think the replica was caughtup incorrectly and drop our leadership, which would cancel any cacthup requests. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -3953,3 +3953,66 @@ func TestJetStreamSuperClusterGWOfflineSatus(t *testing.T) {
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func TestJetStreamSuperClusterMovingR1Stream(t *testing.T) {
|
||||
// Make C2 have some latency.
|
||||
gwm := gwProxyMap{
|
||||
"C2": &gwProxy{
|
||||
rtt: 10 * time.Millisecond,
|
||||
up: 1 * 1024 * 1024 * 1024, // 1gbit
|
||||
down: 1 * 1024 * 1024 * 1024, // 1gbit
|
||||
},
|
||||
}
|
||||
sc := createJetStreamTaggedSuperClusterWithGWProxy(t, gwm)
|
||||
defer sc.shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, sc.clusterForName("C1").randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
toSend := 10_000
|
||||
for i := 0; i < toSend; i++ {
|
||||
_, err := js.PublishAsync("TEST", []byte("HELLO WORLD"))
|
||||
require_NoError(t, err)
|
||||
}
|
||||
select {
|
||||
case <-js.PublishAsyncComplete():
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatalf("Did not receive completion signal")
|
||||
}
|
||||
|
||||
// Have it move to GCP.
|
||||
_, err = js.UpdateStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Placement: &nats.Placement{Tags: []string{"cloud:gcp"}},
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
|
||||
sc.waitOnStreamLeader(globalAccountName, "TEST")
|
||||
si, err := js.StreamInfo("TEST")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if si.Cluster.Name != "C2" {
|
||||
return fmt.Errorf("Wrong cluster: %q", si.Cluster.Name)
|
||||
}
|
||||
if si.Cluster.Leader == _EMPTY_ {
|
||||
return fmt.Errorf("No leader yet")
|
||||
} else if !strings.HasPrefix(si.Cluster.Leader, "C2") {
|
||||
return fmt.Errorf("Wrong leader: %q", si.Cluster.Leader)
|
||||
}
|
||||
// Now we want to see that we shrink back to original.
|
||||
if len(si.Cluster.Replicas) != 0 {
|
||||
return fmt.Errorf("Expected 0 replicas, got %d", len(si.Cluster.Replicas))
|
||||
}
|
||||
if si.State.Msgs != uint64(toSend) {
|
||||
return fmt.Errorf("Only see %d msgs", si.State.Msgs)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user