diff --git a/server/accounts.go b/server/accounts.go index 38b20270..f6e15797 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -18,6 +18,7 @@ import ( "encoding/hex" "errors" "fmt" + "hash/fnv" "hash/maphash" "io/ioutil" "math" @@ -702,6 +703,9 @@ func transformUntokenize(subject string) (string, []string) { if len(token) > 1 && token[0] == '$' && token[1] >= '1' && token[1] <= '9' { phs = append(phs, token) nda = append(nda, "*") + } else if len(token) > 1 && token[0] == '#' && token[1] >= '1' && token[1] <= '9' { + phs = append(phs, token) + nda = append(nda, "*") } else { nda = append(nda, token) } @@ -4115,17 +4119,26 @@ type transform struct { dtoks []string stoks []string dtpi []int8 + dth []int } // Helper to pull raw place holder index. Returns -1 if not a place holder. -func placeHolderIndex(token string) int { - if len(token) > 1 && token[0] == '$' { +func placeHolderIndex(token string) (int, int) { + if len(token) > 1 { var tp int - if n, err := fmt.Sscanf(token, "$%d", &tp); err == nil && n == 1 { - return tp + var tph int + if token[0] == '$' { + if n, err := fmt.Sscanf(token, "$%d", &tp); err == nil && n == 1 { + return tp, -1 + } + } + if token[0] == '#' { + if n, err := fmt.Sscanf(token, "#%d:%d", &tp, &tph); err == nil && n == 2 { + return tp, tph + } } } - return -1 + return -1, -1 } // newTransform will create a new transform checking the src and dest subjects for accuracy. @@ -4140,6 +4153,7 @@ func newTransform(src, dest string) (*transform, error) { } var dtpi []int8 + var dth []int // If the src has partial wildcards then the dest needs to have the token place markers. if npwcs > 0 || hasFwc { @@ -4153,7 +4167,7 @@ func newTransform(src, dest string) (*transform, error) { nphs := 0 for _, token := range dtokens { - tp := placeHolderIndex(token) + tp, numBuckets := placeHolderIndex(token) if tp >= 0 { if tp > npwcs { return nil, ErrBadSubject @@ -4161,8 +4175,10 @@ func newTransform(src, dest string) (*transform, error) { nphs++ // Now build up our runtime mapping from dest to source tokens. dtpi = append(dtpi, int8(sti[tp])) + dth = append(dth, numBuckets) } else { dtpi = append(dtpi, -1) + dth = append(dth, -1) } } @@ -4171,7 +4187,7 @@ func newTransform(src, dest string) (*transform, error) { } } - return &transform{src: src, dest: dest, dtoks: dtokens, stoks: stokens, dtpi: dtpi}, nil + return &transform{src: src, dest: dest, dtoks: dtokens, stoks: stokens, dtpi: dtpi, dth: dth}, nil } // match will take a literal published subject that is associated with a client and will match and transform @@ -4215,6 +4231,28 @@ func (tr *transform) transformSubject(subject string) (string, error) { return tr.transform(tts) } +func (tr *transform) getHashBucket(key string, numBuckets int) string { + // using highwayhash since it's already being used, didn't want to introduce a new dependency but any 'meant for distribution' (rather than security) hash (i.e. you only care about distribution, avalanche, you don't care about collisions) + //hashKey, _ := hex.DecodeString("000102030405060708090A0B0C0D0E0FF0E0D0C0B0A090807060504030201000") // hard coded default fine since only for distribution + // + //hh, _ :=highwayhash.New(hashKey) + //hh.Write([]byte(key)) + //hash := hh.Sum(nil) + // + //// not really consistent hashing but a simple and fast way to get a number out of the hash and then use modulo over the number of buckets is equivalent for our purpopses + //var hash2 = 0 + //for _,h := range hash { + // hash2 += int(h) + //} + + h := fnv.New32a() + h.Write([]byte(key)) + + return fmt.Sprintf("%d", h.Sum32()%uint32(numBuckets)) + + //return fmt.Sprintf("%d",hash2 % numBuckets) +} + // Do a transform on the subject to the dest subject. func (tr *transform) transform(tokens []string) (string, error) { if len(tr.dtpi) == 0 { @@ -4238,7 +4276,15 @@ func (tr *transform) transform(tokens []string) (string, error) { } } else { // >= 0 means use source map index to figure out which source token to pull. - token = tokens[index] + if tr.dth[i] > 0 { + // repeats the original token first followed by the bucket number + // comment those two lines below to just substitute the original token with the bucket number + b.WriteString(tokens[index]) + b.WriteByte(btsep) + token = tr.getHashBucket(tokens[index], tr.dth[i]) + } else { + token = tokens[index] + } } b.WriteString(token) if i < li {