Fix leaking timers in stream sources

Signed-off-by: Neil Twigg <neil@nats.io>
This commit is contained in:
Neil Twigg
2023-09-14 10:30:24 +01:00
parent ea93e77b7f
commit 6f3f544841

View File

@@ -248,7 +248,8 @@ type stream struct {
mirror *sourceInfo
// Sources
sources map[string]*sourceInfo
sources map[string]*sourceInfo
sourceRetries map[string]*time.Timer
// Indicates we have direct consumers.
directs int
@@ -2765,12 +2766,24 @@ func (mset *stream) scheduleSetSourceConsumerRetryAsap(si *sourceInfo, seq uint6
// Simply schedules setSourceConsumer at the given delay.
//
// Does not require lock
// Lock held on entry
func (mset *stream) scheduleSetSourceConsumerRetry(iname string, seq uint64, delay time.Duration, startTime time.Time) {
time.AfterFunc(delay, func() {
if mset.sourceRetries == nil {
mset.sourceRetries = map[string]*time.Timer{}
}
if t, ok := mset.sourceRetries[iname]; ok && !t.Stop() {
// It looks like the goroutine has started running but hasn't taken the
// stream lock yet (otherwise the map entry would be deleted). We had
// might as well let the running goroutine complete and schedule another
// timer only if it needs to.
return
}
mset.sourceRetries[iname] = time.AfterFunc(delay, func() {
mset.mu.Lock()
defer mset.mu.Unlock()
delete(mset.sourceRetries, iname)
mset.setSourceConsumer(iname, seq, startTime)
mset.mu.Unlock()
})
}