mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
[FIXED] JetStream: some stream SOURCE issues
- Possibly missing some early messages from the sourced stream - In some cancel situations, the processing of sourced messages would not longer work Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -1772,7 +1772,10 @@ func TestNoRaceJetStreamClusterSuperClusterSources(t *testing.T) {
|
||||
for i := 0; i < 50; i++ {
|
||||
msg := fmt.Sprintf("R-MSG-%d", i+1)
|
||||
for _, sname := range []string{"foo", "bar", "baz"} {
|
||||
if _, err := js.Publish(sname, []byte(msg)); err != nil {
|
||||
m := nats.NewMsg(sname)
|
||||
m.Header.Set(nats.MsgIdHdr, sname+"-"+msg)
|
||||
m.Data = []byte(msg)
|
||||
if _, err := js.PublishMsg(m); err != nil {
|
||||
t.Errorf("Unexpected publish error: %v", err)
|
||||
}
|
||||
}
|
||||
@@ -2399,7 +2402,7 @@ func TestNoRaceJetStreamFileStoreBufferReuse(t *testing.T) {
|
||||
}
|
||||
|
||||
start = time.Now()
|
||||
sub, err := js.Subscribe("*", cb, nats.EnableFlowControl(), nats.AckNone())
|
||||
sub, err := js.Subscribe("*", cb, nats.EnableFlowControl(), nats.IdleHeartbeat(time.Second), nats.AckNone())
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
@@ -2065,15 +2065,23 @@ func (mset *stream) retrySourceConsumerAtSeq(sname string, seq uint64) {
|
||||
|
||||
// Lock should be held.
|
||||
func (mset *stream) cancelSourceConsumer(sname string) {
|
||||
if si := mset.sources[sname]; si != nil && si.sub != nil {
|
||||
mset.unsubscribe(si.sub)
|
||||
si.sub = nil
|
||||
if si := mset.sources[sname]; si != nil {
|
||||
if si.sub != nil {
|
||||
mset.unsubscribe(si.sub)
|
||||
si.sub = nil
|
||||
}
|
||||
si.sseq, si.dseq = 0, 0
|
||||
si.msgs.drain()
|
||||
mset.removeInternalConsumer(si)
|
||||
// If the go routine is still running close the quit chan.
|
||||
if si.qch != nil {
|
||||
close(si.qch)
|
||||
si.qch = nil
|
||||
// Need to set this here, not rely on processSourceMsgs()
|
||||
// to do it, because if we call setSourceConsumer() before the
|
||||
// go routine has returned (but was signaled to stop), then
|
||||
// we would not restart it.
|
||||
si.grr = false
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2091,7 +2099,8 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64) {
|
||||
// Need to delete the old one.
|
||||
mset.removeInternalConsumer(si)
|
||||
|
||||
si.sseq, si.dseq = seq, 0
|
||||
// Do not set si.sseq to seq here. si.sseq will be set in processInboundSourceMsg
|
||||
si.dseq = 0
|
||||
si.last = time.Now()
|
||||
ssi := mset.streamSource(iname)
|
||||
if ssi == nil {
|
||||
@@ -2221,23 +2230,24 @@ func (mset *stream) processSourceMsgs(si *sourceInfo) {
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
mset.mu.Lock()
|
||||
si.grr = false
|
||||
if si.qch != nil {
|
||||
close(si.qch)
|
||||
si.qch = nil
|
||||
}
|
||||
mset.mu.Unlock()
|
||||
}()
|
||||
|
||||
// Grab stream quit channel.
|
||||
// Grab stream messages queue, quit channel and this go routine quit channel.
|
||||
mset.mu.Lock()
|
||||
msgs, qch, siqch := si.msgs, mset.qch, si.qch
|
||||
// Set the last seen as now so that we don't fail at the first check.
|
||||
si.last = time.Now()
|
||||
mset.mu.Unlock()
|
||||
|
||||
defer func() {
|
||||
mset.mu.Lock()
|
||||
si.grr = false
|
||||
// Close ONLY if the quit channel is the same than when we were started
|
||||
if si.qch != nil && si.qch == siqch {
|
||||
close(si.qch)
|
||||
si.qch = nil
|
||||
}
|
||||
mset.mu.Unlock()
|
||||
}()
|
||||
|
||||
t := time.NewTicker(sourceHealthCheckInterval)
|
||||
defer t.Stop()
|
||||
|
||||
@@ -2309,8 +2319,8 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool {
|
||||
|
||||
// If we are no longer the leader cancel this subscriber.
|
||||
if !mset.isLeader() {
|
||||
mset.mu.Unlock()
|
||||
mset.cancelSourceConsumer(si.name)
|
||||
mset.mu.Unlock()
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -2397,7 +2407,9 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool {
|
||||
if err != nil {
|
||||
s := mset.srv
|
||||
if err == errLastSeqMismatch {
|
||||
mset.mu.Lock()
|
||||
mset.cancelSourceConsumer(si.iname)
|
||||
mset.mu.Unlock()
|
||||
mset.retrySourceConsumer(si.iname)
|
||||
} else {
|
||||
s.RateLimitWarnf("JetStream got an error processing inbound source msg: %v", err)
|
||||
|
||||
Reference in New Issue
Block a user