mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
[FIXED #2706] - Only utilize full state with deleted details when really needed. Otherwise fast state will suffice.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -41,7 +41,7 @@ var (
|
||||
|
||||
const (
|
||||
// VERSION is the current version for the server.
|
||||
VERSION = "2.6.5"
|
||||
VERSION = "2.6.6-beta"
|
||||
|
||||
// PROTO is the currently supported protocol.
|
||||
// 0 was the original
|
||||
|
||||
@@ -2905,12 +2905,13 @@ func (o *consumer) selectStartingSeqNo() {
|
||||
if o.mset == nil || o.mset.store == nil {
|
||||
o.sseq = 1
|
||||
} else {
|
||||
stats := o.mset.store.State()
|
||||
var state StreamState
|
||||
o.mset.store.FastState(&state)
|
||||
if o.cfg.OptStartSeq == 0 {
|
||||
if o.cfg.DeliverPolicy == DeliverAll {
|
||||
o.sseq = stats.FirstSeq
|
||||
o.sseq = state.FirstSeq
|
||||
} else if o.cfg.DeliverPolicy == DeliverLast {
|
||||
o.sseq = stats.LastSeq
|
||||
o.sseq = state.LastSeq
|
||||
// If we are partitioned here this will be properly set when we become leader.
|
||||
if o.cfg.FilterSubject != _EMPTY_ {
|
||||
ss := o.mset.store.FilteredState(1, o.cfg.FilterSubject)
|
||||
@@ -2919,31 +2920,31 @@ func (o *consumer) selectStartingSeqNo() {
|
||||
} else if o.cfg.DeliverPolicy == DeliverLastPerSubject {
|
||||
if mss := o.mset.store.SubjectsState(o.cfg.FilterSubject); len(mss) > 0 {
|
||||
o.lss = &lastSeqSkipList{
|
||||
resume: stats.LastSeq,
|
||||
resume: state.LastSeq,
|
||||
seqs: createLastSeqSkipList(mss),
|
||||
}
|
||||
o.sseq = o.lss.seqs[0]
|
||||
} else {
|
||||
// If no mapping info just set to last.
|
||||
o.sseq = stats.LastSeq
|
||||
o.sseq = state.LastSeq
|
||||
}
|
||||
} else if o.cfg.OptStartTime != nil {
|
||||
// If we are here we are time based.
|
||||
// TODO(dlc) - Once clustered can't rely on this.
|
||||
o.sseq = o.mset.store.GetSeqFromTime(*o.cfg.OptStartTime)
|
||||
} else {
|
||||
o.sseq = stats.LastSeq + 1
|
||||
o.sseq = state.LastSeq + 1
|
||||
}
|
||||
} else {
|
||||
o.sseq = o.cfg.OptStartSeq
|
||||
}
|
||||
|
||||
if stats.FirstSeq == 0 {
|
||||
if state.FirstSeq == 0 {
|
||||
o.sseq = 1
|
||||
} else if o.sseq < stats.FirstSeq {
|
||||
o.sseq = stats.FirstSeq
|
||||
} else if o.sseq > stats.LastSeq {
|
||||
o.sseq = stats.LastSeq + 1
|
||||
} else if o.sseq < state.FirstSeq {
|
||||
o.sseq = state.FirstSeq
|
||||
} else if o.sseq > state.LastSeq {
|
||||
o.sseq = state.LastSeq + 1
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3295,7 +3296,8 @@ func (o *consumer) setInitialPendingAndStart() {
|
||||
}
|
||||
|
||||
if !filtered && dp != DeliverLastPerSubject {
|
||||
state := mset.store.State()
|
||||
var state StreamState
|
||||
mset.store.FastState(&state)
|
||||
if state.Msgs > 0 {
|
||||
o.sgap = state.Msgs - (o.sseq - state.FirstSeq)
|
||||
}
|
||||
|
||||
@@ -3367,13 +3367,19 @@ func (fs *fileStore) Type() StorageType {
|
||||
}
|
||||
|
||||
// FastState will fill in state with only the following.
|
||||
// Msgs, Bytes, FirstSeq, LastSeq
|
||||
// Msgs, Bytes, First and Last Sequence and Time and NumDeleted.
|
||||
func (fs *fileStore) FastState(state *StreamState) {
|
||||
fs.mu.RLock()
|
||||
state.Msgs = fs.state.Msgs
|
||||
state.Bytes = fs.state.Bytes
|
||||
state.FirstSeq = fs.state.FirstSeq
|
||||
state.FirstTime = fs.state.FirstTime
|
||||
state.LastSeq = fs.state.LastSeq
|
||||
state.LastTime = fs.state.LastTime
|
||||
if state.LastSeq > state.FirstSeq {
|
||||
state.NumDeleted = int((state.LastSeq - state.FirstSeq) - state.Msgs + 1)
|
||||
}
|
||||
state.Consumers = len(fs.cfs)
|
||||
fs.mu.RUnlock()
|
||||
}
|
||||
|
||||
|
||||
@@ -4724,7 +4724,7 @@ func (mset *stream) calculateSyncRequest(state *StreamState, snap *streamSnapsho
|
||||
// processSnapshotDeletes will update our current store based on the snapshot
|
||||
// but only processing deletes and new FirstSeq / purges.
|
||||
func (mset *stream) processSnapshotDeletes(snap *streamSnapshot) {
|
||||
state := mset.store.State()
|
||||
state := mset.state()
|
||||
|
||||
// Adjust if FirstSeq has moved.
|
||||
if snap.FirstSeq > state.FirstSeq {
|
||||
|
||||
@@ -737,13 +737,19 @@ func (ms *memStore) Type() StorageType {
|
||||
}
|
||||
|
||||
// FastState will fill in state with only the following.
|
||||
// Msgs, Bytes, FirstSeq, LastSeq
|
||||
// Msgs, Bytes, First and Last Sequence and Time and NumDeleted.
|
||||
func (ms *memStore) FastState(state *StreamState) {
|
||||
ms.mu.RLock()
|
||||
state.Msgs = ms.state.Msgs
|
||||
state.Bytes = ms.state.Bytes
|
||||
state.FirstSeq = ms.state.FirstSeq
|
||||
state.FirstTime = ms.state.FirstTime
|
||||
state.LastSeq = ms.state.LastSeq
|
||||
state.LastTime = ms.state.LastTime
|
||||
if state.LastSeq > state.FirstSeq {
|
||||
state.NumDeleted = int((state.LastSeq - state.FirstSeq) - state.Msgs + 1)
|
||||
}
|
||||
state.Consumers = ms.consumers
|
||||
ms.mu.RUnlock()
|
||||
}
|
||||
|
||||
|
||||
@@ -425,7 +425,8 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
|
||||
mset.pubAck = mset.pubAck[:end:end]
|
||||
|
||||
// Set our known last sequence.
|
||||
state := mset.store.State()
|
||||
var state StreamState
|
||||
mset.store.FastState(&state)
|
||||
mset.lseq = state.LastSeq
|
||||
|
||||
// If no msgs (new stream), set dedupe state loaded to true.
|
||||
@@ -3502,11 +3503,6 @@ func (mset *stream) lookupConsumer(name string) *consumer {
|
||||
return mset.consumers[name]
|
||||
}
|
||||
|
||||
// State will return the current state for this stream.
|
||||
func (mset *stream) state() StreamState {
|
||||
return mset.stateWithDetail(false)
|
||||
}
|
||||
|
||||
func (mset *stream) numDirectConsumers() (num int) {
|
||||
mset.mu.RLock()
|
||||
defer mset.mu.RUnlock()
|
||||
@@ -3522,6 +3518,11 @@ func (mset *stream) numDirectConsumers() (num int) {
|
||||
return num
|
||||
}
|
||||
|
||||
// State will return the current state for this stream.
|
||||
func (mset *stream) state() StreamState {
|
||||
return mset.stateWithDetail(false)
|
||||
}
|
||||
|
||||
func (mset *stream) stateWithDetail(details bool) StreamState {
|
||||
mset.mu.RLock()
|
||||
c, store := mset.client, mset.store
|
||||
@@ -3530,10 +3531,12 @@ func (mset *stream) stateWithDetail(details bool) StreamState {
|
||||
return StreamState{}
|
||||
}
|
||||
// Currently rely on store.
|
||||
state := store.State()
|
||||
if !details {
|
||||
state.Deleted = nil
|
||||
if details {
|
||||
return store.State()
|
||||
}
|
||||
// Here we do the fast version.
|
||||
var state StreamState
|
||||
mset.store.FastState(&state)
|
||||
return state
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user