mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Initial commit
This commit is contained in:
@@ -18,6 +18,7 @@ import (
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"hash/maphash"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
@@ -702,6 +703,9 @@ func transformUntokenize(subject string) (string, []string) {
|
||||
if len(token) > 1 && token[0] == '$' && token[1] >= '1' && token[1] <= '9' {
|
||||
phs = append(phs, token)
|
||||
nda = append(nda, "*")
|
||||
} else if len(token) > 1 && token[0] == '#' && token[1] >= '1' && token[1] <= '9' {
|
||||
phs = append(phs, token)
|
||||
nda = append(nda, "*")
|
||||
} else {
|
||||
nda = append(nda, token)
|
||||
}
|
||||
@@ -4115,17 +4119,26 @@ type transform struct {
|
||||
dtoks []string
|
||||
stoks []string
|
||||
dtpi []int8
|
||||
dth []int
|
||||
}
|
||||
|
||||
// Helper to pull raw place holder index. Returns -1 if not a place holder.
|
||||
func placeHolderIndex(token string) int {
|
||||
if len(token) > 1 && token[0] == '$' {
|
||||
func placeHolderIndex(token string) (int, int) {
|
||||
if len(token) > 1 {
|
||||
var tp int
|
||||
if n, err := fmt.Sscanf(token, "$%d", &tp); err == nil && n == 1 {
|
||||
return tp
|
||||
var tph int
|
||||
if token[0] == '$' {
|
||||
if n, err := fmt.Sscanf(token, "$%d", &tp); err == nil && n == 1 {
|
||||
return tp, -1
|
||||
}
|
||||
}
|
||||
if token[0] == '#' {
|
||||
if n, err := fmt.Sscanf(token, "#%d:%d", &tp, &tph); err == nil && n == 2 {
|
||||
return tp, tph
|
||||
}
|
||||
}
|
||||
}
|
||||
return -1
|
||||
return -1, -1
|
||||
}
|
||||
|
||||
// newTransform will create a new transform checking the src and dest subjects for accuracy.
|
||||
@@ -4140,6 +4153,7 @@ func newTransform(src, dest string) (*transform, error) {
|
||||
}
|
||||
|
||||
var dtpi []int8
|
||||
var dth []int
|
||||
|
||||
// If the src has partial wildcards then the dest needs to have the token place markers.
|
||||
if npwcs > 0 || hasFwc {
|
||||
@@ -4153,7 +4167,7 @@ func newTransform(src, dest string) (*transform, error) {
|
||||
|
||||
nphs := 0
|
||||
for _, token := range dtokens {
|
||||
tp := placeHolderIndex(token)
|
||||
tp, numBuckets := placeHolderIndex(token)
|
||||
if tp >= 0 {
|
||||
if tp > npwcs {
|
||||
return nil, ErrBadSubject
|
||||
@@ -4161,8 +4175,10 @@ func newTransform(src, dest string) (*transform, error) {
|
||||
nphs++
|
||||
// Now build up our runtime mapping from dest to source tokens.
|
||||
dtpi = append(dtpi, int8(sti[tp]))
|
||||
dth = append(dth, numBuckets)
|
||||
} else {
|
||||
dtpi = append(dtpi, -1)
|
||||
dth = append(dth, -1)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4171,7 +4187,7 @@ func newTransform(src, dest string) (*transform, error) {
|
||||
}
|
||||
}
|
||||
|
||||
return &transform{src: src, dest: dest, dtoks: dtokens, stoks: stokens, dtpi: dtpi}, nil
|
||||
return &transform{src: src, dest: dest, dtoks: dtokens, stoks: stokens, dtpi: dtpi, dth: dth}, nil
|
||||
}
|
||||
|
||||
// match will take a literal published subject that is associated with a client and will match and transform
|
||||
@@ -4215,6 +4231,28 @@ func (tr *transform) transformSubject(subject string) (string, error) {
|
||||
return tr.transform(tts)
|
||||
}
|
||||
|
||||
func (tr *transform) getHashBucket(key string, numBuckets int) string {
|
||||
// using highwayhash since it's already being used, didn't want to introduce a new dependency but any 'meant for distribution' (rather than security) hash (i.e. you only care about distribution, avalanche, you don't care about collisions)
|
||||
//hashKey, _ := hex.DecodeString("000102030405060708090A0B0C0D0E0FF0E0D0C0B0A090807060504030201000") // hard coded default fine since only for distribution
|
||||
//
|
||||
//hh, _ :=highwayhash.New(hashKey)
|
||||
//hh.Write([]byte(key))
|
||||
//hash := hh.Sum(nil)
|
||||
//
|
||||
//// not really consistent hashing but a simple and fast way to get a number out of the hash and then use modulo over the number of buckets is equivalent for our purpopses
|
||||
//var hash2 = 0
|
||||
//for _,h := range hash {
|
||||
// hash2 += int(h)
|
||||
//}
|
||||
|
||||
h := fnv.New32a()
|
||||
h.Write([]byte(key))
|
||||
|
||||
return fmt.Sprintf("%d", h.Sum32()%uint32(numBuckets))
|
||||
|
||||
//return fmt.Sprintf("%d",hash2 % numBuckets)
|
||||
}
|
||||
|
||||
// Do a transform on the subject to the dest subject.
|
||||
func (tr *transform) transform(tokens []string) (string, error) {
|
||||
if len(tr.dtpi) == 0 {
|
||||
@@ -4238,7 +4276,15 @@ func (tr *transform) transform(tokens []string) (string, error) {
|
||||
}
|
||||
} else {
|
||||
// >= 0 means use source map index to figure out which source token to pull.
|
||||
token = tokens[index]
|
||||
if tr.dth[i] > 0 {
|
||||
// repeats the original token first followed by the bucket number
|
||||
// comment those two lines below to just substitute the original token with the bucket number
|
||||
b.WriteString(tokens[index])
|
||||
b.WriteByte(btsep)
|
||||
token = tr.getHashBucket(tokens[index], tr.dth[i])
|
||||
} else {
|
||||
token = tokens[index]
|
||||
}
|
||||
}
|
||||
b.WriteString(token)
|
||||
if i < li {
|
||||
|
||||
Reference in New Issue
Block a user