Optimizations, cleanups and new mapping destination error

This commit is contained in:
jnmoyne
2022-03-04 12:35:31 -08:00
parent ceb47f0298
commit 2138c96cdd
4 changed files with 55 additions and 44 deletions

View File

@@ -4124,60 +4124,62 @@ type transform struct {
dtpinp []int32 // destination token position index number of partitions
}
func getMappingFunctionArgs(functionRegEx *regexp.Regexp, token string) ([]string, error) {
func getMappingFunctionArgs(functionRegEx *regexp.Regexp, token string) []string {
commandStrings := functionRegEx.FindStringSubmatch(token)
if len(commandStrings) > 1 {
args := commaSeparatorRegEx.Split(commandStrings[1], -1)
return args, nil
return commaSeparatorRegEx.Split(commandStrings[1], -1)
}
return nil, nil
return nil
}
// Helper to pull raw place holder indexes and number of partitions. Returns -1 if not a place holder.
func placeHolderIndex(token string) ([]int, int32) {
func placeHolderIndex(token string) ([]int, int32, error) {
if len(token) > 1 {
// old $1, $2, etc... mapping format still supported to maintain backwards compatibility
if token[0] == '$' { // simple non-partition mapping
tp, err := strconv.Atoi(token[1:])
if err == nil {
return []int{tp}, -1
if err != nil {
return []int{-1}, -1, nil
}
return []int{tp}, -1, nil
}
// New 'moustache' style mapping
// wildcard(wildcard token index) (equivalent to $)
args, err := getMappingFunctionArgs(wildcardMappingFunctionRegEx, token)
if err == nil && args != nil {
args := getMappingFunctionArgs(wildcardMappingFunctionRegEx, token)
if args != nil {
if len(args) == 1 {
tp, err := strconv.Atoi(strings.Trim(args[0], " "))
if err == nil {
return []int{tp}, -1
if err != nil {
return []int{}, -1, err
}
return []int{tp}, -1, nil
}
}
// partition(number of partitions, token1, token2, ...)
args, err = getMappingFunctionArgs(partitionMappingFunctionRegEx, token)
if err == nil && args != nil {
args = getMappingFunctionArgs(partitionMappingFunctionRegEx, token)
if args != nil {
if len(args) >= 2 {
tphnp, err := strconv.Atoi(strings.Trim(args[0], " "))
if err == nil {
var numPositions = len(args[1:])
tps := make([]int, numPositions)
for ti, t := range args[1:] {
i, err := strconv.Atoi(strings.Trim(t, " "))
if err == nil {
tps[ti] = i
} else {
return []int{-1}, -1
}
}
return tps, int32(tphnp)
if err != nil {
return []int{}, -1, err
}
var numPositions = len(args[1:])
tps := make([]int, numPositions)
for ti, t := range args[1:] {
i, err := strconv.Atoi(strings.Trim(t, " "))
if err == nil {
tps[ti] = i
} else {
return []int{-1}, -1, err
}
}
return tps, int32(tphnp), nil
}
}
}
return []int{-1}, -1
return []int{-1}, -1, nil
}
// newTransform will create a new transform checking the src and dest subjects for accuracy.
@@ -4206,14 +4208,17 @@ func newTransform(src, dest string) (*transform, error) {
nphs := 0
for _, token := range dtokens {
tp, nb := placeHolderIndex(token)
tp, nb, err := placeHolderIndex(token)
if err != nil {
return nil, ErrBadSubjectMappingDestination
}
if tp[0] >= 0 {
nphs++
// Now build up our runtime mapping from dest to source tokens.
var stis []int
for _, position := range tp {
if position > npwcs {
return nil, ErrBadSubject
return nil, ErrBadSubjectMappingDestination
}
stis = append(stis, sti[position])
}
@@ -4225,7 +4230,7 @@ func newTransform(src, dest string) (*transform, error) {
}
}
if nphs < npwcs {
return nil, ErrBadSubject
return nil, ErrBadSubjectMappingDestination
}
}
@@ -4273,11 +4278,11 @@ func (tr *transform) transformSubject(subject string) (string, error) {
return tr.transform(tts)
}
func (tr *transform) getHashPartition(key string, numBuckets int) string {
func (tr *transform) getHashPartition(key []byte, numBuckets int) string {
h := fnv.New32a()
h.Write([]byte(key))
h.Write(key)
return fmt.Sprintf("%d", h.Sum32()%uint32(numBuckets))
return strconv.Itoa(int(h.Sum32() % uint32(numBuckets)))
}
// Do a transform on the subject to the dest subject.
@@ -4304,9 +4309,12 @@ func (tr *transform) transform(tokens []string) (string, error) {
} else {
// >= 0 means use source map index to figure out which source token to pull.
if tr.dtpinp[i] > 0 { // there is a valid (i.e. not -1) value for number of partitions, this is a partition transform token
var keyForHashing string
var (
_buffer [64]byte
keyForHashing = _buffer[:0]
)
for _, sourceToken := range tr.dtpi[i] {
keyForHashing = keyForHashing + tokens[sourceToken]
keyForHashing = append(keyForHashing, []byte(tokens[sourceToken])...)
}
token = tr.getHashPartition(keyForHashing, int(tr.dtpinp[i]))
} else { // back to normal substitution

View File

@@ -49,24 +49,24 @@ func simpleAccountServer(t *testing.T) (*Server, *Account, *Account) {
func TestPlaceHolderIndex(t *testing.T) {
testString := "$1"
indexes, nbPartitions := placeHolderIndex(testString)
indexes, nbPartitions, err := placeHolderIndex(testString)
if len(indexes) != 1 || indexes[0] != 1 || nbPartitions != -1 {
if err != nil || len(indexes) != 1 || indexes[0] != 1 || nbPartitions != -1 {
t.Fatalf("Error parsing %s", testString)
}
testString = "{{partition(10,1,2,3)}}"
indexes, nbPartitions = placeHolderIndex(testString)
indexes, nbPartitions, err = placeHolderIndex(testString)
if !reflect.DeepEqual(indexes, []int{1, 2, 3}) || nbPartitions != 10 {
if err != nil || !reflect.DeepEqual(indexes, []int{1, 2, 3}) || nbPartitions != 10 {
t.Fatalf("Error parsing %s", testString)
}
testString = "{{wildcard(2)}}"
indexes, nbPartitions = placeHolderIndex(testString)
indexes, nbPartitions, err = placeHolderIndex(testString)
if len(indexes) != 1 || indexes[0] != 2 || nbPartitions != -1 {
if err != nil || len(indexes) != 1 || indexes[0] != 2 || nbPartitions != -1 {
t.Fatalf("Error parsing %s", testString)
}
}
@@ -3156,7 +3156,7 @@ func TestSamplingHeader(t *testing.T) {
func TestSubjectTransforms(t *testing.T) {
shouldErr := func(src, dest string) {
t.Helper()
if _, err := newTransform(src, dest); err != ErrBadSubject {
if _, err := newTransform(src, dest); err != ErrBadSubject && err != ErrBadSubjectMappingDestination {
t.Fatalf("Did not get an error for src=%q and dest=%q", src, dest)
}
}

View File

@@ -46,6 +46,9 @@ var (
// ErrBadSubject represents an error condition for an invalid subject.
ErrBadSubject = errors.New("invalid subject")
// ErrBadSubjectMappingDestination is used to error on a bad transform destination mapping
ErrBadSubjectMappingDestination = errors.New("invalid subject mapping destination")
// ErrBadQualifier is used to error on a bad qualifier for a transform.
ErrBadQualifier = errors.New("bad qualifier")

View File

@@ -2322,7 +2322,7 @@ func parseAccountMappings(v interface{}, acc *Account, errors *[]error, warnings
switch vv := v.(type) {
case string:
if err := acc.AddMapping(subj, v.(string)); err != nil {
err := &configErr{tk, fmt.Sprintf("Error adding mapping for %q: %v", subj, err)}
err := &configErr{tk, fmt.Sprintf("Error adding mapping for %q to %q : %v", subj, v.(string), err)}
*errors = append(*errors, err)
continue
}
@@ -2339,7 +2339,7 @@ func parseAccountMappings(v interface{}, acc *Account, errors *[]error, warnings
// Now add them in..
if err := acc.AddWeightedMappings(subj, mappings...); err != nil {
err := &configErr{tk, fmt.Sprintf("Error adding mapping for %q: %v", subj, err)}
err := &configErr{tk, fmt.Sprintf("Error adding mapping for %q to %q : %v", subj, v.(string), err)}
*errors = append(*errors, err)
continue
}
@@ -2351,7 +2351,7 @@ func parseAccountMappings(v interface{}, acc *Account, errors *[]error, warnings
}
// Now add it in..
if err := acc.AddWeightedMappings(subj, mdest); err != nil {
err := &configErr{tk, fmt.Sprintf("Error adding mapping for %q: %v", subj, err)}
err := &configErr{tk, fmt.Sprintf("Error adding mapping for %q to %q : %v", subj, v.(string), err)}
*errors = append(*errors, err)
continue
}