[FIXED] R1 stream move would sometimes lose all msgs. (#4413)

When moving streams, we could check too soon and be in a gap where the
replica peer has not registered a catchup request but had made contact
via the NRG layer.

This would cause us to think the replica was caught up, incorrectly, and
drop our leadership, which would cancel any catchup requests.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-08-22 06:49:57 -07:00
committed by GitHub
3 changed files with 65 additions and 15 deletions

View File

@@ -3288,7 +3288,7 @@ func (o *consumer) checkAckFloor() {
}
}
} else if numPending > 0 {
// here it shorter to walk pending.
// here it is shorter to walk pending.
// toTerm is seq, dseq, rcd for each entry.
toTerm := make([]uint64, 0, numPending*3)
o.mu.RLock()

View File

@@ -2161,21 +2161,11 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
startMigrationMonitoring := func() {
if mmt == nil {
mmt = time.NewTicker(10 * time.Millisecond)
mmt = time.NewTicker(500 * time.Millisecond)
mmtc = mmt.C
}
}
adjustMigrationMonitoring := func() {
const delay = 500 * time.Millisecond
if mmt == nil {
mmt = time.NewTicker(delay)
mmtc = mmt.C
} else {
mmt.Reset(delay)
}
}
stopMigrationMonitoring := func() {
if mmt != nil {
mmt.Stop()
@@ -2407,9 +2397,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
continue
}
// Adjust to our normal time delay.
adjustMigrationMonitoring()
// Make sure we have correct cluster information on the other peers.
ci := js.clusterInfo(rg)
mset.checkClusterInfo(ci)

View File

@@ -3953,3 +3953,66 @@ func TestJetStreamSuperClusterGWOfflineSatus(t *testing.T) {
return nil
})
}
func TestJetStreamSuperClusterMovingR1Stream(t *testing.T) {
// Make C2 have some latency.
gwm := gwProxyMap{
"C2": &gwProxy{
rtt: 10 * time.Millisecond,
up: 1 * 1024 * 1024 * 1024, // 1gbit
down: 1 * 1024 * 1024 * 1024, // 1gbit
},
}
sc := createJetStreamTaggedSuperClusterWithGWProxy(t, gwm)
defer sc.shutdown()
nc, js := jsClientConnect(t, sc.clusterForName("C1").randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
})
require_NoError(t, err)
toSend := 10_000
for i := 0; i < toSend; i++ {
_, err := js.PublishAsync("TEST", []byte("HELLO WORLD"))
require_NoError(t, err)
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
// Have it move to GCP.
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Placement: &nats.Placement{Tags: []string{"cloud:gcp"}},
})
require_NoError(t, err)
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
sc.waitOnStreamLeader(globalAccountName, "TEST")
si, err := js.StreamInfo("TEST")
if err != nil {
return err
}
if si.Cluster.Name != "C2" {
return fmt.Errorf("Wrong cluster: %q", si.Cluster.Name)
}
if si.Cluster.Leader == _EMPTY_ {
return fmt.Errorf("No leader yet")
} else if !strings.HasPrefix(si.Cluster.Leader, "C2") {
return fmt.Errorf("Wrong leader: %q", si.Cluster.Leader)
}
// Now we want to see that we shrink back to original.
if len(si.Cluster.Replicas) != 0 {
return fmt.Errorf("Expected 0 replicas, got %d", len(si.Cluster.Replicas))
}
if si.State.Msgs != uint64(toSend) {
return fmt.Errorf("Only see %d msgs", si.State.Msgs)
}
return nil
})
}