Fix ephemeral consumer delete in single cluster

Also remove retry of sources/mirror in the setSourceConsumer() itself
when not getting a response.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2021-03-10 15:16:31 -07:00
parent 709335f1c2
commit 27f51d4028
2 changed files with 8 additions and 11 deletions

View File

@@ -2018,6 +2018,12 @@ func (c *client) gatewayInterest(acc, subj string) (bool, *SublistResult) {
}
}
e.RUnlock()
// Since callers may just check if the sublist result is nil or not,
// make sure that if what is returned by sl.Match() is the emptyResult, then
// we return nil to the caller.
if r == emptyResult {
r = nil
}
}
return psi, r
}

View File

@@ -1278,7 +1278,7 @@ func (mset *stream) setupMirrorConsumer() error {
}
mset.setMirrorErr(ccr.Error)
case <-time.After(10 * time.Second):
mset.resetMirrorConsumer()
return
}
}()
@@ -1434,12 +1434,7 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) {
}
mset.mu.Unlock()
case <-time.After(10 * time.Second):
// Make sure things have not changed.
mset.mu.Lock()
if si := mset.sources[sname]; si != nil && si.cname == _EMPTY_ {
mset.setSourceConsumer(sname, seq)
}
mset.mu.Unlock()
return
}
}()
}
@@ -2547,10 +2542,6 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
}
}
if deleteFlag {
mset.stopSourceConsumers()
}
// Send stream delete advisory after the consumers.
if deleteFlag && advisory {
mset.sendDeleteAdvisoryLocked()