There were situations where invalid subjects could be assigned to streams.

This will patch them on the fly during recovery. Specifically subjects with leading or trailing spaces and mirror streams with any subjects at all.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-12-01 12:21:30 -08:00
committed by Ivan Kozlovic
parent 1cf8b40304
commit ca12a11be3
3 changed files with 108 additions and 2 deletions

View File

@@ -1115,6 +1115,33 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error {
s.Warnf(" Error adding stream %q to template %q: %v", cfg.Name, cfg.Template, err)
}
}
// We had a bug that could allow subjects in that had prefix or suffix spaces. We check for that here
// and will patch them on the fly for now. We will warn about them.
var hadSubjErr bool
for i, subj := range cfg.StreamConfig.Subjects {
if !IsValidSubject(subj) {
s.Warnf(" Detected bad subject %q while adding stream %q, will attempt to repair", subj, cfg.Name)
if nsubj := strings.TrimSpace(subj); IsValidSubject(nsubj) {
s.Warnf(" Bad subject %q repaired to %q", subj, nsubj)
cfg.StreamConfig.Subjects[i] = nsubj
} else {
s.Warnf(" Error recreating stream %q: %v", cfg.Name, "invalid subject")
hadSubjErr = true
break
}
}
}
if hadSubjErr {
continue
}
// The other possible bug is assigning subjects to mirrors, so check for that and patch as well.
if cfg.StreamConfig.Mirror != nil && len(cfg.StreamConfig.Subjects) > 0 {
cfg.StreamConfig.Subjects = nil
}
// Add in the stream.
mset, err := a.addStream(&cfg.StreamConfig)
if err != nil {
s.Warnf(" Error recreating stream %q: %v", cfg.Name, err)

View File

@@ -13700,6 +13700,85 @@ func TestJetStreamMemoryCorruption(t *testing.T) {
}
}
func TestJetStreamRecoverBadStreamSubjects(t *testing.T) {
s := RunBasicJetStreamServer()
config := s.JetStreamConfig()
if config != nil {
defer removeDir(t, config.StoreDir)
}
sd := config.StoreDir
s.Shutdown()
f := path.Join(sd, "$G", "streams", "TEST")
fs, err := newFileStore(FileStoreConfig{StoreDir: f}, StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar", " baz "}, // baz has spaces
Storage: FileStorage,
})
require_NoError(t, err)
fs.Stop()
s = RunJetStreamServerOnPort(-1, sd)
defer s.Shutdown()
nc, js := jsClientConnect(t, s)
defer nc.Close()
si, err := js.StreamInfo("TEST")
require_NoError(t, err)
if len(si.Config.Subjects) != 3 {
t.Fatalf("Expected to recover all subjects")
}
}
func TestJetStreamRecoverBadMirrorConfigWithSubjects(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
config := s.JetStreamConfig()
if config != nil {
defer removeDir(t, config.StoreDir)
}
sd := config.StoreDir
// Client for API requests.
nc, js := jsClientConnect(t, s)
defer nc.Close()
// Origin
_, err := js.AddStream(&nats.StreamConfig{
Name: "S",
Subjects: []string{"foo"},
})
require_NoError(t, err)
s.Shutdown()
f := path.Join(sd, "$G", "streams", "M")
fs, err := newFileStore(FileStoreConfig{StoreDir: f}, StreamConfig{
Name: "M",
Subjects: []string{"foo", "bar", "baz"}, // Mirrors should not have spaces.
Mirror: &StreamSource{Name: "S"},
Storage: FileStorage,
})
require_NoError(t, err)
fs.Stop()
s = RunJetStreamServerOnPort(-1, sd)
defer s.Shutdown()
nc, js = jsClientConnect(t, s)
defer nc.Close()
si, err := js.StreamInfo("M")
require_NoError(t, err)
if len(si.Config.Subjects) != 0 {
t.Fatalf("Expected to have NO subjects on mirror")
}
}
///////////////////////////////////////////////////////////////////////////
// Simple JetStream Benchmarks
///////////////////////////////////////////////////////////////////////////

View File

@@ -886,11 +886,11 @@ func checkStreamCfg(config *StreamConfig) (StreamConfig, error) {
if subjectIsSubsetMatch(subj, "$JS.API.>") {
return StreamConfig{}, fmt.Errorf("subjects overlap with jetstream api")
}
// Make sure the subject is valid.
if !IsValidSubject(subj) {
return StreamConfig{}, fmt.Errorf("invalid subject")
}
// Mark for duplicate check.
dset[subj] = struct{}{}
}
}