mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Fix for #2551
When a mirror would be processed before the origin stream we would not recover the consumers due to failure on looking up source's subjects. This change processes all streams first then does all consumers. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1034,6 +1034,13 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error {
|
||||
}
|
||||
}
|
||||
|
||||
// Collect consumers, do after all streams.
|
||||
type ce struct {
|
||||
mset *stream
|
||||
odir string
|
||||
}
|
||||
var consumers []*ce
|
||||
|
||||
// Now recover the streams.
|
||||
fis, _ := ioutil.ReadDir(sdir)
|
||||
for _, fi := range fis {
|
||||
@@ -1109,13 +1116,17 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error {
|
||||
|
||||
// Now do the consumers.
|
||||
odir := path.Join(sdir, fi.Name(), consumerDir)
|
||||
ofis, _ := ioutil.ReadDir(odir)
|
||||
consumers = append(consumers, &ce{mset, odir})
|
||||
}
|
||||
|
||||
for _, e := range consumers {
|
||||
ofis, _ := ioutil.ReadDir(e.odir)
|
||||
if len(ofis) > 0 {
|
||||
s.Noticef(" Recovering %d consumers for stream - %q", len(ofis), fi.Name())
|
||||
s.Noticef(" Recovering %d consumers for stream - %q", len(ofis), e.mset.name())
|
||||
}
|
||||
for _, ofi := range ofis {
|
||||
metafile := path.Join(odir, ofi.Name(), JetStreamMetaFile)
|
||||
metasum := path.Join(odir, ofi.Name(), JetStreamMetaFileSum)
|
||||
metafile := path.Join(e.odir, ofi.Name(), JetStreamMetaFile)
|
||||
metasum := path.Join(e.odir, ofi.Name(), JetStreamMetaFileSum)
|
||||
if _, err := os.Stat(metafile); os.IsNotExist(err) {
|
||||
s.Warnf(" Missing consumer metafile %q", metafile)
|
||||
continue
|
||||
@@ -1131,10 +1142,10 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error {
|
||||
}
|
||||
|
||||
// Check if we are encrypted.
|
||||
if key, err := ioutil.ReadFile(path.Join(odir, ofi.Name(), JetStreamMetaFileKey)); err == nil {
|
||||
if key, err := ioutil.ReadFile(path.Join(e.odir, ofi.Name(), JetStreamMetaFileKey)); err == nil {
|
||||
s.Debugf(" Consumer metafile is encrypted, reading encrypted keyfile")
|
||||
// Decode the buffer before proceeding.
|
||||
if buf, err = s.decryptMeta(key, buf, a.Name, fi.Name()+tsep+ofi.Name()); err != nil {
|
||||
if buf, err = s.decryptMeta(key, buf, a.Name, e.mset.name()+tsep+ofi.Name()); err != nil {
|
||||
s.Warnf(" Error decrypting our consumer metafile: %v", err)
|
||||
continue
|
||||
}
|
||||
@@ -1151,7 +1162,7 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error {
|
||||
// the consumer can reconnect. We will create it as a durable and switch it.
|
||||
cfg.ConsumerConfig.Durable = ofi.Name()
|
||||
}
|
||||
obs, err := mset.addConsumer(&cfg.ConsumerConfig)
|
||||
obs, err := e.mset.addConsumer(&cfg.ConsumerConfig)
|
||||
if err != nil {
|
||||
s.Warnf(" Error adding consumer: %v", err)
|
||||
continue
|
||||
|
||||
@@ -13101,7 +13101,7 @@ func TestJetStreamPullLargeBatchExpired(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestNegativeDupeWindow(t *testing.T) {
|
||||
func TestJetStreamNegativeDupeWindow(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
@@ -13134,6 +13134,69 @@ func TestNegativeDupeWindow(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Issue #2551
|
||||
func TestJetStreamMirroredConsumerFailAfterRestart(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
config := s.JetStreamConfig()
|
||||
if config != nil {
|
||||
defer removeDir(t, config.StoreDir)
|
||||
}
|
||||
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "S1",
|
||||
Storage: nats.FileStorage,
|
||||
Subjects: []string{"foo", "bar", "baz"},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("create failed: %s", err)
|
||||
}
|
||||
|
||||
_, err = js.AddStream(&nats.StreamConfig{
|
||||
Name: "M1",
|
||||
Storage: nats.FileStorage,
|
||||
Mirror: &nats.StreamSource{Name: "S1"},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("create failed: %s", err)
|
||||
}
|
||||
|
||||
_, err = js.AddConsumer("M1", &nats.ConsumerConfig{
|
||||
Durable: "C1",
|
||||
FilterSubject: ">",
|
||||
AckPolicy: nats.AckExplicitPolicy,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("consumer create failed: %s", err)
|
||||
}
|
||||
|
||||
// Stop current
|
||||
sd := s.JetStreamConfig().StoreDir
|
||||
s.Shutdown()
|
||||
s.WaitForShutdown()
|
||||
|
||||
// Restart.
|
||||
s = RunJetStreamServerOnPort(-1, sd)
|
||||
defer s.Shutdown()
|
||||
|
||||
nc, js = jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
_, err = js.StreamInfo("M1")
|
||||
if err != nil {
|
||||
t.Fatalf("%s did not exist after start: %s", "M1", err)
|
||||
}
|
||||
|
||||
_, err = js.ConsumerInfo("M1", "C1")
|
||||
if err != nil {
|
||||
t.Fatalf("C1 did not exist after start: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
// Simple JetStream Benchmarks
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Reference in New Issue
Block a user