When leader changes happened to streams that mirrored or sourced other streams we could continue to try to create consumers.

This could get excessive on a stream that has sourced many upstream origin streams and had several leader changes.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-06-10 19:47:46 -07:00
parent ae1202903c
commit 326a377b3e
2 changed files with 139 additions and 10 deletions

View File

@@ -7223,6 +7223,89 @@ func TestJetStreamClusterStatszActiveServers(t *testing.T) {
checkActive(4)
}
func TestJetStreamClusterSourceAndMirrorConsumersLeaderChange(t *testing.T) {
sc := createJetStreamSuperCluster(t, 3, 2)
defer sc.shutdown()
c1 := sc.clusterForName("C1")
c2 := sc.clusterForName("C2")
nc, js := jsClientConnect(t, c1.randomServer())
defer nc.Close()
var sources []*nats.StreamSource
numStreams := 10
for i := 1; i <= numStreams; i++ {
name := fmt.Sprintf("O%d", i)
sources = append(sources, &nats.StreamSource{Name: name})
if _, err := js.AddStream(&nats.StreamConfig{Name: name}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
// Place our new stream that will source all the others in different cluster.
nc, js = jsClientConnect(t, c2.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "S",
Replicas: 2,
Sources: sources,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Force leader change twice.
nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "S"), nil, time.Second)
c2.waitOnStreamLeader("$G", "S")
nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "S"), nil, time.Second)
c2.waitOnStreamLeader("$G", "S")
// Now make sure we only have a single direct consumer on our origin streams.
// Pick one at random.
name := fmt.Sprintf("O%d", rand.Intn(numStreams))
s := c1.streamLeader("$G", name)
a, err := s.lookupAccount("$G")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
mset, err := a.lookupStream(name)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkFor(t, 10*time.Second, 250*time.Millisecond, func() error {
if ndc := mset.numDirectConsumers(); ndc != 1 {
return fmt.Errorf("Stream %q wanted 1 direct consumer, got %d", name, ndc)
}
return nil
})
// Now create a mirror of selected from above. Will test same scenario.
_, err = js.AddStream(&nats.StreamConfig{
Name: "M",
Replicas: 2,
Mirror: &nats.StreamSource{Name: name},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Force leader change twice.
nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "M"), nil, time.Second)
c2.waitOnStreamLeader("$G", "M")
nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "M"), nil, time.Second)
c2.waitOnStreamLeader("$G", "M")
checkFor(t, 10*time.Second, 250*time.Millisecond, func() error {
if ndc := mset.numDirectConsumers(); ndc != 2 {
return fmt.Errorf("Stream %q wanted 2 direct consumers, got %d", name, ndc)
}
return nil
})
}
// Support functions
// Used to setup superclusters for tests.

View File

@@ -204,6 +204,7 @@ type sourceInfo struct {
err *ApiError
last time.Time
lreq time.Time
qch chan struct{}
grr bool
}
@@ -1191,6 +1192,10 @@ func (mset *stream) processMirrorMsgs() {
mset.mu.Lock()
if mset.mirror != nil {
mset.mirror.grr = false
if mset.mirror.qch != nil {
close(mset.mirror.qch)
mset.mirror.qch = nil
}
}
mset.mu.Unlock()
}()
@@ -1201,7 +1206,7 @@ func (mset *stream) processMirrorMsgs() {
mset.mu.Unlock()
return
}
msgs, mch, qch := mset.mirror.msgs, mset.mirror.msgs.mch, mset.qch
msgs, mch, qch, siqch := mset.mirror.msgs, mset.mirror.msgs.mch, mset.qch, mset.mirror.qch
// Set the last seen as now so that we don't fail at the first check.
mset.mirror.last = time.Now()
mset.mu.Unlock()
@@ -1215,6 +1220,8 @@ func (mset *stream) processMirrorMsgs() {
return
case <-qch:
return
case <-siqch:
return
case <-mch:
for im := mset.pending(msgs); im != nil; im = im.next {
if !mset.processInboundMirrorMsg(im) {
@@ -1223,8 +1230,15 @@ func (mset *stream) processMirrorMsgs() {
}
case <-t.C:
mset.mu.RLock()
isLeader := mset.isLeader()
stalled := mset.mirror != nil && time.Since(mset.mirror.last) > 3*sourceHealthCheckInterval
mset.mu.RUnlock()
// No longer leader.
if !isLeader {
mset.cancelMirrorConsumer()
return
}
// We are stalled.
if stalled {
mset.retryMirrorConsumer()
}
@@ -1384,6 +1398,11 @@ func (mset *stream) cancelMirrorConsumer() {
mset.mirror.sub = nil
}
mset.removeInternalConsumer(mset.mirror)
// If the go routine is still running close the quit chan.
if mset.mirror.qch != nil {
close(mset.mirror.qch)
mset.mirror.qch = nil
}
}
func (mset *stream) retryMirrorConsumer() error {
@@ -1457,6 +1476,7 @@ func (mset *stream) setupMirrorConsumer() error {
if !mset.mirror.grr {
mset.mirror.grr = true
mset.mirror.qch = make(chan struct{})
mset.srv.startGoRoutine(func() { mset.processMirrorMsgs() })
}
@@ -1479,7 +1499,7 @@ func (mset *stream) setupMirrorConsumer() error {
DeliverPolicy: DeliverByStartSequence,
OptStartSeq: state.LastSeq + 1,
AckPolicy: AckNone,
AckWait: 48 * time.Hour,
AckWait: 22 * time.Hour,
MaxDeliver: 1,
Heartbeat: sourceHealthCheckInterval,
FlowControl: true,
@@ -1601,8 +1621,7 @@ func (mset *stream) retrySourceConsumer(sname string) {
return
}
mset.setStartingSequenceForSource(sname)
seq := si.sseq + 1
mset.retrySourceConsumerAtSeq(sname, seq)
mset.retrySourceConsumerAtSeq(sname, si.sseq+1)
}
// Lock should be held.
@@ -1621,13 +1640,18 @@ func (mset *stream) retrySourceConsumerAtSeq(sname string, seq uint64) {
mset.setSourceConsumer(sname, seq)
}
// Locl should be held.
// Lock should be held.
func (mset *stream) cancelSourceConsumer(sname string) {
if si := mset.sources[sname]; si != nil && si.sub != nil {
mset.unsubscribe(si.sub)
si.sub = nil
si.sseq, si.dseq = 0, 0
mset.removeInternalConsumer(si)
// If the go routine is still running close the quit chan.
if si.qch != nil {
close(si.qch)
si.qch = nil
}
}
}
@@ -1660,6 +1684,7 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64) {
if !si.grr {
si.grr = true
si.qch = make(chan struct{})
mset.srv.startGoRoutine(func() { mset.processSourceMsgs(si) })
}
@@ -1674,7 +1699,7 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64) {
Config: ConsumerConfig{
DeliverSubject: deliverSubject,
AckPolicy: AckNone,
AckWait: 48 * time.Hour,
AckWait: 22 * time.Hour,
MaxDeliver: 1,
Heartbeat: sourceHealthCheckInterval,
FlowControl: true,
@@ -1773,12 +1798,16 @@ func (mset *stream) processSourceMsgs(si *sourceInfo) {
defer func() {
mset.mu.Lock()
si.grr = false
if si.qch != nil {
close(si.qch)
si.qch = nil
}
mset.mu.Unlock()
}()
// Grab stream quit channel.
mset.mu.Lock()
msgs, mch, qch := si.msgs, si.msgs.mch, mset.qch
msgs, mch, qch, siqch := si.msgs, si.msgs.mch, mset.qch, si.qch
// Set the last seen as now so that we don't fail at the first check.
si.last = time.Now()
mset.mu.Unlock()
@@ -1792,6 +1821,8 @@ func (mset *stream) processSourceMsgs(si *sourceInfo) {
return
case <-qch:
return
case <-siqch:
return
case <-mch:
for im := mset.pending(msgs); im != nil; im = im.next {
if !mset.processInboundSourceMsg(si, im) {
@@ -1800,9 +1831,15 @@ func (mset *stream) processSourceMsgs(si *sourceInfo) {
}
case <-t.C:
mset.mu.RLock()
iname, isLeader := si.iname, mset.isLeader()
stalled := time.Since(si.last) > 3*sourceHealthCheckInterval
iname := si.iname
mset.mu.RUnlock()
// No longer leader.
if !isLeader {
mset.cancelSourceConsumer(iname)
return
}
// We are stalled.
if stalled {
mset.retrySourceConsumer(iname)
}
@@ -1833,6 +1870,7 @@ func (mset *stream) handleFlowControl(si *sourceInfo, m *inMsg) {
func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool {
mset.mu.Lock()
// If we are no longer the leader cancel this subscriber.
if !mset.isLeader() {
mset.mu.Unlock()
mset.cancelSourceConsumer(si.name)
@@ -2098,8 +2136,7 @@ func (mset *stream) setupSourceConsumers() error {
// Reset if needed.
for _, si := range mset.sources {
if si.sub != nil {
mset.unsubscribe(si.sub)
mset.removeInternalConsumer(si)
mset.cancelSourceConsumer(si.name)
}
}
@@ -2150,6 +2187,11 @@ func (mset *stream) stopSourceConsumers() {
}
// Need to delete the old one.
mset.removeInternalConsumer(si)
// If the go routine is still running close the quit chan.
if si.qch != nil {
close(si.qch)
si.qch = nil
}
}
}
@@ -2172,6 +2214,10 @@ func (mset *stream) unsubscribeToStream() error {
mset.unsubscribe(mset.mirror.sub)
}
mset.removeInternalConsumer(mset.mirror)
// If the go routine is still running close the quit chan.
if mset.mirror.qch != nil {
close(mset.mirror.qch)
}
mset.mirror = nil
}