mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
New expendable implementation of subject mapping destinations to transform processing
This commit is contained in:
@@ -169,6 +169,18 @@ var commaSeparatorRegEx = regexp.MustCompile(`,\s*`)
|
||||
var partitionMappingFunctionRegEx = regexp.MustCompile(`{{\s*partition\s*\((.*)\)\s*}}`)
|
||||
var wildcardMappingFunctionRegEx = regexp.MustCompile(`{{\s*wildcard\s*\((.*)\)\s*}}`)
|
||||
|
||||
// Enum for the subject mapping transform function types
|
||||
const (
|
||||
NoTransform int16 = iota
|
||||
BadTransform
|
||||
Partition
|
||||
Wildcard
|
||||
SplitFromLeft
|
||||
SplitFromRight
|
||||
SliceFromLeft
|
||||
SliceFromRight
|
||||
)
|
||||
|
||||
// String helper.
|
||||
func (rt ServiceRespType) String() string {
|
||||
switch rt {
|
||||
@@ -845,7 +857,7 @@ func (a *Account) selectMappedSubject(dest string) (string, bool) {
|
||||
}
|
||||
|
||||
if d != nil {
|
||||
if len(d.tr.dtpi) == 0 {
|
||||
if len(d.tr.dtokmftis) == 0 {
|
||||
ndest = d.tr.dest
|
||||
} else if nsubj, err := d.tr.transform(tts); err == nil {
|
||||
ndest = nsubj
|
||||
@@ -4164,10 +4176,11 @@ func (dr *CacheDirAccResolver) Reload() error {
|
||||
// These will be grouped and caching and locking are assumed to be in the upper layers.
|
||||
type transform struct {
|
||||
src, dest string
|
||||
dtoks []string
|
||||
stoks []string
|
||||
dtpi [][]int // destination token position indexes
|
||||
dtpinp []int32 // destination token position index number of partitions
|
||||
dtoks []string // destination tokens
|
||||
stoks []string // source tokens
|
||||
dtokmfts []int16 // destination token mapping function types
|
||||
dtokmftis [][]int // destination token mapping function array of source token index arguments
|
||||
dtokmfias []int32 // destination token mapping function int32 arguments
|
||||
}
|
||||
|
||||
func getMappingFunctionArgs(functionRegEx *regexp.Regexp, token string) []string {
|
||||
@@ -4178,17 +4191,19 @@ func getMappingFunctionArgs(functionRegEx *regexp.Regexp, token string) []string
|
||||
return nil
|
||||
}
|
||||
|
||||
// Helper to pull raw place holder indexes and number of partitions. Returns -1 if not a place holder.
|
||||
func placeHolderIndex(token string) ([]int, int32, error) {
|
||||
// Helper to ingest and index the transform destination token (e.g. $x or {{}}) in the token
|
||||
// returns a transformation type, and two function arguments: an array of source subject token indexes, and a single number (e.g. number of partitions)
|
||||
|
||||
func indexPlaceHolders(token string) (int16, []int, int32, error) {
|
||||
length := len(token)
|
||||
if length > 1 {
|
||||
// old $1, $2, etc... mapping format still supported to maintain backwards compatibility
|
||||
if token[0] == '$' { // simple non-partition mapping
|
||||
tp, err := strconv.Atoi(token[1:])
|
||||
if err != nil {
|
||||
return []int{-1}, -1, nil
|
||||
return NoTransform, []int{-1}, -1, nil // other things rely on tokens starting with $ so not an error just leave it as is
|
||||
}
|
||||
return []int{tp}, -1, nil
|
||||
return Wildcard, []int{tp}, -1, nil
|
||||
}
|
||||
|
||||
// New 'mustache' style mapping
|
||||
@@ -4197,45 +4212,46 @@ func placeHolderIndex(token string) ([]int, int32, error) {
|
||||
args := getMappingFunctionArgs(wildcardMappingFunctionRegEx, token)
|
||||
if args != nil {
|
||||
if len(args) == 1 && args[0] == _EMPTY_ {
|
||||
return []int{-1}, -1, &mappingDestinationErr{token, ErrorMappingDestinationFunctionNotEnoughArguments}
|
||||
return BadTransform, []int{}, -1, &mappingDestinationErr{token, ErrorMappingDestinationFunctionNotEnoughArguments}
|
||||
}
|
||||
if len(args) == 1 {
|
||||
tp, err := strconv.Atoi(strings.Trim(args[0], " "))
|
||||
if err != nil {
|
||||
return []int{}, -1, &mappingDestinationErr{token, ErrorMappingDestinationFunctionInvalidArgument}
|
||||
return BadTransform, []int{}, -1, &mappingDestinationErr{token, ErrorMappingDestinationFunctionInvalidArgument}
|
||||
}
|
||||
return []int{tp}, -1, nil
|
||||
return Wildcard, []int{tp}, -1, nil
|
||||
} else {
|
||||
return []int{}, -1, &mappingDestinationErr{token, ErrorMappingDestinationFunctionTooManyArguments}
|
||||
return BadTransform, []int{}, -1, &mappingDestinationErr{token, ErrorMappingDestinationFunctionTooManyArguments}
|
||||
}
|
||||
}
|
||||
|
||||
// partition(number of partitions, token1, token2, ...)
|
||||
args = getMappingFunctionArgs(partitionMappingFunctionRegEx, token)
|
||||
if args != nil {
|
||||
if len(args) < 2 {
|
||||
return []int{-1}, -1, &mappingDestinationErr{token, ErrorMappingDestinationFunctionNotEnoughArguments}
|
||||
return BadTransform, []int{}, -1, &mappingDestinationErr{token, ErrorMappingDestinationFunctionNotEnoughArguments}
|
||||
}
|
||||
if len(args) >= 2 {
|
||||
tphnp, err := strconv.Atoi(strings.Trim(args[0], " "))
|
||||
if err != nil {
|
||||
return []int{}, -1, &mappingDestinationErr{token, ErrorMappingDestinationFunctionInvalidArgument}
|
||||
return BadTransform, []int{}, -1, &mappingDestinationErr{token, ErrorMappingDestinationFunctionInvalidArgument}
|
||||
}
|
||||
var numPositions = len(args[1:])
|
||||
tps := make([]int, numPositions)
|
||||
for ti, t := range args[1:] {
|
||||
i, err := strconv.Atoi(strings.Trim(t, " "))
|
||||
if err != nil {
|
||||
return []int{}, -1, &mappingDestinationErr{token, ErrorMappingDestinationFunctionInvalidArgument}
|
||||
return BadTransform, []int{}, -1, &mappingDestinationErr{token, ErrorMappingDestinationFunctionInvalidArgument}
|
||||
}
|
||||
tps[ti] = i
|
||||
}
|
||||
return tps, int32(tphnp), nil
|
||||
return Partition, tps, int32(tphnp), nil
|
||||
}
|
||||
}
|
||||
return []int{}, -1, &mappingDestinationErr{token, ErrUnknownMappingDestinationFunction}
|
||||
return BadTransform, []int{}, -1, &mappingDestinationErr{token, ErrUnknownMappingDestinationFunction}
|
||||
}
|
||||
}
|
||||
return []int{-1}, -1, nil
|
||||
return NoTransform, []int{-1}, -1, nil
|
||||
}
|
||||
|
||||
// SubjectTransformer transforms subjects using mappings
|
||||
@@ -4263,8 +4279,9 @@ func newTransform(src, dest string) (*transform, error) {
|
||||
return nil, ErrBadSubject
|
||||
}
|
||||
|
||||
var dtpi [][]int
|
||||
var dtpinb []int32
|
||||
var dtokMappingFunctionTokenIndexes [][]int
|
||||
var dtokMappingFunctionIntArgs []int32
|
||||
var dtokMappingFunctionTypes []int16
|
||||
|
||||
// If the src has partial wildcards then the dest needs to have the token place markers.
|
||||
if npwcs > 0 || hasFwc {
|
||||
@@ -4278,11 +4295,18 @@ func newTransform(src, dest string) (*transform, error) {
|
||||
|
||||
nphs := 0
|
||||
for _, token := range dtokens {
|
||||
tp, nb, err := placeHolderIndex(token)
|
||||
tt, tp, nb, err := indexPlaceHolders(token)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if tp[0] >= 0 {
|
||||
|
||||
if tt == NoTransform {
|
||||
{
|
||||
dtokMappingFunctionTypes = append(dtokMappingFunctionTypes, NoTransform)
|
||||
dtokMappingFunctionTokenIndexes = append(dtokMappingFunctionTokenIndexes, []int{-1})
|
||||
dtokMappingFunctionIntArgs = append(dtokMappingFunctionIntArgs, -1)
|
||||
}
|
||||
} else {
|
||||
nphs++
|
||||
// Now build up our runtime mapping from dest to source tokens.
|
||||
var stis []int
|
||||
@@ -4292,11 +4316,9 @@ func newTransform(src, dest string) (*transform, error) {
|
||||
}
|
||||
stis = append(stis, sti[position])
|
||||
}
|
||||
dtpi = append(dtpi, stis)
|
||||
dtpinb = append(dtpinb, nb)
|
||||
} else {
|
||||
dtpi = append(dtpi, []int{-1})
|
||||
dtpinb = append(dtpinb, -1)
|
||||
dtokMappingFunctionTypes = append(dtokMappingFunctionTypes, tt)
|
||||
dtokMappingFunctionTokenIndexes = append(dtokMappingFunctionTokenIndexes, stis)
|
||||
dtokMappingFunctionIntArgs = append(dtokMappingFunctionIntArgs, nb)
|
||||
}
|
||||
}
|
||||
if nphs < npwcs {
|
||||
@@ -4305,7 +4327,7 @@ func newTransform(src, dest string) (*transform, error) {
|
||||
}
|
||||
}
|
||||
|
||||
return &transform{src: src, dest: dest, dtoks: dtokens, stoks: stokens, dtpi: dtpi, dtpinp: dtpinb}, nil
|
||||
return &transform{src: src, dest: dest, dtoks: dtokens, stoks: stokens, dtokmfts: dtokMappingFunctionTypes, dtokmftis: dtokMappingFunctionTokenIndexes, dtokmfias: dtokMappingFunctionIntArgs}, nil
|
||||
}
|
||||
|
||||
// Match will take a literal published subject that is associated with a client and will match and transform
|
||||
@@ -4367,38 +4389,37 @@ func (tr *transform) getHashPartition(key []byte, numBuckets int) string {
|
||||
|
||||
// Do a transform on the subject to the dest subject.
|
||||
func (tr *transform) transform(tokens []string) (string, error) {
|
||||
if len(tr.dtpi) == 0 {
|
||||
if len(tr.dtokmfts) == 0 {
|
||||
return tr.dest, nil
|
||||
}
|
||||
|
||||
var b strings.Builder
|
||||
var token string
|
||||
|
||||
// We need to walk destination tokens and create the mapped subject pulling tokens from src.
|
||||
// We need to walk destination tokens and create the mapped subject pulling tokens or mapping functions
|
||||
// This is slow and that is ok, transforms should have caching layer in front for mapping transforms
|
||||
// and export/import semantics with streams and services.
|
||||
li := len(tr.dtpi) - 1
|
||||
for i, index := range tr.dtpi {
|
||||
// <0 means use destination token.
|
||||
if index[0] < 0 {
|
||||
li := len(tr.dtokmfts) - 1
|
||||
for i, mfType := range tr.dtokmfts {
|
||||
if mfType == NoTransform {
|
||||
token = tr.dtoks[i]
|
||||
// Break if fwc
|
||||
if len(token) == 1 && token[0] == fwc {
|
||||
break
|
||||
}
|
||||
} else {
|
||||
// >= 0 means use source map index to figure out which source token to pull.
|
||||
if tr.dtpinp[i] > 0 { // there is a valid (i.e. not -1) value for number of partitions, this is a partition transform token
|
||||
switch mfType {
|
||||
case Partition:
|
||||
var (
|
||||
_buffer [64]byte
|
||||
keyForHashing = _buffer[:0]
|
||||
)
|
||||
for _, sourceToken := range tr.dtpi[i] {
|
||||
for _, sourceToken := range tr.dtokmftis[i] {
|
||||
keyForHashing = append(keyForHashing, []byte(tokens[sourceToken])...)
|
||||
}
|
||||
token = tr.getHashPartition(keyForHashing, int(tr.dtpinp[i]))
|
||||
} else { // back to normal substitution
|
||||
token = tokens[tr.dtpi[i][0]]
|
||||
token = tr.getHashPartition(keyForHashing, int(tr.dtokmfias[i]))
|
||||
case Wildcard: // simple substitution
|
||||
token = tokens[tr.dtokmftis[i][0]]
|
||||
}
|
||||
}
|
||||
b.WriteString(token)
|
||||
@@ -4421,7 +4442,7 @@ func (tr *transform) transform(tokens []string) (string, error) {
|
||||
|
||||
// Reverse a transform.
|
||||
func (tr *transform) reverse() *transform {
|
||||
if len(tr.dtpi) == 0 {
|
||||
if len(tr.dtokmftis) == 0 {
|
||||
rtr, _ := newTransform(tr.dest, tr.src)
|
||||
return rtr
|
||||
}
|
||||
|
||||
@@ -50,47 +50,47 @@ func simpleAccountServer(t *testing.T) (*Server, *Account, *Account) {
|
||||
|
||||
func TestPlaceHolderIndex(t *testing.T) {
|
||||
testString := "$1"
|
||||
indexes, nbPartitions, err := placeHolderIndex(testString)
|
||||
transformType, indexes, nbPartitions, err := indexPlaceHolders(testString)
|
||||
|
||||
if err != nil || len(indexes) != 1 || indexes[0] != 1 || nbPartitions != -1 {
|
||||
if err != nil || transformType != Wildcard || len(indexes) != 1 || indexes[0] != 1 || nbPartitions != -1 {
|
||||
t.Fatalf("Error parsing %s", testString)
|
||||
}
|
||||
|
||||
testString = "{{partition(10,1,2,3)}}"
|
||||
|
||||
indexes, nbPartitions, err = placeHolderIndex(testString)
|
||||
transformType, indexes, nbPartitions, err = indexPlaceHolders(testString)
|
||||
|
||||
if err != nil || !reflect.DeepEqual(indexes, []int{1, 2, 3}) || nbPartitions != 10 {
|
||||
if err != nil || transformType != Partition || !reflect.DeepEqual(indexes, []int{1, 2, 3}) || nbPartitions != 10 {
|
||||
t.Fatalf("Error parsing %s", testString)
|
||||
}
|
||||
|
||||
testString = "{{ partition(10,1,2,3) }}"
|
||||
|
||||
indexes, nbPartitions, err = placeHolderIndex(testString)
|
||||
transformType, indexes, nbPartitions, err = indexPlaceHolders(testString)
|
||||
|
||||
if err != nil || !reflect.DeepEqual(indexes, []int{1, 2, 3}) || nbPartitions != 10 {
|
||||
if err != nil || transformType != Partition || !reflect.DeepEqual(indexes, []int{1, 2, 3}) || nbPartitions != 10 {
|
||||
t.Fatalf("Error parsing %s", testString)
|
||||
}
|
||||
|
||||
testString = "{{partition (10,1,2,3)}}"
|
||||
|
||||
indexes, nbPartitions, err = placeHolderIndex(testString)
|
||||
transformType, indexes, nbPartitions, err = indexPlaceHolders(testString)
|
||||
|
||||
if err != nil || !reflect.DeepEqual(indexes, []int{1, 2, 3}) || nbPartitions != 10 {
|
||||
if err != nil || transformType != Partition || !reflect.DeepEqual(indexes, []int{1, 2, 3}) || nbPartitions != 10 {
|
||||
t.Fatalf("Error parsing %s", testString)
|
||||
}
|
||||
|
||||
testString = "{{wildcard(2)}}"
|
||||
indexes, nbPartitions, err = placeHolderIndex(testString)
|
||||
transformType, indexes, nbPartitions, err = indexPlaceHolders(testString)
|
||||
|
||||
if err != nil || len(indexes) != 1 || indexes[0] != 2 || nbPartitions != -1 {
|
||||
if err != nil || transformType != Wildcard || len(indexes) != 1 || indexes[0] != 2 || nbPartitions != -1 {
|
||||
t.Fatalf("Error parsing %s", testString)
|
||||
}
|
||||
|
||||
testString = "{{ wildcard (2) }}"
|
||||
indexes, nbPartitions, err = placeHolderIndex(testString)
|
||||
transformType, indexes, nbPartitions, err = indexPlaceHolders(testString)
|
||||
|
||||
if err != nil || len(indexes) != 1 || indexes[0] != 2 || nbPartitions != -1 {
|
||||
if err != nil || transformType != Wildcard || len(indexes) != 1 || indexes[0] != 2 || nbPartitions != -1 {
|
||||
t.Fatalf("Error parsing %s", testString)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user