General improvements around handling interest retention.

1. During ackMsg processing hold write lock to block concurrent access.
2. Check for presence of preAcks before and force removal if present.
3. Rework check for orphan msgs on startup to use checkStateForInterestStream().

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-03-28 21:25:10 -07:00
parent e516c47a4b
commit 5cabc365df

View File

@@ -210,6 +210,7 @@ type stream struct {
qch chan struct{}
active bool
ddloaded bool
closed bool
// Mirror
mirror *sourceInfo
@@ -3306,6 +3307,8 @@ func (mset *stream) setupStore(fsCfg *FileStoreConfig) error {
return err
}
mset.store = fs
// Register our server.
fs.registerServer(s)
}
mset.mu.Unlock()
@@ -3676,7 +3679,7 @@ var (
func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, lseq uint64, ts int64) error {
mset.mu.Lock()
c, s, store := mset.client, mset.srv, mset.store
if c == nil {
if mset.closed || c == nil {
mset.mu.Unlock()
return nil
}
@@ -4449,6 +4452,7 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
// Clean up consumers.
mset.mu.Lock()
mset.closed = true
var obs []*consumer
for _, o := range mset.consumers {
obs = append(obs, o)
@@ -4628,6 +4632,12 @@ func (mset *stream) getPublicConsumers() []*consumer {
return obs
}
func (mset *stream) isInterestRetention() bool {
mset.mu.RLock()
defer mset.mu.RUnlock()
return mset.cfg.Retention != LimitsPolicy
}
// NumConsumers reports on number of active consumers for this stream.
func (mset *stream) numConsumers() int {
mset.mu.RLock()
@@ -4817,6 +4827,7 @@ func (mset *stream) potentialFilteredConsumers() bool {
return false
}
// Write lock should be held here for the stream to avoid race conditions on state.
func (mset *stream) checkInterest(seq uint64, obs *consumer) bool {
var subj string
if mset.potentialFilteredConsumers() {
@@ -4838,14 +4849,23 @@ func (mset *stream) checkInterest(seq uint64, obs *consumer) bool {
// ackMsg is called into from a consumer when we have a WorkQueue or Interest Retention Policy.
func (mset *stream) ackMsg(o *consumer, seq uint64) {
if mset.cfg.Retention == LimitsPolicy {
if seq == 0 || mset.cfg.Retention == LimitsPolicy {
return
}
// Don't make this RLock(). We need to have only 1 running at a time to gauge interest across all consumers.
mset.mu.Lock()
if mset.closed || mset.store == nil {
mset.mu.Unlock()
return
}
// Make sure this sequence is not below our first sequence.
var state StreamState
mset.store.FastState(&state)
// Make sure this sequence is not below our first sequence.
if seq < state.FirstSeq {
mset.mu.Unlock()
return
}
@@ -4854,31 +4874,45 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) {
case WorkQueuePolicy:
// Normally we just remove a message when its ack'd here but if we have direct consumers
// from sources and/or mirrors we need to make sure they have delivered the msg.
mset.mu.RLock()
shouldRemove = mset.directs <= 0 || !mset.checkInterest(seq, o)
mset.mu.RUnlock()
case InterestPolicy:
mset.mu.RLock()
shouldRemove = !mset.checkInterest(seq, o)
mset.mu.RUnlock()
}
if shouldRemove {
if _, err := mset.store.RemoveMsg(seq); err == ErrStoreEOF {
// This should be rare but I have seen it.
// The ack reached us before the actual msg.
var state StreamState
mset.store.FastState(&state)
if seq >= state.LastSeq {
mset.mu.Lock()
if mset.preAcks == nil {
mset.preAcks = make(map[uint64]struct{})
}
mset.preAcks[seq] = struct{}{}
mset.mu.Unlock()
}
// Check for existing preAcks. These will override the concept of shouldRemove.
if len(mset.preAcks) > 0 {
if _, hasAck := mset.preAcks[seq]; hasAck {
delete(mset.preAcks, seq)
shouldRemove = true
}
}
// If we should remove but we know this is beyond our last we can add to preAcks here.
// The ack reached us before the actual msg.
ackBeforeMsg := shouldRemove && seq > state.LastSeq
if ackBeforeMsg {
if mset.preAcks == nil {
mset.preAcks = make(map[uint64]struct{})
}
mset.preAcks[seq] = struct{}{}
}
mset.mu.Unlock()
// If nothing else to do.
if !shouldRemove || ackBeforeMsg {
return
}
// If we are here we should attempt to remove.
if _, err := mset.store.RemoveMsg(seq); err == ErrStoreEOF {
// The ack reached us before the actual msg.
mset.mu.Lock()
if mset.preAcks == nil {
mset.preAcks = make(map[uint64]struct{})
}
mset.preAcks[seq] = struct{}{}
mset.mu.Unlock()
}
}
// Snapshot creates a snapshot for the stream and possibly consumers.
@@ -5076,37 +5110,17 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error
return mset, nil
}
// This is to check for dangling messages.
// This is to check for dangling messages on interest retention streams.
// Issue https://github.com/nats-io/nats-server/issues/3612
func (mset *stream) checkForOrphanMsgs() {
// We need to grab the low water mark for all consumers.
var ackFloor uint64
mset.mu.RLock()
var consumers []*consumer
for _, o := range mset.consumers {
o.mu.RLock()
if o.store != nil {
if state, err := o.store.BorrowState(); err == nil {
if ackFloor == 0 || state.AckFloor.Stream < ackFloor {
ackFloor = state.AckFloor.Stream
}
}
}
o.mu.RUnlock()
consumers = append(consumers, o)
}
// Grabs stream state.
var state StreamState
mset.store.FastState(&state)
s, acc := mset.srv, mset.acc
mset.mu.RUnlock()
if ackFloor > state.FirstSeq {
req := &JSApiStreamPurgeRequest{Sequence: ackFloor + 1}
purged, err := mset.purge(req)
if err != nil {
s.Warnf("stream '%s > %s' could not auto purge orphaned messages: %v", acc, mset.name(), err)
} else {
s.Debugf("stream '%s > %s' auto purged %d messages", acc, mset.name(), purged)
}
for _, o := range consumers {
o.checkStateForInterestStream()
}
}