mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-15 10:40:41 -07:00
Merge pull request #1793 from nats-io/jwt-resp-threshold
[Added] support for jwt export response threshold
This commit is contained in:
@@ -2903,9 +2903,11 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
|
||||
}
|
||||
if err := a.AddServiceExportWithResponse(string(e.Subject), rt, authAccounts(e.TokenReq)); err != nil {
|
||||
s.Debugf("Error adding service export to account [%s]: %v", a.Name, err)
|
||||
continue
|
||||
}
|
||||
sub := string(e.Subject)
|
||||
if e.Latency != nil {
|
||||
if err := a.TrackServiceExportWithSampling(string(e.Subject), string(e.Latency.Results), int(e.Latency.Sampling)); err != nil {
|
||||
if err := a.TrackServiceExportWithSampling(sub, string(e.Latency.Results), int(e.Latency.Sampling)); err != nil {
|
||||
hdrNote := ""
|
||||
if e.Latency.Sampling == jwt.Headers {
|
||||
hdrNote = " (using headers)"
|
||||
@@ -2913,6 +2915,12 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
|
||||
s.Debugf("Error adding latency tracking%s for service export to account [%s]: %v", hdrNote, a.Name, err)
|
||||
}
|
||||
}
|
||||
if e.ResponseThreshold != 0 {
|
||||
// Response threshold was set in options.
|
||||
if err := a.SetServiceExportResponseThreshold(sub, e.ResponseThreshold); err != nil {
|
||||
s.Debugf("Error adding service export response threshold for [%s]: %v", a.Name, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
// We will track these at the account level. Should not have any collisions.
|
||||
if e.Revocations != nil {
|
||||
@@ -2945,7 +2953,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
|
||||
to := i.GetTo()
|
||||
switch i.Type {
|
||||
case jwt.Stream:
|
||||
if i.LocalSubject != "" {
|
||||
if i.LocalSubject != _EMPTY_ {
|
||||
// set local subject implies to is empty
|
||||
to = string(i.LocalSubject)
|
||||
s.Debugf("Adding stream import %s:%q for %s:%q", acc.Name, from, a.Name, to)
|
||||
@@ -2959,7 +2967,6 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
|
||||
incompleteImports = append(incompleteImports, i)
|
||||
}
|
||||
case jwt.Service:
|
||||
// FIXME(dlc) - need to add in respThresh here eventually.
|
||||
if i.LocalSubject != _EMPTY_ {
|
||||
from = string(i.LocalSubject)
|
||||
to = string(i.Subject)
|
||||
|
||||
@@ -4818,3 +4818,75 @@ func TestJWTAccountImportsWithWildcardSupport(t *testing.T) {
|
||||
"my.request.1.2.bar", "my.events.2.1.bar")
|
||||
})
|
||||
}
|
||||
|
||||
func TestJWTResponseThreshold(t *testing.T) {
|
||||
respThresh := 20 * time.Millisecond
|
||||
aExpKp, aExpPub := createKey(t)
|
||||
aExpClaim := jwt.NewAccountClaims(aExpPub)
|
||||
aExpClaim.Name = "Export"
|
||||
aExpClaim.Exports.Add(&jwt.Export{
|
||||
Subject: "srvc",
|
||||
Type: jwt.Service,
|
||||
ResponseThreshold: respThresh,
|
||||
})
|
||||
aExpJwt := encodeClaim(t, aExpClaim, aExpPub)
|
||||
aExpCreds := newUser(t, aExpKp)
|
||||
|
||||
defer os.Remove(aExpCreds)
|
||||
aImpKp, aImpPub := createKey(t)
|
||||
aImpClaim := jwt.NewAccountClaims(aImpPub)
|
||||
aImpClaim.Name = "Import"
|
||||
aImpClaim.Imports.Add(&jwt.Import{
|
||||
Subject: "srvc",
|
||||
Type: jwt.Service,
|
||||
Account: aExpPub,
|
||||
})
|
||||
aImpJwt := encodeClaim(t, aImpClaim, aImpPub)
|
||||
aImpCreds := newUser(t, aImpKp)
|
||||
defer os.Remove(aImpCreds)
|
||||
|
||||
cf := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
port: -1
|
||||
operator = %s
|
||||
resolver = MEMORY
|
||||
resolver_preload = {
|
||||
%s : "%s"
|
||||
%s : "%s"
|
||||
}
|
||||
`, ojwt, aExpPub, aExpJwt, aImpPub, aImpJwt)))
|
||||
defer os.Remove(cf)
|
||||
|
||||
s, opts := RunServerWithConfig(cf)
|
||||
defer s.Shutdown()
|
||||
|
||||
ncExp := natsConnect(t, fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.UserCredentials(aExpCreds))
|
||||
defer ncExp.Close()
|
||||
|
||||
ncImp := natsConnect(t, fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.UserCredentials(aImpCreds))
|
||||
defer ncImp.Close()
|
||||
|
||||
delayChan := make(chan time.Duration, 1)
|
||||
|
||||
// Create subscriber for the service endpoint in foo.
|
||||
_, err := ncExp.Subscribe("srvc", func(m *nats.Msg) {
|
||||
time.Sleep(<-delayChan)
|
||||
m.Respond([]byte("yes!"))
|
||||
})
|
||||
require_NoError(t, err)
|
||||
ncExp.Flush()
|
||||
|
||||
t.Run("No-Timeout", func(t *testing.T) {
|
||||
delayChan <- respThresh / 2
|
||||
if resp, err := ncImp.Request("srvc", []byte("yes?"), 4*respThresh); err != nil {
|
||||
t.Fatalf("Expected a response to request srvc got: %v", err)
|
||||
} else if string(resp.Data) != "yes!" {
|
||||
t.Fatalf("Expected a response of %q, got %q", "yes!", resp.Data)
|
||||
}
|
||||
})
|
||||
t.Run("Timeout", func(t *testing.T) {
|
||||
delayChan <- 2 * respThresh
|
||||
if _, err := ncImp.Request("srvc", []byte("yes?"), 4*respThresh); err == nil || err != nats.ErrTimeout {
|
||||
t.Fatalf("Expected a timeout")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user