diff --git a/server/jetstream.go b/server/jetstream.go index 5181be3f..f394beb8 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1115,6 +1115,33 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { s.Warnf(" Error adding stream %q to template %q: %v", cfg.Name, cfg.Template, err) } } + + // We had a bug that could allow subjects in that had prefix or suffix spaces. We check for that here + // and will patch them on the fly for now. We will warn about them. + var hadSubjErr bool + for i, subj := range cfg.StreamConfig.Subjects { + if !IsValidSubject(subj) { + s.Warnf(" Detected bad subject %q while adding stream %q, will attempt to repair", subj, cfg.Name) + if nsubj := strings.TrimSpace(subj); IsValidSubject(nsubj) { + s.Warnf(" Bad subject %q repaired to %q", subj, nsubj) + cfg.StreamConfig.Subjects[i] = nsubj + } else { + s.Warnf(" Error recreating stream %q: %v", cfg.Name, "invalid subject") + hadSubjErr = true + break + } + } + } + if hadSubjErr { + continue + } + + // The other possible bug is assigning subjects to mirrors, so check for that and patch as well. + if cfg.StreamConfig.Mirror != nil && len(cfg.StreamConfig.Subjects) > 0 { + cfg.StreamConfig.Subjects = nil + } + + // Add in the stream. mset, err := a.addStream(&cfg.StreamConfig) if err != nil { s.Warnf(" Error recreating stream %q: %v", cfg.Name, err) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index fa5e191a..8d157caf 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -13700,6 +13700,85 @@ func TestJetStreamMemoryCorruption(t *testing.T) { } } +func TestJetStreamRecoverBadStreamSubjects(t *testing.T) { + s := RunBasicJetStreamServer() + config := s.JetStreamConfig() + if config != nil { + defer removeDir(t, config.StoreDir) + } + sd := config.StoreDir + s.Shutdown() + + f := path.Join(sd, "$G", "streams", "TEST") + fs, err := newFileStore(FileStoreConfig{StoreDir: f}, StreamConfig{ + Name: "TEST", + Subjects: []string{"foo", "bar", " baz "}, // baz has spaces + Storage: FileStorage, + }) + require_NoError(t, err) + fs.Stop() + + s = RunJetStreamServerOnPort(-1, sd) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + + if len(si.Config.Subjects) != 3 { + t.Fatalf("Expected to recover all subjects") + } +} + +func TestJetStreamRecoverBadMirrorConfigWithSubjects(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + config := s.JetStreamConfig() + if config != nil { + defer removeDir(t, config.StoreDir) + } + sd := config.StoreDir + + // Client for API requests. + nc, js := jsClientConnect(t, s) + defer nc.Close() + + // Origin + _, err := js.AddStream(&nats.StreamConfig{ + Name: "S", + Subjects: []string{"foo"}, + }) + require_NoError(t, err) + + s.Shutdown() + + f := path.Join(sd, "$G", "streams", "M") + fs, err := newFileStore(FileStoreConfig{StoreDir: f}, StreamConfig{ + Name: "M", + Subjects: []string{"foo", "bar", "baz"}, // Mirrors should not have spaces. + Mirror: &StreamSource{Name: "S"}, + Storage: FileStorage, + }) + require_NoError(t, err) + fs.Stop() + + s = RunJetStreamServerOnPort(-1, sd) + defer s.Shutdown() + + nc, js = jsClientConnect(t, s) + defer nc.Close() + + si, err := js.StreamInfo("M") + require_NoError(t, err) + + if len(si.Config.Subjects) != 0 { + t.Fatalf("Expected to have NO subjects on mirror") + } +} + /////////////////////////////////////////////////////////////////////////// // Simple JetStream Benchmarks /////////////////////////////////////////////////////////////////////////// diff --git a/server/stream.go b/server/stream.go index fd2ce0b2..a36c5078 100644 --- a/server/stream.go +++ b/server/stream.go @@ -886,11 +886,11 @@ func checkStreamCfg(config *StreamConfig) (StreamConfig, error) { if subjectIsSubsetMatch(subj, "$JS.API.>") { return StreamConfig{}, fmt.Errorf("subjects overlap with jetstream api") } - + // Make sure the subject is valid. if !IsValidSubject(subj) { return StreamConfig{}, fmt.Errorf("invalid subject") } - + // Mark for duplicate check. dset[subj] = struct{}{} } }