mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
[added] support for jwt based account mappings (#1897)
support for jwt based account mappings Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
2
go.mod
2
go.mod
@@ -5,7 +5,7 @@ go 1.15
|
||||
require (
|
||||
github.com/klauspost/compress v1.11.7
|
||||
github.com/minio/highwayhash v1.0.0
|
||||
github.com/nats-io/jwt/v2 v2.0.0-20210125223648-1c24d462becc
|
||||
github.com/nats-io/jwt/v2 v2.0.0-20210208203759-ff814ca5f813
|
||||
github.com/nats-io/nats.go v1.10.1-0.20210127212649-5b4924938a9a
|
||||
github.com/nats-io/nkeys v0.2.0
|
||||
github.com/nats-io/nuid v1.0.1
|
||||
|
||||
4
go.sum
4
go.sum
@@ -19,8 +19,8 @@ github.com/nats-io/jwt v1.1.0 h1:+vOlgtM0ZsF46GbmUoadq0/2rChNS45gtxHEa3H1gqM=
|
||||
github.com/nats-io/jwt v1.1.0/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M=
|
||||
github.com/nats-io/jwt/v2 v2.0.0-20200916203241-1f8ce17dff02/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ=
|
||||
github.com/nats-io/jwt/v2 v2.0.0-20201015190852-e11ce317263c/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ=
|
||||
github.com/nats-io/jwt/v2 v2.0.0-20210125223648-1c24d462becc h1:pu+s4XC+bYnI0iD2vDtOl83zjCYUau/q6c83pEvsGZc=
|
||||
github.com/nats-io/jwt/v2 v2.0.0-20210125223648-1c24d462becc/go.mod h1:PuO5FToRL31ecdFqVjc794vK0Bj0CwzveQEDvkb7MoQ=
|
||||
github.com/nats-io/jwt/v2 v2.0.0-20210208203759-ff814ca5f813 h1:km4lLzT86NyJRhO++VqfP/vn5cbfm+E05i2bGdqDbrY=
|
||||
github.com/nats-io/jwt/v2 v2.0.0-20210208203759-ff814ca5f813/go.mod h1:PuO5FToRL31ecdFqVjc794vK0Bj0CwzveQEDvkb7MoQ=
|
||||
github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093/go.mod h1:rQnBf2Rv4P9adtAs/Ti6LfFmVtFG6HLhl/H7cVshcJU=
|
||||
github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71/go.mod h1:Nan/1L5Sa1JRW+Thm4HNYcIDcVRFc5zK9OpSZeI2kk4=
|
||||
github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad/go.mod h1:TkHpUIDETmTI7mrHN40D1pzxfzHZuGmtMbtb83TGVQw=
|
||||
|
||||
@@ -506,9 +506,9 @@ func (a *Account) TotalSubs() int {
|
||||
|
||||
// MapDest is for mapping published subjects for clients.
|
||||
type MapDest struct {
|
||||
Subject string `json:"subject"`
|
||||
Weight uint8 `json:"weight"`
|
||||
OptCluster string `json:"cluster,omitempty"`
|
||||
Subject string `json:"subject"`
|
||||
Weight uint8 `json:"weight"`
|
||||
Cluster string `json:"cluster,omitempty"`
|
||||
}
|
||||
|
||||
func NewMapDest(subject string, weight uint8) *MapDest {
|
||||
@@ -573,16 +573,16 @@ func (a *Account) AddWeightedMappings(src string, dests ...*MapDest) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if d.OptCluster == "" {
|
||||
if d.Cluster == "" {
|
||||
m.dests = append(m.dests, &destination{tr, d.Weight})
|
||||
} else {
|
||||
// We have a cluster scoped filter.
|
||||
if m.cdests == nil {
|
||||
m.cdests = make(map[string][]*destination)
|
||||
}
|
||||
ad := m.cdests[d.OptCluster]
|
||||
ad := m.cdests[d.Cluster]
|
||||
ad = append(ad, &destination{tr, d.Weight})
|
||||
m.cdests[d.OptCluster] = ad
|
||||
m.cdests[d.Cluster] = ad
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2871,8 +2871,32 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
|
||||
alteredScope[k] = struct{}{}
|
||||
}
|
||||
}
|
||||
// collect mappings that need to be removed
|
||||
removeList := []string{}
|
||||
for _, m := range a.mappings {
|
||||
if _, ok := ac.Mappings[jwt.Subject(m.src)]; !ok {
|
||||
removeList = append(removeList, m.src)
|
||||
}
|
||||
}
|
||||
a.mu.Unlock()
|
||||
|
||||
for sub, wm := range ac.Mappings {
|
||||
mappings := make([]*MapDest, len(wm))
|
||||
for i, m := range wm {
|
||||
mappings[i] = &MapDest{
|
||||
Subject: string(m.Subject),
|
||||
Weight: m.GetWeight(),
|
||||
Cluster: m.Cluster,
|
||||
}
|
||||
}
|
||||
// This will overwrite existing entries
|
||||
a.AddWeightedMappings(string(sub), mappings...)
|
||||
}
|
||||
// remove mappings
|
||||
for _, rmMapping := range removeList {
|
||||
a.RemoveMapping(rmMapping)
|
||||
}
|
||||
|
||||
gatherClients := func() []*client {
|
||||
a.mu.RLock()
|
||||
clients := make([]*client, 0, len(a.clients))
|
||||
|
||||
@@ -5429,3 +5429,66 @@ func TestJWTAccountProtectedImport(t *testing.T) {
|
||||
require_True(t, len(msgChan) == 0)
|
||||
})
|
||||
}
|
||||
|
||||
func TestJWTMappings(t *testing.T) {
|
||||
sysKp, syspub := createKey(t)
|
||||
sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub)
|
||||
sysCreds := newUser(t, sysKp)
|
||||
defer os.Remove(sysCreds)
|
||||
|
||||
// create two jwt, one with and one without mapping
|
||||
aKp, aPub := createKey(t)
|
||||
aClaim := jwt.NewAccountClaims(aPub)
|
||||
aJwtNoM := encodeClaim(t, aClaim, aPub)
|
||||
aClaim.AddMapping("foo1", jwt.WeightedMapping{Subject: "bar1"})
|
||||
aJwtMap1 := encodeClaim(t, aClaim, aPub)
|
||||
|
||||
aClaim.Mappings = map[jwt.Subject][]jwt.WeightedMapping{}
|
||||
aClaim.AddMapping("foo2", jwt.WeightedMapping{Subject: "bar2"})
|
||||
aJwtMap2 := encodeClaim(t, aClaim, aPub)
|
||||
|
||||
dirSrv := createDir(t, "srv")
|
||||
defer os.RemoveAll(dirSrv)
|
||||
conf := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: -1
|
||||
operator: %s
|
||||
system_account: %s
|
||||
resolver: {
|
||||
type: full
|
||||
dir: %s
|
||||
}
|
||||
`, ojwt, syspub, dirSrv)))
|
||||
defer os.Remove(conf)
|
||||
srv, _ := RunServerWithConfig(conf)
|
||||
defer srv.Shutdown()
|
||||
updateJwt(t, srv.ClientURL(), sysCreds, sysJwt, 1) // update system account jwt
|
||||
|
||||
test := func(pub, sub string, fail bool) {
|
||||
t.Helper()
|
||||
nc := natsConnect(t, srv.ClientURL(), createUserCreds(t, srv, aKp))
|
||||
defer nc.Close()
|
||||
s, err := nc.SubscribeSync(sub)
|
||||
require_NoError(t, err)
|
||||
nc.Flush()
|
||||
err = nc.Publish(pub, nil)
|
||||
require_NoError(t, err)
|
||||
_, err = s.NextMsg(500 * time.Millisecond)
|
||||
switch {
|
||||
case fail && err == nil:
|
||||
t.Fatal("expected error, got none")
|
||||
case !fail && err != nil:
|
||||
t.Fatalf("expected no error, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// turn mappings on
|
||||
require_Len(t, 1, updateJwt(t, srv.ClientURL(), sysCreds, aJwtMap1, 1))
|
||||
test("foo1", "bar1", false)
|
||||
// alter mappings
|
||||
require_Len(t, 1, updateJwt(t, srv.ClientURL(), sysCreds, aJwtMap2, 1))
|
||||
test("foo1", "bar1", true)
|
||||
test("foo2", "bar2", false)
|
||||
// turn mappings off
|
||||
require_Len(t, 1, updateJwt(t, srv.ClientURL(), sysCreds, aJwtNoM, 1))
|
||||
test("foo2", "bar2", true)
|
||||
}
|
||||
|
||||
@@ -2018,7 +2018,7 @@ func parseAccountMapDest(v interface{}, tk token, errors *[]error, warnings *[]e
|
||||
return nil, err
|
||||
}
|
||||
case "cluster":
|
||||
mdest.OptCluster = dmv.(string)
|
||||
mdest.Cluster = dmv.(string)
|
||||
default:
|
||||
err := &configErr{tk, fmt.Sprintf("Unknown field %q for mapping destination", k)}
|
||||
*errors = append(*errors, err)
|
||||
|
||||
41
vendor/github.com/nats-io/jwt/v2/account_claims.go
generated
vendored
41
vendor/github.com/nats-io/jwt/v2/account_claims.go
generated
vendored
@@ -84,6 +84,44 @@ func (o *OperatorLimits) Validate(_ *ValidationResults) {
|
||||
// negative values mean unlimited, so all numbers are valid
|
||||
}
|
||||
|
||||
// Mapping for publishes
|
||||
type WeightedMapping struct {
|
||||
Subject Subject `json:"subject"`
|
||||
Weight uint8 `json:"weight,omitempty"`
|
||||
Cluster string `json:"cluster,omitempty"`
|
||||
}
|
||||
|
||||
func (m *WeightedMapping) GetWeight() uint8 {
|
||||
if m.Weight == 0 {
|
||||
return 100
|
||||
}
|
||||
return m.Weight
|
||||
}
|
||||
|
||||
type Mapping map[Subject][]WeightedMapping
|
||||
|
||||
func (m *Mapping) Validate(vr *ValidationResults) {
|
||||
for ubFrom, wm := range (map[Subject][]WeightedMapping)(*m) {
|
||||
ubFrom.Validate(vr)
|
||||
total := uint8(0)
|
||||
for _, wm := range wm {
|
||||
wm.Subject.Validate(vr)
|
||||
if wm.Subject.HasWildCards() {
|
||||
vr.AddError("Subject %q in weighted mapping %q is not allowed to contains wildcard",
|
||||
string(wm.Subject), ubFrom)
|
||||
}
|
||||
total += wm.GetWeight()
|
||||
}
|
||||
if total > 100 {
|
||||
vr.AddError("Mapping %q exceeds 100%% among all of it's weighted to mappings", ubFrom)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Account) AddMapping(sub Subject, to ...WeightedMapping) {
|
||||
a.Mappings[sub] = to
|
||||
}
|
||||
|
||||
// Account holds account specific claims data
|
||||
type Account struct {
|
||||
Imports Imports `json:"imports,omitempty"`
|
||||
@@ -92,6 +130,7 @@ type Account struct {
|
||||
SigningKeys SigningKeys `json:"signing_keys,omitempty"`
|
||||
Revocations RevocationList `json:"revocations,omitempty"`
|
||||
DefaultPermissions Permissions `json:"default_permissions,omitempty"`
|
||||
Mappings Mapping `json:"mappings,omitempty"`
|
||||
Info
|
||||
GenericFields
|
||||
}
|
||||
@@ -102,6 +141,7 @@ func (a *Account) Validate(acct *AccountClaims, vr *ValidationResults) {
|
||||
a.Exports.Validate(vr)
|
||||
a.Limits.Validate(vr)
|
||||
a.DefaultPermissions.Validate(vr)
|
||||
a.Mappings.Validate(vr)
|
||||
|
||||
if !a.Limits.IsEmpty() && a.Limits.Imports >= 0 && int64(len(a.Imports)) > a.Limits.Imports {
|
||||
vr.AddError("the account contains more imports than allowed by the operator")
|
||||
@@ -150,6 +190,7 @@ func NewAccountClaims(subject string) *AccountClaims {
|
||||
AccountLimits{NoLimit, NoLimit, true, NoLimit, NoLimit},
|
||||
JetStreamLimits{0, 0, 0, 0}}
|
||||
c.Subject = subject
|
||||
c.Mappings = Mapping{}
|
||||
return c
|
||||
}
|
||||
|
||||
|
||||
1
vendor/github.com/nats-io/jwt/v2/exports.go
generated
vendored
1
vendor/github.com/nats-io/jwt/v2/exports.go
generated
vendored
@@ -118,6 +118,7 @@ type Export struct {
|
||||
ResponseThreshold time.Duration `json:"response_threshold,omitempty"`
|
||||
Latency *ServiceLatency `json:"service_latency,omitempty"`
|
||||
AccountTokenPosition uint `json:"account_token_position,omitempty"`
|
||||
Advertise bool `json:"advertise,omitempty"`
|
||||
Info
|
||||
}
|
||||
|
||||
|
||||
37
vendor/github.com/nats-io/jwt/v2/imports.go
generated
vendored
37
vendor/github.com/nats-io/jwt/v2/imports.go
generated
vendored
@@ -15,13 +15,6 @@
|
||||
|
||||
package jwt
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Import describes a mapping from another account into this one
|
||||
type Import struct {
|
||||
Name string `json:"name,omitempty"`
|
||||
@@ -92,32 +85,10 @@ func (i *Import) Validate(actPubKey string, vr *ValidationResults) {
|
||||
var act *ActivationClaims
|
||||
|
||||
if i.Token != "" {
|
||||
// Check to see if its an embedded JWT or a URL.
|
||||
if u, err := url.Parse(i.Token); err == nil && u.Scheme != "" {
|
||||
c := &http.Client{Timeout: 5 * time.Second}
|
||||
resp, err := c.Get(u.String())
|
||||
if err != nil {
|
||||
vr.AddWarning("import %s contains an unreachable token URL %q", i.Subject, i.Token)
|
||||
}
|
||||
|
||||
if resp != nil {
|
||||
defer resp.Body.Close()
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
vr.AddWarning("import %s contains an unreadable token URL %q", i.Subject, i.Token)
|
||||
} else {
|
||||
act, err = DecodeActivationClaims(string(body))
|
||||
if err != nil {
|
||||
vr.AddWarning("import %s contains a URL %q with an invalid activation token", i.Subject, i.Token)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
var err error
|
||||
act, err = DecodeActivationClaims(i.Token)
|
||||
if err != nil {
|
||||
vr.AddWarning("import %q contains an invalid activation token", i.Subject)
|
||||
}
|
||||
var err error
|
||||
act, err = DecodeActivationClaims(i.Token)
|
||||
if err != nil {
|
||||
vr.AddWarning("import %q contains an invalid activation token", i.Subject)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
6
vendor/github.com/nats-io/jwt/v2/operator_claims.go
generated
vendored
6
vendor/github.com/nats-io/jwt/v2/operator_claims.go
generated
vendored
@@ -166,6 +166,7 @@ func NewOperatorClaims(subject string) *OperatorClaims {
|
||||
}
|
||||
c := &OperatorClaims{}
|
||||
c.Subject = subject
|
||||
c.Issuer = subject
|
||||
return c
|
||||
}
|
||||
|
||||
@@ -176,7 +177,10 @@ func (oc *OperatorClaims) DidSign(op Claims) bool {
|
||||
}
|
||||
issuer := op.Claims().Issuer
|
||||
if issuer == oc.Subject {
|
||||
return !oc.StrictSigningKeyUsage
|
||||
if !oc.StrictSigningKeyUsage {
|
||||
return true
|
||||
}
|
||||
return op.Claims().Subject == oc.Subject
|
||||
}
|
||||
return oc.SigningKeys.Contains(issuer)
|
||||
}
|
||||
|
||||
2
vendor/github.com/nats-io/jwt/v2/types.go
generated
vendored
2
vendor/github.com/nats-io/jwt/v2/types.go
generated
vendored
@@ -26,7 +26,7 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
const MaxInfoLength = 255
|
||||
const MaxInfoLength = 8 * 1024
|
||||
|
||||
type Info struct {
|
||||
Description string `json:"description,omitempty"`
|
||||
|
||||
2
vendor/modules.txt
vendored
2
vendor/modules.txt
vendored
@@ -4,7 +4,7 @@ github.com/klauspost/compress/s2
|
||||
# github.com/minio/highwayhash v1.0.0
|
||||
## explicit
|
||||
github.com/minio/highwayhash
|
||||
# github.com/nats-io/jwt/v2 v2.0.0-20210125223648-1c24d462becc
|
||||
# github.com/nats-io/jwt/v2 v2.0.0-20210208203759-ff814ca5f813
|
||||
## explicit
|
||||
github.com/nats-io/jwt/v2
|
||||
# github.com/nats-io/nats.go v1.10.1-0.20210127212649-5b4924938a9a
|
||||
|
||||
Reference in New Issue
Block a user