mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
unit test to make sure tiered limits and stream moves work together (#3007)
This needs testing because stream move adjusts the replication factor Because adjusting replication factor and moving is illegal, this case does not need to be tested In order to support one off configurations, added same modification callout to super cluster as is used with cluster Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
@@ -12148,6 +12148,10 @@ func createJetStreamSuperCluster(t *testing.T, numServersPer, numClusters int) *
|
||||
}
|
||||
|
||||
func createJetStreamSuperClusterWithTemplate(t *testing.T, tmpl string, numServersPer, numClusters int) *supercluster {
|
||||
return createJetStreamSuperClusterWithTemplateAndModHook(t, tmpl, numServersPer, numClusters, nil)
|
||||
}
|
||||
|
||||
func createJetStreamSuperClusterWithTemplateAndModHook(t *testing.T, tmpl string, numServersPer, numClusters int, modify modifyCb) *supercluster {
|
||||
t.Helper()
|
||||
if numServersPer < 1 {
|
||||
t.Fatalf("Number of servers must be >= 1")
|
||||
@@ -12199,6 +12203,9 @@ func createJetStreamSuperClusterWithTemplate(t *testing.T, tmpl string, numServe
|
||||
bconf := fmt.Sprintf(tmpl, sn, storeDir, cn, cp+si, routeConfig)
|
||||
conf := fmt.Sprintf(jsSuperClusterTempl, bconf, cn, gp, gwconf)
|
||||
gp++
|
||||
if modify != nil {
|
||||
conf = modify(sn, cn, storeDir, conf)
|
||||
}
|
||||
s, o := RunServerWithConfig(createConfFile(t, []byte(conf)))
|
||||
c.servers = append(c.servers, s)
|
||||
c.opts = append(c.opts, o)
|
||||
|
||||
@@ -5590,6 +5590,88 @@ func TestJWTJetStreamMaxStreamBytes(t *testing.T) {
|
||||
require_Equal(t, err.Error(), "account requires a stream config to have max bytes set")
|
||||
}
|
||||
|
||||
func TestJWTJetStreamMoveWithTiers(t *testing.T) {
|
||||
_, syspub := createKey(t)
|
||||
sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub)
|
||||
|
||||
accKp, aExpPub := createKey(t)
|
||||
accClaim := jwt.NewAccountClaims(aExpPub)
|
||||
accClaim.Name = "acc"
|
||||
accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{
|
||||
DiskStorage: 1100, Consumer: 1, Streams: 1}
|
||||
accClaim.Limits.JetStreamTieredLimits["R3"] = jwt.JetStreamLimits{
|
||||
DiskStorage: 3300, Consumer: 1, Streams: 1}
|
||||
accJwt := encodeClaim(t, accClaim, aExpPub)
|
||||
accCreds := newUser(t, accKp)
|
||||
|
||||
test := func(t *testing.T, replicas int) {
|
||||
tmlp := `
|
||||
listen: 127.0.0.1:-1
|
||||
server_name: %s
|
||||
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
|
||||
leaf {
|
||||
listen: 127.0.0.1:-1
|
||||
}
|
||||
cluster {
|
||||
name: %s
|
||||
listen: 127.0.0.1:%d
|
||||
routes = [%s]
|
||||
}
|
||||
`
|
||||
s := createJetStreamSuperClusterWithTemplateAndModHook(t, tmlp, 3, 3,
|
||||
func(serverName, clustername, storeDir, conf string) string {
|
||||
return conf + fmt.Sprintf(`
|
||||
server_tags: [cloud:%s-tag]
|
||||
operator: %s
|
||||
system_account: %s
|
||||
resolver = MEMORY
|
||||
resolver_preload = {
|
||||
%s : %s
|
||||
%s : %s
|
||||
}
|
||||
`, clustername, ojwt, syspub, syspub, sysJwt, aExpPub, accJwt)
|
||||
})
|
||||
defer s.shutdown()
|
||||
|
||||
nc := natsConnect(t, s.randomServer().ClientURL(), nats.UserCredentials(accCreds))
|
||||
defer nc.Close()
|
||||
|
||||
js, err := nc.JetStream()
|
||||
require_NoError(t, err)
|
||||
|
||||
ci, err := js.AddStream(&nats.StreamConfig{Name: "MOVE-ME", Replicas: replicas,
|
||||
Placement: &nats.Placement{Tags: []string{"cloud:C1-tag"}}})
|
||||
require_NoError(t, err)
|
||||
require_Equal(t, ci.Cluster.Name, "C1")
|
||||
ci, err = js.UpdateStream(&nats.StreamConfig{Name: "MOVE-ME", Replicas: replicas,
|
||||
Placement: &nats.Placement{Tags: []string{"cloud:C2-tag"}}})
|
||||
require_NoError(t, err)
|
||||
require_Equal(t, ci.Cluster.Name, "C1")
|
||||
|
||||
checkFor(t, 10*time.Second, 10*time.Millisecond, func() error {
|
||||
if si, err := js.StreamInfo("MOVE-ME"); err != nil {
|
||||
return err
|
||||
} else if si.Cluster.Name != "C2" {
|
||||
return fmt.Errorf("Wrong cluster: %q", si.Cluster.Name)
|
||||
} else if si.Cluster.Leader == _EMPTY_ {
|
||||
return fmt.Errorf("No leader yet")
|
||||
} else if !strings.HasPrefix(si.Cluster.Leader, "C2-") {
|
||||
return fmt.Errorf("Wrong leader: %q", si.Cluster.Leader)
|
||||
} else if len(si.Cluster.Replicas) != replicas-1 {
|
||||
return fmt.Errorf("Expected %d replicas, got %d", replicas-1, len(si.Cluster.Replicas))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("R1", func(t *testing.T) {
|
||||
test(t, 1)
|
||||
})
|
||||
t.Run("R3", func(t *testing.T) {
|
||||
test(t, 3)
|
||||
})
|
||||
}
|
||||
|
||||
func TestJWTClusteredJetStreamTiers(t *testing.T) {
|
||||
sysKp, syspub := createKey(t)
|
||||
sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub)
|
||||
|
||||
Reference in New Issue
Block a user