From 9534372113f5e6cda0d39f3c54f74ccbbfb316ed Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 21 Sep 2021 08:53:12 -0700 Subject: [PATCH] Fix for #2551 When a mirror would be processed before the origin stream we would not recover the consumers due to failure on looking up source's subjects. This change processes all streams first then does all consumers. Signed-off-by: Derek Collison --- server/jetstream.go | 25 +++++++++++----- server/jetstream_test.go | 65 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 82 insertions(+), 8 deletions(-) diff --git a/server/jetstream.go b/server/jetstream.go index 8dd92dee..a44aeba0 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1034,6 +1034,13 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { } } + // Collect consumers, do after all streams. + type ce struct { + mset *stream + odir string + } + var consumers []*ce + // Now recover the streams. fis, _ := ioutil.ReadDir(sdir) for _, fi := range fis { @@ -1109,13 +1116,17 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { // Now do the consumers. odir := path.Join(sdir, fi.Name(), consumerDir) - ofis, _ := ioutil.ReadDir(odir) + consumers = append(consumers, &ce{mset, odir}) + } + + for _, e := range consumers { + ofis, _ := ioutil.ReadDir(e.odir) if len(ofis) > 0 { - s.Noticef(" Recovering %d consumers for stream - %q", len(ofis), fi.Name()) + s.Noticef(" Recovering %d consumers for stream - %q", len(ofis), e.mset.name()) } for _, ofi := range ofis { - metafile := path.Join(odir, ofi.Name(), JetStreamMetaFile) - metasum := path.Join(odir, ofi.Name(), JetStreamMetaFileSum) + metafile := path.Join(e.odir, ofi.Name(), JetStreamMetaFile) + metasum := path.Join(e.odir, ofi.Name(), JetStreamMetaFileSum) if _, err := os.Stat(metafile); os.IsNotExist(err) { s.Warnf(" Missing consumer metafile %q", metafile) continue @@ -1131,10 +1142,10 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { } // Check if we are encrypted. - if key, err := ioutil.ReadFile(path.Join(odir, ofi.Name(), JetStreamMetaFileKey)); err == nil { + if key, err := ioutil.ReadFile(path.Join(e.odir, ofi.Name(), JetStreamMetaFileKey)); err == nil { s.Debugf(" Consumer metafile is encrypted, reading encrypted keyfile") // Decode the buffer before proceeding. - if buf, err = s.decryptMeta(key, buf, a.Name, fi.Name()+tsep+ofi.Name()); err != nil { + if buf, err = s.decryptMeta(key, buf, a.Name, e.mset.name()+tsep+ofi.Name()); err != nil { s.Warnf(" Error decrypting our consumer metafile: %v", err) continue } @@ -1151,7 +1162,7 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { // the consumer can reconnect. We will create it as a durable and switch it. cfg.ConsumerConfig.Durable = ofi.Name() } - obs, err := mset.addConsumer(&cfg.ConsumerConfig) + obs, err := e.mset.addConsumer(&cfg.ConsumerConfig) if err != nil { s.Warnf(" Error adding consumer: %v", err) continue diff --git a/server/jetstream_test.go b/server/jetstream_test.go index d8cfa0d5..17fec898 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -13101,7 +13101,7 @@ func TestJetStreamPullLargeBatchExpired(t *testing.T) { } } -func TestNegativeDupeWindow(t *testing.T) { +func TestJetStreamNegativeDupeWindow(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown() @@ -13134,6 +13134,69 @@ func TestNegativeDupeWindow(t *testing.T) { } } +// Issue #2551 +func TestJetStreamMirroredConsumerFailAfterRestart(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + config := s.JetStreamConfig() + if config != nil { + defer removeDir(t, config.StoreDir) + } + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "S1", + Storage: nats.FileStorage, + Subjects: []string{"foo", "bar", "baz"}, + }) + if err != nil { + t.Fatalf("create failed: %s", err) + } + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "M1", + Storage: nats.FileStorage, + Mirror: &nats.StreamSource{Name: "S1"}, + }) + if err != nil { + t.Fatalf("create failed: %s", err) + } + + _, err = js.AddConsumer("M1", &nats.ConsumerConfig{ + Durable: "C1", + FilterSubject: ">", + AckPolicy: nats.AckExplicitPolicy, + }) + if err != nil { + t.Fatalf("consumer create failed: %s", err) + } + + // Stop current + sd := s.JetStreamConfig().StoreDir + s.Shutdown() + s.WaitForShutdown() + + // Restart. + s = RunJetStreamServerOnPort(-1, sd) + defer s.Shutdown() + + nc, js = jsClientConnect(t, s) + defer nc.Close() + + _, err = js.StreamInfo("M1") + if err != nil { + t.Fatalf("%s did not exist after start: %s", "M1", err) + } + + _, err = js.ConsumerInfo("M1", "C1") + if err != nil { + t.Fatalf("C1 did not exist after start: %s", err) + } +} + /////////////////////////////////////////////////////////////////////////// // Simple JetStream Benchmarks ///////////////////////////////////////////////////////////////////////////