From ca12a11be36717e8f90fc174b0a7cc9fc04e52fc Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 1 Dec 2021 12:21:30 -0800 Subject: [PATCH] There were situations where invalid subjects could be assigned to streams. This will patch them on the fly during recovery. Specifically subjects with leading or trailing spaces and mirror streams with any subjects at all. Signed-off-by: Derek Collison --- server/jetstream.go | 27 ++++++++++++++ server/jetstream_test.go | 79 ++++++++++++++++++++++++++++++++++++++++ server/stream.go | 4 +- 3 files changed, 108 insertions(+), 2 deletions(-) diff --git a/server/jetstream.go b/server/jetstream.go index 5181be3f..f394beb8 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1115,6 +1115,33 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { s.Warnf(" Error adding stream %q to template %q: %v", cfg.Name, cfg.Template, err) } } + + // We had a bug that could allow subjects in that had prefix or suffix spaces. We check for that here + // and will patch them on the fly for now. We will warn about them. + var hadSubjErr bool + for i, subj := range cfg.StreamConfig.Subjects { + if !IsValidSubject(subj) { + s.Warnf(" Detected bad subject %q while adding stream %q, will attempt to repair", subj, cfg.Name) + if nsubj := strings.TrimSpace(subj); IsValidSubject(nsubj) { + s.Warnf(" Bad subject %q repaired to %q", subj, nsubj) + cfg.StreamConfig.Subjects[i] = nsubj + } else { + s.Warnf(" Error recreating stream %q: %v", cfg.Name, "invalid subject") + hadSubjErr = true + break + } + } + } + if hadSubjErr { + continue + } + + // The other possible bug is assigning subjects to mirrors, so check for that and patch as well. + if cfg.StreamConfig.Mirror != nil && len(cfg.StreamConfig.Subjects) > 0 { + cfg.StreamConfig.Subjects = nil + } + + // Add in the stream. mset, err := a.addStream(&cfg.StreamConfig) if err != nil { s.Warnf(" Error recreating stream %q: %v", cfg.Name, err) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index fa5e191a..8d157caf 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -13700,6 +13700,85 @@ func TestJetStreamMemoryCorruption(t *testing.T) { } } +func TestJetStreamRecoverBadStreamSubjects(t *testing.T) { + s := RunBasicJetStreamServer() + config := s.JetStreamConfig() + if config != nil { + defer removeDir(t, config.StoreDir) + } + sd := config.StoreDir + s.Shutdown() + + f := path.Join(sd, "$G", "streams", "TEST") + fs, err := newFileStore(FileStoreConfig{StoreDir: f}, StreamConfig{ + Name: "TEST", + Subjects: []string{"foo", "bar", " baz "}, // baz has spaces + Storage: FileStorage, + }) + require_NoError(t, err) + fs.Stop() + + s = RunJetStreamServerOnPort(-1, sd) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + + if len(si.Config.Subjects) != 3 { + t.Fatalf("Expected to recover all subjects") + } +} + +func TestJetStreamRecoverBadMirrorConfigWithSubjects(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + config := s.JetStreamConfig() + if config != nil { + defer removeDir(t, config.StoreDir) + } + sd := config.StoreDir + + // Client for API requests. + nc, js := jsClientConnect(t, s) + defer nc.Close() + + // Origin + _, err := js.AddStream(&nats.StreamConfig{ + Name: "S", + Subjects: []string{"foo"}, + }) + require_NoError(t, err) + + s.Shutdown() + + f := path.Join(sd, "$G", "streams", "M") + fs, err := newFileStore(FileStoreConfig{StoreDir: f}, StreamConfig{ + Name: "M", + Subjects: []string{"foo", "bar", "baz"}, // Mirrors should not have spaces. + Mirror: &StreamSource{Name: "S"}, + Storage: FileStorage, + }) + require_NoError(t, err) + fs.Stop() + + s = RunJetStreamServerOnPort(-1, sd) + defer s.Shutdown() + + nc, js = jsClientConnect(t, s) + defer nc.Close() + + si, err := js.StreamInfo("M") + require_NoError(t, err) + + if len(si.Config.Subjects) != 0 { + t.Fatalf("Expected to have NO subjects on mirror") + } +} + /////////////////////////////////////////////////////////////////////////// // Simple JetStream Benchmarks /////////////////////////////////////////////////////////////////////////// diff --git a/server/stream.go b/server/stream.go index fd2ce0b2..a36c5078 100644 --- a/server/stream.go +++ b/server/stream.go @@ -886,11 +886,11 @@ func checkStreamCfg(config *StreamConfig) (StreamConfig, error) { if subjectIsSubsetMatch(subj, "$JS.API.>") { return StreamConfig{}, fmt.Errorf("subjects overlap with jetstream api") } - + // Make sure the subject is valid. if !IsValidSubject(subj) { return StreamConfig{}, fmt.Errorf("invalid subject") } - + // Mark for duplicate check. dset[subj] = struct{}{} } }