Allow cluster filters for mappings, changed accountz for mappings

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-10-16 16:53:14 -07:00
parent 047600750a
commit 5f5ceb4668
4 changed files with 145 additions and 44 deletions

View File

@@ -497,8 +497,13 @@ func (a *Account) TotalSubs() int {
// MapDest is for mapping published subjects for clients.
type MapDest struct {
Subject string `json:"subject"`
Weight uint8 `json:"weight"`
Subject string `json:"subject"`
Weight uint8 `json:"weight"`
OptCluster string `json:"cluster,omitempty"`
}
func NewMapDest(subject string, weight uint8) *MapDest {
return &MapDest{subject, weight, ""}
}
// destination is for internal representation for a weighted mapped destination.
@@ -509,15 +514,16 @@ type destination struct {
// mapping is an internal entry for mapping subjects.
type mapping struct {
src string
wc bool
dests []*destination
src string
wc bool
dests []*destination
cdests map[string][]*destination
}
// AddMapping adds in a simple route mapping from src subject to dest subject
// for inbound client messages.
func (a *Account) AddMapping(src, dest string) error {
return a.AddWeightedMappings(src, &MapDest{dest, 100})
return a.AddWeightedMappings(src, NewMapDest(dest, 100))
}
// AddWeightedMapping will add in a weighted mappings for the destinations.
@@ -558,32 +564,66 @@ func (a *Account) AddWeightedMappings(src string, dests ...*MapDest) error {
if err != nil {
return err
}
m.dests = append(m.dests, &destination{tr, d.Weight})
if d.OptCluster == "" {
m.dests = append(m.dests, &destination{tr, d.Weight})
} else {
// We have a cluster scoped filter.
if m.cdests == nil {
m.cdests = make(map[string][]*destination)
}
ad := m.cdests[d.OptCluster]
ad = append(ad, &destination{tr, d.Weight})
m.cdests[d.OptCluster] = ad
}
}
// Auto add in original at weight difference if all entries weight does not total to 100.
// Iff the src was not already added in explicitly, meaning they want loss.
_, haveSrc := seen[src]
if tw != 100 && !haveSrc {
dest := src
if m.wc {
// We need to make the appropriate markers for the wildcards etc.
dest = transformTokenize(dest)
processDestinations := func(dests []*destination) ([]*destination, error) {
var ltw uint8
for _, d := range dests {
ltw += d.weight
}
tr, err := newTransform(src, dest)
if err != nil {
// Auto add in original at weight difference if all entries weight does not total to 100.
// Iff the src was not already added in explicitly, meaning they want loss.
_, haveSrc := seen[src]
if ltw != 100 && !haveSrc {
dest := src
if m.wc {
// We need to make the appropriate markers for the wildcards etc.
dest = transformTokenize(dest)
}
tr, err := newTransform(src, dest)
if err != nil {
return nil, err
}
aw := 100 - ltw
if len(dests) == 0 {
aw = 100
}
dests = append(dests, &destination{tr, aw})
}
sort.Slice(dests, func(i, j int) bool { return dests[i].weight < dests[j].weight })
var lw uint8
for _, d := range dests {
d.weight += lw
lw = d.weight
}
return dests, nil
}
var err error
if m.dests, err = processDestinations(m.dests); err != nil {
return err
}
// Option cluster scoped destinations
for cluster, dests := range m.cdests {
if dests, err = processDestinations(dests); err != nil {
return err
}
m.dests = append(m.dests, &destination{tr, 100 - tw})
m.cdests[cluster] = dests
}
sort.Slice(m.dests, func(i, j int) bool { return m.dests[i].weight < m.dests[j].weight })
var lw uint8
for _, d := range m.dests {
d.weight += lw
lw = d.weight
}
// Replace an old one if it exists.
for i, m := range a.mappings {
if m.src == src {
@@ -703,12 +743,22 @@ func (a *Account) selectMappedSubject(dest string) (string, bool) {
var d *destination
var ndest string
dests := m.dests
if len(m.cdests) > 0 {
cn := a.srv.ClusterName()
dests = m.cdests[cn]
if dests == nil {
// Fallback to main if we do not match the cluster.
dests = m.dests
}
}
// Optimize for single entry case.
if len(m.dests) == 1 && m.dests[0].weight == 100 {
d = m.dests[0]
if len(dests) == 1 && dests[0].weight == 100 {
d = dests[0]
} else {
w := uint8(a.prand.Int31n(100))
for _, rm := range m.dests {
for _, rm := range dests {
if w < rm.weight {
d = rm
break

View File

@@ -2462,7 +2462,7 @@ func TestAccountSimpleWeightedRouteMapping(t *testing.T) {
defer s.Shutdown()
acc, _ := s.LookupAccount(DEFAULT_GLOBAL_ACCOUNT)
acc.AddWeightedMappings("foo", &MapDest{"bar", 50})
acc.AddWeightedMappings("foo", NewMapDest("bar", 50))
nc := natsConnect(t, s.ClientURL())
defer nc.Close()
@@ -2509,18 +2509,18 @@ func TestAccountMultiWeightedRouteMappings(t *testing.T) {
}
}
shouldErr(&MapDest{"bar", 150})
shouldNotErr(&MapDest{"bar", 50})
shouldNotErr(&MapDest{"bar", 50}, &MapDest{"baz", 50})
shouldErr(NewMapDest("bar", 150))
shouldNotErr(NewMapDest("bar", 50))
shouldNotErr(NewMapDest("bar", 50), NewMapDest("baz", 50))
// Same dest duplicated should error.
shouldErr(&MapDest{"bar", 50}, &MapDest{"bar", 50})
shouldErr(NewMapDest("bar", 50), NewMapDest("bar", 50))
// total over 100
shouldErr(&MapDest{"bar", 50}, &MapDest{"baz", 60})
shouldErr(NewMapDest("bar", 50), NewMapDest("baz", 60))
acc.RemoveMapping("foo")
// 20 for original, you can leave it off will be auto-added.
shouldNotErr(&MapDest{"bar", 50}, &MapDest{"baz", 30})
shouldNotErr(NewMapDest("bar", 50), NewMapDest("baz", 30))
nc := natsConnect(t, s.ClientURL())
defer nc.Close()
@@ -2640,6 +2640,8 @@ func TestAccountRouteMappingsConfiguration(t *testing.T) {
if len(az.Account.Mappings) != 3 {
t.Fatalf("Expected %d mappings, saw %d", 3, len(az.Account.Mappings))
}
b, _ := json.MarshalIndent(az, "", " ")
fmt.Printf("%s", b)
}
func TestAccountRouteMappingsWithLossInjection(t *testing.T) {
@@ -2680,6 +2682,44 @@ func TestAccountRouteMappingsWithLossInjection(t *testing.T) {
}
}
func TestAccountRouteMappingsWithOriginClusterFilter(t *testing.T) {
cf := createConfFile(t, []byte(`
mappings = {
foo: [ { dest: bar, cluster: SYN, weight: 100% } ]
}
`))
defer os.Remove(cf)
s, _ := RunServerWithConfig(cf)
defer s.Shutdown()
nc := natsConnect(t, s.ClientURL())
defer nc.Close()
sub, _ := nc.SubscribeSync("foo")
total := 1000
for i := 0; i < total; i++ {
nc.Publish("foo", nil)
}
nc.Flush()
if pending, _, _ := sub.Pending(); pending != total {
t.Fatalf("Expected pending to be %d, got %d", total, pending)
}
s.setClusterName("SYN")
sub, _ = nc.SubscribeSync("bar")
for i := 0; i < total; i++ {
nc.Publish("foo", nil)
}
nc.Flush()
if pending, _, _ := sub.Pending(); pending != total {
t.Fatalf("Expected pending to be %d, got %d", total, pending)
}
}
func TestAccountServiceImportWithRouteMappings(t *testing.T) {
cf := createConfFile(t, []byte(`
accounts {

View File

@@ -1934,10 +1934,7 @@ type ExtExport struct {
ApprovedAccounts []string `json:"approved_accounts,omitempty"`
}
type ExtMap struct {
Source string `json:"src"`
Destination []*MapDest `json:"dest"`
}
type ExtMap map[string][]*MapDest
type AccountInfo struct {
AccountName string `json:"account_name"`
@@ -1948,7 +1945,7 @@ type AccountInfo struct {
LeafCnt int `json:"leafnode_connections"`
ClientCnt int `json:"client_connections"`
SubCnt uint32 `json:"subscriptions"`
Mappings []*ExtMap `json:"mappings,omitempty"`
Mappings ExtMap `json:"mappings,omitempty"`
Exports []ExtExport `json:"exports,omitempty"`
Imports []ExtImport `json:"imports,omitempty"`
Jwt string `json:"jwt,omitempty"`
@@ -2063,13 +2060,13 @@ func (s *Server) accountInfo(accName string) (*AccountInfo, error) {
})
}
var mappings []*ExtMap
mappings := ExtMap{}
for _, m := range a.mappings {
e := &ExtMap{m.src, nil}
var dests []*MapDest
for _, d := range m.dests {
e.Destination = append(e.Destination, &MapDest{d.tr.dest, d.weight})
dests = append(dests, &MapDest{d.tr.dest, d.weight, ""})
}
mappings = append(mappings, e)
mappings[m.src] = dests
}
return &AccountInfo{

View File

@@ -1887,6 +1887,7 @@ func parseAccountMappings(v interface{}, acc *Account, errors *[]error, warnings
continue
}
mdest := &MapDest{}
var sw bool
for k, v := range mv {
tk, dmv := unwrapValue(v, &lt)
switch strings.ToLower(k) {
@@ -1911,6 +1912,7 @@ func parseAccountMappings(v interface{}, acc *Account, errors *[]error, warnings
continue
}
mdest.Weight = uint8(weight)
sw = true
case int64:
weight := vv
if weight > 100 || weight < 0 {
@@ -1919,13 +1921,25 @@ func parseAccountMappings(v interface{}, acc *Account, errors *[]error, warnings
continue
}
mdest.Weight = uint8(weight)
sw = true
default:
err := &configErr{tk, fmt.Sprintf("Unknown entry type for weight of %v\n", vv)}
*errors = append(*errors, err)
continue
}
case "cluster":
mdest.OptCluster = dmv.(string)
default:
err := &configErr{tk, fmt.Sprintf("Unknown field %q for mapping destination", k)}
*errors = append(*errors, err)
continue
}
}
if !sw {
err := &configErr{tk, fmt.Sprintf("Missing weight for mapping destination %q", mdest.Subject)}
*errors = append(*errors, err)
continue
}
mappings = append(mappings, mdest)
}