Merge pull request #1776 from nats-io/jwt-headers

[Added] ability to use jwt latency sampling properties headers/share
This commit is contained in:
Ivan Kozlovic
2020-12-16 12:44:04 -07:00
committed by GitHub
15 changed files with 352 additions and 84 deletions

2
go.mod
View File

@@ -2,7 +2,7 @@ module github.com/nats-io/nats-server/v2
require (
github.com/minio/highwayhash v1.0.0
github.com/nats-io/jwt/v2 v2.0.0-20201015190852-e11ce317263c
github.com/nats-io/jwt/v2 v2.0.0-20201211164018-2e78446f4e6f
github.com/nats-io/nats.go v1.10.1-0.20201021145452-94be476ad6e0
github.com/nats-io/nkeys v0.2.0
github.com/nats-io/nuid v1.0.1

6
go.sum
View File

@@ -14,9 +14,11 @@ github.com/minio/highwayhash v1.0.0/go.mod h1:xQboMTeM9nY9v/LlAOxFctujiv5+Aq2hR5
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7 h1:RnGotxlghqR5D2KDAu4TyuLqyjuylOsJiAFhXvMvQIc=
github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M=
github.com/nats-io/jwt v1.1.0 h1:+vOlgtM0ZsF46GbmUoadq0/2rChNS45gtxHEa3H1gqM=
github.com/nats-io/jwt v1.1.0/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M=
github.com/nats-io/jwt/v2 v2.0.0-20200916203241-1f8ce17dff02/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ=
github.com/nats-io/jwt/v2 v2.0.0-20201015190852-e11ce317263c h1:Hc1D9ChlsCMVwCxJ6QT5xqfk2zJ4XNea+LtdfaYhd20=
github.com/nats-io/jwt/v2 v2.0.0-20201015190852-e11ce317263c/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ=
github.com/nats-io/jwt/v2 v2.0.0-20201211164018-2e78446f4e6f h1:+F8Vkc9pDf2x6O+csstCXaubPu2njJkbXeTm5wfDzGI=
github.com/nats-io/jwt/v2 v2.0.0-20201211164018-2e78446f4e6f/go.mod h1:PuO5FToRL31ecdFqVjc794vK0Bj0CwzveQEDvkb7MoQ=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093/go.mod h1:rQnBf2Rv4P9adtAs/Ti6LfFmVtFG6HLhl/H7cVshcJU=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71/go.mod h1:Nan/1L5Sa1JRW+Thm4HNYcIDcVRFc5zK9OpSZeI2kk4=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad/go.mod h1:TkHpUIDETmTI7mrHN40D1pzxfzHZuGmtMbtb83TGVQw=

View File

@@ -1736,7 +1736,11 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im
}
}
}
si := &serviceImport{dest, claim, se, nil, from, to, tr, 0, rt, lat, nil, nil, usePub, false, false, false, false, false, isSysAcc, nil}
share := false
if claim != nil {
share = claim.Share
}
si := &serviceImport{dest, claim, se, nil, from, to, tr, 0, rt, lat, nil, nil, usePub, false, false, share, false, false, isSysAcc, nil}
a.imports.services[from] = si
a.mu.Unlock()
@@ -2540,7 +2544,7 @@ func (a *Account) checkActivation(importAcc *Account, claim *jwt.Import, expTime
clone.Token = fetchActivation(url.String())
}
vr := jwt.CreateValidationResults()
clone.Validate(a.Name, vr)
clone.Validate(importAcc.Name, vr)
if vr.IsBlocking(true) {
return false
}
@@ -2897,8 +2901,12 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
s.Debugf("Error adding service export to account [%s]: %v", a.Name, err)
}
if e.Latency != nil {
if err := a.TrackServiceExportWithSampling(string(e.Subject), string(e.Latency.Results), e.Latency.Sampling); err != nil {
s.Debugf("Error adding latency tracking for service export to account [%s]: %v", a.Name, err)
if err := a.TrackServiceExportWithSampling(string(e.Subject), string(e.Latency.Results), int(e.Latency.Sampling)); err != nil {
hdrNote := ""
if e.Latency.Sampling == jwt.Headers {
hdrNote = " (using headers)"
}
s.Debugf("Error adding latency tracking%s for service export to account [%s]: %v", hdrNote, a.Name, err)
}
}
}

View File

@@ -735,6 +735,13 @@ func TestJWTAccountBasicImportExport(t *testing.T) {
if err != nil {
t.Fatalf("Error generating account JWT: %v", err)
}
vr := jwt.ValidationResults{}
barAC.Validate(&vr)
if vr.IsBlocking(true) {
t.Fatalf("Error generating account JWT: %v", vr)
}
addAccountToMemResolver(s, barPub, barJWT)
s.UpdateAccountClaims(acc, barAC)
// Our service import should have succeeded.
@@ -2642,17 +2649,10 @@ func TestJWTAccountImportWrongIssuerAccount(t *testing.T) {
client, clientReader, clientCS := createClient(t, s, clientKP)
defer client.close()
client.parseAsync(clientCS)
expectPong(t, clientReader)
for i := 0; i < 2; i++ {
select {
case e := <-l.errCh:
if !strings.HasPrefix(e, fmt.Sprintf("Invalid issuer account %q in activation claim", clientPK)) {
t.Fatalf("Unexpected error: %v", e)
}
case <-time.After(2 * time.Second):
t.Fatalf("Did not get error regarding issuer account")
}
if l, _, err := clientReader.ReadLine(); err != nil {
t.Fatalf("Expected no Error, got: %v", err)
} else if !strings.Contains(string(l), "-ERR 'Authorization Violation'") {
t.Fatalf("Expected Error, got: %v", l)
}
}
@@ -3282,7 +3282,7 @@ func TestExpiredUserCredentialsRenewal(t *testing.T) {
}
}
func updateJwt(t *testing.T, url string, creds string, pubKey string, jwt string, respCnt int) int {
func updateJwt(t *testing.T, url string, creds string, jwt string, respCnt int) int {
t.Helper()
require_NextMsg := func(sub *nats.Subscription) bool {
t.Helper()
@@ -3566,7 +3566,7 @@ func TestAccountNATSResolverFetch(t *testing.T) {
connect(sC.ClientURL(), sysCreds, "")
checkClusterFormed(t, sA, sB, sC)
// upload system account and require a response from each server
passCnt := updateJwt(t, sA.ClientURL(), sysCreds, syspub, sysjwt, 3)
passCnt := updateJwt(t, sA.ClientURL(), sysCreds, sysjwt, 3)
require_True(t, passCnt == 3)
require_JWTPresent(t, dirA, syspub) // was just received
require_JWTPresent(t, dirB, syspub) // was just received
@@ -3584,7 +3584,7 @@ func TestAccountNATSResolverFetch(t *testing.T) {
require_1Connection(sA.ClientURL(), v.creds, v.pub, sA, sB, sC)
require_1Connection(sB.ClientURL(), v.creds, v.pub, sA, sB, sC)
require_1Connection(sC.ClientURL(), v.creds, v.pub, sA, sB, sC)
passCnt := updateJwt(t, port, sysCreds, v.pub, v.jwt, 3)
passCnt := updateJwt(t, port, sysCreds, v.jwt, 3)
require_True(t, passCnt == 3)
require_2Connection(sA.ClientURL(), v.creds, v.pub, sA, sB, sC)
require_2Connection(sB.ClientURL(), v.creds, v.pub, sA, sB, sC)
@@ -3631,7 +3631,7 @@ func TestAccountNATSResolverFetch(t *testing.T) {
require_2Connection(sC.ClientURL(), aCreds, apub, sA, sB, sC)
// Test exceeding limit. For the exclusive directory resolver, limit is a stop gap measure.
// It is not expected to be hit. When hit the administrator is supposed to take action.
passCnt = updateJwt(t, sA.ClientURL(), sysCreds, dpub, djwt1, 3)
passCnt = updateJwt(t, sA.ClientURL(), sysCreds, djwt1, 3)
require_True(t, passCnt == 1) // Only Server C updated
for _, srv := range []*Server{sA, sB, sC} {
if a, ok := srv.accounts.Load(syspub); ok {
@@ -3823,41 +3823,41 @@ func TestAccountNATSResolverCrossClusterFetch(t *testing.T) {
waitForOutboundGateways(t, sAB, 1, 5*time.Second)
waitForOutboundGateways(t, sBA, 1, 5*time.Second)
waitForOutboundGateways(t, sBB, 1, 5*time.Second)
time.Sleep(500 * time.Millisecond) // wait for the protocol to converge
updateJwt(t, sAA.ClientURL(), sysCreds, syspub, sysjwt, 4) // update system account jwt on all server
require_JWTEqual(t, dirAA, syspub, sysjwt) // assure this update made it to every server
require_JWTEqual(t, dirAB, syspub, sysjwt) // assure this update made it to every server
require_JWTEqual(t, dirBA, syspub, sysjwt) // assure this update made it to every server
require_JWTEqual(t, dirBB, syspub, sysjwt) // assure this update made it to every server
require_JWTAbsent(t, dirAA, bpub) // assure that jwt are not synced across cluster
require_JWTAbsent(t, dirAB, bpub) // assure that jwt are not synced across cluster
require_JWTAbsent(t, dirBA, apub) // assure that jwt are not synced across cluster
require_JWTAbsent(t, dirBB, apub) // assure that jwt are not synced across cluster
connect(sAA.ClientURL(), aCreds) // connect to cluster where jwt was initially stored
connect(sAB.ClientURL(), aCreds) // connect to cluster where jwt was initially stored
connect(sBA.ClientURL(), bCreds) // connect to cluster where jwt was initially stored
connect(sBB.ClientURL(), bCreds) // connect to cluster where jwt was initially stored
time.Sleep(500 * time.Millisecond) // wait for the protocol to (NOT) converge
require_JWTAbsent(t, dirAA, bpub) // assure that jwt are still not synced across cluster
require_JWTAbsent(t, dirAB, bpub) // assure that jwt are still not synced across cluster
require_JWTAbsent(t, dirBA, apub) // assure that jwt are still not synced across cluster
require_JWTAbsent(t, dirBB, apub) // assure that jwt are still not synced across cluster
time.Sleep(500 * time.Millisecond) // wait for the protocol to converge
updateJwt(t, sAA.ClientURL(), sysCreds, sysjwt, 4) // update system account jwt on all server
require_JWTEqual(t, dirAA, syspub, sysjwt) // assure this update made it to every server
require_JWTEqual(t, dirAB, syspub, sysjwt) // assure this update made it to every server
require_JWTEqual(t, dirBA, syspub, sysjwt) // assure this update made it to every server
require_JWTEqual(t, dirBB, syspub, sysjwt) // assure this update made it to every server
require_JWTAbsent(t, dirAA, bpub) // assure that jwt are not synced across cluster
require_JWTAbsent(t, dirAB, bpub) // assure that jwt are not synced across cluster
require_JWTAbsent(t, dirBA, apub) // assure that jwt are not synced across cluster
require_JWTAbsent(t, dirBB, apub) // assure that jwt are not synced across cluster
connect(sAA.ClientURL(), aCreds) // connect to cluster where jwt was initially stored
connect(sAB.ClientURL(), aCreds) // connect to cluster where jwt was initially stored
connect(sBA.ClientURL(), bCreds) // connect to cluster where jwt was initially stored
connect(sBB.ClientURL(), bCreds) // connect to cluster where jwt was initially stored
time.Sleep(500 * time.Millisecond) // wait for the protocol to (NOT) converge
require_JWTAbsent(t, dirAA, bpub) // assure that jwt are still not synced across cluster
require_JWTAbsent(t, dirAB, bpub) // assure that jwt are still not synced across cluster
require_JWTAbsent(t, dirBA, apub) // assure that jwt are still not synced across cluster
require_JWTAbsent(t, dirBB, apub) // assure that jwt are still not synced across cluster
// We have verified that account B does not exist in cluster A, neither does account A in cluster B
// Despite that clients from account B can connect to server A, same for account A in cluster B
connect(sAA.ClientURL(), bCreds) // connect to cluster where jwt was not initially stored
connect(sAB.ClientURL(), bCreds) // connect to cluster where jwt was not initially stored
connect(sBA.ClientURL(), aCreds) // connect to cluster where jwt was not initially stored
connect(sBB.ClientURL(), aCreds) // connect to cluster where jwt was not initially stored
require_JWTEqual(t, dirAA, bpub, bjwt1) // assure that now jwt used in connect is stored
require_JWTEqual(t, dirAB, bpub, bjwt1) // assure that now jwt used in connect is stored
require_JWTEqual(t, dirBA, apub, ajwt1) // assure that now jwt used in connect is stored
require_JWTEqual(t, dirBB, apub, ajwt1) // assure that now jwt used in connect is stored
updateJwt(t, sAA.ClientURL(), sysCreds, bpub, bjwt2, 4) // update bjwt, expect updates from everywhere
updateJwt(t, sBA.ClientURL(), sysCreds, apub, ajwt2, 4) // update ajwt, expect updates from everywhere
require_JWTEqual(t, dirAA, bpub, bjwt2) // assure that jwt got updated accordingly
require_JWTEqual(t, dirAB, bpub, bjwt2) // assure that jwt got updated accordingly
require_JWTEqual(t, dirBA, apub, ajwt2) // assure that jwt got updated accordingly
require_JWTEqual(t, dirBB, apub, ajwt2) // assure that jwt got updated accordingly
connect(sAA.ClientURL(), bCreds) // connect to cluster where jwt was not initially stored
connect(sAB.ClientURL(), bCreds) // connect to cluster where jwt was not initially stored
connect(sBA.ClientURL(), aCreds) // connect to cluster where jwt was not initially stored
connect(sBB.ClientURL(), aCreds) // connect to cluster where jwt was not initially stored
require_JWTEqual(t, dirAA, bpub, bjwt1) // assure that now jwt used in connect is stored
require_JWTEqual(t, dirAB, bpub, bjwt1) // assure that now jwt used in connect is stored
require_JWTEqual(t, dirBA, apub, ajwt1) // assure that now jwt used in connect is stored
require_JWTEqual(t, dirBB, apub, ajwt1) // assure that now jwt used in connect is stored
updateJwt(t, sAA.ClientURL(), sysCreds, bjwt2, 4) // update bjwt, expect updates from everywhere
updateJwt(t, sBA.ClientURL(), sysCreds, ajwt2, 4) // update ajwt, expect updates from everywhere
require_JWTEqual(t, dirAA, bpub, bjwt2) // assure that jwt got updated accordingly
require_JWTEqual(t, dirAB, bpub, bjwt2) // assure that jwt got updated accordingly
require_JWTEqual(t, dirBA, apub, ajwt2) // assure that jwt got updated accordingly
require_JWTEqual(t, dirBB, apub, ajwt2) // assure that jwt got updated accordingly
}
func newTimeRange(start time.Time, dur time.Duration) jwt.TimeRange {
@@ -4428,8 +4428,8 @@ func TestJWTUserRevocation(t *testing.T) {
defer os.Remove(conf)
srv, _ := RunServerWithConfig(conf)
defer srv.Shutdown()
updateJwt(t, srv.ClientURL(), sysCreds, syspub, sysjwt, 1) // update system account jwt
updateJwt(t, srv.ClientURL(), sysCreds, apub, ajwt1, 1) // set account jwt without revocation
updateJwt(t, srv.ClientURL(), sysCreds, sysjwt, 1) // update system account jwt
updateJwt(t, srv.ClientURL(), sysCreds, ajwt1, 1) // set account jwt without revocation
// use credentials that will be revoked ans assure that the connection will be disconnected
nc := natsConnect(t, srv.ClientURL(), nats.UserCredentials(aCreds1),
nats.DisconnectErrHandler(func(conn *nats.Conn, err error) {
@@ -4439,7 +4439,7 @@ func TestJWTUserRevocation(t *testing.T) {
}))
defer nc.Close()
// update account jwt to contain revocation
if passCnt := updateJwt(t, srv.ClientURL(), sysCreds, apub, ajwt2, 1); passCnt != 1 {
if passCnt := updateJwt(t, srv.ClientURL(), sysCreds, ajwt2, 1); passCnt != 1 {
t.Fatalf("Expected jwt update to pass")
}
// assure that nc got disconnected due to the revocation
@@ -4515,9 +4515,9 @@ func TestJWTAccountOps(t *testing.T) {
defer os.Remove(conf)
srv, _ := RunServerWithConfig(conf)
defer srv.Shutdown()
updateJwt(t, srv.ClientURL(), sysCreds, syspub, sysjwt, 1) // update system account jwt
updateJwt(t, srv.ClientURL(), sysCreds, sysjwt, 1) // update system account jwt
// push jwt (for full resolver)
updateJwt(t, srv.ClientURL(), sysCreds, apub, ajwt1, 1) // set jwt
updateJwt(t, srv.ClientURL(), sysCreds, ajwt1, 1) // set jwt
nc := natsConnect(t, srv.ClientURL(), nats.UserCredentials(sysCreds))
defer nc.Close()
// simulate nas resolver in case of a lookup request (cache)
@@ -4548,3 +4548,133 @@ func TestJWTAccountOps(t *testing.T) {
})
}
}
func TestJWTHeader(t *testing.T) {
createKey := func() (nkeys.KeyPair, string) {
t.Helper()
kp, _ := nkeys.CreateAccount()
syspub, _ := kp.PublicKey()
return kp, syspub
}
encode := func(claim *jwt.AccountClaims, pub string) string {
t.Helper()
theJWT, err := claim.Encode(oKp)
require_NoError(t, err)
return theJWT
}
newUser := func(accKp nkeys.KeyPair) string {
ukp, _ := nkeys.CreateUser()
seed, _ := ukp.Seed()
upub, _ := ukp.PublicKey()
uclaim := newJWTTestUserClaims()
uclaim.Subject = upub
ujwt, err := uclaim.Encode(accKp)
require_NoError(t, err)
return genCredsFile(t, ujwt, seed)
}
sysKp, syspub := createKey()
sysJwt := encode(jwt.NewAccountClaims(syspub), syspub)
sysCreds := newUser(sysKp)
defer os.Remove(sysCreds)
test := func(share bool) {
aExpKp, aExpPub := createKey()
aExpClaim := jwt.NewAccountClaims(aExpPub)
aExpClaim.Exports.Add(&jwt.Export{
Name: "test",
Subject: "srvc",
Type: jwt.Service,
TokenReq: false,
Latency: &jwt.ServiceLatency{
Sampling: jwt.Headers,
Results: "res",
},
})
aExpJwt := encode(aExpClaim, aExpPub)
aExpCreds := newUser(aExpKp)
defer os.Remove(aExpCreds)
aImpKp, aImpPub := createKey()
aImpClaim := jwt.NewAccountClaims(aImpPub)
aImpClaim.Imports.Add(&jwt.Import{
Name: "test",
Subject: "srvc",
Account: aExpPub,
Type: jwt.Service,
Share: share,
})
aImpJwt := encode(aImpClaim, aImpPub)
aImpCreds := newUser(aImpKp)
defer os.Remove(aImpCreds)
dirSrv := createDir(t, "srv")
defer os.RemoveAll(dirSrv)
conf := createConfFile(t, []byte(fmt.Sprintf(`
listen: -1
operator: %s
system_account: %s
resolver: {
type: full
dir: %s
}
`, ojwt, syspub, dirSrv)))
defer os.Remove(conf)
srv, _ := RunServerWithConfig(conf)
defer srv.Shutdown()
updateJwt(t, srv.ClientURL(), sysCreds, sysJwt, 1) // update system account jwt
updateJwt(t, srv.ClientURL(), sysCreds, aExpJwt, 1)
updateJwt(t, srv.ClientURL(), sysCreds, aImpJwt, 1)
expNc := natsConnect(t, srv.ClientURL(), nats.UserCredentials(aExpCreds))
defer expNc.Close()
resChan := make(chan *nats.Msg, 1)
expNc.ChanSubscribe("res", resChan)
sub, err := expNc.Subscribe("srvc", func(msg *nats.Msg) {
msg.Respond(nil)
})
require_NoError(t, err)
defer sub.Unsubscribe()
impNc := natsConnect(t, srv.ClientURL(), nats.UserCredentials(aImpCreds))
defer impNc.Close()
// send request w/o header
_, err = impNc.Request("srvc", []byte("msg1"), time.Second)
require_NoError(t, err)
require_True(t, len(resChan) == 0)
_, err = impNc.RequestMsg(&nats.Msg{
Subject: "srvc", Data: []byte("msg2"), Header: http.Header{
"X-B3-Sampled": []string{"1"},
"Share": []string{"Me"}}}, time.Second)
require_NoError(t, err)
select {
case <-time.After(time.Second):
t.Fatalf("should have received a response")
case m := <-resChan:
obj := map[string]interface{}{}
err = json.Unmarshal(m.Data, &obj)
require_NoError(t, err)
// test if shared is honored
reqInfo := obj["requestor"].(map[string]interface{})
// fields always set
require_True(t, reqInfo["acc"] != nil)
require_True(t, reqInfo["rtt"] != nil)
require_True(t, reqInfo["start"] != nil)
// fields only set when shared
_, ok1 := reqInfo["lang"]
_, ok2 := reqInfo["ver"]
_, ok3 := reqInfo["ip"]
if !share {
ok1 = !ok1
ok2 = !ok2
ok3 = !ok3
}
require_True(t, ok1)
require_True(t, ok2)
require_True(t, ok3)
}
require_True(t, len(resChan) == 0)
}
test(true)
test(false)
}

View File

@@ -1933,7 +1933,7 @@ func newExtServiceLatency(l *serviceLatency) *jwt.ServiceLatency {
return nil
}
return &jwt.ServiceLatency{
Sampling: int(l.sampling),
Sampling: jwt.SamplingRate(l.sampling),
Results: jwt.Subject(l.subject),
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2018-2019 The NATS Authors
* Copyright 2018-2020 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -92,6 +92,7 @@ type Account struct {
SigningKeys StringList `json:"signing_keys,omitempty"`
Revocations RevocationList `json:"revocations,omitempty"`
DefaultPermissions Permissions `json:"default_permissions,omitempty"`
Info
GenericFields
}
@@ -131,6 +132,7 @@ func (a *Account) Validate(acct *AccountClaims, vr *ValidationResults) {
vr.AddError("%s is not an account public key", k)
}
}
a.Info.Validate(vr)
}
// AccountClaims defines the body of an account JWT
@@ -238,11 +240,11 @@ func (a *AccountClaims) Revoke(pubKey string) {
// RevokeAt enters a revocation by public key and timestamp into this account
// This will revoke all jwt issued for pubKey, prior to timestamp
// If there is already a revocation for this public key that is newer, it is kept.
// The value is expected to be a public key or "*" (means all public keys)
func (a *AccountClaims) RevokeAt(pubKey string, timestamp time.Time) {
if a.Revocations == nil {
a.Revocations = RevocationList{}
}
a.Revocations.Revoke(pubKey, timestamp)
}

View File

@@ -105,7 +105,14 @@ func (a *ActivationClaims) Payload() interface{} {
// Validate checks the claims
func (a *ActivationClaims) Validate(vr *ValidationResults) {
a.ClaimsData.Validate(vr)
a.validateWithTimeChecks(vr, true)
}
// Validate checks the claims
func (a *ActivationClaims) validateWithTimeChecks(vr *ValidationResults, timeChecks bool) {
if timeChecks {
a.ClaimsData.Validate(vr)
}
a.Activation.Validate(vr)
if a.IssuerAccount != "" && !nkeys.IsValidPublicAccountKey(a.IssuerAccount) {
vr.AddError("account_id is not an account public key")

View File

@@ -16,6 +16,7 @@
package jwt
import (
"encoding/json"
"fmt"
"strings"
"time"
@@ -56,13 +57,49 @@ const (
// }
//
type ServiceLatency struct {
Sampling int `json:"sampling,omitempty"`
Results Subject `json:"results"`
Sampling SamplingRate `json:"sampling"`
Results Subject `json:"results"`
}
type SamplingRate int
const Headers = SamplingRate(0)
// MarshalJSON marshals the field as "headers" or percentages
func (r *SamplingRate) MarshalJSON() ([]byte, error) {
sr := *r
if sr == 0 {
return []byte(`"headers"`), nil
}
if sr >= 1 && sr <= 100 {
return []byte(fmt.Sprintf("%d", sr)), nil
}
return nil, fmt.Errorf("unknown sampling rate")
}
// UnmarshalJSON unmashals numbers as percentages or "headers"
func (t *SamplingRate) UnmarshalJSON(b []byte) error {
if len(b) == 0 {
return fmt.Errorf("empty sampling rate")
}
if strings.ToLower(string(b)) == `"headers"` {
*t = Headers
return nil
}
var j int
err := json.Unmarshal(b, &j)
if err != nil {
return err
}
*t = SamplingRate(j)
return nil
}
func (sl *ServiceLatency) Validate(vr *ValidationResults) {
if sl.Sampling < 1 || sl.Sampling > 100 {
vr.AddError("sampling percentage needs to be between 1-100")
if sl.Sampling != 0 {
if sl.Sampling < 1 || sl.Sampling > 100 {
vr.AddError("sampling percentage needs to be between 1-100")
}
}
sl.Results.Validate(vr)
if sl.Results.HasWildCards() {
@@ -81,6 +118,7 @@ type Export struct {
ResponseThreshold time.Duration `json:"response_threshold,omitempty"`
Latency *ServiceLatency `json:"service_latency,omitempty"`
AccountTokenPosition uint `json:"account_token_position,omitempty"`
Info
}
// IsService returns true if an export is for a service
@@ -153,6 +191,7 @@ func (e *Export) Validate(vr *ValidationResults) {
}
}
}
e.Info.Validate(vr)
}
// Revoke enters a revocation by publickey using time.Now().

View File

@@ -59,7 +59,10 @@ func DecodeGeneric(token string) (*GenericClaims, error) {
return nil, err
}
var gc GenericClaims
gc := struct {
GenericClaims
GenericFields
}{}
if err := json.Unmarshal(data, &gc); err != nil {
return nil, err
}
@@ -74,12 +77,19 @@ func DecodeGeneric(token string) (*GenericClaims, error) {
if !gc.verify(chunks[1], sig) {
return nil, errors.New("claim failed V1 signature verification")
}
if tp := gc.GenericFields.Type; tp != "" {
gc.GenericClaims.Data["type"] = tp
}
if tp := gc.GenericFields.Tags; len(tp) != 0 {
gc.GenericClaims.Data["tags"] = tp
}
} else {
if !gc.verify(token[:len(chunks[0])+len(chunks[1])+1], sig) {
return nil, errors.New("claim failed V2 signature verification")
}
}
return &gc, nil
return &gc.GenericClaims, nil
}
// Claims returns the standard part of the generic claim
@@ -122,11 +132,14 @@ func (gc *GenericClaims) ClaimType() ClaimType {
}
}
}
ct, ctok := v.(string)
if ctok {
switch ct := v.(type) {
case string:
return ClaimType(ct)
case ClaimType:
return ct
default:
return ""
}
return ""
}
func (gc *GenericClaims) updateVersion() {

View File

@@ -1,10 +1,10 @@
module github.com/nats-io/jwt/v2
require (
github.com/nats-io/jwt v0.3.2
github.com/nats-io/jwt v1.1.0
github.com/nats-io/nkeys v0.2.0
)
replace github.com/nats-io/jwt v0.3.2 => ../
replace github.com/nats-io/jwt v1.1.0 => ../
go 1.14

View File

@@ -23,7 +23,7 @@ import (
const (
// Version is semantic version.
Version = "0.3.2"
Version = "2.0.0"
// TokenTypeJwt is the JWT token type supported JWT tokens
// encoded and decoded by this library

View File

@@ -37,8 +37,9 @@ type Import struct {
// from the perspective of a service, it is the subscription waiting for
// requests (the exporter). If the field is empty, it will default to the
// value in the Subject field.
To Subject `json:"to,omitempty"`
Type ExportType `json:"type,omitempty"`
To Subject `json:"to,omitempty"`
Type ExportType `json:"type,omitempty"`
Share bool `json:"share,omitempty"`
}
// IsService returns true if the import is of type service
@@ -73,7 +74,9 @@ func (i *Import) Validate(actPubKey string, vr *ValidationResults) {
if i.IsStream() && i.To.HasWildCards() {
vr.AddError("streams cannot have wildcard to subject: %q", i.Subject)
}
if i.Share && !i.IsService() {
vr.AddError("sharing information (for latency tracking) is only valid for services: %q", i.Subject)
}
var act *ActivationClaims
if i.Token != "" {
@@ -107,13 +110,14 @@ func (i *Import) Validate(actPubKey string, vr *ValidationResults) {
}
if act != nil {
if act.Issuer != i.Account {
vr.AddWarning("activation token doesn't match account for import %q", i.Subject)
if !(act.Issuer == i.Account || act.IssuerAccount == i.Account) {
vr.AddError("activation token doesn't match account for import %q", i.Subject)
}
if act.ClaimsData.Subject != actPubKey {
vr.AddWarning("activation token doesn't match account it is being included in, %q", i.Subject)
vr.AddError("activation token doesn't match account it is being included in, %q", i.Subject)
}
act.validateWithTimeChecks(vr, false)
} else {
vr.AddWarning("no activation provided for import %s", i.Subject)
}

View File

@@ -19,20 +19,47 @@ import (
"time"
)
const All = "*"
// RevocationList is used to store a mapping of public keys to unix timestamps
type RevocationList map[string]int64
type RevocationEntry struct {
PublicKey string
TimeStamp int64
}
// Revoke enters a revocation by publickey and timestamp into this export
// If there is already a revocation for this public key that is newer, it is kept.
func (r RevocationList) Revoke(pubKey string, timestamp time.Time) {
newTS := timestamp.Unix()
// cannot move a revocation into the future - only into the past
if ts, ok := r[pubKey]; ok && ts > newTS {
return
}
r[pubKey] = newTS
}
// MaybeCompact will compact the revocation list if jwt.All is found. Any
// revocation that is covered by a jwt.All revocation will be deleted, thus
// reducing the size of the JWT. Returns a slice of entries that were removed
// during the process.
func (r RevocationList) MaybeCompact() []RevocationEntry {
var deleted []RevocationEntry
ats, ok := r[All]
if ok {
for k, ts := range r {
if k != All && ats >= ts {
deleted = append(deleted, RevocationEntry{
PublicKey: k,
TimeStamp: ts,
})
delete(r, k)
}
}
}
return deleted
}
// ClearRevocation removes any revocation for the public key
func (r RevocationList) ClearRevocation(pubKey string) {
delete(r, pubKey)
@@ -42,6 +69,16 @@ func (r RevocationList) ClearRevocation(pubKey string) {
// the one passed in. Generally this method is called with an issue time but other time's can
// be used for testing.
func (r RevocationList) IsRevoked(pubKey string, timestamp time.Time) bool {
if r.allRevoked(timestamp) {
return true
}
ts, ok := r[pubKey]
return ok && ts >= timestamp.Unix()
}
// allRevoked returns true if All is set and the timestamp is later or same as the
// one passed. This is called by IsRevoked.
func (r RevocationList) allRevoked(timestamp time.Time) bool {
ts, ok := r[All]
return ok && ts >= timestamp.Unix()
}

View File

@@ -19,10 +19,36 @@ import (
"encoding/json"
"fmt"
"net"
"net/url"
"strings"
"time"
)
const MaxInfoLength = 255
type Info struct {
Description string `json:"description,omitempty"`
InfoURL string `json:"info_url,omitempty"`
}
func (s Info) Validate(vr *ValidationResults) {
if len(s.Description) > MaxInfoLength {
vr.AddError("Description is too long")
}
if s.InfoURL != "" {
if len(s.InfoURL) > MaxInfoLength {
vr.AddError("Info URL is too long")
}
u, err := url.Parse(s.InfoURL)
if err == nil && (u.Hostname() == "" || u.Scheme == "") {
err = fmt.Errorf("no hostname or scheme")
}
if err != nil {
vr.AddError("error parsing info url: %v", err)
}
}
}
// ExportType defines the type of import/export.
type ExportType int

2
vendor/modules.txt vendored
View File

@@ -1,6 +1,6 @@
# github.com/minio/highwayhash v1.0.0
github.com/minio/highwayhash
# github.com/nats-io/jwt/v2 v2.0.0-20201015190852-e11ce317263c
# github.com/nats-io/jwt/v2 v2.0.0-20201211164018-2e78446f4e6f
github.com/nats-io/jwt/v2
# github.com/nats-io/nats.go v1.10.1-0.20201021145452-94be476ad6e0
github.com/nats-io/nats.go