From b00b723168ba4c5f39caf4562779c1e11330d009 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 18 Mar 2021 06:13:26 -0700 Subject: [PATCH] Mirrors were not properly retrying after failures to create their internal consumer. Signed-off-by: Derek Collison --- server/jetstream_cluster_test.go | 70 ++++++++++++++++++++++++++++++++ server/stream.go | 21 ++++------ 2 files changed, 77 insertions(+), 14 deletions(-) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index e8fa4229..bd701cd8 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -4297,6 +4297,76 @@ func TestJetStreamClusterLeaderStepdown(t *testing.T) { } } +func TestJetStreamClusterMirrorAndSourcesClusterRestart(t *testing.T) { + c := createJetStreamClusterExplicit(t, "MSR", 5) + defer c.shutdown() + + // Client for API requests. + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // Origin + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo", "bar", "baz.*"}, + Replicas: 2, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + // Create Mirror now. + _, err = js.AddStream(&nats.StreamConfig{ + Name: "M", + Mirror: &nats.StreamSource{Name: "TEST"}, + Replicas: 2, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + sendBatch := func(subject string, n int) { + t.Helper() + // Send a batch to a given subject. + for i := 0; i < n; i++ { + if _, err := js.Publish(subject, []byte("OK")); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + } + } + + checkSync := func() { + checkFor(t, 10*time.Second, 500*time.Millisecond, func() error { + tsi, err := js.StreamInfo("TEST") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + msi, err := js.StreamInfo("M") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if tsi.State.Msgs != msi.State.Msgs { + return fmt.Errorf("Total messages not the same: TEST %d vs M %d", tsi.State.Msgs, msi.State.Msgs) + } + return nil + }) + } + + // Send 100 msgs. + sendBatch("foo", 100) + checkSync() + + c.stopAll() + c.restartAll() + c.waitOnStreamLeader("$G", "TEST") + c.waitOnStreamLeader("$G", "M") + + nc, js = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + sendBatch("bar", 100) + checkSync() +} + func TestJetStreamClusterMirrorAndSourcesFilteredConsumers(t *testing.T) { c := createJetStreamClusterWithTemplate(t, jsClusterMirrorSourceImportsTempl, "MS5", 5) defer c.shutdown() diff --git a/server/stream.go b/server/stream.go index c0f29533..148582f6 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1126,7 +1126,7 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) { return } else { if mset.mirror.cname != _EMPTY_ && mset.mirror.cname == tokenAt(m.rply, 4) { - mset.resetMirrorConsumer() + mset.retryMirrorConsumer() } } } else { @@ -1158,18 +1158,10 @@ func (mset *stream) cancelMirrorConsumer() { mset.removeInternalConsumer(mset.mirror) } -func (mset *stream) retryMirrorConsumer() { +func (mset *stream) retryMirrorConsumer() error { mset.mu.Lock() defer mset.mu.Unlock() mset.srv.Debugf("Retrying mirror consumer for '%s > %s'", mset.acc.Name, mset.cfg.Name) - if mset.mirror != nil && mset.mirror.sub == nil { - mset.setupMirrorConsumer() - } -} - -func (mset *stream) resetMirrorConsumer() error { - mset.mu.Lock() - defer mset.mu.Unlock() return mset.setupMirrorConsumer() } @@ -1263,6 +1255,7 @@ func (mset *stream) setupMirrorConsumer() error { mset.subscribeInternal(reply, func(sub *subscription, c *client, subject, reply string, rmsg []byte) { mset.unsubscribe(sub) _, msg := c.msgParts(rmsg) + var ccr JSApiConsumerCreateResponse if err := json.Unmarshal(msg, &ccr); err != nil { c.Warnf("JetStream bad mirror consumer create response: %q", msg) @@ -1285,7 +1278,7 @@ func (mset *stream) setupMirrorConsumer() error { go func() { select { case ccr := <-respCh: - if ccr.Error != nil { + if ccr.Error != nil || ccr.ConsumerInfo == nil { mset.cancelMirrorConsumer() } else { // Capture consumer name. @@ -1296,7 +1289,7 @@ func (mset *stream) setupMirrorConsumer() error { mset.mu.Unlock() } mset.setMirrorErr(ccr.Error) - case <-time.After(10 * time.Second): + case <-time.After(5 * time.Second): return } }() @@ -1440,7 +1433,7 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) { mset.mu.Lock() if si := mset.sources[sname]; si != nil { si.err = nil - if ccr.Error != nil { + if ccr.Error != nil || ccr.ConsumerInfo == nil { mset.srv.Warnf("JetStream error response for create source consumer: %+v", ccr.Error) si.err = ccr.Error // We will retry every 10 seconds or so @@ -1451,7 +1444,7 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) { } } mset.mu.Unlock() - case <-time.After(10 * time.Second): + case <-time.After(5 * time.Second): return } }()