diff --git a/server/jetstream.go b/server/jetstream.go index 8dd92dee..a44aeba0 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1034,6 +1034,13 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { } } + // Collect consumers, do after all streams. + type ce struct { + mset *stream + odir string + } + var consumers []*ce + // Now recover the streams. fis, _ := ioutil.ReadDir(sdir) for _, fi := range fis { @@ -1109,13 +1116,17 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { // Now do the consumers. odir := path.Join(sdir, fi.Name(), consumerDir) - ofis, _ := ioutil.ReadDir(odir) + consumers = append(consumers, &ce{mset, odir}) + } + + for _, e := range consumers { + ofis, _ := ioutil.ReadDir(e.odir) if len(ofis) > 0 { - s.Noticef(" Recovering %d consumers for stream - %q", len(ofis), fi.Name()) + s.Noticef(" Recovering %d consumers for stream - %q", len(ofis), e.mset.name()) } for _, ofi := range ofis { - metafile := path.Join(odir, ofi.Name(), JetStreamMetaFile) - metasum := path.Join(odir, ofi.Name(), JetStreamMetaFileSum) + metafile := path.Join(e.odir, ofi.Name(), JetStreamMetaFile) + metasum := path.Join(e.odir, ofi.Name(), JetStreamMetaFileSum) if _, err := os.Stat(metafile); os.IsNotExist(err) { s.Warnf(" Missing consumer metafile %q", metafile) continue @@ -1131,10 +1142,10 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { } // Check if we are encrypted. - if key, err := ioutil.ReadFile(path.Join(odir, ofi.Name(), JetStreamMetaFileKey)); err == nil { + if key, err := ioutil.ReadFile(path.Join(e.odir, ofi.Name(), JetStreamMetaFileKey)); err == nil { s.Debugf(" Consumer metafile is encrypted, reading encrypted keyfile") // Decode the buffer before proceeding. - if buf, err = s.decryptMeta(key, buf, a.Name, fi.Name()+tsep+ofi.Name()); err != nil { + if buf, err = s.decryptMeta(key, buf, a.Name, e.mset.name()+tsep+ofi.Name()); err != nil { s.Warnf(" Error decrypting our consumer metafile: %v", err) continue } @@ -1151,7 +1162,7 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { // the consumer can reconnect. We will create it as a durable and switch it. cfg.ConsumerConfig.Durable = ofi.Name() } - obs, err := mset.addConsumer(&cfg.ConsumerConfig) + obs, err := e.mset.addConsumer(&cfg.ConsumerConfig) if err != nil { s.Warnf(" Error adding consumer: %v", err) continue diff --git a/server/jetstream_test.go b/server/jetstream_test.go index d8cfa0d5..17fec898 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -13101,7 +13101,7 @@ func TestJetStreamPullLargeBatchExpired(t *testing.T) { } } -func TestNegativeDupeWindow(t *testing.T) { +func TestJetStreamNegativeDupeWindow(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown() @@ -13134,6 +13134,69 @@ func TestNegativeDupeWindow(t *testing.T) { } } +// Issue #2551 +func TestJetStreamMirroredConsumerFailAfterRestart(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + config := s.JetStreamConfig() + if config != nil { + defer removeDir(t, config.StoreDir) + } + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "S1", + Storage: nats.FileStorage, + Subjects: []string{"foo", "bar", "baz"}, + }) + if err != nil { + t.Fatalf("create failed: %s", err) + } + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "M1", + Storage: nats.FileStorage, + Mirror: &nats.StreamSource{Name: "S1"}, + }) + if err != nil { + t.Fatalf("create failed: %s", err) + } + + _, err = js.AddConsumer("M1", &nats.ConsumerConfig{ + Durable: "C1", + FilterSubject: ">", + AckPolicy: nats.AckExplicitPolicy, + }) + if err != nil { + t.Fatalf("consumer create failed: %s", err) + } + + // Stop current + sd := s.JetStreamConfig().StoreDir + s.Shutdown() + s.WaitForShutdown() + + // Restart. + s = RunJetStreamServerOnPort(-1, sd) + defer s.Shutdown() + + nc, js = jsClientConnect(t, s) + defer nc.Close() + + _, err = js.StreamInfo("M1") + if err != nil { + t.Fatalf("%s did not exist after start: %s", "M1", err) + } + + _, err = js.ConsumerInfo("M1", "C1") + if err != nil { + t.Fatalf("C1 did not exist after start: %s", err) + } +} + /////////////////////////////////////////////////////////////////////////// // Simple JetStream Benchmarks ///////////////////////////////////////////////////////////////////////////