Attempt to improve long RTT catchup time during stream moves.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-08-08 06:27:50 -07:00
committed by Ivan Kozlovic
parent e635de7526
commit 758b733d43
6 changed files with 416 additions and 275 deletions

View File

@@ -17,6 +17,7 @@
package server
import (
"bytes"
"encoding/json"
"errors"
"fmt"
@@ -122,7 +123,7 @@ func TestJetStreamSuperClusterUniquePlacementTag(t *testing.T) {
"C2-S5": "az:1",
}
return conf + fmt.Sprintf("\nserver_tags: [cloud:%s-tag, %s]\n", clustername, azTag[serverName])
})
}, nil)
defer s.shutdown()
inDifferentAz := func(ci *nats.ClusterInfo) (bool, error) {
@@ -2328,7 +2329,7 @@ func TestJetStreamSuperClusterMaxHaAssets(t *testing.T) {
`, 3, 2,
func(serverName, clusterName, storeDir, conf string) string {
return conf
})
}, nil)
defer sc.shutdown()
// speed up statsz reporting
@@ -2657,7 +2658,7 @@ func TestJetStreamSuperClusterMoveCancel(t *testing.T) {
func(serverName, clusterName, storeDir, conf string) string {
server[serverName] = struct{}{}
return fmt.Sprintf("%s\nserver_tags: [%s]", conf, serverName)
})
}, nil)
defer sc.shutdown()
// Client based API
@@ -2794,7 +2795,7 @@ func TestJetStreamSuperClusterDoubleStreamMove(t *testing.T) {
func(serverName, clusterName, storeDir, conf string) string {
server[serverName] = struct{}{}
return fmt.Sprintf("%s\nserver_tags: [%s]", conf, serverName)
})
}, nil)
defer sc.shutdown()
// Client based API
@@ -2992,7 +2993,7 @@ func TestJetStreamSuperClusterPeerEvacuationAndStreamReassignment(t *testing.T)
s := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 4, 2,
func(serverName, clusterName, storeDir, conf string) string {
return fmt.Sprintf("%s\nserver_tags: [cluster:%s, server:%s]", conf, clusterName, serverName)
})
}, nil)
defer s.shutdown()
c := s.clusterForName("C1")
@@ -3223,3 +3224,309 @@ func TestJetStreamSuperClusterMirrorInheritsAllowDirect(t *testing.T) {
t.Fatalf("Expected MirrorDirect to be inherited as true")
}
}
func TestJetStreamSuperClusterSystemLimitsPlacement(t *testing.T) {
const largeSystemLimit = 1024
const smallSystemLimit = 512
tmpl := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {
max_mem_store: _MAXMEM_
max_file_store: _MAXFILE_
store_dir: '%s',
}
server_tags: [
_TAG_
]
leaf {
listen: 127.0.0.1:-1
}
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
`
storeCnf := func(serverName, clusterName, storeDir, conf string) string {
switch {
case strings.HasPrefix(serverName, "C1"):
conf = strings.Replace(conf, "_MAXMEM_", fmt.Sprint(largeSystemLimit), 1)
conf = strings.Replace(conf, "_MAXFILE_", fmt.Sprint(largeSystemLimit), 1)
return strings.Replace(conf, "_TAG_", serverName, 1)
case strings.HasPrefix(serverName, "C2"):
conf = strings.Replace(conf, "_MAXMEM_", fmt.Sprint(smallSystemLimit), 1)
conf = strings.Replace(conf, "_MAXFILE_", fmt.Sprint(smallSystemLimit), 1)
return strings.Replace(conf, "_TAG_", serverName, 1)
default:
return conf
}
}
sCluster := createJetStreamSuperClusterWithTemplateAndModHook(t, tmpl, 3, 2, storeCnf, nil)
defer sCluster.shutdown()
requestLeaderStepDown := func(clientURL string) error {
nc, err := nats.Connect(clientURL)
if err != nil {
return err
}
defer nc.Close()
ncResp, err := nc.Request(JSApiLeaderStepDown, nil, 3*time.Second)
if err != nil {
return err
}
var resp JSApiLeaderStepDownResponse
if err := json.Unmarshal(ncResp.Data, &resp); err != nil {
return err
}
if resp.Error != nil {
return resp.Error
}
if !resp.Success {
return fmt.Errorf("leader step down request not successful")
}
return nil
}
// Force large cluster to be leader
var largeLeader *Server
err := checkForErr(15*time.Second, 500*time.Millisecond, func() error {
// Range over cluster A, which is the large cluster.
servers := sCluster.clusters[0].servers
for _, s := range servers {
if s.JetStreamIsLeader() {
largeLeader = s
return nil
}
}
if err := requestLeaderStepDown(servers[0].ClientURL()); err != nil {
return fmt.Errorf("failed to request leader step down: %s", err)
}
return fmt.Errorf("leader is not in large cluster")
})
if err != nil {
t.Skipf("failed to get desired layout: %s", err)
}
getStreams := func(jsm nats.JetStreamManager) []string {
var streams []string
for s := range jsm.StreamNames() {
streams = append(streams, s)
}
return streams
}
nc, js := jsClientConnect(t, largeLeader)
defer nc.Close()
cases := []struct {
name string
storage nats.StorageType
createMaxBytes int64
serverTag string
wantErr bool
}{
{
name: "file create large stream on small cluster b0",
storage: nats.FileStorage,
createMaxBytes: smallSystemLimit + 1,
serverTag: "C2-S1",
wantErr: true,
},
{
name: "memory create large stream on small cluster b0",
storage: nats.MemoryStorage,
createMaxBytes: smallSystemLimit + 1,
serverTag: "C2-S1",
wantErr: true,
},
{
name: "file create large stream on small cluster b1",
storage: nats.FileStorage,
createMaxBytes: smallSystemLimit + 1,
serverTag: "C2-S2",
wantErr: true,
},
{
name: "memory create large stream on small cluster b1",
storage: nats.MemoryStorage,
createMaxBytes: smallSystemLimit + 1,
serverTag: "C2-S2",
wantErr: true,
},
{
name: "file create large stream on small cluster b2",
storage: nats.FileStorage,
createMaxBytes: smallSystemLimit + 1,
serverTag: "C2-S3",
wantErr: true,
},
{
name: "memory create large stream on small cluster b2",
storage: nats.MemoryStorage,
createMaxBytes: smallSystemLimit + 1,
serverTag: "C2-S3",
wantErr: true,
},
{
name: "file create large stream on large cluster a0",
storage: nats.FileStorage,
createMaxBytes: smallSystemLimit + 1,
serverTag: "C1-S1",
},
{
name: "memory create large stream on large cluster a0",
storage: nats.MemoryStorage,
createMaxBytes: smallSystemLimit + 1,
serverTag: "C1-S1",
},
{
name: "file create large stream on large cluster a1",
storage: nats.FileStorage,
createMaxBytes: smallSystemLimit + 1,
serverTag: "C1-S2",
},
{
name: "memory create large stream on large cluster a1",
storage: nats.MemoryStorage,
createMaxBytes: smallSystemLimit + 1,
serverTag: "C1-S2",
},
{
name: "file create large stream on large cluster a2",
storage: nats.FileStorage,
createMaxBytes: smallSystemLimit + 1,
serverTag: "C1-S3",
},
{
name: "memory create large stream on large cluster a2",
storage: nats.MemoryStorage,
createMaxBytes: smallSystemLimit + 1,
serverTag: "C1-S3",
},
}
for i := 0; i < len(cases) && !t.Failed(); i++ {
c := cases[i]
t.Run(c.name, func(st *testing.T) {
var clusterName string
if strings.HasPrefix(c.serverTag, "a") {
clusterName = "cluster-a"
} else if strings.HasPrefix(c.serverTag, "b") {
clusterName = "cluster-b"
}
if s := getStreams(js); len(s) != 0 {
st.Fatalf("unexpected stream count, got=%d, want=0", len(s))
}
streamName := fmt.Sprintf("TEST-%s", c.serverTag)
si, err := js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{"foo"},
Storage: c.storage,
MaxBytes: c.createMaxBytes,
Placement: &nats.Placement{
Cluster: clusterName,
Tags: []string{c.serverTag},
},
})
if c.wantErr && err == nil {
if s := getStreams(js); len(s) != 1 {
st.Logf("unexpected stream count, got=%d, want=1, streams=%v", len(s), s)
}
cfg := si.Config
st.Fatalf("unexpected success, maxBytes=%d, cluster=%s, tags=%v",
cfg.MaxBytes, cfg.Placement.Cluster, cfg.Placement.Tags)
} else if !c.wantErr && err != nil {
if s := getStreams(js); len(s) != 0 {
st.Logf("unexpected stream count, got=%d, want=0, streams=%v", len(s), s)
}
require_NoError(st, err)
}
if err == nil {
if s := getStreams(js); len(s) != 1 {
st.Fatalf("unexpected stream count, got=%d, want=1", len(s))
}
}
// Delete regardless.
js.DeleteStream(streamName)
})
}
}
func TestJetStreamSuperClusterStreamCathupLongRTT(t *testing.T) {
skip(t)
// Make C2 far away.
gwm := gwProxyMap{
"C2": &gwProxy{
rtt: 300 * time.Millisecond,
up: 1 * 1024 * 1024 * 1024, // 1gbit
down: 1 * 1024 * 1024 * 1024, // 1gbit
},
}
sc := createJetStreamTaggedSuperClusterWithGWProxy(t, gwm)
defer sc.shutdown()
nc, js := jsClientConnect(t, sc.randomServer())
defer nc.Close()
cfg := &nats.StreamConfig{
Name: "TEST",
Subjects: []string{"chunk.*"},
Placement: &nats.Placement{Tags: []string{"cloud:aws", "country:us"}},
Replicas: 3,
MaxMsgsPerSubject: 1,
}
// Place a stream in C1.
_, err := js.AddStream(cfg)
require_NoError(t, err)
chunk := bytes.Repeat([]byte("Z"), 1000*1024) // ~1MB
// 256 MB
for i := 0; i < 256; i++ {
subj := fmt.Sprintf("chunk.%d", i)
js.PublishAsync(subj, chunk)
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
// C2, slow RTT.
cfg.Placement = &nats.Placement{Tags: []string{"cloud:gcp", "country:uk"}}
_, err = js.UpdateStream(cfg)
require_NoError(t, err)
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("TEST")
if err != nil {
return err
}
if si.Cluster.Name != "C2" {
return fmt.Errorf("Wrong cluster: %q", si.Cluster.Name)
}
if si.Cluster.Leader == _EMPTY_ {
return fmt.Errorf("No leader yet")
} else if !strings.HasPrefix(si.Cluster.Leader, "C2-") {
return fmt.Errorf("Wrong leader: %q", si.Cluster.Leader)
}
// Now we want to see that we shrink back to original.
if len(si.Cluster.Replicas) != cfg.Replicas-1 {
return fmt.Errorf("Expected %d replicas, got %d", cfg.Replicas-1, len(si.Cluster.Replicas))
}
return nil
})
}