Merge pull request #3052 from nats-io/js_sources_issues

[FIXED] JetStream: some stream SOURCE issues
This commit is contained in:
Ivan Kozlovic
2022-04-18 13:15:52 -06:00
committed by GitHub
2 changed files with 33 additions and 18 deletions

View File

@@ -1772,7 +1772,10 @@ func TestNoRaceJetStreamClusterSuperClusterSources(t *testing.T) {
for i := 0; i < 50; i++ {
msg := fmt.Sprintf("R-MSG-%d", i+1)
for _, sname := range []string{"foo", "bar", "baz"} {
if _, err := js.Publish(sname, []byte(msg)); err != nil {
m := nats.NewMsg(sname)
m.Header.Set(nats.MsgIdHdr, sname+"-"+msg)
m.Data = []byte(msg)
if _, err := js.PublishMsg(m); err != nil {
t.Errorf("Unexpected publish error: %v", err)
}
}
@@ -2399,7 +2402,7 @@ func TestNoRaceJetStreamFileStoreBufferReuse(t *testing.T) {
}
start = time.Now()
sub, err := js.Subscribe("*", cb, nats.EnableFlowControl(), nats.AckNone())
sub, err := js.Subscribe("*", cb, nats.EnableFlowControl(), nats.IdleHeartbeat(time.Second), nats.AckNone())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

View File

@@ -2065,15 +2065,23 @@ func (mset *stream) retrySourceConsumerAtSeq(sname string, seq uint64) {
// Lock should be held.
func (mset *stream) cancelSourceConsumer(sname string) {
if si := mset.sources[sname]; si != nil && si.sub != nil {
mset.unsubscribe(si.sub)
si.sub = nil
if si := mset.sources[sname]; si != nil {
if si.sub != nil {
mset.unsubscribe(si.sub)
si.sub = nil
}
si.sseq, si.dseq = 0, 0
si.msgs.drain()
mset.removeInternalConsumer(si)
// If the go routine is still running close the quit chan.
if si.qch != nil {
close(si.qch)
si.qch = nil
// Need to set this here, not rely on processSourceMsgs()
// to do it, because if we call setSourceConsumer() before the
// go routine has returned (but was signaled to stop), then
// we would not restart it.
si.grr = false
}
}
}
@@ -2091,7 +2099,8 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64) {
// Need to delete the old one.
mset.removeInternalConsumer(si)
si.sseq, si.dseq = seq, 0
// Do not set si.sseq to seq here. si.sseq will be set in processInboundSourceMsg
si.dseq = 0
si.last = time.Now()
ssi := mset.streamSource(iname)
if ssi == nil {
@@ -2221,23 +2230,24 @@ func (mset *stream) processSourceMsgs(si *sourceInfo) {
return
}
defer func() {
mset.mu.Lock()
si.grr = false
if si.qch != nil {
close(si.qch)
si.qch = nil
}
mset.mu.Unlock()
}()
// Grab stream quit channel.
// Grab stream messages queue, quit channel and this go routine quit channel.
mset.mu.Lock()
msgs, qch, siqch := si.msgs, mset.qch, si.qch
// Set the last seen as now so that we don't fail at the first check.
si.last = time.Now()
mset.mu.Unlock()
defer func() {
mset.mu.Lock()
si.grr = false
// Close ONLY if the quit channel is the same than when we were started
if si.qch != nil && si.qch == siqch {
close(si.qch)
si.qch = nil
}
mset.mu.Unlock()
}()
t := time.NewTicker(sourceHealthCheckInterval)
defer t.Stop()
@@ -2309,8 +2319,8 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool {
// If we are no longer the leader cancel this subscriber.
if !mset.isLeader() {
mset.mu.Unlock()
mset.cancelSourceConsumer(si.name)
mset.mu.Unlock()
return false
}
@@ -2397,7 +2407,9 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool {
if err != nil {
s := mset.srv
if err == errLastSeqMismatch {
mset.mu.Lock()
mset.cancelSourceConsumer(si.iname)
mset.mu.Unlock()
mset.retrySourceConsumer(si.iname)
} else {
s.RateLimitWarnf("JetStream got an error processing inbound source msg: %v", err)