diff --git a/.github/workflows/go-test.yaml b/.github/workflows/go-test.yaml index def55c5f..045a9795 100644 --- a/.github/workflows/go-test.yaml +++ b/.github/workflows/go-test.yaml @@ -6,7 +6,6 @@ jobs: strategy: matrix: go: ["1.21"] - env: GOPATH: /home/runner/work/nats-server GO111MODULE: "on" diff --git a/server/stream.go b/server/stream.go index 310b9e7a..015b9b2d 100644 --- a/server/stream.go +++ b/server/stream.go @@ -949,32 +949,30 @@ func (mset *stream) rebuildDedupe() { mset.ddloaded = true - if mset.cfg.Duplicates > time.Duration(0) { - // We have some messages. Lookup starting sequence by duplicate time window. - sseq := mset.store.GetSeqFromTime(time.Now().Add(-mset.cfg.Duplicates)) - if sseq == 0 { - return + // We have some messages. Lookup starting sequence by duplicate time window. + sseq := mset.store.GetSeqFromTime(time.Now().Add(-mset.cfg.Duplicates)) + if sseq == 0 { + return + } + + var smv StoreMsg + var state StreamState + mset.store.FastState(&state) + + for seq := sseq; seq <= state.LastSeq; seq++ { + sm, err := mset.store.LoadMsg(seq, &smv) + if err != nil { + continue } - - var smv StoreMsg - var state StreamState - mset.store.FastState(&state) - - for seq := sseq; seq <= state.LastSeq; seq++ { - sm, err := mset.store.LoadMsg(seq, &smv) - if err != nil { - continue - } - var msgId string - if len(sm.hdr) > 0 { - if msgId = getMsgId(sm.hdr); msgId != _EMPTY_ { - mset.storeMsgIdLocked(&ddentry{msgId, sm.seq, sm.ts}) - } - } - if seq == state.LastSeq { - mset.lmsgId = msgId + var msgId string + if len(sm.hdr) > 0 { + if msgId = getMsgId(sm.hdr); msgId != _EMPTY_ { + mset.storeMsgIdLocked(&ddentry{msgId, sm.seq, sm.ts}) } } + if seq == state.LastSeq { + mset.lmsgId = msgId + } } } @@ -1177,7 +1175,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi if cfg.MaxConsumers == 0 { cfg.MaxConsumers = -1 } - if cfg.Duplicates == 0 && cfg.Mirror == nil && len(cfg.Sources) == 0 { + if cfg.Duplicates == 0 && cfg.Mirror == nil { maxWindow := StreamDefaultDuplicatesWindow if lim.Duplicates > 0 && maxWindow > lim.Duplicates { maxWindow = lim.Duplicates @@ -3793,15 +3791,13 @@ func (mset *stream) storeMsgId(dde *ddentry) { // storeMsgIdLocked will store the message id for duplicate detection. // Lock should he held. func (mset *stream) storeMsgIdLocked(dde *ddentry) { - if mset.cfg.Duplicates > time.Duration(0) { - if mset.ddmap == nil { - mset.ddmap = make(map[string]*ddentry) - } - mset.ddmap[dde.id] = dde - mset.ddarr = append(mset.ddarr, dde) - if mset.ddtmr == nil { - mset.ddtmr = time.AfterFunc(mset.cfg.Duplicates, mset.purgeMsgIds) - } + if mset.ddmap == nil { + mset.ddmap = make(map[string]*ddentry) + } + mset.ddmap[dde.id] = dde + mset.ddarr = append(mset.ddarr, dde) + if mset.ddtmr == nil { + mset.ddtmr = time.AfterFunc(mset.cfg.Duplicates, mset.purgeMsgIds) } }