From e1574eca3e79e3f8a80b7ca3cf4cac4a2f619e7e Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Wed, 6 Sep 2023 11:51:38 -0700 Subject: [PATCH] Revert "Enables 0s deduplication window duration when the stream has sources (#4476)" This reverts commit db96238ad9b18c9930e63c404d1ae288f8855374. --- server/stream.go | 62 ++++++++++++++++++++++-------------------------- 1 file changed, 29 insertions(+), 33 deletions(-) diff --git a/server/stream.go b/server/stream.go index bfe06b23..fb792845 100644 --- a/server/stream.go +++ b/server/stream.go @@ -805,32 +805,30 @@ func (mset *stream) rebuildDedupe() { mset.ddloaded = true - if mset.cfg.Duplicates > time.Duration(0) { - // We have some messages. Lookup starting sequence by duplicate time window. - sseq := mset.store.GetSeqFromTime(time.Now().Add(-mset.cfg.Duplicates)) - if sseq == 0 { - return + // We have some messages. Lookup starting sequence by duplicate time window. + sseq := mset.store.GetSeqFromTime(time.Now().Add(-mset.cfg.Duplicates)) + if sseq == 0 { + return + } + + var smv StoreMsg + var state StreamState + mset.store.FastState(&state) + + for seq := sseq; seq <= state.LastSeq; seq++ { + sm, err := mset.store.LoadMsg(seq, &smv) + if err != nil { + continue } - - var smv StoreMsg - var state StreamState - mset.store.FastState(&state) - - for seq := sseq; seq <= state.LastSeq; seq++ { - sm, err := mset.store.LoadMsg(seq, &smv) - if err != nil { - continue - } - var msgId string - if len(sm.hdr) > 0 { - if msgId = getMsgId(sm.hdr); msgId != _EMPTY_ { - mset.storeMsgIdLocked(&ddentry{msgId, sm.seq, sm.ts}) - } - } - if seq == state.LastSeq { - mset.lmsgId = msgId + var msgId string + if len(sm.hdr) > 0 { + if msgId = getMsgId(sm.hdr); msgId != _EMPTY_ { + mset.storeMsgIdLocked(&ddentry{msgId, sm.seq, sm.ts}) } } + if seq == state.LastSeq { + mset.lmsgId = msgId + } } } @@ -1025,7 +1023,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi if cfg.MaxConsumers == 0 { cfg.MaxConsumers = -1 } - if cfg.Duplicates == 0 && cfg.Mirror == nil && len(cfg.Sources) == 0 { + if cfg.Duplicates == 0 && cfg.Mirror == nil { maxWindow := StreamDefaultDuplicatesWindow if lim.Duplicates > 0 && maxWindow > lim.Duplicates { maxWindow = lim.Duplicates @@ -3488,15 +3486,13 @@ func (mset *stream) storeMsgId(dde *ddentry) { // storeMsgIdLocked will store the message id for duplicate detection. // Lock should he held. func (mset *stream) storeMsgIdLocked(dde *ddentry) { - if mset.cfg.Duplicates > time.Duration(0) { - if mset.ddmap == nil { - mset.ddmap = make(map[string]*ddentry) - } - mset.ddmap[dde.id] = dde - mset.ddarr = append(mset.ddarr, dde) - if mset.ddtmr == nil { - mset.ddtmr = time.AfterFunc(mset.cfg.Duplicates, mset.purgeMsgIds) - } + if mset.ddmap == nil { + mset.ddmap = make(map[string]*ddentry) + } + mset.ddmap[dde.id] = dde + mset.ddarr = append(mset.ddarr, dde) + if mset.ddtmr == nil { + mset.ddtmr = time.AfterFunc(mset.cfg.Duplicates, mset.purgeMsgIds) } }