From c6daffbfcc0e69e07b94360f465da14d1afbeed5 Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Wed, 16 Dec 2020 13:01:49 -0500 Subject: [PATCH] [Added] ability to use jwt latency sampling properties headers/share Signed-off-by: Matthias Hanel --- go.mod | 2 +- go.sum | 6 +- server/accounts.go | 16 +- server/jwt_test.go | 236 ++++++++++++++---- server/monitor.go | 2 +- .../nats-io/jwt/v2/account_claims.go | 6 +- .../nats-io/jwt/v2/activation_claims.go | 9 +- vendor/github.com/nats-io/jwt/v2/exports.go | 47 +++- .../github.com/nats-io/jwt/v2/genericlaims.go | 23 +- vendor/github.com/nats-io/jwt/v2/go.mod | 4 +- vendor/github.com/nats-io/jwt/v2/header.go | 2 +- vendor/github.com/nats-io/jwt/v2/imports.go | 16 +- .../nats-io/jwt/v2/revocation_list.go | 39 ++- vendor/github.com/nats-io/jwt/v2/types.go | 26 ++ vendor/modules.txt | 2 +- 15 files changed, 352 insertions(+), 84 deletions(-) diff --git a/go.mod b/go.mod index 85acbd73..89ff76b3 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,7 @@ module github.com/nats-io/nats-server/v2 require ( github.com/minio/highwayhash v1.0.0 - github.com/nats-io/jwt/v2 v2.0.0-20201015190852-e11ce317263c + github.com/nats-io/jwt/v2 v2.0.0-20201211164018-2e78446f4e6f github.com/nats-io/nats.go v1.10.1-0.20201021145452-94be476ad6e0 github.com/nats-io/nkeys v0.2.0 github.com/nats-io/nuid v1.0.1 diff --git a/go.sum b/go.sum index d1539e60..ba696ff7 100644 --- a/go.sum +++ b/go.sum @@ -14,9 +14,11 @@ github.com/minio/highwayhash v1.0.0/go.mod h1:xQboMTeM9nY9v/LlAOxFctujiv5+Aq2hR5 github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7 h1:RnGotxlghqR5D2KDAu4TyuLqyjuylOsJiAFhXvMvQIc= github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M= +github.com/nats-io/jwt v1.1.0 h1:+vOlgtM0ZsF46GbmUoadq0/2rChNS45gtxHEa3H1gqM= +github.com/nats-io/jwt v1.1.0/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M= github.com/nats-io/jwt/v2 v2.0.0-20200916203241-1f8ce17dff02/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ= -github.com/nats-io/jwt/v2 v2.0.0-20201015190852-e11ce317263c h1:Hc1D9ChlsCMVwCxJ6QT5xqfk2zJ4XNea+LtdfaYhd20= -github.com/nats-io/jwt/v2 v2.0.0-20201015190852-e11ce317263c/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ= +github.com/nats-io/jwt/v2 v2.0.0-20201211164018-2e78446f4e6f h1:+F8Vkc9pDf2x6O+csstCXaubPu2njJkbXeTm5wfDzGI= +github.com/nats-io/jwt/v2 v2.0.0-20201211164018-2e78446f4e6f/go.mod h1:PuO5FToRL31ecdFqVjc794vK0Bj0CwzveQEDvkb7MoQ= github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093/go.mod h1:rQnBf2Rv4P9adtAs/Ti6LfFmVtFG6HLhl/H7cVshcJU= github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71/go.mod h1:Nan/1L5Sa1JRW+Thm4HNYcIDcVRFc5zK9OpSZeI2kk4= github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad/go.mod h1:TkHpUIDETmTI7mrHN40D1pzxfzHZuGmtMbtb83TGVQw= diff --git a/server/accounts.go b/server/accounts.go index 497d437d..2802fb97 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -1736,7 +1736,11 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im } } } - si := &serviceImport{dest, claim, se, nil, from, to, tr, 0, rt, lat, nil, nil, usePub, false, false, false, false, false, isSysAcc, nil} + share := false + if claim != nil { + share = claim.Share + } + si := &serviceImport{dest, claim, se, nil, from, to, tr, 0, rt, lat, nil, nil, usePub, false, false, share, false, false, isSysAcc, nil} a.imports.services[from] = si a.mu.Unlock() @@ -2540,7 +2544,7 @@ func (a *Account) checkActivation(importAcc *Account, claim *jwt.Import, expTime clone.Token = fetchActivation(url.String()) } vr := jwt.CreateValidationResults() - clone.Validate(a.Name, vr) + clone.Validate(importAcc.Name, vr) if vr.IsBlocking(true) { return false } @@ -2897,8 +2901,12 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim s.Debugf("Error adding service export to account [%s]: %v", a.Name, err) } if e.Latency != nil { - if err := a.TrackServiceExportWithSampling(string(e.Subject), string(e.Latency.Results), e.Latency.Sampling); err != nil { - s.Debugf("Error adding latency tracking for service export to account [%s]: %v", a.Name, err) + if err := a.TrackServiceExportWithSampling(string(e.Subject), string(e.Latency.Results), int(e.Latency.Sampling)); err != nil { + hdrNote := "" + if e.Latency.Sampling == jwt.Headers { + hdrNote = " (using headers)" + } + s.Debugf("Error adding latency tracking%s for service export to account [%s]: %v", hdrNote, a.Name, err) } } } diff --git a/server/jwt_test.go b/server/jwt_test.go index 5cfcbccd..c4a1f663 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -735,6 +735,13 @@ func TestJWTAccountBasicImportExport(t *testing.T) { if err != nil { t.Fatalf("Error generating account JWT: %v", err) } + + vr := jwt.ValidationResults{} + barAC.Validate(&vr) + if vr.IsBlocking(true) { + t.Fatalf("Error generating account JWT: %v", vr) + } + addAccountToMemResolver(s, barPub, barJWT) s.UpdateAccountClaims(acc, barAC) // Our service import should have succeeded. @@ -2642,17 +2649,10 @@ func TestJWTAccountImportWrongIssuerAccount(t *testing.T) { client, clientReader, clientCS := createClient(t, s, clientKP) defer client.close() client.parseAsync(clientCS) - expectPong(t, clientReader) - - for i := 0; i < 2; i++ { - select { - case e := <-l.errCh: - if !strings.HasPrefix(e, fmt.Sprintf("Invalid issuer account %q in activation claim", clientPK)) { - t.Fatalf("Unexpected error: %v", e) - } - case <-time.After(2 * time.Second): - t.Fatalf("Did not get error regarding issuer account") - } + if l, _, err := clientReader.ReadLine(); err != nil { + t.Fatalf("Expected no Error, got: %v", err) + } else if !strings.Contains(string(l), "-ERR 'Authorization Violation'") { + t.Fatalf("Expected Error, got: %v", l) } } @@ -3282,7 +3282,7 @@ func TestExpiredUserCredentialsRenewal(t *testing.T) { } } -func updateJwt(t *testing.T, url string, creds string, pubKey string, jwt string, respCnt int) int { +func updateJwt(t *testing.T, url string, creds string, jwt string, respCnt int) int { t.Helper() require_NextMsg := func(sub *nats.Subscription) bool { t.Helper() @@ -3566,7 +3566,7 @@ func TestAccountNATSResolverFetch(t *testing.T) { connect(sC.ClientURL(), sysCreds, "") checkClusterFormed(t, sA, sB, sC) // upload system account and require a response from each server - passCnt := updateJwt(t, sA.ClientURL(), sysCreds, syspub, sysjwt, 3) + passCnt := updateJwt(t, sA.ClientURL(), sysCreds, sysjwt, 3) require_True(t, passCnt == 3) require_JWTPresent(t, dirA, syspub) // was just received require_JWTPresent(t, dirB, syspub) // was just received @@ -3584,7 +3584,7 @@ func TestAccountNATSResolverFetch(t *testing.T) { require_1Connection(sA.ClientURL(), v.creds, v.pub, sA, sB, sC) require_1Connection(sB.ClientURL(), v.creds, v.pub, sA, sB, sC) require_1Connection(sC.ClientURL(), v.creds, v.pub, sA, sB, sC) - passCnt := updateJwt(t, port, sysCreds, v.pub, v.jwt, 3) + passCnt := updateJwt(t, port, sysCreds, v.jwt, 3) require_True(t, passCnt == 3) require_2Connection(sA.ClientURL(), v.creds, v.pub, sA, sB, sC) require_2Connection(sB.ClientURL(), v.creds, v.pub, sA, sB, sC) @@ -3631,7 +3631,7 @@ func TestAccountNATSResolverFetch(t *testing.T) { require_2Connection(sC.ClientURL(), aCreds, apub, sA, sB, sC) // Test exceeding limit. For the exclusive directory resolver, limit is a stop gap measure. // It is not expected to be hit. When hit the administrator is supposed to take action. - passCnt = updateJwt(t, sA.ClientURL(), sysCreds, dpub, djwt1, 3) + passCnt = updateJwt(t, sA.ClientURL(), sysCreds, djwt1, 3) require_True(t, passCnt == 1) // Only Server C updated for _, srv := range []*Server{sA, sB, sC} { if a, ok := srv.accounts.Load(syspub); ok { @@ -3823,41 +3823,41 @@ func TestAccountNATSResolverCrossClusterFetch(t *testing.T) { waitForOutboundGateways(t, sAB, 1, 5*time.Second) waitForOutboundGateways(t, sBA, 1, 5*time.Second) waitForOutboundGateways(t, sBB, 1, 5*time.Second) - time.Sleep(500 * time.Millisecond) // wait for the protocol to converge - updateJwt(t, sAA.ClientURL(), sysCreds, syspub, sysjwt, 4) // update system account jwt on all server - require_JWTEqual(t, dirAA, syspub, sysjwt) // assure this update made it to every server - require_JWTEqual(t, dirAB, syspub, sysjwt) // assure this update made it to every server - require_JWTEqual(t, dirBA, syspub, sysjwt) // assure this update made it to every server - require_JWTEqual(t, dirBB, syspub, sysjwt) // assure this update made it to every server - require_JWTAbsent(t, dirAA, bpub) // assure that jwt are not synced across cluster - require_JWTAbsent(t, dirAB, bpub) // assure that jwt are not synced across cluster - require_JWTAbsent(t, dirBA, apub) // assure that jwt are not synced across cluster - require_JWTAbsent(t, dirBB, apub) // assure that jwt are not synced across cluster - connect(sAA.ClientURL(), aCreds) // connect to cluster where jwt was initially stored - connect(sAB.ClientURL(), aCreds) // connect to cluster where jwt was initially stored - connect(sBA.ClientURL(), bCreds) // connect to cluster where jwt was initially stored - connect(sBB.ClientURL(), bCreds) // connect to cluster where jwt was initially stored - time.Sleep(500 * time.Millisecond) // wait for the protocol to (NOT) converge - require_JWTAbsent(t, dirAA, bpub) // assure that jwt are still not synced across cluster - require_JWTAbsent(t, dirAB, bpub) // assure that jwt are still not synced across cluster - require_JWTAbsent(t, dirBA, apub) // assure that jwt are still not synced across cluster - require_JWTAbsent(t, dirBB, apub) // assure that jwt are still not synced across cluster + time.Sleep(500 * time.Millisecond) // wait for the protocol to converge + updateJwt(t, sAA.ClientURL(), sysCreds, sysjwt, 4) // update system account jwt on all server + require_JWTEqual(t, dirAA, syspub, sysjwt) // assure this update made it to every server + require_JWTEqual(t, dirAB, syspub, sysjwt) // assure this update made it to every server + require_JWTEqual(t, dirBA, syspub, sysjwt) // assure this update made it to every server + require_JWTEqual(t, dirBB, syspub, sysjwt) // assure this update made it to every server + require_JWTAbsent(t, dirAA, bpub) // assure that jwt are not synced across cluster + require_JWTAbsent(t, dirAB, bpub) // assure that jwt are not synced across cluster + require_JWTAbsent(t, dirBA, apub) // assure that jwt are not synced across cluster + require_JWTAbsent(t, dirBB, apub) // assure that jwt are not synced across cluster + connect(sAA.ClientURL(), aCreds) // connect to cluster where jwt was initially stored + connect(sAB.ClientURL(), aCreds) // connect to cluster where jwt was initially stored + connect(sBA.ClientURL(), bCreds) // connect to cluster where jwt was initially stored + connect(sBB.ClientURL(), bCreds) // connect to cluster where jwt was initially stored + time.Sleep(500 * time.Millisecond) // wait for the protocol to (NOT) converge + require_JWTAbsent(t, dirAA, bpub) // assure that jwt are still not synced across cluster + require_JWTAbsent(t, dirAB, bpub) // assure that jwt are still not synced across cluster + require_JWTAbsent(t, dirBA, apub) // assure that jwt are still not synced across cluster + require_JWTAbsent(t, dirBB, apub) // assure that jwt are still not synced across cluster // We have verified that account B does not exist in cluster A, neither does account A in cluster B // Despite that clients from account B can connect to server A, same for account A in cluster B - connect(sAA.ClientURL(), bCreds) // connect to cluster where jwt was not initially stored - connect(sAB.ClientURL(), bCreds) // connect to cluster where jwt was not initially stored - connect(sBA.ClientURL(), aCreds) // connect to cluster where jwt was not initially stored - connect(sBB.ClientURL(), aCreds) // connect to cluster where jwt was not initially stored - require_JWTEqual(t, dirAA, bpub, bjwt1) // assure that now jwt used in connect is stored - require_JWTEqual(t, dirAB, bpub, bjwt1) // assure that now jwt used in connect is stored - require_JWTEqual(t, dirBA, apub, ajwt1) // assure that now jwt used in connect is stored - require_JWTEqual(t, dirBB, apub, ajwt1) // assure that now jwt used in connect is stored - updateJwt(t, sAA.ClientURL(), sysCreds, bpub, bjwt2, 4) // update bjwt, expect updates from everywhere - updateJwt(t, sBA.ClientURL(), sysCreds, apub, ajwt2, 4) // update ajwt, expect updates from everywhere - require_JWTEqual(t, dirAA, bpub, bjwt2) // assure that jwt got updated accordingly - require_JWTEqual(t, dirAB, bpub, bjwt2) // assure that jwt got updated accordingly - require_JWTEqual(t, dirBA, apub, ajwt2) // assure that jwt got updated accordingly - require_JWTEqual(t, dirBB, apub, ajwt2) // assure that jwt got updated accordingly + connect(sAA.ClientURL(), bCreds) // connect to cluster where jwt was not initially stored + connect(sAB.ClientURL(), bCreds) // connect to cluster where jwt was not initially stored + connect(sBA.ClientURL(), aCreds) // connect to cluster where jwt was not initially stored + connect(sBB.ClientURL(), aCreds) // connect to cluster where jwt was not initially stored + require_JWTEqual(t, dirAA, bpub, bjwt1) // assure that now jwt used in connect is stored + require_JWTEqual(t, dirAB, bpub, bjwt1) // assure that now jwt used in connect is stored + require_JWTEqual(t, dirBA, apub, ajwt1) // assure that now jwt used in connect is stored + require_JWTEqual(t, dirBB, apub, ajwt1) // assure that now jwt used in connect is stored + updateJwt(t, sAA.ClientURL(), sysCreds, bjwt2, 4) // update bjwt, expect updates from everywhere + updateJwt(t, sBA.ClientURL(), sysCreds, ajwt2, 4) // update ajwt, expect updates from everywhere + require_JWTEqual(t, dirAA, bpub, bjwt2) // assure that jwt got updated accordingly + require_JWTEqual(t, dirAB, bpub, bjwt2) // assure that jwt got updated accordingly + require_JWTEqual(t, dirBA, apub, ajwt2) // assure that jwt got updated accordingly + require_JWTEqual(t, dirBB, apub, ajwt2) // assure that jwt got updated accordingly } func newTimeRange(start time.Time, dur time.Duration) jwt.TimeRange { @@ -4428,8 +4428,8 @@ func TestJWTUserRevocation(t *testing.T) { defer os.Remove(conf) srv, _ := RunServerWithConfig(conf) defer srv.Shutdown() - updateJwt(t, srv.ClientURL(), sysCreds, syspub, sysjwt, 1) // update system account jwt - updateJwt(t, srv.ClientURL(), sysCreds, apub, ajwt1, 1) // set account jwt without revocation + updateJwt(t, srv.ClientURL(), sysCreds, sysjwt, 1) // update system account jwt + updateJwt(t, srv.ClientURL(), sysCreds, ajwt1, 1) // set account jwt without revocation // use credentials that will be revoked ans assure that the connection will be disconnected nc := natsConnect(t, srv.ClientURL(), nats.UserCredentials(aCreds1), nats.DisconnectErrHandler(func(conn *nats.Conn, err error) { @@ -4439,7 +4439,7 @@ func TestJWTUserRevocation(t *testing.T) { })) defer nc.Close() // update account jwt to contain revocation - if passCnt := updateJwt(t, srv.ClientURL(), sysCreds, apub, ajwt2, 1); passCnt != 1 { + if passCnt := updateJwt(t, srv.ClientURL(), sysCreds, ajwt2, 1); passCnt != 1 { t.Fatalf("Expected jwt update to pass") } // assure that nc got disconnected due to the revocation @@ -4515,9 +4515,9 @@ func TestJWTAccountOps(t *testing.T) { defer os.Remove(conf) srv, _ := RunServerWithConfig(conf) defer srv.Shutdown() - updateJwt(t, srv.ClientURL(), sysCreds, syspub, sysjwt, 1) // update system account jwt + updateJwt(t, srv.ClientURL(), sysCreds, sysjwt, 1) // update system account jwt // push jwt (for full resolver) - updateJwt(t, srv.ClientURL(), sysCreds, apub, ajwt1, 1) // set jwt + updateJwt(t, srv.ClientURL(), sysCreds, ajwt1, 1) // set jwt nc := natsConnect(t, srv.ClientURL(), nats.UserCredentials(sysCreds)) defer nc.Close() // simulate nas resolver in case of a lookup request (cache) @@ -4548,3 +4548,133 @@ func TestJWTAccountOps(t *testing.T) { }) } } + +func TestJWTHeader(t *testing.T) { + createKey := func() (nkeys.KeyPair, string) { + t.Helper() + kp, _ := nkeys.CreateAccount() + syspub, _ := kp.PublicKey() + return kp, syspub + } + encode := func(claim *jwt.AccountClaims, pub string) string { + t.Helper() + theJWT, err := claim.Encode(oKp) + require_NoError(t, err) + return theJWT + } + newUser := func(accKp nkeys.KeyPair) string { + ukp, _ := nkeys.CreateUser() + seed, _ := ukp.Seed() + upub, _ := ukp.PublicKey() + uclaim := newJWTTestUserClaims() + uclaim.Subject = upub + ujwt, err := uclaim.Encode(accKp) + require_NoError(t, err) + return genCredsFile(t, ujwt, seed) + } + sysKp, syspub := createKey() + sysJwt := encode(jwt.NewAccountClaims(syspub), syspub) + sysCreds := newUser(sysKp) + defer os.Remove(sysCreds) + + test := func(share bool) { + aExpKp, aExpPub := createKey() + aExpClaim := jwt.NewAccountClaims(aExpPub) + aExpClaim.Exports.Add(&jwt.Export{ + Name: "test", + Subject: "srvc", + Type: jwt.Service, + TokenReq: false, + Latency: &jwt.ServiceLatency{ + Sampling: jwt.Headers, + Results: "res", + }, + }) + aExpJwt := encode(aExpClaim, aExpPub) + aExpCreds := newUser(aExpKp) + defer os.Remove(aExpCreds) + + aImpKp, aImpPub := createKey() + aImpClaim := jwt.NewAccountClaims(aImpPub) + aImpClaim.Imports.Add(&jwt.Import{ + Name: "test", + Subject: "srvc", + Account: aExpPub, + Type: jwt.Service, + Share: share, + }) + aImpJwt := encode(aImpClaim, aImpPub) + aImpCreds := newUser(aImpKp) + defer os.Remove(aImpCreds) + + dirSrv := createDir(t, "srv") + defer os.RemoveAll(dirSrv) + conf := createConfFile(t, []byte(fmt.Sprintf(` + listen: -1 + operator: %s + system_account: %s + resolver: { + type: full + dir: %s + } + `, ojwt, syspub, dirSrv))) + defer os.Remove(conf) + srv, _ := RunServerWithConfig(conf) + defer srv.Shutdown() + updateJwt(t, srv.ClientURL(), sysCreds, sysJwt, 1) // update system account jwt + updateJwt(t, srv.ClientURL(), sysCreds, aExpJwt, 1) + updateJwt(t, srv.ClientURL(), sysCreds, aImpJwt, 1) + + expNc := natsConnect(t, srv.ClientURL(), nats.UserCredentials(aExpCreds)) + defer expNc.Close() + resChan := make(chan *nats.Msg, 1) + expNc.ChanSubscribe("res", resChan) + sub, err := expNc.Subscribe("srvc", func(msg *nats.Msg) { + msg.Respond(nil) + }) + require_NoError(t, err) + defer sub.Unsubscribe() + + impNc := natsConnect(t, srv.ClientURL(), nats.UserCredentials(aImpCreds)) + defer impNc.Close() + // send request w/o header + _, err = impNc.Request("srvc", []byte("msg1"), time.Second) + require_NoError(t, err) + require_True(t, len(resChan) == 0) + + _, err = impNc.RequestMsg(&nats.Msg{ + Subject: "srvc", Data: []byte("msg2"), Header: http.Header{ + "X-B3-Sampled": []string{"1"}, + "Share": []string{"Me"}}}, time.Second) + require_NoError(t, err) + select { + case <-time.After(time.Second): + t.Fatalf("should have received a response") + case m := <-resChan: + obj := map[string]interface{}{} + err = json.Unmarshal(m.Data, &obj) + require_NoError(t, err) + // test if shared is honored + reqInfo := obj["requestor"].(map[string]interface{}) + // fields always set + require_True(t, reqInfo["acc"] != nil) + require_True(t, reqInfo["rtt"] != nil) + require_True(t, reqInfo["start"] != nil) + // fields only set when shared + _, ok1 := reqInfo["lang"] + _, ok2 := reqInfo["ver"] + _, ok3 := reqInfo["ip"] + if !share { + ok1 = !ok1 + ok2 = !ok2 + ok3 = !ok3 + } + require_True(t, ok1) + require_True(t, ok2) + require_True(t, ok3) + } + require_True(t, len(resChan) == 0) + } + test(true) + test(false) +} diff --git a/server/monitor.go b/server/monitor.go index 6c941f2a..1c79816e 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1933,7 +1933,7 @@ func newExtServiceLatency(l *serviceLatency) *jwt.ServiceLatency { return nil } return &jwt.ServiceLatency{ - Sampling: int(l.sampling), + Sampling: jwt.SamplingRate(l.sampling), Results: jwt.Subject(l.subject), } } diff --git a/vendor/github.com/nats-io/jwt/v2/account_claims.go b/vendor/github.com/nats-io/jwt/v2/account_claims.go index 4105dd7f..5db453d4 100644 --- a/vendor/github.com/nats-io/jwt/v2/account_claims.go +++ b/vendor/github.com/nats-io/jwt/v2/account_claims.go @@ -1,5 +1,5 @@ /* - * Copyright 2018-2019 The NATS Authors + * Copyright 2018-2020 The NATS Authors * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -92,6 +92,7 @@ type Account struct { SigningKeys StringList `json:"signing_keys,omitempty"` Revocations RevocationList `json:"revocations,omitempty"` DefaultPermissions Permissions `json:"default_permissions,omitempty"` + Info GenericFields } @@ -131,6 +132,7 @@ func (a *Account) Validate(acct *AccountClaims, vr *ValidationResults) { vr.AddError("%s is not an account public key", k) } } + a.Info.Validate(vr) } // AccountClaims defines the body of an account JWT @@ -238,11 +240,11 @@ func (a *AccountClaims) Revoke(pubKey string) { // RevokeAt enters a revocation by public key and timestamp into this account // This will revoke all jwt issued for pubKey, prior to timestamp // If there is already a revocation for this public key that is newer, it is kept. +// The value is expected to be a public key or "*" (means all public keys) func (a *AccountClaims) RevokeAt(pubKey string, timestamp time.Time) { if a.Revocations == nil { a.Revocations = RevocationList{} } - a.Revocations.Revoke(pubKey, timestamp) } diff --git a/vendor/github.com/nats-io/jwt/v2/activation_claims.go b/vendor/github.com/nats-io/jwt/v2/activation_claims.go index 3ba8c4ce..eb0c3224 100644 --- a/vendor/github.com/nats-io/jwt/v2/activation_claims.go +++ b/vendor/github.com/nats-io/jwt/v2/activation_claims.go @@ -105,7 +105,14 @@ func (a *ActivationClaims) Payload() interface{} { // Validate checks the claims func (a *ActivationClaims) Validate(vr *ValidationResults) { - a.ClaimsData.Validate(vr) + a.validateWithTimeChecks(vr, true) +} + +// Validate checks the claims +func (a *ActivationClaims) validateWithTimeChecks(vr *ValidationResults, timeChecks bool) { + if timeChecks { + a.ClaimsData.Validate(vr) + } a.Activation.Validate(vr) if a.IssuerAccount != "" && !nkeys.IsValidPublicAccountKey(a.IssuerAccount) { vr.AddError("account_id is not an account public key") diff --git a/vendor/github.com/nats-io/jwt/v2/exports.go b/vendor/github.com/nats-io/jwt/v2/exports.go index c43b9a30..23065fbc 100644 --- a/vendor/github.com/nats-io/jwt/v2/exports.go +++ b/vendor/github.com/nats-io/jwt/v2/exports.go @@ -16,6 +16,7 @@ package jwt import ( + "encoding/json" "fmt" "strings" "time" @@ -56,13 +57,49 @@ const ( // } // type ServiceLatency struct { - Sampling int `json:"sampling,omitempty"` - Results Subject `json:"results"` + Sampling SamplingRate `json:"sampling"` + Results Subject `json:"results"` +} + +type SamplingRate int + +const Headers = SamplingRate(0) + +// MarshalJSON marshals the field as "headers" or percentages +func (r *SamplingRate) MarshalJSON() ([]byte, error) { + sr := *r + if sr == 0 { + return []byte(`"headers"`), nil + } + if sr >= 1 && sr <= 100 { + return []byte(fmt.Sprintf("%d", sr)), nil + } + return nil, fmt.Errorf("unknown sampling rate") +} + +// UnmarshalJSON unmashals numbers as percentages or "headers" +func (t *SamplingRate) UnmarshalJSON(b []byte) error { + if len(b) == 0 { + return fmt.Errorf("empty sampling rate") + } + if strings.ToLower(string(b)) == `"headers"` { + *t = Headers + return nil + } + var j int + err := json.Unmarshal(b, &j) + if err != nil { + return err + } + *t = SamplingRate(j) + return nil } func (sl *ServiceLatency) Validate(vr *ValidationResults) { - if sl.Sampling < 1 || sl.Sampling > 100 { - vr.AddError("sampling percentage needs to be between 1-100") + if sl.Sampling != 0 { + if sl.Sampling < 1 || sl.Sampling > 100 { + vr.AddError("sampling percentage needs to be between 1-100") + } } sl.Results.Validate(vr) if sl.Results.HasWildCards() { @@ -81,6 +118,7 @@ type Export struct { ResponseThreshold time.Duration `json:"response_threshold,omitempty"` Latency *ServiceLatency `json:"service_latency,omitempty"` AccountTokenPosition uint `json:"account_token_position,omitempty"` + Info } // IsService returns true if an export is for a service @@ -153,6 +191,7 @@ func (e *Export) Validate(vr *ValidationResults) { } } } + e.Info.Validate(vr) } // Revoke enters a revocation by publickey using time.Now(). diff --git a/vendor/github.com/nats-io/jwt/v2/genericlaims.go b/vendor/github.com/nats-io/jwt/v2/genericlaims.go index 9a657282..1caf5209 100644 --- a/vendor/github.com/nats-io/jwt/v2/genericlaims.go +++ b/vendor/github.com/nats-io/jwt/v2/genericlaims.go @@ -59,7 +59,10 @@ func DecodeGeneric(token string) (*GenericClaims, error) { return nil, err } - var gc GenericClaims + gc := struct { + GenericClaims + GenericFields + }{} if err := json.Unmarshal(data, &gc); err != nil { return nil, err } @@ -74,12 +77,19 @@ func DecodeGeneric(token string) (*GenericClaims, error) { if !gc.verify(chunks[1], sig) { return nil, errors.New("claim failed V1 signature verification") } + if tp := gc.GenericFields.Type; tp != "" { + gc.GenericClaims.Data["type"] = tp + } + if tp := gc.GenericFields.Tags; len(tp) != 0 { + gc.GenericClaims.Data["tags"] = tp + } + } else { if !gc.verify(token[:len(chunks[0])+len(chunks[1])+1], sig) { return nil, errors.New("claim failed V2 signature verification") } } - return &gc, nil + return &gc.GenericClaims, nil } // Claims returns the standard part of the generic claim @@ -122,11 +132,14 @@ func (gc *GenericClaims) ClaimType() ClaimType { } } } - ct, ctok := v.(string) - if ctok { + switch ct := v.(type) { + case string: return ClaimType(ct) + case ClaimType: + return ct + default: + return "" } - return "" } func (gc *GenericClaims) updateVersion() { diff --git a/vendor/github.com/nats-io/jwt/v2/go.mod b/vendor/github.com/nats-io/jwt/v2/go.mod index 420392a3..2b664648 100644 --- a/vendor/github.com/nats-io/jwt/v2/go.mod +++ b/vendor/github.com/nats-io/jwt/v2/go.mod @@ -1,10 +1,10 @@ module github.com/nats-io/jwt/v2 require ( - github.com/nats-io/jwt v0.3.2 + github.com/nats-io/jwt v1.1.0 github.com/nats-io/nkeys v0.2.0 ) -replace github.com/nats-io/jwt v0.3.2 => ../ +replace github.com/nats-io/jwt v1.1.0 => ../ go 1.14 diff --git a/vendor/github.com/nats-io/jwt/v2/header.go b/vendor/github.com/nats-io/jwt/v2/header.go index 4fe7920a..c96880ab 100644 --- a/vendor/github.com/nats-io/jwt/v2/header.go +++ b/vendor/github.com/nats-io/jwt/v2/header.go @@ -23,7 +23,7 @@ import ( const ( // Version is semantic version. - Version = "0.3.2" + Version = "2.0.0" // TokenTypeJwt is the JWT token type supported JWT tokens // encoded and decoded by this library diff --git a/vendor/github.com/nats-io/jwt/v2/imports.go b/vendor/github.com/nats-io/jwt/v2/imports.go index 04a9e361..478b6def 100644 --- a/vendor/github.com/nats-io/jwt/v2/imports.go +++ b/vendor/github.com/nats-io/jwt/v2/imports.go @@ -37,8 +37,9 @@ type Import struct { // from the perspective of a service, it is the subscription waiting for // requests (the exporter). If the field is empty, it will default to the // value in the Subject field. - To Subject `json:"to,omitempty"` - Type ExportType `json:"type,omitempty"` + To Subject `json:"to,omitempty"` + Type ExportType `json:"type,omitempty"` + Share bool `json:"share,omitempty"` } // IsService returns true if the import is of type service @@ -73,7 +74,9 @@ func (i *Import) Validate(actPubKey string, vr *ValidationResults) { if i.IsStream() && i.To.HasWildCards() { vr.AddError("streams cannot have wildcard to subject: %q", i.Subject) } - + if i.Share && !i.IsService() { + vr.AddError("sharing information (for latency tracking) is only valid for services: %q", i.Subject) + } var act *ActivationClaims if i.Token != "" { @@ -107,13 +110,14 @@ func (i *Import) Validate(actPubKey string, vr *ValidationResults) { } if act != nil { - if act.Issuer != i.Account { - vr.AddWarning("activation token doesn't match account for import %q", i.Subject) + if !(act.Issuer == i.Account || act.IssuerAccount == i.Account) { + vr.AddError("activation token doesn't match account for import %q", i.Subject) } if act.ClaimsData.Subject != actPubKey { - vr.AddWarning("activation token doesn't match account it is being included in, %q", i.Subject) + vr.AddError("activation token doesn't match account it is being included in, %q", i.Subject) } + act.validateWithTimeChecks(vr, false) } else { vr.AddWarning("no activation provided for import %s", i.Subject) } diff --git a/vendor/github.com/nats-io/jwt/v2/revocation_list.go b/vendor/github.com/nats-io/jwt/v2/revocation_list.go index 9de30b11..c582896b 100644 --- a/vendor/github.com/nats-io/jwt/v2/revocation_list.go +++ b/vendor/github.com/nats-io/jwt/v2/revocation_list.go @@ -19,20 +19,47 @@ import ( "time" ) +const All = "*" + // RevocationList is used to store a mapping of public keys to unix timestamps type RevocationList map[string]int64 +type RevocationEntry struct { + PublicKey string + TimeStamp int64 +} // Revoke enters a revocation by publickey and timestamp into this export // If there is already a revocation for this public key that is newer, it is kept. func (r RevocationList) Revoke(pubKey string, timestamp time.Time) { newTS := timestamp.Unix() + // cannot move a revocation into the future - only into the past if ts, ok := r[pubKey]; ok && ts > newTS { return } - r[pubKey] = newTS } +// MaybeCompact will compact the revocation list if jwt.All is found. Any +// revocation that is covered by a jwt.All revocation will be deleted, thus +// reducing the size of the JWT. Returns a slice of entries that were removed +// during the process. +func (r RevocationList) MaybeCompact() []RevocationEntry { + var deleted []RevocationEntry + ats, ok := r[All] + if ok { + for k, ts := range r { + if k != All && ats >= ts { + deleted = append(deleted, RevocationEntry{ + PublicKey: k, + TimeStamp: ts, + }) + delete(r, k) + } + } + } + return deleted +} + // ClearRevocation removes any revocation for the public key func (r RevocationList) ClearRevocation(pubKey string) { delete(r, pubKey) @@ -42,6 +69,16 @@ func (r RevocationList) ClearRevocation(pubKey string) { // the one passed in. Generally this method is called with an issue time but other time's can // be used for testing. func (r RevocationList) IsRevoked(pubKey string, timestamp time.Time) bool { + if r.allRevoked(timestamp) { + return true + } ts, ok := r[pubKey] return ok && ts >= timestamp.Unix() } + +// allRevoked returns true if All is set and the timestamp is later or same as the +// one passed. This is called by IsRevoked. +func (r RevocationList) allRevoked(timestamp time.Time) bool { + ts, ok := r[All] + return ok && ts >= timestamp.Unix() +} diff --git a/vendor/github.com/nats-io/jwt/v2/types.go b/vendor/github.com/nats-io/jwt/v2/types.go index 34a3209d..78dc9cf9 100644 --- a/vendor/github.com/nats-io/jwt/v2/types.go +++ b/vendor/github.com/nats-io/jwt/v2/types.go @@ -19,10 +19,36 @@ import ( "encoding/json" "fmt" "net" + "net/url" "strings" "time" ) +const MaxInfoLength = 255 + +type Info struct { + Description string `json:"description,omitempty"` + InfoURL string `json:"info_url,omitempty"` +} + +func (s Info) Validate(vr *ValidationResults) { + if len(s.Description) > MaxInfoLength { + vr.AddError("Description is too long") + } + if s.InfoURL != "" { + if len(s.InfoURL) > MaxInfoLength { + vr.AddError("Info URL is too long") + } + u, err := url.Parse(s.InfoURL) + if err == nil && (u.Hostname() == "" || u.Scheme == "") { + err = fmt.Errorf("no hostname or scheme") + } + if err != nil { + vr.AddError("error parsing info url: %v", err) + } + } +} + // ExportType defines the type of import/export. type ExportType int diff --git a/vendor/modules.txt b/vendor/modules.txt index 0eb4b056..c2c76320 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,6 +1,6 @@ # github.com/minio/highwayhash v1.0.0 github.com/minio/highwayhash -# github.com/nats-io/jwt/v2 v2.0.0-20201015190852-e11ce317263c +# github.com/nats-io/jwt/v2 v2.0.0-20201211164018-2e78446f4e6f github.com/nats-io/jwt/v2 # github.com/nats-io/nats.go v1.10.1-0.20201021145452-94be476ad6e0 github.com/nats-io/nats.go