Update observables on purge

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-01-03 14:00:08 -05:00
parent cd8cd3a9eb
commit 54e2de912f
3 changed files with 124 additions and 1 deletions

View File

@@ -195,7 +195,22 @@ func (mset *MsgSet) Delete() error {
// Purge will remove all messages from the message set and underlying store.
func (mset *MsgSet) Purge() uint64 {
return mset.store.Purge()
mset.mu.Lock()
if mset.client == nil {
mset.mu.Unlock()
return 0
}
purged := mset.store.Purge()
stats := mset.store.Stats()
var obs []*Observable
for _, o := range mset.obs {
obs = append(obs, o)
}
mset.mu.Unlock()
for _, o := range obs {
o.purge(stats.FirstSeq)
}
return purged
}
// RemoveMsg will remove a message from a message set.

View File

@@ -1108,6 +1108,7 @@ func (o *Observable) onRedeliverQueue(seq uint64) bool {
return false
}
// Checks the pending messages.
func (o *Observable) checkPending() {
now := time.Now().UnixNano()
shouldSignal := false
@@ -1284,6 +1285,22 @@ func (o *Observable) Active() bool {
return active
}
// This is when the underlying message set has been purged.
func (o *Observable) purge(sseq uint64) {
o.mu.Lock()
o.sseq = sseq
o.asflr = sseq - 1
o.adflr = o.dseq - 1
if len(o.pending) > 0 {
o.pending = nil
if o.ptmr != nil {
o.ptmr.Stop()
o.ptmr = nil
}
}
o.mu.Unlock()
}
func stopAndClearTimer(tp **time.Timer) {
if *tp == nil {
return