Merge branch 'main' into dev

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-09-06 13:33:47 -07:00
2 changed files with 29 additions and 34 deletions

View File

@@ -949,32 +949,30 @@ func (mset *stream) rebuildDedupe() {
mset.ddloaded = true
if mset.cfg.Duplicates > time.Duration(0) {
// We have some messages. Lookup starting sequence by duplicate time window.
sseq := mset.store.GetSeqFromTime(time.Now().Add(-mset.cfg.Duplicates))
if sseq == 0 {
return
// We have some messages. Lookup starting sequence by duplicate time window.
sseq := mset.store.GetSeqFromTime(time.Now().Add(-mset.cfg.Duplicates))
if sseq == 0 {
return
}
var smv StoreMsg
var state StreamState
mset.store.FastState(&state)
for seq := sseq; seq <= state.LastSeq; seq++ {
sm, err := mset.store.LoadMsg(seq, &smv)
if err != nil {
continue
}
var smv StoreMsg
var state StreamState
mset.store.FastState(&state)
for seq := sseq; seq <= state.LastSeq; seq++ {
sm, err := mset.store.LoadMsg(seq, &smv)
if err != nil {
continue
}
var msgId string
if len(sm.hdr) > 0 {
if msgId = getMsgId(sm.hdr); msgId != _EMPTY_ {
mset.storeMsgIdLocked(&ddentry{msgId, sm.seq, sm.ts})
}
}
if seq == state.LastSeq {
mset.lmsgId = msgId
var msgId string
if len(sm.hdr) > 0 {
if msgId = getMsgId(sm.hdr); msgId != _EMPTY_ {
mset.storeMsgIdLocked(&ddentry{msgId, sm.seq, sm.ts})
}
}
if seq == state.LastSeq {
mset.lmsgId = msgId
}
}
}
@@ -1177,7 +1175,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
if cfg.MaxConsumers == 0 {
cfg.MaxConsumers = -1
}
if cfg.Duplicates == 0 && cfg.Mirror == nil && len(cfg.Sources) == 0 {
if cfg.Duplicates == 0 && cfg.Mirror == nil {
maxWindow := StreamDefaultDuplicatesWindow
if lim.Duplicates > 0 && maxWindow > lim.Duplicates {
maxWindow = lim.Duplicates
@@ -3793,15 +3791,13 @@ func (mset *stream) storeMsgId(dde *ddentry) {
// storeMsgIdLocked will store the message id for duplicate detection.
// Lock should he held.
func (mset *stream) storeMsgIdLocked(dde *ddentry) {
if mset.cfg.Duplicates > time.Duration(0) {
if mset.ddmap == nil {
mset.ddmap = make(map[string]*ddentry)
}
mset.ddmap[dde.id] = dde
mset.ddarr = append(mset.ddarr, dde)
if mset.ddtmr == nil {
mset.ddtmr = time.AfterFunc(mset.cfg.Duplicates, mset.purgeMsgIds)
}
if mset.ddmap == nil {
mset.ddmap = make(map[string]*ddentry)
}
mset.ddmap[dde.id] = dde
mset.ddarr = append(mset.ddarr, dde)
if mset.ddtmr == nil {
mset.ddtmr = time.AfterFunc(mset.cfg.Duplicates, mset.purgeMsgIds)
}
}