mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
[FIXED] Weighted subject mappings updates not applied
Suppose an account is updated to have the following weighted mapping: ``` foo -> bar 40% ``` The server automatically adds foo -> foo at 60%. Sending messages to "foo" will result in the expected distribution of 60% messages going to "foo" and 40% going to bar. However, if a successive update is pushed to the server(s): ``` foo -> bar 40% foo -> baz 60% ``` The subject mapping should now be as described, that is, no more mapping from "foo" to "foo" and 40% to bar and 60% to baz, however, what was happening is that the server would always use the original mapping. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -707,8 +707,8 @@ func (a *Account) AddWeightedMappings(src string, dests ...*MapDest) error {
|
||||
}
|
||||
|
||||
// Replace an old one if it exists.
|
||||
for i, m := range a.mappings {
|
||||
if m.src == src {
|
||||
for i, em := range a.mappings {
|
||||
if em.src == src {
|
||||
a.mappings[i] = m
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -6516,3 +6516,134 @@ func TestJWTAccountConnzAccessAfterClaimUpdate(t *testing.T) {
|
||||
// If export was wiped this would fail with timeout.
|
||||
doRequest()
|
||||
}
|
||||
|
||||
func TestAccountWeightedMappingInSuperCluster(t *testing.T) {
|
||||
skp, spub := createKey(t)
|
||||
sysClaim := jwt.NewAccountClaims(spub)
|
||||
sysClaim.Name = "SYS"
|
||||
sysCreds := newUser(t, skp)
|
||||
defer removeFile(t, sysCreds)
|
||||
|
||||
akp, apub := createKey(t)
|
||||
aUsr := createUserCreds(t, nil, akp)
|
||||
claim := jwt.NewAccountClaims(apub)
|
||||
aJwtMap := encodeClaim(t, claim, apub)
|
||||
|
||||
// We are using the createJetStreamSuperClusterWithTemplateAndModHook()
|
||||
// helper, but this test is not about JetStream...
|
||||
tmpl := `
|
||||
listen: 127.0.0.1:-1
|
||||
server_name: %s
|
||||
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
|
||||
cluster {
|
||||
name: %s
|
||||
listen: 127.0.0.1:%d
|
||||
routes = [%s]
|
||||
}
|
||||
`
|
||||
|
||||
sc := createJetStreamSuperClusterWithTemplateAndModHook(t, tmpl, 3, 3,
|
||||
func(serverName, clusterName, storeDir, conf string) string {
|
||||
dirSrv := createDir(t, "srv")
|
||||
return fmt.Sprintf(`%s
|
||||
operator: %s
|
||||
system_account: %s
|
||||
resolver: {
|
||||
type: full
|
||||
dir: '%s'
|
||||
}
|
||||
`, conf, ojwt, spub, dirSrv)
|
||||
}, nil)
|
||||
defer sc.shutdown()
|
||||
|
||||
// Update from C2
|
||||
require_Len(t, 1, updateJwt(t, sc.clusterForName("C2").randomServer().ClientURL(), sysCreds, aJwtMap, 1))
|
||||
|
||||
// We will connect our services in the C3 cluster.
|
||||
nc1 := natsConnect(t, sc.clusterForName("C3").randomServer().ClientURL(), aUsr)
|
||||
defer nc1.Close()
|
||||
nc2 := natsConnect(t, sc.clusterForName("C3").randomServer().ClientURL(), aUsr)
|
||||
defer nc2.Close()
|
||||
|
||||
natsSub(t, nc1, "foo", func(m *nats.Msg) {
|
||||
m.Respond([]byte("foo"))
|
||||
})
|
||||
natsSub(t, nc1, "bar.v1", func(m *nats.Msg) {
|
||||
m.Respond([]byte("v1"))
|
||||
})
|
||||
natsSub(t, nc2, "bar.v2", func(m *nats.Msg) {
|
||||
m.Respond([]byte("v2"))
|
||||
})
|
||||
natsFlush(t, nc1)
|
||||
natsFlush(t, nc2)
|
||||
|
||||
// Now we will update the account to add weighted subject mapping
|
||||
claim.Mappings = map[jwt.Subject][]jwt.WeightedMapping{}
|
||||
// Start with foo->bar.v2 at 40%, the server will auto-add foo->foo at 60%.
|
||||
wm := []jwt.WeightedMapping{{Subject: "bar.v2", Weight: 40}}
|
||||
claim.AddMapping("foo", wm...)
|
||||
aJwtMap = encodeClaim(t, claim, apub)
|
||||
|
||||
// We will update from C2
|
||||
require_Len(t, 1, updateJwt(t, sc.clusterForName("C2").randomServer().ClientURL(), sysCreds, aJwtMap, 1))
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
// And we will publish from C1
|
||||
nc := natsConnect(t, sc.clusterForName("C1").randomServer().ClientURL(), aUsr)
|
||||
defer nc.Close()
|
||||
|
||||
var foo, v1, v2 int
|
||||
pubAndCount := func() {
|
||||
for i := 0; i < 1000; i++ {
|
||||
msg, err := nc.Request("foo", []byte("req"), 500*time.Millisecond)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
switch string(msg.Data) {
|
||||
case "foo":
|
||||
foo++
|
||||
case "v1":
|
||||
v1++
|
||||
case "v2":
|
||||
v2++
|
||||
}
|
||||
}
|
||||
}
|
||||
pubAndCount()
|
||||
if foo < 550 || foo > 650 {
|
||||
t.Fatalf("Expected foo to receive 60%%, got %v/1000", foo)
|
||||
}
|
||||
if v1 != 0 {
|
||||
t.Fatalf("Expected v1 to receive no message, got %v/1000", v1)
|
||||
}
|
||||
if v2 < 350 || v2 > 450 {
|
||||
t.Fatalf("Expected v2 to receive 40%%, got %v/1000", v2)
|
||||
}
|
||||
|
||||
// Now send a new update with foo-> bar.v2(40) and bar.v1(60).
|
||||
// The auto-add of "foo" should no longer be used by the server.
|
||||
wm = []jwt.WeightedMapping{
|
||||
{Subject: "bar.v2", Weight: 40},
|
||||
{Subject: "bar.v1", Weight: 60},
|
||||
}
|
||||
claim.AddMapping("foo", wm...)
|
||||
aJwtMap = encodeClaim(t, claim, apub)
|
||||
|
||||
// We will update from C2
|
||||
require_Len(t, 1, updateJwt(t, sc.clusterForName("C2").randomServer().ClientURL(), sysCreds, aJwtMap, 1))
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
foo, v1, v2 = 0, 0, 0
|
||||
pubAndCount()
|
||||
if foo != 0 {
|
||||
t.Fatalf("Expected foo to receive no message, got %v/1000", foo)
|
||||
}
|
||||
if v1 < 550 || v1 > 650 {
|
||||
t.Fatalf("Expected v1 to receive 60%%, got %v/1000", v1)
|
||||
}
|
||||
if v2 < 350 || v2 > 450 {
|
||||
t.Fatalf("Expected v2 to receive 40%%, got %v/1000", v2)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user