mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Revert "Enables 0s deduplication window duration when the stream has sources" (#4495)
This reverts commit db96238ad9 which was
causing `TestNoRaceJetStreamSuperClusterSources` to fail sometimes with
duplicate messages around shutdown.
This commit is contained in:
@@ -805,32 +805,30 @@ func (mset *stream) rebuildDedupe() {
|
||||
|
||||
mset.ddloaded = true
|
||||
|
||||
if mset.cfg.Duplicates > time.Duration(0) {
|
||||
// We have some messages. Lookup starting sequence by duplicate time window.
|
||||
sseq := mset.store.GetSeqFromTime(time.Now().Add(-mset.cfg.Duplicates))
|
||||
if sseq == 0 {
|
||||
return
|
||||
// We have some messages. Lookup starting sequence by duplicate time window.
|
||||
sseq := mset.store.GetSeqFromTime(time.Now().Add(-mset.cfg.Duplicates))
|
||||
if sseq == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
var smv StoreMsg
|
||||
var state StreamState
|
||||
mset.store.FastState(&state)
|
||||
|
||||
for seq := sseq; seq <= state.LastSeq; seq++ {
|
||||
sm, err := mset.store.LoadMsg(seq, &smv)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
var smv StoreMsg
|
||||
var state StreamState
|
||||
mset.store.FastState(&state)
|
||||
|
||||
for seq := sseq; seq <= state.LastSeq; seq++ {
|
||||
sm, err := mset.store.LoadMsg(seq, &smv)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
var msgId string
|
||||
if len(sm.hdr) > 0 {
|
||||
if msgId = getMsgId(sm.hdr); msgId != _EMPTY_ {
|
||||
mset.storeMsgIdLocked(&ddentry{msgId, sm.seq, sm.ts})
|
||||
}
|
||||
}
|
||||
if seq == state.LastSeq {
|
||||
mset.lmsgId = msgId
|
||||
var msgId string
|
||||
if len(sm.hdr) > 0 {
|
||||
if msgId = getMsgId(sm.hdr); msgId != _EMPTY_ {
|
||||
mset.storeMsgIdLocked(&ddentry{msgId, sm.seq, sm.ts})
|
||||
}
|
||||
}
|
||||
if seq == state.LastSeq {
|
||||
mset.lmsgId = msgId
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1025,7 +1023,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
|
||||
if cfg.MaxConsumers == 0 {
|
||||
cfg.MaxConsumers = -1
|
||||
}
|
||||
if cfg.Duplicates == 0 && cfg.Mirror == nil && len(cfg.Sources) == 0 {
|
||||
if cfg.Duplicates == 0 && cfg.Mirror == nil {
|
||||
maxWindow := StreamDefaultDuplicatesWindow
|
||||
if lim.Duplicates > 0 && maxWindow > lim.Duplicates {
|
||||
maxWindow = lim.Duplicates
|
||||
@@ -3488,15 +3486,13 @@ func (mset *stream) storeMsgId(dde *ddentry) {
|
||||
// storeMsgIdLocked will store the message id for duplicate detection.
|
||||
// Lock should he held.
|
||||
func (mset *stream) storeMsgIdLocked(dde *ddentry) {
|
||||
if mset.cfg.Duplicates > time.Duration(0) {
|
||||
if mset.ddmap == nil {
|
||||
mset.ddmap = make(map[string]*ddentry)
|
||||
}
|
||||
mset.ddmap[dde.id] = dde
|
||||
mset.ddarr = append(mset.ddarr, dde)
|
||||
if mset.ddtmr == nil {
|
||||
mset.ddtmr = time.AfterFunc(mset.cfg.Duplicates, mset.purgeMsgIds)
|
||||
}
|
||||
if mset.ddmap == nil {
|
||||
mset.ddmap = make(map[string]*ddentry)
|
||||
}
|
||||
mset.ddmap[dde.id] = dde
|
||||
mset.ddarr = append(mset.ddarr, dde)
|
||||
if mset.ddtmr == nil {
|
||||
mset.ddtmr = time.AfterFunc(mset.cfg.Duplicates, mset.purgeMsgIds)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user