diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 55779d2c..cea38022 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -7223,6 +7223,89 @@ func TestJetStreamClusterStatszActiveServers(t *testing.T) { checkActive(4) } +func TestJetStreamClusterSourceAndMirrorConsumersLeaderChange(t *testing.T) { + sc := createJetStreamSuperCluster(t, 3, 2) + defer sc.shutdown() + + c1 := sc.clusterForName("C1") + c2 := sc.clusterForName("C2") + + nc, js := jsClientConnect(t, c1.randomServer()) + defer nc.Close() + + var sources []*nats.StreamSource + numStreams := 10 + + for i := 1; i <= numStreams; i++ { + name := fmt.Sprintf("O%d", i) + sources = append(sources, &nats.StreamSource{Name: name}) + if _, err := js.AddStream(&nats.StreamConfig{Name: name}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } + + // Place our new stream that will source all the others in different cluster. + nc, js = jsClientConnect(t, c2.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "S", + Replicas: 2, + Sources: sources, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Force leader change twice. + nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "S"), nil, time.Second) + c2.waitOnStreamLeader("$G", "S") + nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "S"), nil, time.Second) + c2.waitOnStreamLeader("$G", "S") + + // Now make sure we only have a single direct consumer on our origin streams. + // Pick one at random. + name := fmt.Sprintf("O%d", rand.Intn(numStreams)) + s := c1.streamLeader("$G", name) + a, err := s.lookupAccount("$G") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + mset, err := a.lookupStream(name) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + checkFor(t, 10*time.Second, 250*time.Millisecond, func() error { + if ndc := mset.numDirectConsumers(); ndc != 1 { + return fmt.Errorf("Stream %q wanted 1 direct consumer, got %d", name, ndc) + } + return nil + }) + + // Now create a mirror of selected from above. Will test same scenario. + _, err = js.AddStream(&nats.StreamConfig{ + Name: "M", + Replicas: 2, + Mirror: &nats.StreamSource{Name: name}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + // Force leader change twice. + nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "M"), nil, time.Second) + c2.waitOnStreamLeader("$G", "M") + nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "M"), nil, time.Second) + c2.waitOnStreamLeader("$G", "M") + + checkFor(t, 10*time.Second, 250*time.Millisecond, func() error { + if ndc := mset.numDirectConsumers(); ndc != 2 { + return fmt.Errorf("Stream %q wanted 2 direct consumers, got %d", name, ndc) + } + return nil + }) + +} + // Support functions // Used to setup superclusters for tests. diff --git a/server/stream.go b/server/stream.go index 467864e2..ae6a0d47 100644 --- a/server/stream.go +++ b/server/stream.go @@ -204,6 +204,7 @@ type sourceInfo struct { err *ApiError last time.Time lreq time.Time + qch chan struct{} grr bool } @@ -1191,6 +1192,10 @@ func (mset *stream) processMirrorMsgs() { mset.mu.Lock() if mset.mirror != nil { mset.mirror.grr = false + if mset.mirror.qch != nil { + close(mset.mirror.qch) + mset.mirror.qch = nil + } } mset.mu.Unlock() }() @@ -1201,7 +1206,7 @@ func (mset *stream) processMirrorMsgs() { mset.mu.Unlock() return } - msgs, mch, qch := mset.mirror.msgs, mset.mirror.msgs.mch, mset.qch + msgs, mch, qch, siqch := mset.mirror.msgs, mset.mirror.msgs.mch, mset.qch, mset.mirror.qch // Set the last seen as now so that we don't fail at the first check. mset.mirror.last = time.Now() mset.mu.Unlock() @@ -1215,6 +1220,8 @@ func (mset *stream) processMirrorMsgs() { return case <-qch: return + case <-siqch: + return case <-mch: for im := mset.pending(msgs); im != nil; im = im.next { if !mset.processInboundMirrorMsg(im) { @@ -1223,8 +1230,15 @@ func (mset *stream) processMirrorMsgs() { } case <-t.C: mset.mu.RLock() + isLeader := mset.isLeader() stalled := mset.mirror != nil && time.Since(mset.mirror.last) > 3*sourceHealthCheckInterval mset.mu.RUnlock() + // No longer leader. + if !isLeader { + mset.cancelMirrorConsumer() + return + } + // We are stalled. if stalled { mset.retryMirrorConsumer() } @@ -1384,6 +1398,11 @@ func (mset *stream) cancelMirrorConsumer() { mset.mirror.sub = nil } mset.removeInternalConsumer(mset.mirror) + // If the go routine is still running close the quit chan. + if mset.mirror.qch != nil { + close(mset.mirror.qch) + mset.mirror.qch = nil + } } func (mset *stream) retryMirrorConsumer() error { @@ -1457,6 +1476,7 @@ func (mset *stream) setupMirrorConsumer() error { if !mset.mirror.grr { mset.mirror.grr = true + mset.mirror.qch = make(chan struct{}) mset.srv.startGoRoutine(func() { mset.processMirrorMsgs() }) } @@ -1479,7 +1499,7 @@ func (mset *stream) setupMirrorConsumer() error { DeliverPolicy: DeliverByStartSequence, OptStartSeq: state.LastSeq + 1, AckPolicy: AckNone, - AckWait: 48 * time.Hour, + AckWait: 22 * time.Hour, MaxDeliver: 1, Heartbeat: sourceHealthCheckInterval, FlowControl: true, @@ -1601,8 +1621,7 @@ func (mset *stream) retrySourceConsumer(sname string) { return } mset.setStartingSequenceForSource(sname) - seq := si.sseq + 1 - mset.retrySourceConsumerAtSeq(sname, seq) + mset.retrySourceConsumerAtSeq(sname, si.sseq+1) } // Lock should be held. @@ -1621,13 +1640,18 @@ func (mset *stream) retrySourceConsumerAtSeq(sname string, seq uint64) { mset.setSourceConsumer(sname, seq) } -// Locl should be held. +// Lock should be held. func (mset *stream) cancelSourceConsumer(sname string) { if si := mset.sources[sname]; si != nil && si.sub != nil { mset.unsubscribe(si.sub) si.sub = nil si.sseq, si.dseq = 0, 0 mset.removeInternalConsumer(si) + // If the go routine is still running close the quit chan. + if si.qch != nil { + close(si.qch) + si.qch = nil + } } } @@ -1660,6 +1684,7 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64) { if !si.grr { si.grr = true + si.qch = make(chan struct{}) mset.srv.startGoRoutine(func() { mset.processSourceMsgs(si) }) } @@ -1674,7 +1699,7 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64) { Config: ConsumerConfig{ DeliverSubject: deliverSubject, AckPolicy: AckNone, - AckWait: 48 * time.Hour, + AckWait: 22 * time.Hour, MaxDeliver: 1, Heartbeat: sourceHealthCheckInterval, FlowControl: true, @@ -1773,12 +1798,16 @@ func (mset *stream) processSourceMsgs(si *sourceInfo) { defer func() { mset.mu.Lock() si.grr = false + if si.qch != nil { + close(si.qch) + si.qch = nil + } mset.mu.Unlock() }() // Grab stream quit channel. mset.mu.Lock() - msgs, mch, qch := si.msgs, si.msgs.mch, mset.qch + msgs, mch, qch, siqch := si.msgs, si.msgs.mch, mset.qch, si.qch // Set the last seen as now so that we don't fail at the first check. si.last = time.Now() mset.mu.Unlock() @@ -1792,6 +1821,8 @@ func (mset *stream) processSourceMsgs(si *sourceInfo) { return case <-qch: return + case <-siqch: + return case <-mch: for im := mset.pending(msgs); im != nil; im = im.next { if !mset.processInboundSourceMsg(si, im) { @@ -1800,9 +1831,15 @@ func (mset *stream) processSourceMsgs(si *sourceInfo) { } case <-t.C: mset.mu.RLock() + iname, isLeader := si.iname, mset.isLeader() stalled := time.Since(si.last) > 3*sourceHealthCheckInterval - iname := si.iname mset.mu.RUnlock() + // No longer leader. + if !isLeader { + mset.cancelSourceConsumer(iname) + return + } + // We are stalled. if stalled { mset.retrySourceConsumer(iname) } @@ -1833,6 +1870,7 @@ func (mset *stream) handleFlowControl(si *sourceInfo, m *inMsg) { func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool { mset.mu.Lock() + // If we are no longer the leader cancel this subscriber. if !mset.isLeader() { mset.mu.Unlock() mset.cancelSourceConsumer(si.name) @@ -2098,8 +2136,7 @@ func (mset *stream) setupSourceConsumers() error { // Reset if needed. for _, si := range mset.sources { if si.sub != nil { - mset.unsubscribe(si.sub) - mset.removeInternalConsumer(si) + mset.cancelSourceConsumer(si.name) } } @@ -2150,6 +2187,11 @@ func (mset *stream) stopSourceConsumers() { } // Need to delete the old one. mset.removeInternalConsumer(si) + // If the go routine is still running close the quit chan. + if si.qch != nil { + close(si.qch) + si.qch = nil + } } } @@ -2172,6 +2214,10 @@ func (mset *stream) unsubscribeToStream() error { mset.unsubscribe(mset.mirror.sub) } mset.removeInternalConsumer(mset.mirror) + // If the go routine is still running close the quit chan. + if mset.mirror.qch != nil { + close(mset.mirror.qch) + } mset.mirror = nil }