From c269a1ca095b099b66aed8582d4a9ff4fb234ce5 Mon Sep 17 00:00:00 2001 From: jnmoyne Date: Tue, 26 Jul 2022 17:23:22 -0700 Subject: [PATCH] New expendable implementation of subject mapping destinations to transform processing --- server/accounts.go | 105 ++++++++++++++++++++++++---------------- server/accounts_test.go | 24 ++++----- 2 files changed, 75 insertions(+), 54 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index e9a814f8..2249f7cc 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -169,6 +169,18 @@ var commaSeparatorRegEx = regexp.MustCompile(`,\s*`) var partitionMappingFunctionRegEx = regexp.MustCompile(`{{\s*partition\s*\((.*)\)\s*}}`) var wildcardMappingFunctionRegEx = regexp.MustCompile(`{{\s*wildcard\s*\((.*)\)\s*}}`) +// Enum for the subject mapping transform function types +const ( + NoTransform int16 = iota + BadTransform + Partition + Wildcard + SplitFromLeft + SplitFromRight + SliceFromLeft + SliceFromRight +) + // String helper. func (rt ServiceRespType) String() string { switch rt { @@ -845,7 +857,7 @@ func (a *Account) selectMappedSubject(dest string) (string, bool) { } if d != nil { - if len(d.tr.dtpi) == 0 { + if len(d.tr.dtokmftis) == 0 { ndest = d.tr.dest } else if nsubj, err := d.tr.transform(tts); err == nil { ndest = nsubj @@ -4164,10 +4176,11 @@ func (dr *CacheDirAccResolver) Reload() error { // These will be grouped and caching and locking are assumed to be in the upper layers. type transform struct { src, dest string - dtoks []string - stoks []string - dtpi [][]int // destination token position indexes - dtpinp []int32 // destination token position index number of partitions + dtoks []string // destination tokens + stoks []string // source tokens + dtokmfts []int16 // destination token mapping function types + dtokmftis [][]int // destination token mapping function array of source token index arguments + dtokmfias []int32 // destination token mapping function int32 arguments } func getMappingFunctionArgs(functionRegEx *regexp.Regexp, token string) []string { @@ -4178,17 +4191,19 @@ func getMappingFunctionArgs(functionRegEx *regexp.Regexp, token string) []string return nil } -// Helper to pull raw place holder indexes and number of partitions. Returns -1 if not a place holder. -func placeHolderIndex(token string) ([]int, int32, error) { +// Helper to ingest and index the transform destination token (e.g. $x or {{}}) in the token +// returns a transformation type, and two function arguments: an array of source subject token indexes, and a single number (e.g. number of partitions) + +func indexPlaceHolders(token string) (int16, []int, int32, error) { length := len(token) if length > 1 { // old $1, $2, etc... mapping format still supported to maintain backwards compatibility if token[0] == '$' { // simple non-partition mapping tp, err := strconv.Atoi(token[1:]) if err != nil { - return []int{-1}, -1, nil + return NoTransform, []int{-1}, -1, nil // other things rely on tokens starting with $ so not an error just leave it as is } - return []int{tp}, -1, nil + return Wildcard, []int{tp}, -1, nil } // New 'mustache' style mapping @@ -4197,45 +4212,46 @@ func placeHolderIndex(token string) ([]int, int32, error) { args := getMappingFunctionArgs(wildcardMappingFunctionRegEx, token) if args != nil { if len(args) == 1 && args[0] == _EMPTY_ { - return []int{-1}, -1, &mappingDestinationErr{token, ErrorMappingDestinationFunctionNotEnoughArguments} + return BadTransform, []int{}, -1, &mappingDestinationErr{token, ErrorMappingDestinationFunctionNotEnoughArguments} } if len(args) == 1 { tp, err := strconv.Atoi(strings.Trim(args[0], " ")) if err != nil { - return []int{}, -1, &mappingDestinationErr{token, ErrorMappingDestinationFunctionInvalidArgument} + return BadTransform, []int{}, -1, &mappingDestinationErr{token, ErrorMappingDestinationFunctionInvalidArgument} } - return []int{tp}, -1, nil + return Wildcard, []int{tp}, -1, nil } else { - return []int{}, -1, &mappingDestinationErr{token, ErrorMappingDestinationFunctionTooManyArguments} + return BadTransform, []int{}, -1, &mappingDestinationErr{token, ErrorMappingDestinationFunctionTooManyArguments} } } + // partition(number of partitions, token1, token2, ...) args = getMappingFunctionArgs(partitionMappingFunctionRegEx, token) if args != nil { if len(args) < 2 { - return []int{-1}, -1, &mappingDestinationErr{token, ErrorMappingDestinationFunctionNotEnoughArguments} + return BadTransform, []int{}, -1, &mappingDestinationErr{token, ErrorMappingDestinationFunctionNotEnoughArguments} } if len(args) >= 2 { tphnp, err := strconv.Atoi(strings.Trim(args[0], " ")) if err != nil { - return []int{}, -1, &mappingDestinationErr{token, ErrorMappingDestinationFunctionInvalidArgument} + return BadTransform, []int{}, -1, &mappingDestinationErr{token, ErrorMappingDestinationFunctionInvalidArgument} } var numPositions = len(args[1:]) tps := make([]int, numPositions) for ti, t := range args[1:] { i, err := strconv.Atoi(strings.Trim(t, " ")) if err != nil { - return []int{}, -1, &mappingDestinationErr{token, ErrorMappingDestinationFunctionInvalidArgument} + return BadTransform, []int{}, -1, &mappingDestinationErr{token, ErrorMappingDestinationFunctionInvalidArgument} } tps[ti] = i } - return tps, int32(tphnp), nil + return Partition, tps, int32(tphnp), nil } } - return []int{}, -1, &mappingDestinationErr{token, ErrUnknownMappingDestinationFunction} + return BadTransform, []int{}, -1, &mappingDestinationErr{token, ErrUnknownMappingDestinationFunction} } } - return []int{-1}, -1, nil + return NoTransform, []int{-1}, -1, nil } // SubjectTransformer transforms subjects using mappings @@ -4263,8 +4279,9 @@ func newTransform(src, dest string) (*transform, error) { return nil, ErrBadSubject } - var dtpi [][]int - var dtpinb []int32 + var dtokMappingFunctionTokenIndexes [][]int + var dtokMappingFunctionIntArgs []int32 + var dtokMappingFunctionTypes []int16 // If the src has partial wildcards then the dest needs to have the token place markers. if npwcs > 0 || hasFwc { @@ -4278,11 +4295,18 @@ func newTransform(src, dest string) (*transform, error) { nphs := 0 for _, token := range dtokens { - tp, nb, err := placeHolderIndex(token) + tt, tp, nb, err := indexPlaceHolders(token) if err != nil { return nil, err } - if tp[0] >= 0 { + + if tt == NoTransform { + { + dtokMappingFunctionTypes = append(dtokMappingFunctionTypes, NoTransform) + dtokMappingFunctionTokenIndexes = append(dtokMappingFunctionTokenIndexes, []int{-1}) + dtokMappingFunctionIntArgs = append(dtokMappingFunctionIntArgs, -1) + } + } else { nphs++ // Now build up our runtime mapping from dest to source tokens. var stis []int @@ -4292,11 +4316,9 @@ func newTransform(src, dest string) (*transform, error) { } stis = append(stis, sti[position]) } - dtpi = append(dtpi, stis) - dtpinb = append(dtpinb, nb) - } else { - dtpi = append(dtpi, []int{-1}) - dtpinb = append(dtpinb, -1) + dtokMappingFunctionTypes = append(dtokMappingFunctionTypes, tt) + dtokMappingFunctionTokenIndexes = append(dtokMappingFunctionTokenIndexes, stis) + dtokMappingFunctionIntArgs = append(dtokMappingFunctionIntArgs, nb) } } if nphs < npwcs { @@ -4305,7 +4327,7 @@ func newTransform(src, dest string) (*transform, error) { } } - return &transform{src: src, dest: dest, dtoks: dtokens, stoks: stokens, dtpi: dtpi, dtpinp: dtpinb}, nil + return &transform{src: src, dest: dest, dtoks: dtokens, stoks: stokens, dtokmfts: dtokMappingFunctionTypes, dtokmftis: dtokMappingFunctionTokenIndexes, dtokmfias: dtokMappingFunctionIntArgs}, nil } // Match will take a literal published subject that is associated with a client and will match and transform @@ -4367,38 +4389,37 @@ func (tr *transform) getHashPartition(key []byte, numBuckets int) string { // Do a transform on the subject to the dest subject. func (tr *transform) transform(tokens []string) (string, error) { - if len(tr.dtpi) == 0 { + if len(tr.dtokmfts) == 0 { return tr.dest, nil } var b strings.Builder var token string - // We need to walk destination tokens and create the mapped subject pulling tokens from src. + // We need to walk destination tokens and create the mapped subject pulling tokens or mapping functions // This is slow and that is ok, transforms should have caching layer in front for mapping transforms // and export/import semantics with streams and services. - li := len(tr.dtpi) - 1 - for i, index := range tr.dtpi { - // <0 means use destination token. - if index[0] < 0 { + li := len(tr.dtokmfts) - 1 + for i, mfType := range tr.dtokmfts { + if mfType == NoTransform { token = tr.dtoks[i] // Break if fwc if len(token) == 1 && token[0] == fwc { break } } else { - // >= 0 means use source map index to figure out which source token to pull. - if tr.dtpinp[i] > 0 { // there is a valid (i.e. not -1) value for number of partitions, this is a partition transform token + switch mfType { + case Partition: var ( _buffer [64]byte keyForHashing = _buffer[:0] ) - for _, sourceToken := range tr.dtpi[i] { + for _, sourceToken := range tr.dtokmftis[i] { keyForHashing = append(keyForHashing, []byte(tokens[sourceToken])...) } - token = tr.getHashPartition(keyForHashing, int(tr.dtpinp[i])) - } else { // back to normal substitution - token = tokens[tr.dtpi[i][0]] + token = tr.getHashPartition(keyForHashing, int(tr.dtokmfias[i])) + case Wildcard: // simple substitution + token = tokens[tr.dtokmftis[i][0]] } } b.WriteString(token) @@ -4421,7 +4442,7 @@ func (tr *transform) transform(tokens []string) (string, error) { // Reverse a transform. func (tr *transform) reverse() *transform { - if len(tr.dtpi) == 0 { + if len(tr.dtokmftis) == 0 { rtr, _ := newTransform(tr.dest, tr.src) return rtr } diff --git a/server/accounts_test.go b/server/accounts_test.go index 435b418b..c9eec193 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -50,47 +50,47 @@ func simpleAccountServer(t *testing.T) (*Server, *Account, *Account) { func TestPlaceHolderIndex(t *testing.T) { testString := "$1" - indexes, nbPartitions, err := placeHolderIndex(testString) + transformType, indexes, nbPartitions, err := indexPlaceHolders(testString) - if err != nil || len(indexes) != 1 || indexes[0] != 1 || nbPartitions != -1 { + if err != nil || transformType != Wildcard || len(indexes) != 1 || indexes[0] != 1 || nbPartitions != -1 { t.Fatalf("Error parsing %s", testString) } testString = "{{partition(10,1,2,3)}}" - indexes, nbPartitions, err = placeHolderIndex(testString) + transformType, indexes, nbPartitions, err = indexPlaceHolders(testString) - if err != nil || !reflect.DeepEqual(indexes, []int{1, 2, 3}) || nbPartitions != 10 { + if err != nil || transformType != Partition || !reflect.DeepEqual(indexes, []int{1, 2, 3}) || nbPartitions != 10 { t.Fatalf("Error parsing %s", testString) } testString = "{{ partition(10,1,2,3) }}" - indexes, nbPartitions, err = placeHolderIndex(testString) + transformType, indexes, nbPartitions, err = indexPlaceHolders(testString) - if err != nil || !reflect.DeepEqual(indexes, []int{1, 2, 3}) || nbPartitions != 10 { + if err != nil || transformType != Partition || !reflect.DeepEqual(indexes, []int{1, 2, 3}) || nbPartitions != 10 { t.Fatalf("Error parsing %s", testString) } testString = "{{partition (10,1,2,3)}}" - indexes, nbPartitions, err = placeHolderIndex(testString) + transformType, indexes, nbPartitions, err = indexPlaceHolders(testString) - if err != nil || !reflect.DeepEqual(indexes, []int{1, 2, 3}) || nbPartitions != 10 { + if err != nil || transformType != Partition || !reflect.DeepEqual(indexes, []int{1, 2, 3}) || nbPartitions != 10 { t.Fatalf("Error parsing %s", testString) } testString = "{{wildcard(2)}}" - indexes, nbPartitions, err = placeHolderIndex(testString) + transformType, indexes, nbPartitions, err = indexPlaceHolders(testString) - if err != nil || len(indexes) != 1 || indexes[0] != 2 || nbPartitions != -1 { + if err != nil || transformType != Wildcard || len(indexes) != 1 || indexes[0] != 2 || nbPartitions != -1 { t.Fatalf("Error parsing %s", testString) } testString = "{{ wildcard (2) }}" - indexes, nbPartitions, err = placeHolderIndex(testString) + transformType, indexes, nbPartitions, err = indexPlaceHolders(testString) - if err != nil || len(indexes) != 1 || indexes[0] != 2 || nbPartitions != -1 { + if err != nil || transformType != Wildcard || len(indexes) != 1 || indexes[0] != 2 || nbPartitions != -1 { t.Fatalf("Error parsing %s", testString) } }