mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
When we polled too quickly on migration we could check before catchup logic had even kicked in.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1645,7 +1645,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
|
||||
|
||||
startMigrationMonitoring := func() {
|
||||
if mmt == nil {
|
||||
mmt = time.NewTicker(250 * time.Millisecond)
|
||||
mmt = time.NewTicker(1 * time.Second)
|
||||
mmtc = mmt.C
|
||||
}
|
||||
}
|
||||
@@ -1758,7 +1758,10 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
|
||||
continue
|
||||
}
|
||||
// Check to see that we have someone caught up.
|
||||
// TODO(dlc) - For now start checking after a second in order to give proper time to kick in any catchup logic needed.
|
||||
// What we really need to do longer term is know if we need catchup and make sure that process has kicked off and/or completed.
|
||||
ci := js.clusterInfo(mset.raftGroup())
|
||||
// The polling interval of one second allows this to be kicked in if needed.
|
||||
if mset.hasCatchupPeers() {
|
||||
mset.checkClusterInfo(ci)
|
||||
}
|
||||
|
||||
@@ -12142,66 +12142,79 @@ func TestJetStreamClusterMovingStreamAndMoveBack(t *testing.T) {
|
||||
nc, js := jsClientConnect(t, sc.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Replicas: 3,
|
||||
Placement: &nats.Placement{Tags: []string{"cloud:aws"}},
|
||||
})
|
||||
require_NoError(t, err)
|
||||
for _, test := range []struct {
|
||||
name string
|
||||
replicas int
|
||||
}{
|
||||
{"R1", 1},
|
||||
{"R3", 3},
|
||||
} {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
js.DeleteStream("TEST")
|
||||
|
||||
for i := 0; i < 1000; i++ {
|
||||
_, err := js.PublishAsync("TEST", []byte("HELLO WORLD"))
|
||||
require_NoError(t, err)
|
||||
}
|
||||
select {
|
||||
case <-js.PublishAsyncComplete():
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatalf("Did not receive completion signal")
|
||||
}
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Replicas: test.replicas,
|
||||
Placement: &nats.Placement{Tags: []string{"cloud:aws"}},
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
_, err = js.UpdateStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Replicas: 3,
|
||||
Placement: &nats.Placement{Tags: []string{"cloud:gcp"}},
|
||||
})
|
||||
require_NoError(t, err)
|
||||
toSend := 10_000
|
||||
for i := 0; i < toSend; i++ {
|
||||
_, err := js.PublishAsync("TEST", []byte("HELLO WORLD"))
|
||||
require_NoError(t, err)
|
||||
}
|
||||
select {
|
||||
case <-js.PublishAsyncComplete():
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatalf("Did not receive completion signal")
|
||||
}
|
||||
|
||||
checkMove := func(cluster string) {
|
||||
t.Helper()
|
||||
checkFor(t, 30*time.Second, 100*time.Millisecond, func() error {
|
||||
si, err := js.StreamInfo("TEST")
|
||||
if err != nil {
|
||||
return err
|
||||
_, err = js.UpdateStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Replicas: test.replicas,
|
||||
Placement: &nats.Placement{Tags: []string{"cloud:gcp"}},
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
checkMove := func(cluster string) {
|
||||
t.Helper()
|
||||
checkFor(t, 20*time.Second, 100*time.Millisecond, func() error {
|
||||
si, err := js.StreamInfo("TEST")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if si.Cluster.Name != cluster {
|
||||
return fmt.Errorf("Wrong cluster: %q", si.Cluster.Name)
|
||||
}
|
||||
if si.Cluster.Leader == _EMPTY_ {
|
||||
return fmt.Errorf("No leader yet")
|
||||
} else if !strings.HasPrefix(si.Cluster.Leader, cluster) {
|
||||
return fmt.Errorf("Wrong leader: %q", si.Cluster.Leader)
|
||||
}
|
||||
// Now we want to see that we shrink back to original.
|
||||
if len(si.Cluster.Replicas) != test.replicas-1 {
|
||||
return fmt.Errorf("Expected %d replicas, got %d", test.replicas-1, len(si.Cluster.Replicas))
|
||||
}
|
||||
if si.State.Msgs != uint64(toSend) {
|
||||
return fmt.Errorf("Only see %d msgs", si.State.Msgs)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
if si.Cluster.Name != cluster {
|
||||
return fmt.Errorf("Wrong cluster: %q", si.Cluster.Name)
|
||||
}
|
||||
if si.Cluster.Leader == _EMPTY_ {
|
||||
return fmt.Errorf("No leader yet")
|
||||
} else if !strings.HasPrefix(si.Cluster.Leader, cluster) {
|
||||
return fmt.Errorf("Wrong leader: %q", si.Cluster.Leader)
|
||||
}
|
||||
// Now we want to see that we shrink back to original.
|
||||
if len(si.Cluster.Replicas) != 2 {
|
||||
return fmt.Errorf("Expected %d replicas, got %d", 2, len(si.Cluster.Replicas))
|
||||
}
|
||||
if si.State.Msgs != 1000 {
|
||||
return fmt.Errorf("Only see %d msgs", si.State.Msgs)
|
||||
}
|
||||
return nil
|
||||
|
||||
checkMove("C2")
|
||||
|
||||
_, err = js.UpdateStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Replicas: test.replicas,
|
||||
Placement: &nats.Placement{Tags: []string{"cloud:aws"}},
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
checkMove("C1")
|
||||
})
|
||||
}
|
||||
|
||||
checkMove("C2")
|
||||
|
||||
_, err = js.UpdateStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Replicas: 3,
|
||||
Placement: &nats.Placement{Tags: []string{"cloud:aws"}},
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
checkMove("C1")
|
||||
}
|
||||
|
||||
func TestJetStreamClusterMemoryConsumerInterestRetention(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user