diff --git a/server/accounts.go b/server/accounts.go index ad8e73b8..1a83c6b4 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -64,6 +64,7 @@ type Account struct { lqws map[string]int32 usersRevoked map[string]int64 actsRevoked map[string]int64 + mappings []*mapping lleafs []*client imports importMap exports exportMap @@ -100,8 +101,11 @@ type sconns struct { type streamImport struct { acc *Account from string - prefix string + to string + tr *transform + rtr *transform claim *jwt.Import + usePub bool invalid bool } @@ -114,12 +118,13 @@ type serviceImport struct { from string to string exsub string + tr *transform ts int64 rt ServiceRespType latency *serviceLatency m1 *ServiceLatency rc *client - hasWC bool + usePub bool response bool invalid bool share bool @@ -209,7 +214,6 @@ func NewAccount(name string) *Account { limits: limits{-1, -1, -1, -1}, eventIds: nuid.New(), } - return a } @@ -490,6 +494,244 @@ func (a *Account) TotalSubs() int { return int(a.sl.Count()) } +// MapDest is for mapping published subjects for clients. +type MapDest struct { + Subject string + Weight uint8 +} + +// destination is for internal representation for a weighted mapped destination. +type destination struct { + tr *transform + weight uint8 +} + +// mapping is an internal entry for mapping subjects. +type mapping struct { + src string + wc bool + dests []*destination +} + +// AddMapping adds in a simple route mapping from src subject to dest subject +// for inbound client messages. +func (a *Account) AddMapping(src, dest string) error { + return a.AddWeightedMappings(src, &MapDest{dest, 100}) +} + +// AddWeightedMapping will add in a weighted mappings for the destinations. +// TODO(dlc) - Allow cluster filtering +func (a *Account) AddWeightedMappings(src string, dests ...*MapDest) error { + a.mu.Lock() + defer a.mu.Unlock() + + if !IsValidSubject(src) { + return ErrBadSubject + } + + m := &mapping{src: src, wc: subjectHasWildcard(src), dests: make([]*destination, 0, len(dests)+1)} + seen := make(map[string]struct{}) + + var tw uint8 + for _, d := range dests { + if _, ok := seen[d.Subject]; ok { + return fmt.Errorf("duplicate entry for %q", d.Subject) + } + seen[d.Subject] = struct{}{} + if d.Weight > 100 { + return fmt.Errorf("individual weights need to be <= 100") + } + tw += d.Weight + if tw > 100 { + return fmt.Errorf("total weight needs to be <= 100") + } + if !IsValidSubject(d.Subject) { + return ErrBadSubject + } + tr, err := newTransform(src, d.Subject) + if err != nil { + return err + } + + m.dests = append(m.dests, &destination{tr, d.Weight}) + } + + // Auto add in original at weight if all entries weight does not total to 100. + if tw != 100 { + dest := src + if m.wc { + // We need to make the appropriate markers for the wildcards etc. + dest = transformTokenize(dest) + } + tr, err := newTransform(src, dest) + if err != nil { + return err + } + m.dests = append(m.dests, &destination{tr, 100 - tw}) + } + sort.Slice(m.dests, func(i, j int) bool { return m.dests[i].weight < m.dests[j].weight }) + + var lw uint8 + for _, d := range m.dests { + d.weight += lw + lw = d.weight + } + // Replace an old one if it exists. + for i, m := range a.mappings { + if m.src == src { + a.mappings[i] = m + return nil + } + } + // If we did not replace add to the end. + a.mappings = append(a.mappings, m) + + return nil +} + +// Helper function to tokenize subjects with partial wildcards into formal transform destinations. +// e.g. foo.*.* -> foo.$1.$2 +func transformTokenize(subject string) string { + // We need to make the appropriate markers for the wildcards etc. + i := 1 + var nda []string + for _, token := range strings.Split(subject, tsep) { + if token == "*" { + nda = append(nda, fmt.Sprintf("$%d", i)) + i++ + } else { + nda = append(nda, token) + } + } + return strings.Join(nda, tsep) +} + +func transformUntokenize(subject string) (string, []string) { + var phs []string + var nda []string + + for _, token := range strings.Split(subject, tsep) { + if len(token) > 1 && token[0] == '$' && token[1] >= '1' && token[1] <= '9' { + phs = append(phs, token) + nda = append(nda, "*") + } else { + nda = append(nda, token) + } + } + return strings.Join(nda, tsep), phs +} + +// RemoveMapping will remove an existing mapping. +func (a *Account) RemoveMapping(src string) bool { + a.mu.Lock() + defer a.mu.Unlock() + for i, m := range a.mappings { + if m.src == src { + // Swap last one into this spot. Its ok to change order. + a.mappings[i] = a.mappings[len(a.mappings)-1] + a.mappings[len(a.mappings)-1] = nil // gc + a.mappings = a.mappings[:len(a.mappings)-1] + return true + } + } + return false +} + +// Indicates we have mapping entries. +func (a *Account) hasMappings() bool { + if a == nil { + return false + } + a.mu.RLock() + n := len(a.mappings) + a.mu.RUnlock() + return n > 0 +} + +// This performs the logic to map to a new dest subject based on mappings. +// Should only be called from processInboundClientMsg or service import processing. +func (a *Account) selectMappedSubject(dest string) (string, bool) { + a.mu.RLock() + if len(a.mappings) == 0 { + a.mu.RUnlock() + return dest, false + } + + // In case we have to tokenize for subset matching. + tsa := [32]string{} + tts := tsa[:0] + + var m *mapping + for _, rm := range a.mappings { + if !rm.wc && rm.src == dest { + m = rm + break + } else { + // tokenize and reuse for subset matching. + if len(tts) == 0 { + start := 0 + subject := dest + for i := 0; i < len(subject); i++ { + if subject[i] == btsep { + tts = append(tts, subject[start:i]) + start = i + 1 + } + } + tts = append(tts, subject[start:]) + } + if isSubsetMatch(tts, rm.src) { + m = rm + break + } + } + } + + if m == nil { + a.mu.RUnlock() + return dest, false + } + + // The selected destination for the mapping. + var d *destination + + // Optimize for single entry case. + if len(m.dests) == 1 && m.dests[0].weight == 100 { + d = m.dests[0] + } else { + if a.prand == nil { + a.makeRand() + } + w := uint8(a.prand.Int31n(100)) + for _, rm := range m.dests { + if w <= rm.weight { + d = rm + break + } + } + } + + if d != nil { + if len(d.tr.dtpi) == 0 { + dest = d.tr.dest + } else if nsubj, err := d.tr.transform(tts); err == nil { + dest = nsubj + } + } + + a.mu.RUnlock() + return dest, true +} + +// Small helper function. +// Read lock assumed held. +func (a *Account) makeRand() { + a.mu.RUnlock() + a.mu.Lock() + a.prand = rand.New(rand.NewSource(time.Now().UnixNano())) + a.mu.Unlock() + a.mu.RLock() +} + // SubscriptionInterest returns true if this account has a matching subscription // for the given `subject`. Works only for literal subjects. // TODO: Add support for wildcards @@ -1040,15 +1282,14 @@ func (a *Account) AddServiceImportWithClaim(destination *Account, from, to strin if destination == nil { return ErrMissingAccount } - // Empty means use from. Also means we can use wildcards since we are not doing remapping. - if !IsValidSubject(from) || (to != "" && (!IsValidLiteralSubject(from) || !IsValidLiteralSubject(to))) { - return ErrInvalidSubject - } - // Empty means use from. if to == "" { to = from } + if !IsValidSubject(from) || !IsValidSubject(to) { + return ErrInvalidSubject + } + // First check to see if the account has authorized us to route to the "to" subject. if !destination.checkServiceImportAuthorized(a, to, imClaim) { return ErrServiceImportAuthorization @@ -1332,9 +1573,27 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im if to == "" { to = from } - hasWC := subjectHasWildcard(from) + // Check to see if we have a wildcard + var ( + usePub bool + tr *transform + err error + ) + if subjectHasWildcard(to) { + // If to and from match, then we use the published subject. + if to == from { + usePub = true + } else { + // Create a transform + if tr, err = newTransform(from, transformTokenize(to)); err != nil { + a.mu.Unlock() + return nil, fmt.Errorf("failed to create mapping transform for service import subject %q to %q: %v", + from, to, err) + } + } + } - si := &serviceImport{dest, claim, se, nil, from, to, "", 0, rt, lat, nil, nil, hasWC, false, false, false, false, nil} + si := &serviceImport{dest, claim, se, nil, from, to, "", tr, 0, rt, lat, nil, nil, usePub, false, false, false, false, nil} a.imports.services[from] = si a.mu.Unlock() @@ -1748,7 +2007,7 @@ func (a *Account) addRespServiceImport(dest *Account, to string, osi *serviceImp // dest is the requestor's account. a is the service responder with the export. // Marked as internal here, that is how we distinguish. - si := &serviceImport{dest, nil, osi.se, nil, nrr, to, osi.to, 0, rt, nil, nil, nil, false, true, false, osi.share, false, nil} + si := &serviceImport{dest, nil, osi.se, nil, nrr, to, osi.to, nil, 0, rt, nil, nil, nil, false, true, false, osi.share, false, nil} if a.exports.responses == nil { a.exports.responses = make(map[string]*serviceImport) @@ -1800,12 +2059,51 @@ func (a *Account) AddStreamImportWithClaim(account *Account, from, prefix string prefix = prefix + string(btsep) } } + return a.AddMappedStreamImportWithClaim(account, from, prefix+from, imClaim) +} + +// AddMappedStreamImport helper for AddMappedStreamImportWithClaim +func (a *Account) AddMappedStreamImport(account *Account, from, to string) error { + return a.AddMappedStreamImportWithClaim(account, from, to, nil) +} + +// AddMappedStreamImportWithClaim will add in the stream import from a specific account with optional token. +func (a *Account) AddMappedStreamImportWithClaim(account *Account, from, to string, imClaim *jwt.Import) error { + if account == nil { + return ErrMissingAccount + } + + // First check to see if the account has authorized export of the subject. + if !account.checkStreamImportAuthorized(a, from, imClaim) { + return ErrStreamImportAuthorization + } + + if to == "" { + to = from + } + var ( + usePub bool + tr *transform + err error + ) + if subjectHasWildcard(from) { + if to == from { + usePub = true + } else { + // Create a transform + if tr, err = newTransform(from, transformTokenize(to)); err != nil { + return fmt.Errorf("failed to create mapping transform for stream import subject %q to %q: %v", + from, to, err) + } + } + } + a.mu.Lock() if a.isStreamImportDuplicate(account, from) { a.mu.Unlock() return ErrStreamImportDuplicate } - a.imports.streams = append(a.imports.streams, &streamImport{account, from, prefix, imClaim, false}) + a.imports.streams = append(a.imports.streams, &streamImport{account, from, to, tr, nil, imClaim, usePub, false}) a.mu.Unlock() return nil } @@ -2150,10 +2448,10 @@ func (a *Account) checkStreamImportsEqual(b *Account) bool { // Load the b imports into a map index by what we are looking for. bm := make(map[string]*streamImport, len(b.imports.streams)) for _, bim := range b.imports.streams { - bm[bim.acc.Name+bim.from+bim.prefix] = bim + bm[bim.acc.Name+bim.from+bim.to] = bim } for _, aim := range a.imports.streams { - if _, ok := bm[aim.acc.Name+aim.from+aim.prefix]; !ok { + if _, ok := bm[aim.acc.Name+aim.from+aim.to]; !ok { return false } } @@ -3281,3 +3579,185 @@ func (dr *CacheDirAccResolver) Start(s *Server) error { s.Noticef("Managing some jwt in exclusive directory %s", dr.directory) return nil } + +func (dr *CacheDirAccResolver) Reload() error { + return dr.DirAccResolver.Reload() +} + +// Transforms for arbitrarily mapping subjects from one to another for maps, tees and filters. +// These can also be used for proper mapping on wildcard exports/imports. +// These will be grouped and caching and locking are assumed to be in the upper layers. +type transform struct { + src, dest string + dtoks []string + stoks []string + dtpi []int8 +} + +// Helper to pull raw place holder index. Returns -1 if not a place holder. +func placeHolderIndex(token string) int { + if len(token) > 1 && token[0] == '$' { + var tp int + if n, err := fmt.Sscanf(token, "$%d", &tp); err == nil && n == 1 { + return tp + } + } + return -1 +} + +// newTransform will create a new transform checking the src and dest subjects for accuracy. +func newTransform(src, dest string) (*transform, error) { + // Both entries need to be valid subjects. + sv, stokens, npwcs, hasFwc := subjectInfo(src) + dv, dtokens, dnpwcs, dHasFwc := subjectInfo(dest) + + // Make sure both are valid, match fwc if present and there are no pwcs in the dest subject. + if !sv || !dv || dnpwcs > 0 || hasFwc != dHasFwc { + return nil, ErrBadSubject + } + + var dtpi []int8 + + // If the src has partial wildcards then the dest needs to have the token place markers. + if npwcs > 0 || hasFwc { + // We need to count to make sure that the dest has token holders for the pwcs. + sti := make(map[int]int) + for i, token := range stokens { + if len(token) == 1 && token[0] == pwc { + sti[len(sti)+1] = i + } + } + + nphs := 0 + for _, token := range dtokens { + tp := placeHolderIndex(token) + if tp >= 0 { + if tp > npwcs { + return nil, ErrBadSubject + } + nphs++ + // Now build up our runtime mapping from dest to source tokens. + dtpi = append(dtpi, int8(sti[tp])) + } else { + dtpi = append(dtpi, -1) + } + } + + if nphs != npwcs { + return nil, ErrBadSubject + } + } + + return &transform{src: src, dest: dest, dtoks: dtokens, stoks: stokens, dtpi: dtpi}, nil +} + +// match will take a literal published subject that is associated with a client and will match and transform +// the subject if possible. +// TODO(dlc) - We could add in client here to allow for things like foo -> foo.$ACCOUNT +func (tr *transform) match(subject string) (string, error) { + // Tokenize the subject. This should always be a literal subject. + tsa := [32]string{} + tts := tsa[:0] + start := 0 + for i := 0; i < len(subject); i++ { + if subject[i] == btsep { + tts = append(tts, subject[start:i]) + start = i + 1 + } + } + tts = append(tts, subject[start:]) + if !isValidLiteralSubject(tts) { + return "", ErrBadSubject + } + + if isSubsetMatch(tts, tr.src) { + return tr.transform(tts) + } + return "", ErrNoTransforms +} + +// Do not need to match, just transform. +func (tr *transform) transformSubject(subject string) (string, error) { + // Tokenize the subject. + tsa := [32]string{} + tts := tsa[:0] + start := 0 + for i := 0; i < len(subject); i++ { + if subject[i] == btsep { + tts = append(tts, subject[start:i]) + start = i + 1 + } + } + tts = append(tts, subject[start:]) + return tr.transform(tts) +} + +// Do a transform on the subject to the dest subject. +func (tr *transform) transform(tokens []string) (string, error) { + if len(tr.dtpi) == 0 { + return tr.dest, nil + } + + var b strings.Builder + var token string + + // We need to walk destination tokens and create the mapped subject pulling tokens from src. + // This is slow and that is ok, transforms should have caching layer in front for mapping transforms + // and export/import semantics with streams and services. + li := len(tr.dtpi) - 1 + for i, index := range tr.dtpi { + // <0 means use destination token. + if index < 0 { + token = tr.dtoks[i] + // Break if fwc + if len(token) == 1 && token[0] == fwc { + break + } + } else { + // >= 0 means use source map index to figure out which source token to pull. + token = tokens[index] + } + b.WriteString(token) + if i < li { + b.WriteByte(btsep) + } + } + + // We may have more source tokens available. This happens with ">". + if tr.dtoks[len(tr.dtoks)-1] == ">" { + for sli, i := len(tokens)-1, len(tr.stoks)-1; i < len(tokens); i++ { + b.WriteString(tokens[i]) + if i < sli { + b.WriteByte(btsep) + } + } + } + return b.String(), nil +} + +// Reverse a transform. +func (tr *transform) reverse() *transform { + if len(tr.dtpi) == 0 { + rtr, _ := newTransform(tr.dest, tr.src) + return rtr + } + // If we are here we need to dynamically get the correct reverse + // of this transform. + nsrc, phs := transformUntokenize(tr.dest) + var nda []string + for _, token := range tr.stoks { + if token == "*" { + if len(phs) == 0 { + // TODO(dlc) - Should not happen + return nil + } + nda = append(nda, phs[0]) + phs = phs[1:] + } else { + nda = append(nda, token) + } + } + ndest := strings.Join(nda, tsep) + rtr, _ := newTransform(nsrc, ndest) + return rtr +} diff --git a/server/accounts_test.go b/server/accounts_test.go index b84484dd..7f175800 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -1208,7 +1208,6 @@ func TestAccountAddServiceImportRace(t *testing.T) { go func(i int) { err := barAcc.AddServiceImport(fooAcc, fmt.Sprintf("foo.%d", i), "") errCh <- err // nil is a valid value. - }(i) } @@ -1391,13 +1390,10 @@ func TestCrossAccountRequestReply(t *testing.T) { t.Fatalf("Error adding account service export to client foo: %v", err) } - // Test addServiceImport to make sure it requires accounts, and literalsubjects for both from and to subjects. + // Test addServiceImport to make sure it requires accounts. if err := cbar.acc.AddServiceImport(nil, "foo", "test.request"); err != ErrMissingAccount { t.Fatalf("Expected ErrMissingAccount but received %v.", err) } - if err := cbar.acc.AddServiceImport(fooAcc, "*", "test.request"); err != ErrInvalidSubject { - t.Fatalf("Expected ErrInvalidSubject but received %v.", err) - } if err := cbar.acc.AddServiceImport(fooAcc, "foo", "test..request."); err != ErrInvalidSubject { t.Fatalf("Expected ErrInvalidSubject but received %v.", err) } @@ -2334,6 +2330,443 @@ func TestMultipleStreamImportsWithSameSubject(t *testing.T) { readMsg() } +func TestAccountBasicRouteMapping(t *testing.T) { + opts := DefaultOptions() + opts.Port = -1 + s := RunServer(opts) + defer s.Shutdown() + + acc, _ := s.LookupAccount(DEFAULT_GLOBAL_ACCOUNT) + acc.AddMapping("foo", "bar") + + nc := natsConnect(t, s.ClientURL()) + defer nc.Close() + + fsub, _ := nc.SubscribeSync("foo") + bsub, _ := nc.SubscribeSync("bar") + nc.Publish("foo", nil) + nc.Flush() + + checkPending := func(sub *nats.Subscription, expected int) { + t.Helper() + if n, _, _ := sub.Pending(); n != expected { + t.Fatalf("Expected %d msgs for %q, but got %d", expected, sub.Subject, n) + } + } + + checkPending(fsub, 0) + checkPending(bsub, 1) + + acc.RemoveMapping("foo") + + nc.Publish("foo", nil) + nc.Flush() + + checkPending(fsub, 1) + checkPending(bsub, 1) +} + +func TestAccountWildcardRouteMapping(t *testing.T) { + opts := DefaultOptions() + opts.Port = -1 + s := RunServer(opts) + defer s.Shutdown() + + acc, _ := s.LookupAccount(DEFAULT_GLOBAL_ACCOUNT) + + addMap := func(src, dest string) { + t.Helper() + if err := acc.AddMapping(src, dest); err != nil { + t.Fatalf("Error adding mapping: %v", err) + } + } + + addMap("foo.*.*", "bar.$2.$1") + addMap("bar.*.>", "baz.$1.>") + + nc := natsConnect(t, s.ClientURL()) + defer nc.Close() + + pub := func(subj string) { + t.Helper() + err := nc.Publish(subj, nil) + if err == nil { + err = nc.Flush() + } + if err != nil { + t.Fatalf("Error publishing: %v", err) + } + } + + fsub, _ := nc.SubscribeSync("foo.>") + bsub, _ := nc.SubscribeSync("bar.>") + zsub, _ := nc.SubscribeSync("baz.>") + + checkPending := func(sub *nats.Subscription, expected int) { + t.Helper() + if n, _, _ := sub.Pending(); n != expected { + t.Fatalf("Expected %d msgs for %q, but got %d", expected, sub.Subject, n) + } + } + + pub("foo.1.2") + + checkPending(fsub, 0) + checkPending(bsub, 1) + checkPending(zsub, 0) +} + +func TestAccountRouteMappingChangesAfterClientStart(t *testing.T) { + opts := DefaultOptions() + opts.Port = -1 + s := RunServer(opts) + defer s.Shutdown() + + // Create the client first then add in mapping. + nc := natsConnect(t, s.ClientURL()) + defer nc.Close() + + nc.Flush() + + acc, _ := s.LookupAccount(DEFAULT_GLOBAL_ACCOUNT) + acc.AddMapping("foo", "bar") + + fsub, _ := nc.SubscribeSync("foo") + bsub, _ := nc.SubscribeSync("bar") + nc.Publish("foo", nil) + nc.Flush() + + checkPending := func(sub *nats.Subscription, expected int) { + t.Helper() + if n, _, _ := sub.Pending(); n != expected { + t.Fatalf("Expected %d msgs for %q, but got %d", expected, sub.Subject, n) + } + } + + checkPending(fsub, 0) + checkPending(bsub, 1) + + acc.RemoveMapping("foo") + + nc.Publish("foo", nil) + nc.Flush() + + checkPending(fsub, 1) + checkPending(bsub, 1) +} + +func TestAccountSimpleWeightedRouteMapping(t *testing.T) { + opts := DefaultOptions() + opts.Port = -1 + s := RunServer(opts) + defer s.Shutdown() + + acc, _ := s.LookupAccount(DEFAULT_GLOBAL_ACCOUNT) + acc.AddWeightedMappings("foo", &MapDest{"bar", 50}) + + nc := natsConnect(t, s.ClientURL()) + defer nc.Close() + + fsub, _ := nc.SubscribeSync("foo") + bsub, _ := nc.SubscribeSync("bar") + + total := 500 + for i := 0; i < total; i++ { + nc.Publish("foo", nil) + } + nc.Flush() + + fpending, _, _ := fsub.Pending() + bpending, _, _ := bsub.Pending() + + h := total / 2 + tp := h / 5 + min, max := h-tp, h+tp + if fpending < min || fpending > max { + t.Fatalf("Expected about %d msgs, got %d and %d", h, fpending, bpending) + } +} + +func TestAccountMultiWeightedRouteMappings(t *testing.T) { + opts := DefaultOptions() + opts.Port = -1 + s := RunServer(opts) + defer s.Shutdown() + + acc, _ := s.LookupAccount(DEFAULT_GLOBAL_ACCOUNT) + + // Check failures for bad weights. + shouldErr := func(rds ...*MapDest) { + t.Helper() + if acc.AddWeightedMappings("foo", rds...) == nil { + t.Fatalf("Expected an error, got none") + } + } + shouldNotErr := func(rds ...*MapDest) { + t.Helper() + if err := acc.AddWeightedMappings("foo", rds...); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } + + shouldErr(&MapDest{"bar", 150}) + shouldNotErr(&MapDest{"bar", 50}) + shouldNotErr(&MapDest{"bar", 50}, &MapDest{"baz", 50}) + // Same dest duplicated should error. + shouldErr(&MapDest{"bar", 50}, &MapDest{"bar", 50}) + // total over 100 + shouldErr(&MapDest{"bar", 50}, &MapDest{"baz", 60}) + + acc.RemoveMapping("foo") + + // 20 for original, you can leave it off will be auto-added. + shouldNotErr(&MapDest{"bar", 50}, &MapDest{"baz", 30}) + + nc := natsConnect(t, s.ClientURL()) + defer nc.Close() + + fsub, _ := nc.SubscribeSync("foo") + bsub, _ := nc.SubscribeSync("bar") + zsub, _ := nc.SubscribeSync("baz") + + // For checking later. + rds := []struct { + sub *nats.Subscription + w uint8 + }{ + {fsub, 20}, + {bsub, 50}, + {zsub, 30}, + } + + total := 5000 + for i := 0; i < total; i++ { + nc.Publish("foo", nil) + } + nc.Flush() + + for _, rd := range rds { + pending, _, _ := rd.sub.Pending() + expected := total / int(100/rd.w) + tp := expected / 5 // 20% + min, max := expected-tp, expected+tp + if pending < min || pending > max { + t.Fatalf("Expected about %d msgs for %q, got %d", expected, rd.sub.Subject, pending) + } + } +} + +func TestGlobalAccountRouteMappingsConfiguration(t *testing.T) { + cf := createConfFile(t, []byte(` + mappings = { + foo: bar + foo.*: [ { dest: bar.v1.$1, weight: 40% }, { destination: baz.v2.$1, weight: 20 } ] + bar.*.*: RAB.$2.$1 + } + `)) + defer os.Remove(cf) + + s, _ := RunServerWithConfig(cf) + defer s.Shutdown() + + nc := natsConnect(t, s.ClientURL()) + defer nc.Close() + + bsub, _ := nc.SubscribeSync("bar") + fsub1, _ := nc.SubscribeSync("bar.v1.>") + fsub2, _ := nc.SubscribeSync("baz.v2.>") + zsub, _ := nc.SubscribeSync("RAB.>") + f22sub, _ := nc.SubscribeSync("foo.*") + + checkPending := func(sub *nats.Subscription, expected int) { + t.Helper() + if n, _, _ := sub.Pending(); n != expected { + t.Fatalf("Expected %d msgs for %q, but got %d", expected, sub.Subject, n) + } + } + + nc.Publish("foo", nil) + nc.Publish("bar.11.22", nil) + + total := 500 + for i := 0; i < total; i++ { + nc.Publish("foo.22", nil) + } + nc.Flush() + + checkPending(bsub, 1) + checkPending(zsub, 1) + + fpending, _, _ := f22sub.Pending() + fpending1, _, _ := fsub1.Pending() + fpending2, _, _ := fsub2.Pending() + + if fpending1 < fpending2 || fpending < fpending2 { + t.Fatalf("Loadbalancing seems off for the foo.* mappings: %d and %d and %d", fpending, fpending1, fpending2) + } +} + +func TestAccountRouteMappingsConfiguration(t *testing.T) { + cf := createConfFile(t, []byte(` + accounts { + synadia { + users = [{user: derek, password: foo}] + mappings = { + foo: bar + foo.*: [ { dest: bar.v1.$1, weight: 40% }, { destination: baz.v2.$1, weight: 20 } ] + bar.*.*: RAB.$2.$1 + } + } + } + `)) + defer os.Remove(cf) + + s, _ := RunServerWithConfig(cf) + defer s.Shutdown() + + // We test functionality above, so for this one just make sure we have mappings for the account. + acc, _ := s.LookupAccount("synadia") + if !acc.hasMappings() { + t.Fatalf("Account %q does not have mappings", "synadia") + } +} + +func TestAccountServiceImportWithRouteMappings(t *testing.T) { + cf := createConfFile(t, []byte(` + accounts { + foo { + users = [{user: derek, password: foo}] + exports = [{service: "request"}] + } + bar { + users = [{user: ivan, password: bar}] + imports = [{service: {account: "foo", subject:"request"}}] + } + } + `)) + defer os.Remove(cf) + + s, opts := RunServerWithConfig(cf) + defer s.Shutdown() + + acc, _ := s.LookupAccount("foo") + acc.AddMapping("request", "request.v2") + + // Create the service client first. + ncFoo := natsConnect(t, fmt.Sprintf("nats://derek:foo@%s:%d", opts.Host, opts.Port)) + defer ncFoo.Close() + + fooSub := natsSubSync(t, ncFoo, "request.v2") + + // Requestor + ncBar := natsConnect(t, fmt.Sprintf("nats://ivan:bar@%s:%d", opts.Host, opts.Port)) + defer ncBar.Close() + + ncBar.Publish("request", nil) + ncBar.Flush() + + checkFor(t, time.Second, 10*time.Millisecond, func() error { + if n, _, _ := fooSub.Pending(); n != 1 { + return fmt.Errorf("Expected a request for %q, but got %d", fooSub.Subject, n) + } + return nil + }) +} + +func TestAccountImportsWithWildcardSupport(t *testing.T) { + cf := createConfFile(t, []byte(` + accounts { + foo { + users = [{user: derek, password: foo}] + exports = [ + { service: "request.*" } + { stream: "events.>" } + { stream: "info.*.*.>" } + ] + } + bar { + users = [{user: ivan, password: bar}] + imports = [ + { service: {account: "foo", subject:"request.*"}, to:"my.request.*"} + { stream: {account: "foo", subject:"events.>"}, to:"foo.events.>"} + { stream: {account: "foo", subject:"info.*.*.>"}, to:"foo.info.$2.$1.>"} + ] + } + } + `)) + defer os.Remove(cf) + + s, opts := RunServerWithConfig(cf) + defer s.Shutdown() + + ncFoo := natsConnect(t, fmt.Sprintf("nats://derek:foo@%s:%d", opts.Host, opts.Port)) + defer ncFoo.Close() + + ncBar := natsConnect(t, fmt.Sprintf("nats://ivan:bar@%s:%d", opts.Host, opts.Port)) + defer ncBar.Close() + + // Create subscriber for the service endpoint in foo. + _, err := ncFoo.QueueSubscribe("request.*", "t22", func(m *nats.Msg) { + if m.Subject != "request.22" { + t.Fatalf("Expected literal subject for request, got %q", m.Subject) + } + m.Respond([]byte("yes!")) + }) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + ncFoo.Flush() + + // Now test service import. + resp, err := ncBar.Request("my.request.22", []byte("yes?"), time.Second) + if err != nil { + t.Fatalf("Expected a response") + } + if string(resp.Data) != "yes!" { + t.Fatalf("Expected a response of %q, got %q", "yes!", resp.Data) + } + + // Now test stream imports. + esub, _ := ncBar.SubscribeSync("foo.events.*") // subset + isub, _ := ncBar.SubscribeSync("foo.info.>") + ncBar.Flush() + + // Now publish some stream events. + ncFoo.Publish("events.22", nil) + ncFoo.Publish("info.11.22.bar", nil) + ncFoo.Flush() + + checkPending := func(sub *nats.Subscription, expected int) { + t.Helper() + checkFor(t, time.Second, 10*time.Millisecond, func() error { + if n, _, _ := sub.Pending(); n != expected { + return fmt.Errorf("Expected %d msgs for %q, but got %d", expected, sub.Subject, n) + } + return nil + }) + } + + checkPending(esub, 1) + checkPending(isub, 1) + + // Now check to make sure the subjects are correct etc. + m, err := esub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if m.Subject != "foo.events.22" { + t.Fatalf("Incorrect subject for stream import, expected %q, got %q", "foo.events.22", m.Subject) + } + + m, err = isub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if m.Subject != "foo.info.22.11.bar" { + t.Fatalf("Incorrect subject for stream import, expected %q, got %q", "foo.info.22.11.bar", m.Subject) + } +} + func BenchmarkNewRouteReply(b *testing.B) { opts := defaultServerOptions s := New(&opts) @@ -2403,3 +2836,56 @@ func TestSamplingHeader(t *testing.T) { test(true, http.Header{"traceparent": []string{"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"}}) test(false, http.Header{"traceparent": []string{"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-00"}}) } + +func TestSubjectTransforms(t *testing.T) { + shouldErr := func(src, dest string) { + t.Helper() + if _, err := newTransform(src, dest); err != ErrBadSubject { + t.Fatalf("Did not get an error for src=%q and dest=%q", src, dest) + } + } + + // Must be valid subjects. + shouldErr("foo", "") + shouldErr("foo..", "bar") + + // Wildcards are allowed in src, but must be matched by token placements on the other side. + // e.g. foo.* -> bar.$1. + // Need to have as many pwcs as placements on other side. + shouldErr("foo.*", "bar.*") + shouldErr("foo.*", "bar.$2") // Bad pwc token identifier + shouldErr("foo.*", "bar.$1.>") // fwcs have to match. + shouldErr("foo.>", "bar.baz") // fwcs have to match. + shouldErr("foo.*.*", "bar.$2") // Must place all pwcs. + + shouldBeOK := func(src, dest string) *transform { + t.Helper() + tr, err := newTransform(src, dest) + if err != nil { + t.Fatalf("Got an error %v for src=%q and dest=%q", err, src, dest) + } + return tr + } + + shouldBeOK("foo", "bar") + shouldBeOK("foo.*.bar.*.baz", "req.$2.$1") + shouldBeOK("baz.>", "mybaz.>") + + shouldMatch := func(src, dest, sample, expected string) { + t.Helper() + tr := shouldBeOK(src, dest) + s, err := tr.match(sample) + if err != nil { + t.Fatalf("Got an error %v when expecting a match for %q to %q", err, sample, expected) + } + if s != expected { + t.Fatalf("Dest does not match what was expected. Got %q, expected %q", s, expected) + } + } + + shouldMatch("foo", "bar", "foo", "bar") + shouldMatch("foo.*.bar.*.baz", "req.$2.$1", "foo.A.bar.B.baz", "req.B.A") + shouldMatch("baz.>", "my.pre.>", "baz.1.2.3", "my.pre.1.2.3") + shouldMatch("baz.>", "foo.bar.>", "baz.1.2.3", "foo.bar.1.2.3") + shouldMatch("*", "foo.bar.$1", "foo", "foo.bar.foo") +} diff --git a/server/client.go b/server/client.go index 8e133a54..5e19201a 100644 --- a/server/client.go +++ b/server/client.go @@ -322,6 +322,13 @@ const ( replyPermLimit = 4096 ) +// Represent read cache booleans with a bitmask +type readCacheFlag uint16 + +const ( + hasMappings readCacheFlag = 1 << iota // For account subject mappings. +) + // Used in readloop to cache hot subject lookups and group statistics. type readCache struct { // These are for clients who are bound to a single account. @@ -344,6 +351,24 @@ type readCache struct { rsz int32 // Read buffer size srs int32 // Short reads, used for dynamic buffer resizing. + + // These are for readcache flags to avoind locks. + flags readCacheFlag +} + +// set the flag (would be equivalent to set the boolean to true) +func (rcf *readCacheFlag) set(c readCacheFlag) { + *rcf |= c +} + +// clear the flag (would be equivalent to set the boolean to false) +func (rcf *readCacheFlag) clear(c readCacheFlag) { + *rcf &= ^c +} + +// isSet returns true if the flag is set, false otherwise +func (rcf readCacheFlag) isSet(c readCacheFlag) bool { + return rcf&c != 0 } const ( @@ -947,6 +972,7 @@ func (c *client) readLoop(pre []byte) { c.mcl = int32(opts.MaxControlLine) } } + // Check the per-account-cache for closed subscriptions cpacc := c.kind == ROUTER || c.kind == GATEWAY // Last per-account-cache check for closed subscriptions @@ -975,7 +1001,7 @@ func (c *client) readLoop(pre []byte) { wsr.init() } - // If we have a pre parse that first. + // If we have a pre buffer parse that first. if len(pre) > 0 { c.parse(pre) } @@ -1002,6 +1028,16 @@ func (c *client) readLoop(pre []byte) { } start := time.Now() + // Check if the account has mappings and if so set the local readcache flag. + // We check here to make sure any changes such as config reload are reflected here. + if c.kind == CLIENT { + if c.acc.hasMappings() { + c.in.flags.set(hasMappings) + } else { + c.in.flags.clear(hasMappings) + } + } + // Clear inbound stats cache c.in.msgs = 0 c.in.bytes = 0 @@ -2302,6 +2338,12 @@ func (c *client) processSub(subject, queue, bsid []byte, cb msgHandler, noForwar return sub, nil } +// Used to pass stream import matches to addShadowSub +type ime struct { + im *streamImport + dyn bool +} + // If the client's account has stream imports and there are matches for // this subscription's subject, then add shadow subscriptions in the // other accounts that export this subject. @@ -2311,29 +2353,25 @@ func (c *client) addShadowSubscriptions(acc *Account, sub *subscription) error { } var ( - rims [32]*streamImport - ims = rims[:0] - rfroms [32]*streamImport - froms = rfroms[:0] + _ims [16]ime + ims = _ims[:0] tokens []string tsa [32]string hasWC bool ) acc.mu.RLock() - // Loop over the import subjects. We have 3 scenarios. If we exact - // match or we know the proposed subject is a strict subset of the - // import we can subscribe to the subscription's subject directly. - // The third scenario is where the proposed subject has a wildcard - // and may not be an exact subset, but is a match. Therefore we have to - // subscribe to the import subject, not the subscription's subject. + // Loop over the import subjects. We have 3 scenarios. If we have an + // exact match or a superset match we should use the from field from + // the import. If we are a subset, we have to dynamically calculate + // the subject. for _, im := range acc.imports.streams { if im.invalid { continue } subj := string(sub.subject) - if subj == im.prefix+im.from { - ims = append(ims, im) + if subj == im.to { + ims = append(ims, ime{im, false}) continue } if tokens == nil { @@ -2352,36 +2390,24 @@ func (c *client) addShadowSubscriptions(acc *Account, sub *subscription) error { } tokens = append(tokens, subj[start:]) } - if isSubsetMatch(tokens, im.prefix+im.from) { - ims = append(ims, im) - } else if hasWC { - if subjectIsSubsetMatch(im.prefix+im.from, subj) { - froms = append(froms, im) - } + if isSubsetMatch(tokens, im.to) { + ims = append(ims, ime{im, true}) + } else if hasWC && subjectIsSubsetMatch(im.to, subj) { + ims = append(ims, ime{im, false}) } } acc.mu.RUnlock() var shadow []*subscription - if len(ims) > 0 || len(froms) > 0 { - shadow = make([]*subscription, 0, len(ims)+len(froms)) + if len(ims) > 0 { + shadow = make([]*subscription, 0, len(ims)) } - // Now walk through collected importMaps - for _, im := range ims { + // Now walk through collected stream imports that matched. + for _, ime := range ims { // We will create a shadow subscription. - nsub, err := c.addShadowSub(sub, im, false) - if err != nil { - return err - } - shadow = append(shadow, nsub) - } - // Now walk through importMaps that we need to subscribe - // exactly to the "from" property. - for _, im := range froms { - // We will create a shadow subscription. - nsub, err := c.addShadowSub(sub, im, true) + nsub, err := c.addShadowSub(sub, &ime) if err != nil { return err } @@ -2398,17 +2424,26 @@ func (c *client) addShadowSubscriptions(acc *Account, sub *subscription) error { } // Add in the shadow subscription. -func (c *client) addShadowSub(sub *subscription, im *streamImport, useFrom bool) (*subscription, error) { +func (c *client) addShadowSub(sub *subscription, ime *ime) (*subscription, error) { + im := ime.im nsub := *sub // copy nsub.im = im - if useFrom { - nsub.subject = []byte(im.from) - } else if im.prefix != "" { - // redo subject here to match subject in the publisher account space. - // Just remove prefix from what they gave us. That maps into other space. - nsub.subject = sub.subject[len(im.prefix):] - } + // Check if we need to change shadow subscription's subject. + if !im.usePub { + if ime.dyn { + if im.rtr == nil { + im.rtr = im.tr.reverse() + } + subj, err := im.rtr.transformSubject(string(nsub.subject)) + if err != nil { + return nil, err + } + nsub.subject = []byte(subj) + } else { + nsub.subject = []byte(im.from) + } + } c.Debugf("Creating import subscription on %q from account %q", nsub.subject, im.acc.Name) if err := im.acc.sl.Insert(&nsub); err != nil { @@ -2690,8 +2725,13 @@ func (c *client) msgHeaderForRouteOrLeaf(subj, reply []byte, rt *routeTarget, ac // Leaf nodes are LMSG mh[0] = 'L' // Remap subject if its a shadow subscription, treat like a normal client. - if rt.sub.im != nil && rt.sub.im.prefix != "" { - mh = append(mh, rt.sub.im.prefix...) + if rt.sub.im != nil { + if rt.sub.im.tr != nil { + to, _ := rt.sub.im.tr.transformSubject(string(subj)) + subj = []byte(to) + } else { + subj = []byte(rt.sub.im.to) + } } } mh = append(mh, subj...) @@ -3216,6 +3256,15 @@ func (c *client) processInboundMsg(msg []byte) { } } +// selectMappedSubject will chose the mapped subject based on the client's inbound subject. +func (c *client) selectMappedSubject() bool { + nsubj, changed := c.acc.selectMappedSubject(string(c.pa.subject)) + if changed { + c.pa.subject = []byte(nsubj) + } + return changed +} + // processInboundClientMsg is called to process an inbound msg from a client. func (c *client) processInboundClientMsg(msg []byte) bool { // Update statistics @@ -3229,6 +3278,11 @@ func (c *client) processInboundClientMsg(msg []byte) bool { return false } + // Mostly under testing scenarios. + if c.srv == nil || c.acc == nil { + return false + } + // Check pub permissions if c.perms != nil && (c.perms.pub.allow != nil || c.perms.pub.deny != nil) && !c.pubAllowed(string(c.pa.subject)) { c.pubPermissionViolation(c.pa.subject) @@ -3245,11 +3299,6 @@ func (c *client) processInboundClientMsg(msg []byte) bool { c.sendOK() } - // Mostly under testing scenarios. - if c.srv == nil || c.acc == nil { - return false - } - // Check if this client's gateway replies map is not empty if atomic.LoadInt32(&c.cgwrt) > 0 && c.handleGWReplyMap(msg) { return true @@ -3468,10 +3517,18 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt // Pick correct to subject. If we matched on a wildcard use the literal publish subject. to := si.to - if si.hasWC { + if si.tr != nil { + // FIXME(dlc) - This could be slow, may want to look at adding cache to bare transforms? + to, _ = si.tr.transformSubject(string(c.pa.subject)) + } else if si.usePub { to = string(c.pa.subject) } + // Now check to see if this account has mappings that could affect the service import. + // Can't use non locked trick like in processInboundClientMsg, so just call into selectMappedSubject + // so we only lock once. + to, _ = si.acc.selectMappedSubject(to) + // FIXME(dlc) - Do L1 cache trick like normal client? rr := si.acc.sl.Match(to) @@ -3645,10 +3702,16 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, } // Assume delivery subject is normal subject to this point. dsubj = subj - // Check for stream import mapped subs. These apply to local subs only. - if sub.im != nil && sub.im.prefix != "" { - dsubj = append(_dsubj[:0], sub.im.prefix...) - dsubj = append(dsubj, subj...) + // Check for stream import mapped subs (shadow subs). These apply to local subs only. + if sub.im != nil { + if sub.im.tr != nil { + to, _ := sub.im.tr.transformSubject(string(subj)) + dsubj = append(_dsubj[:0], to...) + } else if sub.im.usePub { + dsubj = append(_dsubj[:0], subj...) + } else { + dsubj = append(_dsubj[:0], sub.im.to...) + } } // Normal delivery mh := c.msgHeader(dsubj, creply, sub) @@ -3755,9 +3818,15 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, // Assume delivery subject is normal subject to this point. dsubj = subj // Check for stream import mapped subs. These apply to local subs only. - if sub.im != nil && sub.im.prefix != "" { - dsubj = append(_dsubj[:0], sub.im.prefix...) - dsubj = append(dsubj, subj...) + if sub.im != nil { + if sub.im.tr != nil { + to, _ := sub.im.tr.transformSubject(string(subj)) + dsubj = append(_dsubj[:0], to...) + } else if sub.im.usePub { + dsubj = append(_dsubj[:0], subj...) + } else { + dsubj = append(_dsubj[:0], sub.im.to...) + } } var rreply = reply diff --git a/server/errors.go b/server/errors.go index e3dfc58f..89e50d42 100644 --- a/server/errors.go +++ b/server/errors.go @@ -43,6 +43,12 @@ var ( // ErrBadPublishSubject represents an error condition for an invalid publish subject. ErrBadPublishSubject = errors.New("invalid publish subject") + // ErrBadSubject represents an error condition for an invalid subject. + ErrBadSubject = errors.New("invalid subject") + + // ErrBadQualifier is used to error on a bad qualifier for a transform. + ErrBadQualifier = errors.New("bad qualifier") + // ErrBadClientProtocol signals a client requested an invalid client protocol. ErrBadClientProtocol = errors.New("invalid client protocol") @@ -160,6 +166,9 @@ var ( // ErrSubscribePermissionViolation is returned when processing of a subscription fails due to permissions. ErrSubscribePermissionViolation = errors.New("subscribe permission viloation") + + // ErrNoTransforms signals no subject transforms are available to map this subject. + ErrNoTransforms = errors.New("no matching transforms available") ) // configErr is a configuration error. diff --git a/server/monitor.go b/server/monitor.go index fd67ad77..dc8ffbfc 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -2035,16 +2035,12 @@ func (s *Server) accountInfo(accName string) (*AccountInfo, error) { } imports := []ExtImport{} for _, v := range a.imports.streams { - to := "" - if v.prefix != "" { - to = v.prefix + "." + v.from - } imports = append(imports, ExtImport{ Import: jwt.Import{ Subject: jwt.Subject(v.from), Account: v.acc.Name, Type: jwt.Stream, - To: jwt.Subject(to), + To: jwt.Subject(v.to), }, Invalid: v.invalid, }) diff --git a/server/opts.go b/server/opts.go index 42bbad95..ded6f134 100644 --- a/server/opts.go +++ b/server/opts.go @@ -603,6 +603,14 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error case "logtime": o.Logtime = v.(bool) trackExplicitVal(o, &o.inConfig, "Logtime", o.Logtime) + case "mappings", "maps": + gacc := NewAccount(globalAccountName) + o.Accounts = append(o.Accounts, gacc) + err := parseAccountMappings(tk, gacc, errors, warnings) + if err != nil { + *errors = append(*errors, err) + return + } case "disable_sublist_cache", "no_sublist_cache": o.NoSublistCache = v.(bool) case "accounts": @@ -1828,6 +1836,7 @@ type importStream struct { acc *Account an string sub string + to string pre string } @@ -1844,6 +1853,99 @@ func isReservedAccount(name string) bool { return name == globalAccountName } +// parseAccountMappings is called to parse account mappings. +func parseAccountMappings(v interface{}, acc *Account, errors *[]error, warnings *[]error) error { + var lt token + defer convertPanicToErrorList(<, errors) + + tk, v := unwrapValue(v, <) + am := v.(map[string]interface{}) + for subj, mv := range am { + if !IsValidSubject(subj) { + err := &configErr{tk, fmt.Sprintf("Subject %q is not a valid subject", subj)} + *errors = append(*errors, err) + continue + } + tk, v := unwrapValue(mv, <) + + switch vv := v.(type) { + case string: + if err := acc.AddMapping(subj, v.(string)); err != nil { + err := &configErr{tk, fmt.Sprintf("Error adding mapping for %q: %v", subj, err)} + *errors = append(*errors, err) + continue + } + case []interface{}: + var mappings []*MapDest + for _, mv := range v.([]interface{}) { + tk, amv := unwrapValue(mv, <) + // These should be maps. + mv, ok := amv.(map[string]interface{}) + if !ok { + err := &configErr{tk, "Expected an entry for the mapping destination"} + *errors = append(*errors, err) + continue + } + mdest := &MapDest{} + for k, v := range mv { + tk, dmv := unwrapValue(v, <) + switch strings.ToLower(k) { + case "dest", "destination": + mdest.Subject = dmv.(string) + case "weight": + switch vv := dmv.(type) { + case string: + ws := vv + if strings.HasSuffix(ws, "%") { + ws = ws[:len(ws)-1] + } + weight, err := strconv.Atoi(ws) + if err != nil { + err := &configErr{tk, fmt.Sprintf("Invalid weight %q for mapping destination", ws)} + *errors = append(*errors, err) + continue + } + if weight > 100 || weight < 0 { + err := &configErr{tk, fmt.Sprintf("Invalid weight %d for mapping destination", weight)} + *errors = append(*errors, err) + continue + } + mdest.Weight = uint8(weight) + case int64: + weight := vv + if weight > 100 || weight < 0 { + err := &configErr{tk, fmt.Sprintf("Invalid weight %d for mapping destination", weight)} + *errors = append(*errors, err) + continue + } + mdest.Weight = uint8(weight) + } + default: + err := &configErr{tk, fmt.Sprintf("Unknown field %q for mapping destination", k)} + *errors = append(*errors, err) + continue + } + } + mappings = append(mappings, mdest) + } + + // Now add them in.. + if err := acc.AddWeightedMappings(subj, mappings...); err != nil { + err := &configErr{tk, fmt.Sprintf("Error adding mapping for %q: %v", subj, err)} + *errors = append(*errors, err) + continue + } + + default: + err := &configErr{tk, fmt.Sprintf("Unknown type %T for mapping destination", vv)} + *errors = append(*errors, err) + continue + } + } + + return nil +} + // parseAccounts will parse the different accounts syntax. func parseAccounts(v interface{}, opts *Options, errors *[]error, warnings *[]error) error { var ( @@ -1958,6 +2060,12 @@ func parseAccounts(v interface{}, opts *Options, errors *[]error, warnings *[]er continue } acc.defaultPerms = permissions + case "mappings", "maps": + err := parseAccountMappings(tk, acc, errors, warnings) + if err != nil { + *errors = append(*errors, err) + continue + } default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ @@ -2075,10 +2183,18 @@ func parseAccounts(v interface{}, opts *Options, errors *[]error, warnings *[]er *errors = append(*errors, &configErr{tk, msg}) continue } - if err := stream.acc.AddStreamImport(ta, stream.sub, stream.pre); err != nil { - msg := fmt.Sprintf("Error adding stream import %q: %v", stream.sub, err) - *errors = append(*errors, &configErr{tk, msg}) - continue + if stream.pre != "" { + if err := stream.acc.AddStreamImport(ta, stream.sub, stream.pre); err != nil { + msg := fmt.Sprintf("Error adding stream import %q: %v", stream.sub, err) + *errors = append(*errors, &configErr{tk, msg}) + continue + } + } else { + if err := stream.acc.AddMappedStreamImport(ta, stream.sub, stream.to); err != nil { + msg := fmt.Sprintf("Error adding stream import %q: %v", stream.sub, err) + *errors = append(*errors, &configErr{tk, msg}) + continue + } } } for _, service := range importServices { @@ -2518,6 +2634,9 @@ func parseImportStreamOrService(v interface{}, errors, warnings *[]error) (*impo continue } curStream = &importStream{an: accountName, sub: subject} + if to != "" { + curStream.to = to + } if pre != "" { curStream.pre = pre } @@ -2561,6 +2680,14 @@ func parseImportStreamOrService(v interface{}, errors, warnings *[]error) (*impo if curService != nil { curService.to = to } + if curStream != nil { + curStream.to = to + if curStream.pre != "" { + err := &configErr{tk, "Stream import can not have a 'prefix' and a 'to' property"} + *errors = append(*errors, err) + continue + } + } case "share": share = mv.(bool) if curService != nil { diff --git a/server/parser.go b/server/parser.go index 26f8088c..d8b7f87e 100644 --- a/server/parser.go +++ b/server/parser.go @@ -389,6 +389,17 @@ func (c *client) parse(buf []byte) error { if err := c.processPub(arg); err != nil { return err } + // Check if we have and account mappings or tees or filters. + // FIXME(dlc) - Probably better way to do this. + // Could add in cache but will be tricky since results based on pub subject are dynamic + // due to wildcard matching and weight sets. + if c.kind == CLIENT && c.in.flags.isSet(hasMappings) { + old := c.pa.subject + changed := c.selectMappedSubject() + if trace && changed { + c.traceInOp("MAPPING", []byte(fmt.Sprintf("%s -> %s", old, c.pa.subject))) + } + } c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD // If we don't have a saved buffer then jump ahead with // the index. If this overruns what is left we fall out diff --git a/server/reload_test.go b/server/reload_test.go index 2422addc..0d46fe51 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -4488,3 +4488,65 @@ func TestConfigReloadDefaultSystemAccount(t *testing.T) { } testInAccounts() } + +func TestConfigReloadAccountMappings(t *testing.T) { + conf := createConfFile(t, []byte(` + listen: "127.0.0.1:-1" + accounts { + ACC { + users = [{user: usr, password: pwd}] + mappings = { foo: bar } + } + } + `)) + defer os.Remove(conf) + s, opts := RunServerWithConfig(conf) + defer s.Shutdown() + + reloadUpdateConfig(t, s, conf, ` + listen: "127.0.0.1:-1" + accounts { + ACC { + users = [{user: usr, password: pwd}] + mappings = { foo: baz } + } + } + `) + + nc := natsConnect(t, fmt.Sprintf("nats://usr:pwd@%s:%d", opts.Host, opts.Port)) + defer nc.Close() + + fsub, _ := nc.SubscribeSync("foo") + sub, _ := nc.SubscribeSync("baz") + nc.Publish("foo", nil) + nc.Flush() + + checkPending := func(sub *nats.Subscription, expected int) { + t.Helper() + if n, _, _ := sub.Pending(); n != expected { + t.Fatalf("Expected %d msgs for %q, but got %d", expected, sub.Subject, n) + } + } + checkPending(fsub, 0) + checkPending(sub, 1) + + // Drain it off + if _, err := sub.NextMsg(2 * time.Second); err != nil { + t.Fatalf("Error receiving msg: %v", err) + } + + reloadUpdateConfig(t, s, conf, ` + listen: "127.0.0.1:-1" + accounts { + ACC { + users = [{user: usr, password: pwd}] + } + } + `) + + nc.Publish("foo", nil) + nc.Flush() + + checkPending(fsub, 1) + checkPending(sub, 0) +} diff --git a/server/server.go b/server/server.go index 2628c9d3..39d4419e 100644 --- a/server/server.go +++ b/server/server.go @@ -586,7 +586,17 @@ func (s *Server) configureAccounts() error { // Check opts and walk through them. We need to copy them here // so that we do not keep a real one sitting in the options. for _, acc := range s.opts.Accounts { - a := acc.shallowCopy() + var a *Account + if acc.Name == globalAccountName { + a = s.gacc + } else { + a = acc.shallowCopy() + } + if acc.hasMappings() { + // For now just move and wipe from opts.Accounts version. + a.mappings = acc.mappings + acc.mappings = nil + } acc.sl = nil acc.clients = nil s.registerAccountNoLock(a) diff --git a/server/sublist.go b/server/sublist.go index 3dbef61b..7ae9a3b8 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -1010,6 +1010,32 @@ func IsValidSubject(subject string) bool { return true } +// Will share relevant info regarding the subject. +// Returns valid, tokens, num pwcs, has fwc. +func subjectInfo(subject string) (bool, []string, int, bool) { + if subject == "" { + return false, nil, 0, false + } + npwcs := 0 + sfwc := false + tokens := strings.Split(subject, tsep) + for _, t := range tokens { + if len(t) == 0 || sfwc { + return false, nil, 0, false + } + if len(t) > 1 { + continue + } + switch t[0] { + case fwc: + sfwc = true + case pwc: + npwcs++ + } + } + return true, tokens, npwcs, sfwc +} + // IsValidLiteralSubject returns true if a subject is valid and literal (no wildcards), false otherwise func IsValidLiteralSubject(subject string) bool { return isValidLiteralSubject(strings.Split(subject, tsep))