Merge pull request #2017 from nats-io/mirror_restart

Mirror consumers were not able to restart after initial failure.
This commit is contained in:
Derek Collison
2021-03-18 06:28:04 -07:00
committed by GitHub
2 changed files with 77 additions and 14 deletions

View File

@@ -4297,6 +4297,76 @@ func TestJetStreamClusterLeaderStepdown(t *testing.T) {
}
}
func TestJetStreamClusterMirrorAndSourcesClusterRestart(t *testing.T) {
c := createJetStreamClusterExplicit(t, "MSR", 5)
defer c.shutdown()
// Client for API requests.
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
// Origin
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar", "baz.*"},
Replicas: 2,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Create Mirror now.
_, err = js.AddStream(&nats.StreamConfig{
Name: "M",
Mirror: &nats.StreamSource{Name: "TEST"},
Replicas: 2,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
sendBatch := func(subject string, n int) {
t.Helper()
// Send a batch to a given subject.
for i := 0; i < n; i++ {
if _, err := js.Publish(subject, []byte("OK")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
}
checkSync := func() {
checkFor(t, 10*time.Second, 500*time.Millisecond, func() error {
tsi, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
msi, err := js.StreamInfo("M")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if tsi.State.Msgs != msi.State.Msgs {
return fmt.Errorf("Total messages not the same: TEST %d vs M %d", tsi.State.Msgs, msi.State.Msgs)
}
return nil
})
}
// Send 100 msgs.
sendBatch("foo", 100)
checkSync()
c.stopAll()
c.restartAll()
c.waitOnStreamLeader("$G", "TEST")
c.waitOnStreamLeader("$G", "M")
nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()
sendBatch("bar", 100)
checkSync()
}
func TestJetStreamClusterMirrorAndSourcesFilteredConsumers(t *testing.T) {
c := createJetStreamClusterWithTemplate(t, jsClusterMirrorSourceImportsTempl, "MS5", 5)
defer c.shutdown()

View File

@@ -1126,7 +1126,7 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) {
return
} else {
if mset.mirror.cname != _EMPTY_ && mset.mirror.cname == tokenAt(m.rply, 4) {
mset.resetMirrorConsumer()
mset.retryMirrorConsumer()
}
}
} else {
@@ -1158,18 +1158,10 @@ func (mset *stream) cancelMirrorConsumer() {
mset.removeInternalConsumer(mset.mirror)
}
func (mset *stream) retryMirrorConsumer() {
func (mset *stream) retryMirrorConsumer() error {
mset.mu.Lock()
defer mset.mu.Unlock()
mset.srv.Debugf("Retrying mirror consumer for '%s > %s'", mset.acc.Name, mset.cfg.Name)
if mset.mirror != nil && mset.mirror.sub == nil {
mset.setupMirrorConsumer()
}
}
func (mset *stream) resetMirrorConsumer() error {
mset.mu.Lock()
defer mset.mu.Unlock()
return mset.setupMirrorConsumer()
}
@@ -1263,6 +1255,7 @@ func (mset *stream) setupMirrorConsumer() error {
mset.subscribeInternal(reply, func(sub *subscription, c *client, subject, reply string, rmsg []byte) {
mset.unsubscribe(sub)
_, msg := c.msgParts(rmsg)
var ccr JSApiConsumerCreateResponse
if err := json.Unmarshal(msg, &ccr); err != nil {
c.Warnf("JetStream bad mirror consumer create response: %q", msg)
@@ -1285,7 +1278,7 @@ func (mset *stream) setupMirrorConsumer() error {
go func() {
select {
case ccr := <-respCh:
if ccr.Error != nil {
if ccr.Error != nil || ccr.ConsumerInfo == nil {
mset.cancelMirrorConsumer()
} else {
// Capture consumer name.
@@ -1296,7 +1289,7 @@ func (mset *stream) setupMirrorConsumer() error {
mset.mu.Unlock()
}
mset.setMirrorErr(ccr.Error)
case <-time.After(10 * time.Second):
case <-time.After(5 * time.Second):
return
}
}()
@@ -1440,7 +1433,7 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) {
mset.mu.Lock()
if si := mset.sources[sname]; si != nil {
si.err = nil
if ccr.Error != nil {
if ccr.Error != nil || ccr.ConsumerInfo == nil {
mset.srv.Warnf("JetStream error response for create source consumer: %+v", ccr.Error)
si.err = ccr.Error
// We will retry every 10 seconds or so
@@ -1451,7 +1444,7 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) {
}
}
mset.mu.Unlock()
case <-time.After(10 * time.Second):
case <-time.After(5 * time.Second):
return
}
}()