mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Make sure we do not drift on accounting.
Three issues were found and resolved. 1. Purge replays after recovery could execute full purge. 2. Callback was registered without lock, which could lead to skew. 3. Cluster reset could stop stream store and recreate it, which could lead to double accounting. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -2808,12 +2808,15 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
|
||||
}
|
||||
panic(err.Error())
|
||||
}
|
||||
// Ignore if we are recovering and we have already processed.
|
||||
if isRecovering && (sp.Request == nil || sp.Request.Sequence == 0) {
|
||||
// If no explicit request, fill in with leader stamped last sequence to protect ourselves on replay during server start.
|
||||
if sp.Request == nil || sp.Request.Sequence == 0 {
|
||||
purgeSeq := sp.LastSeq + 1
|
||||
if sp.Request == nil {
|
||||
sp.Request = &JSApiStreamPurgeRequest{Sequence: sp.LastSeq}
|
||||
} else {
|
||||
sp.Request.Sequence = sp.LastSeq
|
||||
sp.Request = &JSApiStreamPurgeRequest{Sequence: purgeSeq}
|
||||
} else if sp.Request.Keep == 0 {
|
||||
sp.Request.Sequence = purgeSeq
|
||||
} else if isRecovering {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user