Should allow multiple stream imports on same subject

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2019-12-14 17:06:14 -08:00
parent 547dbe7400
commit a2ebf08593
4 changed files with 176 additions and 22 deletions

View File

@@ -179,7 +179,7 @@ type exportMap struct {
// importMap tracks the imported streams and services.
type importMap struct {
streams map[string]*streamImport
streams []*streamImport
services map[string]*serviceImport // TODO(dlc) sync.Map may be better.
}
@@ -642,8 +642,7 @@ func (a *Account) IsExportServiceTracking(service string) bool {
a.mu.RUnlock()
return true
}
// FIXME(dlc) - Might want to cache this is in the hot path checking for
// latency tracking.
// FIXME(dlc) - Might want to cache this is in the hot path checking for latency tracking.
tokens := strings.Split(service, tsep)
for subj, ea := range a.exports.services {
if isSubsetMatch(tokens, subj) && ea != nil && ea.latency != nil {
@@ -1200,15 +1199,26 @@ func (a *Account) AddStreamImportWithClaim(account *Account, from, prefix string
}
}
a.mu.Lock()
if a.imports.streams == nil {
a.imports.streams = make(map[string]*streamImport)
if a.isStreamImportDuplicate(account, from) {
a.mu.Unlock()
return ErrStreamImportDuplicate
}
// TODO(dlc) - collisions, etc.
a.imports.streams[from] = &streamImport{account, from, prefix, imClaim, false}
a.imports.streams = append(a.imports.streams, &streamImport{account, from, prefix, imClaim, false})
a.mu.Unlock()
return nil
}
// isStreamImportDuplicate checks for duplicate.
// Lock should be held.
func (a *Account) isStreamImportDuplicate(acc *Account, from string) bool {
for _, si := range a.imports.streams {
if si.acc == acc && si.from == from {
return true
}
}
return false
}
// AddStreamImport will add in the stream import from a specific account.
func (a *Account) AddStreamImport(account *Account, from, prefix string) error {
return a.AddStreamImportWithClaim(account, from, prefix, nil)
@@ -1382,13 +1392,19 @@ func fetchActivation(url string) string {
}
// These are import stream specific versions for when an activation expires.
func (a *Account) streamActivationExpired(subject string) {
func (a *Account) streamActivationExpired(exportAcc *Account, subject string) {
a.mu.RLock()
if a.expired || a.imports.streams == nil {
a.mu.RUnlock()
return
}
si := a.imports.streams[subject]
var si *streamImport
for _, si = range a.imports.streams {
if si.acc == exportAcc && si.from == subject {
break
}
}
if si == nil || si.invalid {
a.mu.RUnlock()
return
@@ -1414,7 +1430,7 @@ func (a *Account) streamActivationExpired(subject string) {
}
// These are import service specific versions for when an activation expires.
func (a *Account) serviceActivationExpired(subject string) {
func (a *Account) serviceActivationExpired(exportAcc *Account, subject string) {
a.mu.RLock()
if a.expired || a.imports.services == nil {
a.mu.RUnlock()
@@ -1439,17 +1455,17 @@ func (a *Account) serviceActivationExpired(subject string) {
// Fires for expired activation tokens. We could track this with timers etc.
// Instead we just re-analyze where we are and if we need to act.
func (a *Account) activationExpired(subject string, kind jwt.ExportType) {
func (a *Account) activationExpired(exportAcc *Account, subject string, kind jwt.ExportType) {
switch kind {
case jwt.Stream:
a.streamActivationExpired(subject)
a.streamActivationExpired(exportAcc, subject)
case jwt.Service:
a.serviceActivationExpired(subject)
a.serviceActivationExpired(exportAcc, subject)
}
}
// checkActivation will check the activation token for validity.
func (a *Account) checkActivation(acc *Account, claim *jwt.Import, expTimer bool) bool {
func (a *Account) checkActivation(importAcc *Account, claim *jwt.Import, expTimer bool) bool {
if claim == nil || claim.Token == "" {
return false
}
@@ -1485,7 +1501,7 @@ func (a *Account) checkActivation(acc *Account, claim *jwt.Import, expTimer bool
if expTimer {
expiresAt := time.Duration(act.Expires - tn)
time.AfterFunc(expiresAt*time.Second, func() {
acc.activationExpired(string(act.ImportSubject), claim.Type)
importAcc.activationExpired(a, string(act.ImportSubject), claim.Type)
})
}
}
@@ -1744,15 +1760,12 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {
// Imports are checked unlocked in processInbound, so we can't change out the struct here. Need to process inline.
if a.imports.streams != nil {
old.imports.streams = make(map[string]*streamImport, len(a.imports.streams))
old.imports.streams = a.imports.streams
a.imports.streams = nil
}
if a.imports.services != nil {
old.imports.services = make(map[string]*serviceImport, len(a.imports.services))
}
for k, v := range a.imports.streams {
old.imports.streams[k] = v
delete(a.imports.streams, k)
}
for k, v := range a.imports.services {
old.imports.services[k] = v
delete(a.imports.services, k)

View File

@@ -2180,6 +2180,144 @@ func TestAccountDuplicateServiceImportSubject(t *testing.T) {
}
}
func TestMultipleStreamImportsWithSameSubjectDifferentPrefix(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()
fooAcc, _ := s.RegisterAccount("foo")
fooAcc.AddStreamExport("test", nil)
barAcc, _ := s.RegisterAccount("bar")
barAcc.AddStreamExport("test", nil)
importAcc, _ := s.RegisterAccount("import")
if err := importAcc.AddStreamImport(fooAcc, "test", "foo"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if err := importAcc.AddStreamImport(barAcc, "test", "bar"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Now make sure we can see messages from both.
cimport, crImport, _ := newClientForServer(s)
defer cimport.close()
if err := cimport.registerWithAccount(importAcc); err != nil {
t.Fatalf("Error registering client with 'import' account: %v", err)
}
if err := cimport.parse([]byte("SUB *.test 1\r\n")); err != nil {
t.Fatalf("Error for client 'import' from server: %v", err)
}
cfoo, _, _ := newClientForServer(s)
defer cfoo.close()
if err := cfoo.registerWithAccount(fooAcc); err != nil {
t.Fatalf("Error registering client with 'foo' account: %v", err)
}
cbar, _, _ := newClientForServer(s)
defer cbar.close()
if err := cbar.registerWithAccount(barAcc); err != nil {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
readMsg := func() {
t.Helper()
l, err := crImport.ReadString('\n')
if err != nil {
t.Fatalf("Error reading msg header from client 'import': %v", err)
}
mraw := msgPat.FindAllStringSubmatch(l, -1)
if len(mraw) == 0 {
t.Fatalf("No message received")
}
// Consume msg body too.
if _, err = crImport.ReadString('\n'); err != nil {
t.Fatalf("Error reading msg body from client 'import': %v", err)
}
}
cbar.parseAsync("PUB test 9\r\nhello-bar\r\n")
readMsg()
cfoo.parseAsync("PUB test 9\r\nhello-foo\r\n")
readMsg()
}
// This should work with prefixes that are different but we also want it to just work with same subject
// being imported from multiple accounts.
func TestMultipleStreamImportsWithSameSubject(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()
fooAcc, _ := s.RegisterAccount("foo")
fooAcc.AddStreamExport("test", nil)
barAcc, _ := s.RegisterAccount("bar")
barAcc.AddStreamExport("test", nil)
importAcc, _ := s.RegisterAccount("import")
if err := importAcc.AddStreamImport(fooAcc, "test", ""); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Since we allow this now, make sure we do detect a duplicate import from same account etc.
// That should be not allowed.
if err := importAcc.AddStreamImport(fooAcc, "test", ""); err != ErrStreamImportDuplicate {
t.Fatalf("Expected ErrStreamImportDuplicate but got %v", err)
}
if err := importAcc.AddStreamImport(barAcc, "test", ""); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Now make sure we can see messages from both.
cimport, crImport, _ := newClientForServer(s)
defer cimport.close()
if err := cimport.registerWithAccount(importAcc); err != nil {
t.Fatalf("Error registering client with 'import' account: %v", err)
}
if err := cimport.parse([]byte("SUB test 1\r\n")); err != nil {
t.Fatalf("Error for client 'import' from server: %v", err)
}
cfoo, _, _ := newClientForServer(s)
defer cfoo.close()
if err := cfoo.registerWithAccount(fooAcc); err != nil {
t.Fatalf("Error registering client with 'foo' account: %v", err)
}
cbar, _, _ := newClientForServer(s)
defer cbar.close()
if err := cbar.registerWithAccount(barAcc); err != nil {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
readMsg := func() {
t.Helper()
l, err := crImport.ReadString('\n')
if err != nil {
t.Fatalf("Error reading msg header from client 'import': %v", err)
}
mraw := msgPat.FindAllStringSubmatch(l, -1)
if len(mraw) == 0 {
t.Fatalf("No message received")
}
// Consume msg body too.
if _, err = crImport.ReadString('\n'); err != nil {
t.Fatalf("Error reading msg body from client 'import': %v", err)
}
}
cbar.parseAsync("PUB test 9\r\nhello-bar\r\n")
readMsg()
cfoo.parseAsync("PUB test 9\r\nhello-foo\r\n")
readMsg()
}
func BenchmarkNewRouteReply(b *testing.B) {
opts := defaultServerOptions
s := New(&opts)

View File

@@ -1891,8 +1891,8 @@ func (c *client) processSub(argo []byte, noForward bool) (*subscription, error)
}
// If the client's account has stream imports and there are matches for
// this subscription's subject, then add shadow subscriptions in
// other accounts that can export this subject.
// this subscription's subject, then add shadow subscriptions in the
// other accounts that export this subject.
func (c *client) addShadowSubscriptions(acc *Account, sub *subscription) error {
if acc == nil {
return ErrMissingAccount

View File

@@ -109,6 +109,9 @@ var (
// ErrStreamImportBadPrefix is returned when a stream import prefix contains wildcards.
ErrStreamImportBadPrefix = errors.New("stream import prefix can not contain wildcard tokens")
// ErrStreamImportDuplicate is returned when a stream import is a duplicate of one that already exists.
ErrStreamImportDuplicate = errors.New("stream import already exists")
// ErrServiceImportAuthorization is returned when a service import is not authorized.
ErrServiceImportAuthorization = errors.New("service import not authorized")