Merge pull request #1130 from nats-io/add-latency-config

Expose service latency info in config file
This commit is contained in:
Derek Collison
2019-09-18 13:22:11 -07:00
committed by GitHub
6 changed files with 369 additions and 9 deletions

View File

@@ -487,7 +487,7 @@ func (a *Account) AddServiceExportWithResponse(subject string, respType ServiceR
// TrackServiceExport will enable latency tracking of the named service.
// Results will be published in this account to the given results subject.
func (a *Account) TrackServiceExport(service, results string) error {
return a.TrackServiceExportWithSampling(service, results, 100)
return a.TrackServiceExportWithSampling(service, results, DEFAULT_SERVICE_LATENCY_SAMPLING)
}
// TrackServiceExportWithSampling will enable latency tracking of the named service for the given

View File

@@ -2228,7 +2228,7 @@ func (c *client) deliverMsg(sub *subscription, subject, mh, msg []byte) bool {
}
// Do a fast check here to see if we should be tracking this from a latency
// persepective. This will be for a request being received for an exported service.
// perspective. This will be for a request being received for an exported service.
// This needs to be from a non-client (otherwise tracking happens at requestor).
if client.kind == CLIENT && len(c.pa.reply) > minReplyLen {
// If we do not have a registered RTT queue that up now.

View File

@@ -1113,6 +1113,105 @@ func TestConfigCheck(t *testing.T) {
errorLine: 0,
errorPos: 0,
},
{
name: "when setting latency tracking without a system account",
config: `
accounts {
sys { users = [ {user: sys, pass: "" } ] }
nats.io: {
users = [ { user : bar, pass: "" } ]
exports = [
{ service: "nats.add"
response: singleton
latency: {
sampling: 100%
subject: "latency.tracking.add"
}
}
]
}
}
`,
err: errors.New(`Error adding service latency sampling for "nats.add": system account not setup`),
errorLine: 2,
errorPos: 17,
},
{
name: "when setting latency tracking with a system account",
config: `
system_account: sys
accounts {
sys { users = [ {user: sys, pass: "" } ] }
nats.io: {
users = [ { user : bar, pass: "" } ]
exports = [
{ service: "nats.add"
response: singleton
latency: {
sampling: 100%
subject: "latency.tracking.add"
}
}
]
}
}
`,
err: nil,
errorLine: 0,
errorPos: 0,
},
{
name: "when setting latency tracking with an invalid publish subject",
config: `
system_account = sys
accounts {
sys { users = [ {user: sys, pass: "" } ] }
nats.io: {
users = [ { user : bar, pass: "" } ]
exports = [
{ service: "nats.add"
response: singleton
latency: "*"
}
]
}
}
`,
err: errors.New(`Error adding service latency sampling for "nats.add" on subject "*": invalid publish subject`),
errorLine: 3,
errorPos: 17,
},
{
name: "when setting latency tracking on a stream",
config: `
system_account = sys
accounts {
sys { users = [ {user: sys, pass: "" } ] }
nats.io: {
users = [ { user : bar, pass: "" } ]
exports = [
{ stream: "nats.add"
latency: "foo"
}
]
}
}
`,
err: errors.New(`Detected latency directive on non-service`),
errorLine: 11,
errorPos: 25,
},
}
checkConfig := func(config string) error {

View File

@@ -175,4 +175,8 @@ const (
// DEFAULT_ALLOW_RESPONSE_EXPIRATION is the default time allowed for a given
// dynamic response permission.
DEFAULT_ALLOW_RESPONSE_EXPIRATION = 2 * time.Minute
// DEFAULT_SERVICE_LATENCY_SAMPLING is the default sampling rate for service
// latency metrics
DEFAULT_SERVICE_LATENCY_SAMPLING = 100
)

View File

@@ -371,6 +371,28 @@ func unwrapValue(v interface{}) (token, interface{}) {
}
}
// configureSystemAccount configures a system account
// if present in the configuration.
func configureSystemAccount(o *Options, m map[string]interface{}) error {
configure := func(v interface{}) error {
tk, v := unwrapValue(v)
sa, ok := v.(string)
if !ok {
return &configErr{tk, fmt.Sprintf("system account name must be a string")}
}
o.SystemAccount = sa
return nil
}
if v, ok := m["system_account"]; ok {
return configure(v)
} else if v, ok := m["system"]; ok {
return configure(v)
}
return nil
}
// ProcessConfigFile updates the Options structure with options
// present in the given configuration file.
// This version is convenient if one wants to set some default
@@ -398,6 +420,12 @@ func (o *Options) ProcessConfigFile(configFile string) error {
errors := make([]error, 0)
warnings := make([]error, 0)
// First check whether a system account has been defined,
// as that is a condition for other features to be enabled.
if err := configureSystemAccount(o, m); err != nil {
errors = append(errors, err)
}
for k, v := range m {
tk, v := unwrapValue(v)
switch strings.ToLower(k) {
@@ -677,12 +705,9 @@ func (o *Options) ProcessConfigFile(configFile string) error {
}
}
case "system_account", "system":
if sa, ok := v.(string); !ok {
err := &configErr{tk, fmt.Sprintf("system account name must be a string")}
errors = append(errors, err)
} else {
o.SystemAccount = sa
}
// Already processed at the beginning so we just skip them
// to not treat them as unknown values.
continue
case "trusted", "trusted_keys":
switch v := v.(type) {
case string:
@@ -1250,6 +1275,7 @@ type export struct {
sub string
accs []string
rt ServiceRespType
lat *serviceLatency
}
type importStream struct {
@@ -1447,6 +1473,20 @@ func parseAccounts(v interface{}, opts *Options, errors *[]error, warnings *[]er
*errors = append(*errors, &configErr{tk, msg})
continue
}
if service.lat != nil {
if opts.SystemAccount == "" {
msg := fmt.Sprintf("Error adding service latency sampling for %q: %v", service.sub, ErrNoSysAccount.Error())
*errors = append(*errors, &configErr{tk, msg})
continue
}
if err := service.acc.TrackServiceExportWithSampling(service.sub, service.lat.subject, int(service.lat.sampling)); err != nil {
msg := fmt.Sprintf("Error adding service latency sampling for %q on subject %q: %v", service.sub, service.lat.subject, err)
*errors = append(*errors, &configErr{tk, msg})
continue
}
}
}
for _, stream := range importStreams {
ta := am[stream.an]
@@ -1581,6 +1621,7 @@ func parseExportStreamOrService(v interface{}, errors, warnings *[]error) (*expo
accounts []string
rt ServiceRespType
rtSeen bool
lat *serviceLatency
)
tk, v := unwrapValue(v)
vv, ok := v.(map[string]interface{})
@@ -1657,6 +1698,9 @@ func parseExportStreamOrService(v interface{}, errors, warnings *[]error) (*expo
if rtSeen {
curService.rt = rt
}
if lat != nil {
curService.lat = lat
}
case "accounts":
for _, iv := range mv.([]interface{}) {
_, mv := unwrapValue(iv)
@@ -1667,6 +1711,20 @@ func parseExportStreamOrService(v interface{}, errors, warnings *[]error) (*expo
} else if curService != nil {
curService.accs = accounts
}
case "latency":
var err error
lat, err = parseServiceLatency(tk, mv)
if err != nil {
*errors = append(*errors, err)
continue
}
if curStream != nil {
err = &configErr{tk, "Detected latency directive on non-service"}
*errors = append(*errors, err)
}
if curService != nil {
curService.lat = lat
}
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
@@ -1678,11 +1736,77 @@ func parseExportStreamOrService(v interface{}, errors, warnings *[]error) (*expo
*errors = append(*errors, err)
}
}
}
return curStream, curService, nil
}
// parseServiceLatency returns a latency config block.
func parseServiceLatency(root token, v interface{}) (*serviceLatency, error) {
if subject, ok := v.(string); ok {
return &serviceLatency{
subject: subject,
sampling: DEFAULT_SERVICE_LATENCY_SAMPLING,
}, nil
}
latency, ok := v.(map[string]interface{})
if !ok {
return nil, &configErr{token: root,
reason: fmt.Sprintf("Expected latency entry to be a map/struct or string, got %T", v)}
}
sl := serviceLatency{
sampling: DEFAULT_SERVICE_LATENCY_SAMPLING,
}
// Read sampling value.
if v, ok := latency["sampling"]; ok {
tk, v := unwrapValue(v)
var sample int64
switch vv := v.(type) {
case int64:
// Sample is an int, like 50.
sample = vv
case string:
// Sample is a string, like "50%".
s := strings.TrimSuffix(vv, "%")
n, err := strconv.Atoi(s)
if err != nil {
return nil, &configErr{token: tk,
reason: fmt.Sprintf("Failed to parse latency sample: %v", err)}
}
sample = int64(n)
default:
return nil, &configErr{token: tk,
reason: fmt.Sprintf("Expected latency sample to be a string or map/struct, got %T", v)}
}
if sample < 1 || sample > 100 {
return nil, &configErr{token: tk,
reason: ErrBadSampling.Error()}
}
sl.sampling = int8(sample)
}
// Read subject value.
v, ok = latency["subject"]
if !ok {
return nil, &configErr{token: root,
reason: "Latency subject required, but missing"}
}
tk, v := unwrapValue(v)
subject, ok := v.(string)
if !ok {
return nil, &configErr{token: tk,
reason: fmt.Sprintf("Expected latency subject to be a string, got %T", subject)}
}
sl.subject = subject
return &sl, nil
}
// Parse an import stream or service.
// e.g.
// {stream: {account: "synadia", subject:"public.synadia"}, prefix: "imports.synadia"}

View File

@@ -1514,6 +1514,139 @@ func TestClusterPermissionsConfig(t *testing.T) {
}
}
func TestParseServiceLatency(t *testing.T) {
cases := []struct {
name string
conf string
want *serviceLatency
wantErr bool
}{
{
name: "block with percent sample default value",
conf: `system_account = nats.io
accounts {
nats.io {
exports [{
service: nats.add
latency: {
sampling: 100%
subject: latency.tracking.add
}
}]
}
}`,
want: &serviceLatency{
subject: "latency.tracking.add",
sampling: 100,
},
},
{
name: "block with percent sample nondefault value",
conf: `system_account = nats.io
accounts {
nats.io {
exports [{
service: nats.add
latency: {
sampling: 33%
subject: latency.tracking.add
}
}]
}
}`,
want: &serviceLatency{
subject: "latency.tracking.add",
sampling: 33,
},
},
{
name: "block with number sample nondefault value",
conf: `system_account = nats.io
accounts {
nats.io {
exports [{
service: nats.add
latency: {
sampling: 87
subject: latency.tracking.add
}
}]
}
}`,
want: &serviceLatency{
subject: "latency.tracking.add",
sampling: 87,
},
},
{
name: "field with subject",
conf: `system_account = nats.io
accounts {
nats.io {
exports [{
service: nats.add
latency: latency.tracking.add
}]
}
}`,
want: &serviceLatency{
subject: "latency.tracking.add",
sampling: 100,
},
},
{
name: "block with missing subject",
conf: `system_account = nats.io
accounts {
nats.io {
exports [{
service: nats.add
latency: {
sampling: 87
}
}]
}
}`,
wantErr: true,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
f := createConfFile(t, []byte(c.conf))
opts, err := ProcessConfigFile(f)
os.Remove(f)
switch {
case c.wantErr && err == nil:
t.Fatalf("Expected ProcessConfigFile to fail, but didn't")
case c.wantErr && err != nil:
// We wanted an error and got one, test passed.
return
case !c.wantErr && err == nil:
// We didn't want an error and didn't get one, keep going.
break
case !c.wantErr && err != nil:
t.Fatalf("Failed to process config: %v", err)
}
if len(opts.Accounts) != 1 {
t.Fatalf("Expected accounts to have len %d, got %d", 1, len(opts.Accounts))
}
if len(opts.Accounts[0].exports.services) != 1 {
t.Fatalf("Expected export services to have len %d, got %d", 1, len(opts.Accounts[0].exports.services))
}
s, ok := opts.Accounts[0].exports.services["nats.add"]
if !ok {
t.Fatalf("Expected export service nats.add, missing")
}
if !reflect.DeepEqual(s.latency, c.want) {
t.Fatalf("Expected latency to be %#v, got %#v", c.want, s.latency)
}
})
}
}
func TestAccountUsersLoadedProperly(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: "127.0.0.1:-1"