From 2d94caee87a26e131bfe2b96b94fab4a4c2f1b1a Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 9 Apr 2020 14:22:26 -0700 Subject: [PATCH] Fix for https://github.com/nats-io/jetstream/issues/144 Signed-off-by: Derek Collison --- server/stream.go | 6 ++++-- test/jetstream_test.go | 27 +++++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/server/stream.go b/server/stream.go index 90526d1d..b3b3de63 100644 --- a/server/stream.go +++ b/server/stream.go @@ -125,7 +125,7 @@ func (a *Account) AddStreamWithStore(config *StreamConfig, fsConfig *FileStoreCo } fsCfg.StoreDir = storeDir if err := mset.setupStore(fsCfg); err != nil { - mset.delete() + mset.Delete() return nil, err } // Setup our internal send go routine. @@ -133,7 +133,7 @@ func (a *Account) AddStreamWithStore(config *StreamConfig, fsConfig *FileStoreCo // Setup subscriptions if err := mset.subscribeToStream(); err != nil { - mset.delete() + mset.Delete() return nil, err } @@ -380,6 +380,8 @@ func (mset *Stream) subscribeInternal(subject string, cb msgHandler) (*subscript sub, err := c.processSub([]byte(subject+" "+strconv.Itoa(mset.sid)), false) if err != nil { return nil, err + } else if sub == nil { + return nil, fmt.Errorf("malformed subject") } c.mu.Lock() sub.icb = cb diff --git a/test/jetstream_test.go b/test/jetstream_test.go index 3e532756..cf4f256e 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -468,6 +468,7 @@ func TestJetStreamAddStreamCanonicalNames(t *testing.T) { } expectErr(acc.AddStream(&server.StreamConfig{Name: "foo.bar"})) + expectErr(acc.AddStream(&server.StreamConfig{Name: "foo.bar."})) expectErr(acc.AddStream(&server.StreamConfig{Name: "foo.*"})) expectErr(acc.AddStream(&server.StreamConfig{Name: "foo.>"})) expectErr(acc.AddStream(&server.StreamConfig{Name: "*"})) @@ -475,6 +476,32 @@ func TestJetStreamAddStreamCanonicalNames(t *testing.T) { expectErr(acc.AddStream(&server.StreamConfig{Name: "*>"})) } +func TestJetStreamAddStreamBadSubjects(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + // Client for API requests. + nc := clientConnectToServer(t, s) + defer nc.Close() + + expectAPIErr := func(cfg server.StreamConfig) { + t.Helper() + req, err := json.Marshal(cfg) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + resp, _ := nc.Request(fmt.Sprintf(server.JetStreamCreateStreamT, cfg.Name), req, time.Second) + if string(resp.Data) != "-ERR 'malformed subject'" { + t.Fatalf("Did not get proper err response: %q", resp.Data) + } + } + + expectAPIErr(server.StreamConfig{Name: "S", Subjects: []string{"foo.bar."}}) + expectAPIErr(server.StreamConfig{Name: "S", Subjects: []string{".."}}) + expectAPIErr(server.StreamConfig{Name: "S", Subjects: []string{".*"}}) + expectAPIErr(server.StreamConfig{Name: "S", Subjects: []string{".>"}}) +} + func TestJetStreamAddStreamOverlappingSubjects(t *testing.T) { mconfig := &server.StreamConfig{ Name: "ok",