From da287b0aea54c07bcf78a16ed20bff58f9b71e46 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Fri, 11 Nov 2022 15:45:32 -0700 Subject: [PATCH] [FIXED] Weighted subject mappings updates not applied Suppose an account is updated to have the following weighted mapping: ``` foo -> bar 40% ``` The server automatically adds foo -> foo at 60%. Sending messages to "foo" will result in the expected distribution of 60% messages going to "foo" and 40% going to bar. However, if a successive update is pushed to the server(s): ``` foo -> bar 40% foo -> baz 60% ``` The subject mapping should now be as described, that is, no more mapping from "foo" to "foo" and 40% to bar and 60% to baz, however, what was happening is that the server would always use the original mapping. Signed-off-by: Ivan Kozlovic --- server/accounts.go | 4 +- server/jwt_test.go | 131 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 133 insertions(+), 2 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index a03e4102..12c48814 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -707,8 +707,8 @@ func (a *Account) AddWeightedMappings(src string, dests ...*MapDest) error { } // Replace an old one if it exists. - for i, m := range a.mappings { - if m.src == src { + for i, em := range a.mappings { + if em.src == src { a.mappings[i] = m return nil } diff --git a/server/jwt_test.go b/server/jwt_test.go index 54e82a5a..9126d346 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -6516,3 +6516,134 @@ func TestJWTAccountConnzAccessAfterClaimUpdate(t *testing.T) { // If export was wiped this would fail with timeout. doRequest() } + +func TestAccountWeightedMappingInSuperCluster(t *testing.T) { + skp, spub := createKey(t) + sysClaim := jwt.NewAccountClaims(spub) + sysClaim.Name = "SYS" + sysCreds := newUser(t, skp) + defer removeFile(t, sysCreds) + + akp, apub := createKey(t) + aUsr := createUserCreds(t, nil, akp) + claim := jwt.NewAccountClaims(apub) + aJwtMap := encodeClaim(t, claim, apub) + + // We are using the createJetStreamSuperClusterWithTemplateAndModHook() + // helper, but this test is not about JetStream... + tmpl := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + ` + + sc := createJetStreamSuperClusterWithTemplateAndModHook(t, tmpl, 3, 3, + func(serverName, clusterName, storeDir, conf string) string { + dirSrv := createDir(t, "srv") + return fmt.Sprintf(`%s + operator: %s + system_account: %s + resolver: { + type: full + dir: '%s' + } + `, conf, ojwt, spub, dirSrv) + }, nil) + defer sc.shutdown() + + // Update from C2 + require_Len(t, 1, updateJwt(t, sc.clusterForName("C2").randomServer().ClientURL(), sysCreds, aJwtMap, 1)) + + // We will connect our services in the C3 cluster. + nc1 := natsConnect(t, sc.clusterForName("C3").randomServer().ClientURL(), aUsr) + defer nc1.Close() + nc2 := natsConnect(t, sc.clusterForName("C3").randomServer().ClientURL(), aUsr) + defer nc2.Close() + + natsSub(t, nc1, "foo", func(m *nats.Msg) { + m.Respond([]byte("foo")) + }) + natsSub(t, nc1, "bar.v1", func(m *nats.Msg) { + m.Respond([]byte("v1")) + }) + natsSub(t, nc2, "bar.v2", func(m *nats.Msg) { + m.Respond([]byte("v2")) + }) + natsFlush(t, nc1) + natsFlush(t, nc2) + + // Now we will update the account to add weighted subject mapping + claim.Mappings = map[jwt.Subject][]jwt.WeightedMapping{} + // Start with foo->bar.v2 at 40%, the server will auto-add foo->foo at 60%. + wm := []jwt.WeightedMapping{{Subject: "bar.v2", Weight: 40}} + claim.AddMapping("foo", wm...) + aJwtMap = encodeClaim(t, claim, apub) + + // We will update from C2 + require_Len(t, 1, updateJwt(t, sc.clusterForName("C2").randomServer().ClientURL(), sysCreds, aJwtMap, 1)) + + time.Sleep(time.Second) + + // And we will publish from C1 + nc := natsConnect(t, sc.clusterForName("C1").randomServer().ClientURL(), aUsr) + defer nc.Close() + + var foo, v1, v2 int + pubAndCount := func() { + for i := 0; i < 1000; i++ { + msg, err := nc.Request("foo", []byte("req"), 500*time.Millisecond) + if err != nil { + continue + } + switch string(msg.Data) { + case "foo": + foo++ + case "v1": + v1++ + case "v2": + v2++ + } + } + } + pubAndCount() + if foo < 550 || foo > 650 { + t.Fatalf("Expected foo to receive 60%%, got %v/1000", foo) + } + if v1 != 0 { + t.Fatalf("Expected v1 to receive no message, got %v/1000", v1) + } + if v2 < 350 || v2 > 450 { + t.Fatalf("Expected v2 to receive 40%%, got %v/1000", v2) + } + + // Now send a new update with foo-> bar.v2(40) and bar.v1(60). + // The auto-add of "foo" should no longer be used by the server. + wm = []jwt.WeightedMapping{ + {Subject: "bar.v2", Weight: 40}, + {Subject: "bar.v1", Weight: 60}, + } + claim.AddMapping("foo", wm...) + aJwtMap = encodeClaim(t, claim, apub) + + // We will update from C2 + require_Len(t, 1, updateJwt(t, sc.clusterForName("C2").randomServer().ClientURL(), sysCreds, aJwtMap, 1)) + + time.Sleep(time.Second) + + foo, v1, v2 = 0, 0, 0 + pubAndCount() + if foo != 0 { + t.Fatalf("Expected foo to receive no message, got %v/1000", foo) + } + if v1 < 550 || v1 > 650 { + t.Fatalf("Expected v1 to receive 60%%, got %v/1000", v1) + } + if v2 < 350 || v2 > 450 { + t.Fatalf("Expected v2 to receive 40%%, got %v/1000", v2) + } +}