Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-04-09 14:22:26 -07:00
parent 4a30f593bd
commit 2d94caee87
2 changed files with 31 additions and 2 deletions

View File

@@ -125,7 +125,7 @@ func (a *Account) AddStreamWithStore(config *StreamConfig, fsConfig *FileStoreCo
}
fsCfg.StoreDir = storeDir
if err := mset.setupStore(fsCfg); err != nil {
mset.delete()
mset.Delete()
return nil, err
}
// Setup our internal send go routine.
@@ -133,7 +133,7 @@ func (a *Account) AddStreamWithStore(config *StreamConfig, fsConfig *FileStoreCo
// Setup subscriptions
if err := mset.subscribeToStream(); err != nil {
mset.delete()
mset.Delete()
return nil, err
}
@@ -380,6 +380,8 @@ func (mset *Stream) subscribeInternal(subject string, cb msgHandler) (*subscript
sub, err := c.processSub([]byte(subject+" "+strconv.Itoa(mset.sid)), false)
if err != nil {
return nil, err
} else if sub == nil {
return nil, fmt.Errorf("malformed subject")
}
c.mu.Lock()
sub.icb = cb

View File

@@ -468,6 +468,7 @@ func TestJetStreamAddStreamCanonicalNames(t *testing.T) {
}
expectErr(acc.AddStream(&server.StreamConfig{Name: "foo.bar"}))
expectErr(acc.AddStream(&server.StreamConfig{Name: "foo.bar."}))
expectErr(acc.AddStream(&server.StreamConfig{Name: "foo.*"}))
expectErr(acc.AddStream(&server.StreamConfig{Name: "foo.>"}))
expectErr(acc.AddStream(&server.StreamConfig{Name: "*"}))
@@ -475,6 +476,32 @@ func TestJetStreamAddStreamCanonicalNames(t *testing.T) {
expectErr(acc.AddStream(&server.StreamConfig{Name: "*>"}))
}
func TestJetStreamAddStreamBadSubjects(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
// Client for API requests.
nc := clientConnectToServer(t, s)
defer nc.Close()
expectAPIErr := func(cfg server.StreamConfig) {
t.Helper()
req, err := json.Marshal(cfg)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
resp, _ := nc.Request(fmt.Sprintf(server.JetStreamCreateStreamT, cfg.Name), req, time.Second)
if string(resp.Data) != "-ERR 'malformed subject'" {
t.Fatalf("Did not get proper err response: %q", resp.Data)
}
}
expectAPIErr(server.StreamConfig{Name: "S", Subjects: []string{"foo.bar."}})
expectAPIErr(server.StreamConfig{Name: "S", Subjects: []string{".."}})
expectAPIErr(server.StreamConfig{Name: "S", Subjects: []string{".*"}})
expectAPIErr(server.StreamConfig{Name: "S", Subjects: []string{".>"}})
}
func TestJetStreamAddStreamOverlappingSubjects(t *testing.T) {
mconfig := &server.StreamConfig{
Name: "ok",