Fix for panic on #1159, do not allow wildcards in stream import prefix

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2019-10-16 10:06:39 -07:00
parent 116ae2a9c2
commit 9192a1f43c
3 changed files with 67 additions and 4 deletions

View File

@@ -1078,16 +1078,25 @@ func (a *Account) AddStreamImportWithClaim(account *Account, from, prefix string
return ErrStreamImportAuthorization
}
// Check prefix if it exists and make sure its a literal.
// Append token separator if not already present.
if prefix != "" {
// Make sure there are no wildcards here, this prefix needs to be a literal
// since it will be prepended to a publish subject.
if !subjectIsLiteral(prefix) {
return ErrStreamImportBadPrefix
}
if prefix[len(prefix)-1] != btsep {
prefix = prefix + string(btsep)
}
}
a.mu.Lock()
defer a.mu.Unlock()
if a.imports.streams == nil {
a.imports.streams = make(map[string]*streamImport)
}
if prefix != "" && prefix[len(prefix)-1] != btsep {
prefix = prefix + string(btsep)
}
// TODO(dlc) - collisions, etc.
a.imports.streams[from] = &streamImport{account, from, prefix, imClaim, false}
a.mu.Unlock()
return nil
}

View File

@@ -698,6 +698,57 @@ func TestSimpleMapping(t *testing.T) {
}
}
// https://github.com/nats-io/nats-server/issues/1159
func TestStreamImportLengthBug(t *testing.T) {
s, fooAcc, barAcc := simpleAccountServer(t)
defer s.Shutdown()
cfoo, _, _ := newClientForServer(s)
defer cfoo.nc.Close()
if err := cfoo.registerWithAccount(fooAcc); err != nil {
t.Fatalf("Error registering client with 'foo' account: %v", err)
}
cbar, _, _ := newClientForServer(s)
defer cbar.nc.Close()
if err := cbar.registerWithAccount(barAcc); err != nil {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}
if err := cfoo.acc.AddStreamExport("client.>", nil); err != nil {
t.Fatalf("Error adding account export to client foo: %v", err)
}
if err := cbar.acc.AddStreamImport(fooAcc, "client.>", "events.>"); err == nil {
t.Fatalf("Expected an error when using a stream import prefix with a wildcard")
}
if err := cbar.acc.AddStreamImport(fooAcc, "client.>", "events"); err != nil {
t.Fatalf("Error adding account import to client bar: %v", err)
}
if err := cbar.parse([]byte("SUB events.> 1\r\n")); err != nil {
t.Fatalf("Error for client 'bar' from server: %v", err)
}
// Also make sure that we will get an error from a config version.
// JWT will be updated separately.
cf := createConfFile(t, []byte(`
accounts {
foo {
exports = [{stream: "client.>"}]
}
bar {
imports = [{stream: {account: "foo", subject:"client.>"}, prefix:"events.>"}]
}
}
`))
defer os.Remove(cf)
if _, err := ProcessConfigFile(cf); err == nil {
t.Fatalf("Expected an error with import with wildcard prefix")
}
}
func TestShadowSubsCleanupOnClientClose(t *testing.T) {
s, fooAcc, barAcc := simpleAccountServer(t)
defer s.Shutdown()

View File

@@ -106,6 +106,9 @@ var (
// ErrStreamImportAuthorization is returned when a stream import is not authorized.
ErrStreamImportAuthorization = errors.New("stream import not authorized")
// ErrStreamImportBadPrefix is returned when a stream import prefix contains wildcards.
ErrStreamImportBadPrefix = errors.New("stream import prefix can not contain wildcard tokens")
// ErrServiceImportAuthorization is returned when a service import is not authorized.
ErrServiceImportAuthorization = errors.New("service import not authorized")