diff --git a/go.mod b/go.mod index 31dcb5c2..a877def0 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,7 @@ module github.com/nats-io/nats-server/v2 require ( - github.com/nats-io/jwt v0.2.14 + github.com/nats-io/jwt v0.2.16 github.com/nats-io/nats.go v1.8.1 github.com/nats-io/nkeys v0.1.0 github.com/nats-io/nuid v1.0.1 diff --git a/go.sum b/go.sum index dfab468e..293ce94c 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/nats-io/jwt v0.2.14 h1:wA50KvFz/JXGXMHRygTWsRGh/ixxgC5E3kHvmtGLNf4= -github.com/nats-io/jwt v0.2.14/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= +github.com/nats-io/jwt v0.2.16 h1:7lZ34jS9YAnW3gg/CITJuxjZnSI69kx/rgQAW4ro7G8= +github.com/nats-io/jwt v0.2.16/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= github.com/nats-io/nats.go v1.8.1 h1:6lF/f1/NN6kzUDBz6pyvQDEXO39jqXcWRLu/tKjtOUQ= github.com/nats-io/nats.go v1.8.1/go.mod h1:BrFz9vVn0fU3AcH9Vn4Kd7W0NpJ651tD5omQ3M8LwxM= github.com/nats-io/nkeys v0.0.2/go.mod h1:dab7URMsZm6Z/jp9Z5UGa87Uutgc2mVpXLC4B7TDb/4= diff --git a/server/accounts.go b/server/accounts.go index 8c6d1476..05a84cb3 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -508,7 +508,7 @@ func (a *Account) TrackServiceExportWithSampling(service, results string, sampli return ErrBadPublishSubject } - if a.srv == nil || !a.srv.eventsEnabled() { + if a.srv != nil && !a.srv.eventsEnabled() { return ErrNoSysAccount } @@ -1476,11 +1476,26 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) { a.mu.Lock() // Clone to update, only select certain fields. - old := &Account{Name: a.Name, imports: a.imports, exports: a.exports, limits: a.limits, signingKeys: a.signingKeys} + old := &Account{Name: a.Name, exports: a.exports, limits: a.limits, signingKeys: a.signingKeys} // Reset exports and imports here. a.exports = exportMap{} - a.imports = importMap{} + + // Imports are checked unlocked in processInbound, so we can't change out the struct here. Need to process inline. + if a.imports.streams != nil { + old.imports.streams = make(map[string]*streamImport, len(a.imports.streams)) + } + if a.imports.services != nil { + old.imports.services = make(map[string]*serviceImport, len(a.imports.services)) + } + for k, v := range a.imports.streams { + old.imports.streams[k] = v + delete(a.imports.streams, k) + } + for k, v := range a.imports.services { + old.imports.services[k] = v + delete(a.imports.services, k) + } // Reset any notion of export revocations. a.actsRevoked = nil @@ -1531,7 +1546,12 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) { rt = Chunked } if err := a.AddServiceExportWithResponse(string(e.Subject), rt, authAccounts(e.TokenReq)); err != nil { - s.Debugf("Error adding service export to account [%s]: %v", a.Name, err.Error()) + s.Debugf("Error adding service export to account [%s]: %v", a.Name, err) + } + if e.Latency != nil { + if err := a.TrackServiceExportWithSampling(string(e.Subject), string(e.Latency.Results), e.Latency.Sampling); err != nil { + s.Debugf("Error adding latency tracking for service export to account [%s]: %v", a.Name, err) + } } } // We will track these at the account level. Should not have any collisions. diff --git a/test/service_latency_test.go b/test/service_latency_test.go index 08f02182..8cfbafc1 100644 --- a/test/service_latency_test.go +++ b/test/service_latency_test.go @@ -17,13 +17,17 @@ import ( "encoding/json" "fmt" "math/rand" + "os" + "strings" "sync" "sync/atomic" "testing" "time" + "github.com/nats-io/jwt" "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" + "github.com/nats-io/nkeys" ) // Used to setup superclusters for tests. @@ -105,6 +109,8 @@ func clientConnect(t *testing.T, opts *server.Options, user string) *nats.Conn { func checkServiceLatency(t *testing.T, sl server.ServiceLatency, start time.Time, serviceTime time.Duration) { t.Helper() + serviceTime = serviceTime.Round(time.Millisecond) + startDelta := sl.RequestStart.Sub(start) if startDelta > 5*time.Millisecond { t.Fatalf("Bad start delta %v", startDelta) @@ -442,3 +448,142 @@ func TestServiceLatencyWithQueueSubscribersAndNames(t *testing.T) { } } } + +func createAccountWithJWT(t *testing.T) (string, nkeys.KeyPair, *jwt.AccountClaims) { + t.Helper() + okp, _ := nkeys.FromSeed(oSeed) + akp, _ := nkeys.CreateAccount() + pub, _ := akp.PublicKey() + nac := jwt.NewAccountClaims(pub) + jwt, _ := nac.Encode(okp) + return jwt, akp, nac +} + +func TestServiceLatencyWithJWT(t *testing.T) { + okp, _ := nkeys.FromSeed(oSeed) + + // Create three accounts, system, service and normal account. + sysJWT, sysKP, _ := createAccountWithJWT(t) + sysPub, _ := sysKP.PublicKey() + + _, svcKP, svcAcc := createAccountWithJWT(t) + svcPub, _ := svcKP.PublicKey() + + // Add in the service export with latency tracking here. + serviceExport := &jwt.Export{Subject: "req.echo", Type: jwt.Service} + serviceExport.Latency = &jwt.ServiceLatency{Sampling: 100, Results: "results"} + svcAcc.Exports.Add(serviceExport) + svcJWT, err := svcAcc.Encode(okp) + if err != nil { + t.Fatalf("Error generating account JWT: %v", err) + } + + _, accKP, accAcc := createAccountWithJWT(t) + accPub, _ := accKP.PublicKey() + + // Add in the import. + serviceImport := &jwt.Import{Account: svcPub, Subject: "request", To: "req.echo", Type: jwt.Service} + accAcc.Imports.Add(serviceImport) + accJWT, err := accAcc.Encode(okp) + if err != nil { + t.Fatalf("Error generating account JWT: %v", err) + } + + cf := ` + listen: 127.0.0.1:-1 + cluster { + listen: 127.0.0.1:-1 + authorization { + timeout: 2.2 + } %s + } + + operator = "./configs/nkeys/op.jwt" + system_account = "%s" + + resolver = MEMORY + resolver_preload = { + %s : "%s" + %s : "%s" + %s : "%s" + } + ` + contents := strings.Replace(fmt.Sprintf(cf, "", sysPub, sysPub, sysJWT, svcPub, svcJWT, accPub, accJWT), "\n\t", "\n", -1) + conf := createConfFile(t, []byte(contents)) + defer os.Remove(conf) + + s, opts := RunServerWithConfig(conf) + defer s.Shutdown() + + // Create a new server and route to main one. + routeStr := fmt.Sprintf("\n\t\troutes = [nats-route://%s:%d]", opts.Cluster.Host, opts.Cluster.Port) + contents2 := strings.Replace(fmt.Sprintf(cf, routeStr, sysPub, sysPub, sysJWT, svcPub, svcJWT, accPub, accJWT), "\n\t", "\n", -1) + + conf2 := createConfFile(t, []byte(contents2)) + defer os.Remove(conf2) + + s2, opts2 := RunServerWithConfig(conf2) + defer s2.Shutdown() + + checkClusterFormed(t, s, s2) + + // Create service provider. + url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) + nc, err := nats.Connect(url, createUserCreds(t, s, svcKP)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + // The service listener. + serviceTime := 25 * time.Millisecond + nc.Subscribe("req.echo", func(msg *nats.Msg) { + time.Sleep(serviceTime) + msg.Respond(msg.Data) + }) + + // Listen for metrics + rsub, _ := nc.SubscribeSync("results") + + // Create second client and send request from this one. + url2 := fmt.Sprintf("nats://%s:%d/", opts2.Host, opts2.Port) + nc2, err := nats.Connect(url2, createUserCreds(t, s2, accKP)) + if err != nil { + t.Fatalf("Error creating client: %v\n", err) + } + defer nc2.Close() + + // Send the request. + start := time.Now() + _, err = nc2.Request("request", []byte("hello"), time.Second) + if err != nil { + t.Fatalf("Expected a response") + } + + var sl server.ServiceLatency + rmsg, err := rsub.NextMsg(time.Second) + if err != nil || rmsg == nil { + t.Fatalf("Did not receive a latency metric") + } + json.Unmarshal(rmsg.Data, &sl) + checkServiceLatency(t, sl, start, serviceTime) + + // Now we will remove tracking. Do this by simulating a JWT update. + serviceExport.Latency = nil + + svcAccount, err := s.LookupAccount(svcPub) + if err != nil { + t.Fatalf("Could not lookup service account") + } + s.UpdateAccountClaims(svcAccount, svcAcc) + + // Now we should not get any tracking data. + _, err = nc2.Request("request", []byte("hello"), time.Second) + if err != nil { + t.Fatalf("Expected a response") + } + _, err = rsub.NextMsg(100 * time.Millisecond) + if err == nil { + t.Fatalf("Did not expect to receive a latency metric") + } +} diff --git a/vendor/github.com/nats-io/jwt/exports.go b/vendor/github.com/nats-io/jwt/exports.go index 720abf6e..58d7e52e 100644 --- a/vendor/github.com/nats-io/jwt/exports.go +++ b/vendor/github.com/nats-io/jwt/exports.go @@ -1,5 +1,5 @@ /* - * Copyright 2018 The NATS Authors + * Copyright 2018-2019 The NATS Authors * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -34,14 +34,40 @@ const ( ResponseTypeChunked = "Chunked" ) +// ServiceLatency is used when observing and exported service for +// latency measurements. +// Sampling 1-100, represents sampling rate, defaults to 100. +// Results is the subject where the latency metrics are published. +// A metric will be defined by the nats-server's ServiceLatency. Time durations +// are in nanoseconds. +// see https://github.com/nats-io/nats-server/blob/master/server/accounts.go#L524 +// e.g. +// { +// "app": "dlc22", +// "start": "2019-09-16T21:46:23.636869585-07:00", +// "svc": 219732, +// "nats": { +// "req": 320415, +// "resp": 228268, +// "sys": 0 +// }, +// "total": 768415 +// } +// +type ServiceLatency struct { + Sampling int `json:"sampling,omitempty"` + Results Subject `json:"results"` +} + // Export represents a single export type Export struct { - Name string `json:"name,omitempty"` - Subject Subject `json:"subject,omitempty"` - Type ExportType `json:"type,omitempty"` - TokenReq bool `json:"token_req,omitempty"` - Revocations RevocationList `json:"revocations,omitempty"` - ResponseType ResponseType `json:"response_type,omitempty"` + Name string `json:"name,omitempty"` + Subject Subject `json:"subject,omitempty"` + Type ExportType `json:"type,omitempty"` + TokenReq bool `json:"token_req,omitempty"` + Revocations RevocationList `json:"revocations,omitempty"` + ResponseType ResponseType `json:"response_type,omitempty"` + Latency *ServiceLatency `json:"service_latency,omitempty"` } // IsService returns true if an export is for a service @@ -81,6 +107,18 @@ func (e *Export) Validate(vr *ValidationResults) { if e.IsStream() && e.ResponseType != "" { vr.AddError("invalid response type for stream: %q", e.ResponseType) } + if e.Latency != nil { + if !e.IsService() { + vr.AddError("latency tracking only permitted for services") + } + if e.Latency.Sampling < 1 || e.Latency.Sampling > 100 { + vr.AddError("sampling percentage needs to be between 1-100") + } + if e.Latency.Results.HasWildCards() { + vr.AddError("results subject can not contain wildcards") + } + e.Latency.Results.Validate(vr) + } e.Subject.Validate(vr) } diff --git a/vendor/github.com/nats-io/jwt/header.go b/vendor/github.com/nats-io/jwt/header.go index 2625856b..73d1b055 100644 --- a/vendor/github.com/nats-io/jwt/header.go +++ b/vendor/github.com/nats-io/jwt/header.go @@ -23,7 +23,7 @@ import ( const ( // Version is semantic version. - Version = "0.0.5" + Version = "0.2.16" // TokenTypeJwt is the JWT token type supported JWT tokens // encoded and decoded by this library diff --git a/vendor/modules.txt b/vendor/modules.txt index 09c62e6d..8d2e79cc 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,4 +1,4 @@ -# github.com/nats-io/jwt v0.2.14 +# github.com/nats-io/jwt v0.2.16 github.com/nats-io/jwt # github.com/nats-io/nats.go v1.8.1 github.com/nats-io/nats.go