mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Updates to stream and consumer move logic.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -17,7 +17,6 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -1798,6 +1797,10 @@ func TestJetStreamSuperClusterMovingStreamsAndConsumers(t *testing.T) {
|
||||
numPeers++
|
||||
}
|
||||
if numPeers != 2*replicas {
|
||||
// The move can happen very quick now, so we might already be done.
|
||||
if si.Cluster.Name == "C2" {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("Expected to see %d replicas, got %d", 2*replicas, numPeers)
|
||||
}
|
||||
return nil
|
||||
@@ -2823,7 +2826,7 @@ func TestJetStreamSuperClusterMoveCancel(t *testing.T) {
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
fmt.Printf("durable peer group does not match stream peer group")
|
||||
t.Logf("durable peer group does not match stream peer group")
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2876,10 +2879,6 @@ func TestJetStreamSuperClusterMoveCancel(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestJetStreamSuperClusterDoubleStreamMove(t *testing.T) {
|
||||
// Shorten this test by factor of 2.
|
||||
scaleDownDelayTicks = 0
|
||||
defer func() { scaleDownDelayTicks = defaultScaleDownDelayTicks }()
|
||||
|
||||
server := map[string]struct{}{}
|
||||
sc := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 4, 2,
|
||||
func(serverName, clusterName, storeDir, conf string) string {
|
||||
@@ -3047,6 +3046,7 @@ func TestJetStreamSuperClusterDoubleStreamMove(t *testing.T) {
|
||||
}
|
||||
|
||||
moveAndCheck := func(from, to string, expectedSet ...string) {
|
||||
t.Helper()
|
||||
move(from, to)
|
||||
checkFor(t, 40*time.Second, 100*time.Millisecond, func() error { return moveComplete(to, expectedSet...) })
|
||||
checkFor(t, 20*time.Second, 100*time.Millisecond, func() error { return serverEmpty(from) })
|
||||
@@ -3080,10 +3080,6 @@ func TestJetStreamSuperClusterDoubleStreamMove(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestJetStreamSuperClusterPeerEvacuationAndStreamReassignment(t *testing.T) {
|
||||
// Shorten this test by factor of 2.
|
||||
scaleDownDelayTicks = 0
|
||||
defer func() { scaleDownDelayTicks = defaultScaleDownDelayTicks }()
|
||||
|
||||
s := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 4, 2,
|
||||
func(serverName, clusterName, storeDir, conf string) string {
|
||||
return fmt.Sprintf("%s\nserver_tags: [cluster:%s, server:%s]", conf, clusterName, serverName)
|
||||
@@ -3164,8 +3160,7 @@ func TestJetStreamSuperClusterPeerEvacuationAndStreamReassignment(t *testing.T)
|
||||
require_NoError(t, err)
|
||||
defer ncsys.Close()
|
||||
|
||||
moveReq, err := json.Marshal(&JSApiMetaServerStreamMoveRequest{
|
||||
Server: toMoveFrom, Tags: moveTags})
|
||||
moveReq, err := json.Marshal(&JSApiMetaServerStreamMoveRequest{Server: toMoveFrom, Tags: moveTags})
|
||||
require_NoError(t, err)
|
||||
rmsg, err := ncsys.Request(fmt.Sprintf(JSApiServerStreamMoveT, "$G", "TEST"), moveReq, 100*time.Second)
|
||||
require_NoError(t, err)
|
||||
@@ -3219,7 +3214,7 @@ func TestJetStreamSuperClusterPeerEvacuationAndStreamReassignment(t *testing.T)
|
||||
return nil
|
||||
})
|
||||
// test draining
|
||||
checkFor(t, 20*time.Second, 1000*time.Millisecond, func() error {
|
||||
checkFor(t, 20*time.Second, time.Second, func() error {
|
||||
if !listFrom {
|
||||
// when needed determine which server move moved away from
|
||||
si, err := js.StreamInfo("TEST", nats.MaxWait(2*time.Second))
|
||||
@@ -3228,7 +3223,7 @@ func TestJetStreamSuperClusterPeerEvacuationAndStreamReassignment(t *testing.T)
|
||||
}
|
||||
for n := range startSet {
|
||||
if n != si.Cluster.Leader {
|
||||
found := false
|
||||
var found bool
|
||||
for _, p := range si.Cluster.Replicas {
|
||||
if p.Name == n {
|
||||
found = true
|
||||
@@ -3560,73 +3555,6 @@ func TestJetStreamSuperClusterSystemLimitsPlacement(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamSuperClusterStreamCathupLongRTT(t *testing.T) {
|
||||
skip(t)
|
||||
|
||||
// Make C2 far away.
|
||||
gwm := gwProxyMap{
|
||||
"C2": &gwProxy{
|
||||
rtt: 300 * time.Millisecond,
|
||||
up: 1 * 1024 * 1024 * 1024, // 1gbit
|
||||
down: 1 * 1024 * 1024 * 1024, // 1gbit
|
||||
},
|
||||
}
|
||||
sc := createJetStreamTaggedSuperClusterWithGWProxy(t, gwm)
|
||||
defer sc.shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, sc.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
cfg := &nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"chunk.*"},
|
||||
Placement: &nats.Placement{Tags: []string{"cloud:aws", "country:us"}},
|
||||
Replicas: 3,
|
||||
MaxMsgsPerSubject: 1,
|
||||
}
|
||||
|
||||
// Place a stream in C1.
|
||||
_, err := js.AddStream(cfg)
|
||||
require_NoError(t, err)
|
||||
|
||||
chunk := bytes.Repeat([]byte("Z"), 1000*1024) // ~1MB
|
||||
// 256 MB
|
||||
for i := 0; i < 256; i++ {
|
||||
subj := fmt.Sprintf("chunk.%d", i)
|
||||
js.PublishAsync(subj, chunk)
|
||||
}
|
||||
select {
|
||||
case <-js.PublishAsyncComplete():
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatalf("Did not receive completion signal")
|
||||
}
|
||||
|
||||
// C2, slow RTT.
|
||||
cfg.Placement = &nats.Placement{Tags: []string{"cloud:gcp", "country:uk"}}
|
||||
_, err = js.UpdateStream(cfg)
|
||||
require_NoError(t, err)
|
||||
|
||||
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
|
||||
si, err := js.StreamInfo("TEST")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if si.Cluster.Name != "C2" {
|
||||
return fmt.Errorf("Wrong cluster: %q", si.Cluster.Name)
|
||||
}
|
||||
if si.Cluster.Leader == _EMPTY_ {
|
||||
return fmt.Errorf("No leader yet")
|
||||
} else if !strings.HasPrefix(si.Cluster.Leader, "C2-") {
|
||||
return fmt.Errorf("Wrong leader: %q", si.Cluster.Leader)
|
||||
}
|
||||
// Now we want to see that we shrink back to original.
|
||||
if len(si.Cluster.Replicas) != cfg.Replicas-1 {
|
||||
return fmt.Errorf("Expected %d replicas, got %d", cfg.Replicas-1, len(si.Cluster.Replicas))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func TestJetStreamSuperClusterMixedModeSwitchToInterestOnlyStaticConfig(t *testing.T) {
|
||||
tmpl := `
|
||||
listen: 127.0.0.1:-1
|
||||
|
||||
Reference in New Issue
Block a user