mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
1605 lines
38 KiB
Go
1605 lines
38 KiB
Go
// Copyright 2019-2023 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package server
|
|
|
|
import (
|
|
crand "crypto/rand"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/nats-io/nats-server/v2/server/avl"
|
|
)
|
|
|
|
// TODO(dlc) - This is a fairly simplistic approach but should do for now.
|
|
type memStore struct {
|
|
mu sync.RWMutex
|
|
cfg StreamConfig
|
|
state StreamState
|
|
msgs map[uint64]*StoreMsg
|
|
fss map[string]*SimpleState
|
|
dmap avl.SequenceSet
|
|
maxp int64
|
|
scb StorageUpdateHandler
|
|
ageChk *time.Timer
|
|
consumers int
|
|
receivedAny bool
|
|
}
|
|
|
|
func newMemStore(cfg *StreamConfig) (*memStore, error) {
|
|
if cfg == nil {
|
|
return nil, fmt.Errorf("config required")
|
|
}
|
|
if cfg.Storage != MemoryStorage {
|
|
return nil, fmt.Errorf("memStore requires memory storage type in config")
|
|
}
|
|
ms := &memStore{
|
|
msgs: make(map[uint64]*StoreMsg),
|
|
fss: make(map[string]*SimpleState),
|
|
maxp: cfg.MaxMsgsPer,
|
|
cfg: *cfg,
|
|
}
|
|
if cfg.FirstSeq > 0 {
|
|
if _, err := ms.purge(cfg.FirstSeq); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return ms, nil
|
|
}
|
|
|
|
func (ms *memStore) UpdateConfig(cfg *StreamConfig) error {
|
|
if cfg == nil {
|
|
return fmt.Errorf("config required")
|
|
}
|
|
if cfg.Storage != MemoryStorage {
|
|
return fmt.Errorf("memStore requires memory storage type in config")
|
|
}
|
|
|
|
ms.mu.Lock()
|
|
ms.cfg = *cfg
|
|
// Limits checks and enforcement.
|
|
ms.enforceMsgLimit()
|
|
ms.enforceBytesLimit()
|
|
// Do age timers.
|
|
if ms.ageChk == nil && ms.cfg.MaxAge != 0 {
|
|
ms.startAgeChk()
|
|
}
|
|
if ms.ageChk != nil && ms.cfg.MaxAge == 0 {
|
|
ms.ageChk.Stop()
|
|
ms.ageChk = nil
|
|
}
|
|
// Make sure to update MaxMsgsPer
|
|
maxp := ms.maxp
|
|
ms.maxp = cfg.MaxMsgsPer
|
|
// If the value is smaller we need to enforce that.
|
|
if ms.maxp != 0 && ms.maxp < maxp {
|
|
lm := uint64(ms.maxp)
|
|
for subj, ss := range ms.fss {
|
|
if ss.Msgs > lm {
|
|
ms.enforcePerSubjectLimit(subj, ss)
|
|
}
|
|
}
|
|
}
|
|
ms.mu.Unlock()
|
|
|
|
if cfg.MaxAge != 0 {
|
|
ms.expireMsgs()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Stores a raw message with expected sequence number and timestamp.
|
|
// Lock should be held.
|
|
func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int64) error {
|
|
if ms.msgs == nil {
|
|
return ErrStoreClosed
|
|
}
|
|
|
|
// Tracking by subject.
|
|
var ss *SimpleState
|
|
var asl bool
|
|
if len(subj) > 0 {
|
|
if ss = ms.fss[subj]; ss != nil {
|
|
asl = ms.maxp > 0 && ss.Msgs >= uint64(ms.maxp)
|
|
}
|
|
}
|
|
|
|
// Check if we are discarding new messages when we reach the limit.
|
|
if ms.cfg.Discard == DiscardNew {
|
|
if asl && ms.cfg.DiscardNewPer {
|
|
return ErrMaxMsgsPerSubject
|
|
}
|
|
if ms.cfg.MaxMsgs > 0 && ms.state.Msgs >= uint64(ms.cfg.MaxMsgs) {
|
|
// If we are tracking max messages per subject and are at the limit we will replace, so this is ok.
|
|
if !asl {
|
|
return ErrMaxMsgs
|
|
}
|
|
}
|
|
if ms.cfg.MaxBytes > 0 && ms.state.Bytes+uint64(len(msg)+len(hdr)) >= uint64(ms.cfg.MaxBytes) {
|
|
if !asl {
|
|
return ErrMaxBytes
|
|
}
|
|
// If we are here we are at a subject maximum, need to determine if dropping last message gives us enough room.
|
|
if ss.firstNeedsUpdate {
|
|
ms.recalculateFirstForSubj(subj, ss.First, ss)
|
|
}
|
|
sm, ok := ms.msgs[ss.First]
|
|
if !ok || memStoreMsgSize(sm.subj, sm.hdr, sm.msg) < uint64(len(msg)+len(hdr)) {
|
|
return ErrMaxBytes
|
|
}
|
|
}
|
|
}
|
|
|
|
if seq != ms.state.LastSeq+1 {
|
|
if seq > 0 {
|
|
return ErrSequenceMismatch
|
|
}
|
|
seq = ms.state.LastSeq + 1
|
|
}
|
|
|
|
// Adjust first if needed.
|
|
now := time.Unix(0, ts).UTC()
|
|
if ms.state.Msgs == 0 {
|
|
ms.state.FirstSeq = seq
|
|
ms.state.FirstTime = now
|
|
}
|
|
|
|
// Make copies
|
|
// TODO(dlc) - Maybe be smarter here.
|
|
if len(msg) > 0 {
|
|
msg = copyBytes(msg)
|
|
}
|
|
if len(hdr) > 0 {
|
|
hdr = copyBytes(hdr)
|
|
}
|
|
|
|
// FIXME(dlc) - Could pool at this level?
|
|
sm := &StoreMsg{subj, nil, nil, make([]byte, 0, len(hdr)+len(msg)), seq, ts}
|
|
sm.buf = append(sm.buf, hdr...)
|
|
sm.buf = append(sm.buf, msg...)
|
|
if len(hdr) > 0 {
|
|
sm.hdr = sm.buf[:len(hdr)]
|
|
}
|
|
sm.msg = sm.buf[len(hdr):]
|
|
ms.msgs[seq] = sm
|
|
ms.state.Msgs++
|
|
ms.state.Bytes += memStoreMsgSize(subj, hdr, msg)
|
|
ms.state.LastSeq = seq
|
|
ms.state.LastTime = now
|
|
|
|
// Track per subject.
|
|
if len(subj) > 0 {
|
|
if ss != nil {
|
|
ss.Msgs++
|
|
ss.Last = seq
|
|
// Check per subject limits.
|
|
if ms.maxp > 0 && ss.Msgs > uint64(ms.maxp) {
|
|
ms.enforcePerSubjectLimit(subj, ss)
|
|
}
|
|
} else {
|
|
ms.fss[subj] = &SimpleState{Msgs: 1, First: seq, Last: seq}
|
|
}
|
|
}
|
|
|
|
// Limits checks and enforcement.
|
|
ms.enforceMsgLimit()
|
|
ms.enforceBytesLimit()
|
|
|
|
// Check if we have and need the age expiration timer running.
|
|
if ms.ageChk == nil && ms.cfg.MaxAge != 0 {
|
|
ms.startAgeChk()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// StoreRawMsg stores a raw message with expected sequence number and timestamp.
|
|
func (ms *memStore) StoreRawMsg(subj string, hdr, msg []byte, seq uint64, ts int64) error {
|
|
ms.mu.Lock()
|
|
err := ms.storeRawMsg(subj, hdr, msg, seq, ts)
|
|
cb := ms.scb
|
|
// Check if first message timestamp requires expiry
|
|
// sooner than initial replica expiry timer set to MaxAge when initializing.
|
|
if !ms.receivedAny && ms.cfg.MaxAge != 0 && ts > 0 {
|
|
ms.receivedAny = true
|
|
// Calculate duration when the next expireMsgs should be called.
|
|
ms.resetAgeChk(int64(time.Millisecond) * 50)
|
|
}
|
|
ms.mu.Unlock()
|
|
|
|
if err == nil && cb != nil {
|
|
cb(1, int64(memStoreMsgSize(subj, hdr, msg)), seq, subj)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// Store stores a message.
|
|
func (ms *memStore) StoreMsg(subj string, hdr, msg []byte) (uint64, int64, error) {
|
|
ms.mu.Lock()
|
|
seq, ts := ms.state.LastSeq+1, time.Now().UnixNano()
|
|
err := ms.storeRawMsg(subj, hdr, msg, seq, ts)
|
|
cb := ms.scb
|
|
ms.mu.Unlock()
|
|
|
|
if err != nil {
|
|
seq, ts = 0, 0
|
|
} else if cb != nil {
|
|
cb(1, int64(memStoreMsgSize(subj, hdr, msg)), seq, subj)
|
|
}
|
|
|
|
return seq, ts, err
|
|
}
|
|
|
|
// SkipMsg will use the next sequence number but not store anything.
|
|
func (ms *memStore) SkipMsg() uint64 {
|
|
// Grab time.
|
|
now := time.Now().UTC()
|
|
|
|
ms.mu.Lock()
|
|
seq := ms.state.LastSeq + 1
|
|
ms.state.LastSeq = seq
|
|
ms.state.LastTime = now
|
|
if ms.state.Msgs == 0 {
|
|
ms.state.FirstSeq = seq
|
|
ms.state.FirstTime = now
|
|
}
|
|
ms.updateFirstSeq(seq)
|
|
ms.mu.Unlock()
|
|
return seq
|
|
}
|
|
|
|
// RegisterStorageUpdates registers a callback for updates to storage changes.
|
|
// It will present number of messages and bytes as a signed integer and an
|
|
// optional sequence number of the message if a single.
|
|
func (ms *memStore) RegisterStorageUpdates(cb StorageUpdateHandler) {
|
|
ms.mu.Lock()
|
|
ms.scb = cb
|
|
ms.mu.Unlock()
|
|
}
|
|
|
|
// GetSeqFromTime looks for the first sequence number that has the message
|
|
// with >= timestamp.
|
|
// FIXME(dlc) - inefficient.
|
|
func (ms *memStore) GetSeqFromTime(t time.Time) uint64 {
|
|
ts := t.UnixNano()
|
|
ms.mu.RLock()
|
|
defer ms.mu.RUnlock()
|
|
if len(ms.msgs) == 0 {
|
|
return ms.state.LastSeq + 1
|
|
}
|
|
if ts <= ms.msgs[ms.state.FirstSeq].ts {
|
|
return ms.state.FirstSeq
|
|
}
|
|
last := ms.msgs[ms.state.LastSeq].ts
|
|
if ts == last {
|
|
return ms.state.LastSeq
|
|
}
|
|
if ts > last {
|
|
return ms.state.LastSeq + 1
|
|
}
|
|
index := sort.Search(len(ms.msgs), func(i int) bool {
|
|
return ms.msgs[uint64(i)+ms.state.FirstSeq].ts >= ts
|
|
})
|
|
return uint64(index) + ms.state.FirstSeq
|
|
}
|
|
|
|
// FilteredState will return the SimpleState associated with the filtered subject and a proposed starting sequence.
|
|
func (ms *memStore) FilteredState(sseq uint64, subj string) SimpleState {
|
|
// This needs to be a write lock, as filteredStateLocked can
|
|
// mutate the per-subject state.
|
|
ms.mu.Lock()
|
|
defer ms.mu.Unlock()
|
|
|
|
return ms.filteredStateLocked(sseq, subj, false)
|
|
}
|
|
|
|
func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubject bool) SimpleState {
|
|
var ss SimpleState
|
|
|
|
if sseq < ms.state.FirstSeq {
|
|
sseq = ms.state.FirstSeq
|
|
}
|
|
|
|
// If past the end no results.
|
|
if sseq > ms.state.LastSeq {
|
|
return ss
|
|
}
|
|
|
|
isAll := filter == _EMPTY_ || filter == fwcs
|
|
|
|
// First check if we can optimize this part.
|
|
// This means we want all and the starting sequence was before this block.
|
|
if isAll && sseq <= ms.state.FirstSeq {
|
|
total := ms.state.Msgs
|
|
if lastPerSubject {
|
|
total = uint64(len(ms.fss))
|
|
}
|
|
return SimpleState{
|
|
Msgs: total,
|
|
First: ms.state.FirstSeq,
|
|
Last: ms.state.LastSeq,
|
|
}
|
|
}
|
|
|
|
tsa := [32]string{}
|
|
fsa := [32]string{}
|
|
fts := tokenizeSubjectIntoSlice(fsa[:0], filter)
|
|
wc := subjectHasWildcard(filter)
|
|
|
|
// 1. See if we match any subs from fss.
|
|
// 2. If we match and the sseq is past ss.Last then we can use meta only.
|
|
// 3. If we match and we need to do a partial, break and clear any totals and do a full scan like num pending.
|
|
|
|
isMatch := func(subj string) bool {
|
|
if isAll {
|
|
return true
|
|
}
|
|
if !wc {
|
|
return subj == filter
|
|
}
|
|
tts := tokenizeSubjectIntoSlice(tsa[:0], subj)
|
|
return isSubsetMatchTokenized(tts, fts)
|
|
}
|
|
|
|
update := func(fss *SimpleState) {
|
|
msgs, first, last := fss.Msgs, fss.First, fss.Last
|
|
if lastPerSubject {
|
|
msgs, first = 1, last
|
|
}
|
|
ss.Msgs += msgs
|
|
if ss.First == 0 || first < ss.First {
|
|
ss.First = first
|
|
}
|
|
if last > ss.Last {
|
|
ss.Last = last
|
|
}
|
|
}
|
|
|
|
var havePartial bool
|
|
// We will track start and end sequences as we go.
|
|
for subj, fss := range ms.fss {
|
|
if isMatch(subj) {
|
|
if fss.firstNeedsUpdate {
|
|
ms.recalculateFirstForSubj(subj, fss.First, fss)
|
|
}
|
|
if sseq <= fss.First {
|
|
update(fss)
|
|
} else if sseq <= fss.Last {
|
|
// We matched but its a partial.
|
|
havePartial = true
|
|
// Don't break here, we will update to keep tracking last.
|
|
update(fss)
|
|
}
|
|
}
|
|
}
|
|
|
|
// If we did not encounter any partials we can return here.
|
|
if !havePartial {
|
|
return ss
|
|
}
|
|
|
|
// If we are here we need to scan the msgs.
|
|
// Capture first and last sequences for scan and then clear what we had.
|
|
first, last := ss.First, ss.Last
|
|
// To track if we decide to exclude and we need to calculate first.
|
|
var needScanFirst bool
|
|
if first < sseq {
|
|
first = sseq
|
|
needScanFirst = true
|
|
}
|
|
|
|
// Now we want to check if it is better to scan inclusive and recalculate that way
|
|
// or leave and scan exclusive and adjust our totals.
|
|
// ss.Last is always correct here.
|
|
toScan, toExclude := last-first, first-ms.state.FirstSeq+ms.state.LastSeq-ss.Last
|
|
var seen map[string]bool
|
|
if lastPerSubject {
|
|
seen = make(map[string]bool)
|
|
}
|
|
if toScan < toExclude {
|
|
ss.Msgs, ss.First = 0, 0
|
|
for seq := first; seq <= last; seq++ {
|
|
if sm, ok := ms.msgs[seq]; ok && !seen[sm.subj] && isMatch(sm.subj) {
|
|
ss.Msgs++
|
|
if ss.First == 0 {
|
|
ss.First = seq
|
|
}
|
|
if seen != nil {
|
|
seen[sm.subj] = true
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
// We will adjust from the totals above by scanning what we need to exclude.
|
|
ss.First = first
|
|
var adjust uint64
|
|
for seq := ms.state.FirstSeq; seq < first; seq++ {
|
|
if sm, ok := ms.msgs[seq]; ok && !seen[sm.subj] && isMatch(sm.subj) {
|
|
adjust++
|
|
if seen != nil {
|
|
seen[sm.subj] = true
|
|
}
|
|
}
|
|
}
|
|
// Now do range at end.
|
|
for seq := last + 1; seq < ms.state.LastSeq; seq++ {
|
|
if sm, ok := ms.msgs[seq]; ok && !seen[sm.subj] && isMatch(sm.subj) {
|
|
adjust++
|
|
if seen != nil {
|
|
seen[sm.subj] = true
|
|
}
|
|
}
|
|
}
|
|
ss.Msgs -= adjust
|
|
if needScanFirst {
|
|
for seq := first; seq < last; seq++ {
|
|
if sm, ok := ms.msgs[seq]; ok && isMatch(sm.subj) {
|
|
ss.First = seq
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return ss
|
|
}
|
|
|
|
// SubjectsState returns a map of SimpleState for all matching subjects.
|
|
func (ms *memStore) SubjectsState(subject string) map[string]SimpleState {
|
|
ms.mu.RLock()
|
|
defer ms.mu.RUnlock()
|
|
|
|
if len(ms.fss) == 0 {
|
|
return nil
|
|
}
|
|
|
|
fss := make(map[string]SimpleState)
|
|
for subj, ss := range ms.fss {
|
|
if subject == _EMPTY_ || subject == fwcs || subjectIsSubsetMatch(subj, subject) {
|
|
if ss.firstNeedsUpdate {
|
|
ms.recalculateFirstForSubj(subj, ss.First, ss)
|
|
}
|
|
oss := fss[subj]
|
|
if oss.First == 0 { // New
|
|
fss[subj] = *ss
|
|
} else {
|
|
// Merge here.
|
|
oss.Last, oss.Msgs = ss.Last, oss.Msgs+ss.Msgs
|
|
fss[subj] = oss
|
|
}
|
|
}
|
|
}
|
|
return fss
|
|
}
|
|
|
|
// SubjectsTotal return message totals per subject.
|
|
func (ms *memStore) SubjectsTotals(filterSubject string) map[string]uint64 {
|
|
ms.mu.RLock()
|
|
defer ms.mu.RUnlock()
|
|
|
|
if len(ms.fss) == 0 {
|
|
return nil
|
|
}
|
|
|
|
tsa := [32]string{}
|
|
fsa := [32]string{}
|
|
fts := tokenizeSubjectIntoSlice(fsa[:0], filterSubject)
|
|
isAll := filterSubject == _EMPTY_ || filterSubject == fwcs
|
|
|
|
fst := make(map[string]uint64)
|
|
for subj, ss := range ms.fss {
|
|
if isAll {
|
|
fst[subj] = ss.Msgs
|
|
} else {
|
|
if tts := tokenizeSubjectIntoSlice(tsa[:0], subj); isSubsetMatchTokenized(tts, fts) {
|
|
fst[subj] = ss.Msgs
|
|
}
|
|
}
|
|
}
|
|
return fst
|
|
}
|
|
|
|
// NumPending will return the number of pending messages matching the filter subject starting at sequence.
|
|
func (ms *memStore) NumPending(sseq uint64, filter string, lastPerSubject bool) (total, validThrough uint64) {
|
|
// This needs to be a write lock, as filteredStateLocked can
|
|
// mutate the per-subject state.
|
|
ms.mu.Lock()
|
|
defer ms.mu.Unlock()
|
|
|
|
ss := ms.filteredStateLocked(sseq, filter, lastPerSubject)
|
|
return ss.Msgs, ms.state.LastSeq
|
|
}
|
|
|
|
// Will check the msg limit for this tracked subject.
|
|
// Lock should be held.
|
|
func (ms *memStore) enforcePerSubjectLimit(subj string, ss *SimpleState) {
|
|
if ms.maxp <= 0 {
|
|
return
|
|
}
|
|
for nmsgs := ss.Msgs; nmsgs > uint64(ms.maxp); nmsgs = ss.Msgs {
|
|
if ss.firstNeedsUpdate {
|
|
ms.recalculateFirstForSubj(subj, ss.First, ss)
|
|
}
|
|
if !ms.removeMsg(ss.First, false) {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Will check the msg limit and drop firstSeq msg if needed.
|
|
// Lock should be held.
|
|
func (ms *memStore) enforceMsgLimit() {
|
|
if ms.cfg.MaxMsgs <= 0 || ms.state.Msgs <= uint64(ms.cfg.MaxMsgs) {
|
|
return
|
|
}
|
|
for nmsgs := ms.state.Msgs; nmsgs > uint64(ms.cfg.MaxMsgs); nmsgs = ms.state.Msgs {
|
|
ms.deleteFirstMsgOrPanic()
|
|
}
|
|
}
|
|
|
|
// Will check the bytes limit and drop msgs if needed.
|
|
// Lock should be held.
|
|
func (ms *memStore) enforceBytesLimit() {
|
|
if ms.cfg.MaxBytes <= 0 || ms.state.Bytes <= uint64(ms.cfg.MaxBytes) {
|
|
return
|
|
}
|
|
for bs := ms.state.Bytes; bs > uint64(ms.cfg.MaxBytes); bs = ms.state.Bytes {
|
|
ms.deleteFirstMsgOrPanic()
|
|
}
|
|
}
|
|
|
|
// Will start the age check timer.
|
|
// Lock should be held.
|
|
func (ms *memStore) startAgeChk() {
|
|
if ms.ageChk == nil && ms.cfg.MaxAge != 0 {
|
|
ms.ageChk = time.AfterFunc(ms.cfg.MaxAge, ms.expireMsgs)
|
|
}
|
|
}
|
|
|
|
// Lock should be held.
|
|
func (ms *memStore) resetAgeChk(delta int64) {
|
|
if ms.cfg.MaxAge == 0 {
|
|
return
|
|
}
|
|
|
|
fireIn := ms.cfg.MaxAge
|
|
if delta > 0 && time.Duration(delta) < fireIn {
|
|
fireIn = time.Duration(delta)
|
|
}
|
|
if ms.ageChk != nil {
|
|
ms.ageChk.Reset(fireIn)
|
|
} else {
|
|
ms.ageChk = time.AfterFunc(fireIn, ms.expireMsgs)
|
|
}
|
|
}
|
|
|
|
// Will expire msgs that are too old.
|
|
func (ms *memStore) expireMsgs() {
|
|
ms.mu.Lock()
|
|
defer ms.mu.Unlock()
|
|
|
|
now := time.Now().UnixNano()
|
|
minAge := now - int64(ms.cfg.MaxAge)
|
|
for {
|
|
if sm, ok := ms.msgs[ms.state.FirstSeq]; ok && sm.ts <= minAge {
|
|
ms.deleteFirstMsgOrPanic()
|
|
// Recalculate in case we are expiring a bunch.
|
|
now = time.Now().UnixNano()
|
|
minAge = now - int64(ms.cfg.MaxAge)
|
|
|
|
} else {
|
|
if len(ms.msgs) == 0 {
|
|
if ms.ageChk != nil {
|
|
ms.ageChk.Stop()
|
|
ms.ageChk = nil
|
|
}
|
|
} else {
|
|
var fireIn time.Duration
|
|
if sm == nil {
|
|
fireIn = ms.cfg.MaxAge
|
|
} else {
|
|
fireIn = time.Duration(sm.ts - minAge)
|
|
}
|
|
if ms.ageChk != nil {
|
|
ms.ageChk.Reset(fireIn)
|
|
} else {
|
|
ms.ageChk = time.AfterFunc(fireIn, ms.expireMsgs)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// PurgeEx will remove messages based on subject filters, sequence and number of messages to keep.
|
|
// Will return the number of purged messages.
|
|
func (ms *memStore) PurgeEx(subject string, sequence, keep uint64) (purged uint64, err error) {
|
|
if subject == _EMPTY_ || subject == fwcs {
|
|
if keep == 0 && (sequence == 0 || sequence == 1) {
|
|
return ms.Purge()
|
|
}
|
|
if sequence > 1 {
|
|
return ms.Compact(sequence)
|
|
} else if keep > 0 {
|
|
ms.mu.RLock()
|
|
msgs, lseq := ms.state.Msgs, ms.state.LastSeq
|
|
ms.mu.RUnlock()
|
|
if keep >= msgs {
|
|
return 0, nil
|
|
}
|
|
return ms.Compact(lseq - keep + 1)
|
|
}
|
|
return 0, nil
|
|
|
|
}
|
|
eq := compareFn(subject)
|
|
if ss := ms.FilteredState(1, subject); ss.Msgs > 0 {
|
|
if keep > 0 {
|
|
if keep >= ss.Msgs {
|
|
return 0, nil
|
|
}
|
|
ss.Msgs -= keep
|
|
}
|
|
last := ss.Last
|
|
if sequence > 1 {
|
|
last = sequence - 1
|
|
}
|
|
ms.mu.Lock()
|
|
for seq := ss.First; seq <= last; seq++ {
|
|
if sm, ok := ms.msgs[seq]; ok && eq(sm.subj, subject) {
|
|
if ok := ms.removeMsg(sm.seq, false); ok {
|
|
purged++
|
|
if purged >= ss.Msgs {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
ms.mu.Unlock()
|
|
}
|
|
return purged, nil
|
|
}
|
|
|
|
// Purge will remove all messages from this store.
|
|
// Will return the number of purged messages.
|
|
func (ms *memStore) Purge() (uint64, error) {
|
|
ms.mu.RLock()
|
|
first := ms.state.LastSeq + 1
|
|
ms.mu.RUnlock()
|
|
return ms.purge(first)
|
|
}
|
|
|
|
func (ms *memStore) purge(fseq uint64) (uint64, error) {
|
|
ms.mu.Lock()
|
|
purged := uint64(len(ms.msgs))
|
|
cb := ms.scb
|
|
bytes := int64(ms.state.Bytes)
|
|
if fseq < ms.state.LastSeq {
|
|
ms.mu.Unlock()
|
|
return 0, fmt.Errorf("partial purges not supported on memory store")
|
|
}
|
|
ms.state.FirstSeq = fseq
|
|
ms.state.LastSeq = fseq - 1
|
|
ms.state.FirstTime = time.Time{}
|
|
ms.state.Bytes = 0
|
|
ms.state.Msgs = 0
|
|
ms.msgs = make(map[uint64]*StoreMsg)
|
|
ms.fss = make(map[string]*SimpleState)
|
|
ms.mu.Unlock()
|
|
|
|
if cb != nil {
|
|
cb(-int64(purged), -bytes, 0, _EMPTY_)
|
|
}
|
|
|
|
return purged, nil
|
|
}
|
|
|
|
// Compact will remove all messages from this store up to
|
|
// but not including the seq parameter.
|
|
// Will return the number of purged messages.
|
|
func (ms *memStore) Compact(seq uint64) (uint64, error) {
|
|
if seq == 0 {
|
|
return ms.Purge()
|
|
}
|
|
|
|
var purged, bytes uint64
|
|
|
|
ms.mu.Lock()
|
|
cb := ms.scb
|
|
if seq <= ms.state.LastSeq {
|
|
sm, ok := ms.msgs[seq]
|
|
if !ok {
|
|
ms.mu.Unlock()
|
|
return 0, ErrStoreMsgNotFound
|
|
}
|
|
ms.state.FirstSeq = seq
|
|
ms.state.FirstTime = time.Unix(0, sm.ts).UTC()
|
|
|
|
for seq := seq - 1; seq > 0; seq-- {
|
|
if sm := ms.msgs[seq]; sm != nil {
|
|
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
|
|
purged++
|
|
delete(ms.msgs, seq)
|
|
ms.removeSeqPerSubject(sm.subj, seq)
|
|
}
|
|
}
|
|
if purged > ms.state.Msgs {
|
|
purged = ms.state.Msgs
|
|
}
|
|
ms.state.Msgs -= purged
|
|
if bytes > ms.state.Bytes {
|
|
bytes = ms.state.Bytes
|
|
}
|
|
ms.state.Bytes -= bytes
|
|
} else {
|
|
// We are compacting past the end of our range. Do purge and set sequences correctly
|
|
// such that the next message placed will have seq.
|
|
purged = uint64(len(ms.msgs))
|
|
bytes = ms.state.Bytes
|
|
ms.state.Bytes = 0
|
|
ms.state.Msgs = 0
|
|
ms.state.FirstSeq = seq
|
|
ms.state.FirstTime = time.Time{}
|
|
ms.state.LastSeq = seq - 1
|
|
ms.msgs = make(map[uint64]*StoreMsg)
|
|
}
|
|
ms.mu.Unlock()
|
|
|
|
if cb != nil {
|
|
cb(-int64(purged), -int64(bytes), 0, _EMPTY_)
|
|
}
|
|
|
|
return purged, nil
|
|
}
|
|
|
|
// Will completely reset our store.
|
|
func (ms *memStore) reset() error {
|
|
|
|
ms.mu.Lock()
|
|
var purged, bytes uint64
|
|
cb := ms.scb
|
|
if cb != nil {
|
|
for _, sm := range ms.msgs {
|
|
purged++
|
|
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
|
|
}
|
|
}
|
|
|
|
// Reset
|
|
ms.state.FirstSeq = 0
|
|
ms.state.FirstTime = time.Time{}
|
|
ms.state.LastSeq = 0
|
|
ms.state.LastTime = time.Now().UTC()
|
|
// Update msgs and bytes.
|
|
ms.state.Msgs = 0
|
|
ms.state.Bytes = 0
|
|
// Reset msgs and fss.
|
|
ms.msgs = make(map[uint64]*StoreMsg)
|
|
ms.fss = make(map[string]*SimpleState)
|
|
|
|
ms.mu.Unlock()
|
|
|
|
if cb != nil {
|
|
cb(-int64(purged), -int64(bytes), 0, _EMPTY_)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Truncate will truncate a stream store up to seq. Sequence needs to be valid.
|
|
func (ms *memStore) Truncate(seq uint64) error {
|
|
// Check for request to reset.
|
|
if seq == 0 {
|
|
return ms.reset()
|
|
}
|
|
|
|
var purged, bytes uint64
|
|
|
|
ms.mu.Lock()
|
|
lsm, ok := ms.msgs[seq]
|
|
if !ok {
|
|
ms.mu.Unlock()
|
|
return ErrInvalidSequence
|
|
}
|
|
|
|
for i := ms.state.LastSeq; i > seq; i-- {
|
|
if sm := ms.msgs[i]; sm != nil {
|
|
purged++
|
|
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
|
|
delete(ms.msgs, i)
|
|
ms.removeSeqPerSubject(sm.subj, i)
|
|
}
|
|
}
|
|
// Reset last.
|
|
ms.state.LastSeq = lsm.seq
|
|
ms.state.LastTime = time.Unix(0, lsm.ts).UTC()
|
|
// Update msgs and bytes.
|
|
if purged > ms.state.Msgs {
|
|
purged = ms.state.Msgs
|
|
}
|
|
ms.state.Msgs -= purged
|
|
if bytes > ms.state.Bytes {
|
|
bytes = ms.state.Bytes
|
|
}
|
|
ms.state.Bytes -= bytes
|
|
|
|
cb := ms.scb
|
|
ms.mu.Unlock()
|
|
|
|
if cb != nil {
|
|
cb(-int64(purged), -int64(bytes), 0, _EMPTY_)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ms *memStore) deleteFirstMsgOrPanic() {
|
|
if !ms.deleteFirstMsg() {
|
|
panic("jetstream memstore has inconsistent state, can't find first seq msg")
|
|
}
|
|
}
|
|
|
|
func (ms *memStore) deleteFirstMsg() bool {
|
|
return ms.removeMsg(ms.state.FirstSeq, false)
|
|
}
|
|
|
|
// LoadMsg will lookup the message by sequence number and return it if found.
|
|
func (ms *memStore) LoadMsg(seq uint64, smp *StoreMsg) (*StoreMsg, error) {
|
|
ms.mu.RLock()
|
|
sm, ok := ms.msgs[seq]
|
|
last := ms.state.LastSeq
|
|
ms.mu.RUnlock()
|
|
|
|
if !ok || sm == nil {
|
|
var err = ErrStoreEOF
|
|
if seq <= last {
|
|
err = ErrStoreMsgNotFound
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
if smp == nil {
|
|
smp = new(StoreMsg)
|
|
}
|
|
sm.copy(smp)
|
|
return smp, nil
|
|
}
|
|
|
|
// LoadLastMsg will return the last message we have that matches a given subject.
|
|
// The subject can be a wildcard.
|
|
func (ms *memStore) LoadLastMsg(subject string, smp *StoreMsg) (*StoreMsg, error) {
|
|
var sm *StoreMsg
|
|
var ok bool
|
|
|
|
// This needs to be a write lock, as filteredStateLocked can
|
|
// mutate the per-subject state.
|
|
ms.mu.Lock()
|
|
defer ms.mu.Unlock()
|
|
|
|
if subject == _EMPTY_ || subject == fwcs {
|
|
sm, ok = ms.msgs[ms.state.LastSeq]
|
|
} else if subjectIsLiteral(subject) {
|
|
if ss := ms.fss[subject]; ss != nil && ss.Msgs > 0 {
|
|
sm, ok = ms.msgs[ss.Last]
|
|
}
|
|
} else if ss := ms.filteredStateLocked(1, subject, true); ss.Msgs > 0 {
|
|
sm, ok = ms.msgs[ss.Last]
|
|
}
|
|
if !ok || sm == nil {
|
|
return nil, ErrStoreMsgNotFound
|
|
}
|
|
|
|
if smp == nil {
|
|
smp = new(StoreMsg)
|
|
}
|
|
sm.copy(smp)
|
|
return smp, nil
|
|
}
|
|
|
|
// LoadNextMsg will find the next message matching the filter subject starting at the start sequence.
|
|
// The filter subject can be a wildcard.
|
|
func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *StoreMsg) (*StoreMsg, uint64, error) {
|
|
ms.mu.RLock()
|
|
defer ms.mu.RUnlock()
|
|
|
|
if start < ms.state.FirstSeq {
|
|
start = ms.state.FirstSeq
|
|
}
|
|
|
|
// If past the end no results.
|
|
if start > ms.state.LastSeq {
|
|
return nil, ms.state.LastSeq, ErrStoreEOF
|
|
}
|
|
|
|
isAll := filter == _EMPTY_ || filter == fwcs
|
|
|
|
// Skip scan of ms.fss is number of messages in the block are less than
|
|
// 1/2 the number of subjects in ms.fss. Or we have a wc and lots of fss entries.
|
|
const linearScanMaxFSS = 256
|
|
doLinearScan := isAll || 2*int(ms.state.LastSeq-start) < len(ms.fss) || (wc && len(ms.fss) > linearScanMaxFSS)
|
|
|
|
// Initial setup.
|
|
fseq, lseq := start, ms.state.LastSeq
|
|
|
|
if !doLinearScan {
|
|
subs := []string{filter}
|
|
if wc || isAll {
|
|
subs = subs[:0]
|
|
for fsubj := range ms.fss {
|
|
if isAll || subjectIsSubsetMatch(fsubj, filter) {
|
|
subs = append(subs, fsubj)
|
|
}
|
|
}
|
|
}
|
|
fseq, lseq = ms.state.LastSeq, uint64(0)
|
|
for _, subj := range subs {
|
|
ss := ms.fss[subj]
|
|
if ss == nil {
|
|
continue
|
|
}
|
|
if ss.firstNeedsUpdate {
|
|
ms.recalculateFirstForSubj(subj, ss.First, ss)
|
|
}
|
|
if ss.First < fseq {
|
|
fseq = ss.First
|
|
}
|
|
if ss.Last > lseq {
|
|
lseq = ss.Last
|
|
}
|
|
}
|
|
if fseq < start {
|
|
fseq = start
|
|
}
|
|
}
|
|
|
|
eq := subjectsEqual
|
|
if wc {
|
|
eq = subjectIsSubsetMatch
|
|
}
|
|
|
|
for nseq := fseq; nseq <= lseq; nseq++ {
|
|
if sm, ok := ms.msgs[nseq]; ok && (isAll || eq(sm.subj, filter)) {
|
|
if smp == nil {
|
|
smp = new(StoreMsg)
|
|
}
|
|
sm.copy(smp)
|
|
return smp, nseq, nil
|
|
}
|
|
}
|
|
return nil, ms.state.LastSeq, ErrStoreEOF
|
|
}
|
|
|
|
// RemoveMsg will remove the message from this store.
|
|
// Will return the number of bytes removed.
|
|
func (ms *memStore) RemoveMsg(seq uint64) (bool, error) {
|
|
ms.mu.Lock()
|
|
removed := ms.removeMsg(seq, false)
|
|
ms.mu.Unlock()
|
|
return removed, nil
|
|
}
|
|
|
|
// EraseMsg will remove the message and rewrite its contents.
|
|
func (ms *memStore) EraseMsg(seq uint64) (bool, error) {
|
|
ms.mu.Lock()
|
|
removed := ms.removeMsg(seq, true)
|
|
ms.mu.Unlock()
|
|
return removed, nil
|
|
}
|
|
|
|
// Performs logic to update first sequence number.
|
|
// Lock should be held.
|
|
func (ms *memStore) updateFirstSeq(seq uint64) {
|
|
if seq != ms.state.FirstSeq {
|
|
// Interior delete.
|
|
return
|
|
}
|
|
var nsm *StoreMsg
|
|
var ok bool
|
|
for nseq := ms.state.FirstSeq + 1; nseq <= ms.state.LastSeq; nseq++ {
|
|
if nsm, ok = ms.msgs[nseq]; ok {
|
|
break
|
|
}
|
|
}
|
|
oldFirst := ms.state.FirstSeq
|
|
if nsm != nil {
|
|
ms.state.FirstSeq = nsm.seq
|
|
ms.state.FirstTime = time.Unix(0, nsm.ts).UTC()
|
|
} else {
|
|
// Like purge.
|
|
ms.state.FirstSeq = ms.state.LastSeq + 1
|
|
ms.state.FirstTime = time.Time{}
|
|
}
|
|
|
|
if oldFirst == ms.state.FirstSeq-1 {
|
|
ms.dmap.Delete(oldFirst)
|
|
} else {
|
|
for seq := oldFirst; seq < ms.state.FirstSeq; seq++ {
|
|
ms.dmap.Delete(seq)
|
|
}
|
|
}
|
|
if ms.dmap.IsEmpty() {
|
|
ms.dmap.SetInitialMin(ms.state.FirstSeq)
|
|
}
|
|
}
|
|
|
|
// Remove a seq from the fss and select new first.
|
|
// Lock should be held.
|
|
func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) {
|
|
ss := ms.fss[subj]
|
|
if ss == nil {
|
|
return
|
|
}
|
|
if ss.Msgs == 1 {
|
|
delete(ms.fss, subj)
|
|
return
|
|
}
|
|
ss.Msgs--
|
|
|
|
// If we know we only have 1 msg left don't need to search for next first.
|
|
if ss.Msgs == 1 {
|
|
if seq == ss.Last {
|
|
ss.Last = ss.First
|
|
} else {
|
|
ss.First = ss.Last
|
|
}
|
|
ss.firstNeedsUpdate = false
|
|
} else {
|
|
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
|
|
}
|
|
}
|
|
|
|
// Will recalulate the first sequence for this subject in this block.
|
|
func (ms *memStore) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) {
|
|
for tseq := startSeq + 1; tseq <= ss.Last; tseq++ {
|
|
if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj {
|
|
ss.First = tseq
|
|
ss.firstNeedsUpdate = false
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Removes the message referenced by seq.
|
|
// Lock should he held.
|
|
func (ms *memStore) removeMsg(seq uint64, secure bool) bool {
|
|
var ss uint64
|
|
sm, ok := ms.msgs[seq]
|
|
if !ok {
|
|
return false
|
|
}
|
|
|
|
ss = memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
|
|
|
|
delete(ms.msgs, seq)
|
|
if ms.state.Msgs > 0 {
|
|
ms.state.Msgs--
|
|
if ss > ms.state.Bytes {
|
|
ss = ms.state.Bytes
|
|
}
|
|
ms.state.Bytes -= ss
|
|
}
|
|
ms.dmap.Insert(seq)
|
|
ms.updateFirstSeq(seq)
|
|
|
|
if secure {
|
|
if len(sm.hdr) > 0 {
|
|
sm.hdr = make([]byte, len(sm.hdr))
|
|
crand.Read(sm.hdr)
|
|
}
|
|
if len(sm.msg) > 0 {
|
|
sm.msg = make([]byte, len(sm.msg))
|
|
crand.Read(sm.msg)
|
|
}
|
|
sm.seq, sm.ts = 0, 0
|
|
}
|
|
|
|
// Remove any per subject tracking.
|
|
ms.removeSeqPerSubject(sm.subj, seq)
|
|
|
|
if ms.scb != nil {
|
|
// We do not want to hold any locks here.
|
|
ms.mu.Unlock()
|
|
delta := int64(ss)
|
|
ms.scb(-1, -delta, seq, sm.subj)
|
|
ms.mu.Lock()
|
|
}
|
|
|
|
return ok
|
|
}
|
|
|
|
// Type returns the type of the underlying store.
|
|
func (ms *memStore) Type() StorageType {
|
|
return MemoryStorage
|
|
}
|
|
|
|
// FastState will fill in state with only the following.
|
|
// Msgs, Bytes, First and Last Sequence and Time and NumDeleted.
|
|
func (ms *memStore) FastState(state *StreamState) {
|
|
ms.mu.RLock()
|
|
state.Msgs = ms.state.Msgs
|
|
state.Bytes = ms.state.Bytes
|
|
state.FirstSeq = ms.state.FirstSeq
|
|
state.FirstTime = ms.state.FirstTime
|
|
state.LastSeq = ms.state.LastSeq
|
|
state.LastTime = ms.state.LastTime
|
|
if state.LastSeq > state.FirstSeq {
|
|
state.NumDeleted = int((state.LastSeq - state.FirstSeq + 1) - state.Msgs)
|
|
if state.NumDeleted < 0 {
|
|
state.NumDeleted = 0
|
|
}
|
|
}
|
|
state.Consumers = ms.consumers
|
|
state.NumSubjects = len(ms.fss)
|
|
ms.mu.RUnlock()
|
|
}
|
|
|
|
func (ms *memStore) State() StreamState {
|
|
ms.mu.RLock()
|
|
defer ms.mu.RUnlock()
|
|
|
|
state := ms.state
|
|
state.Consumers = ms.consumers
|
|
state.NumSubjects = len(ms.fss)
|
|
state.Deleted = nil
|
|
|
|
// Calculate interior delete details.
|
|
if numDeleted := int((state.LastSeq - state.FirstSeq + 1) - state.Msgs); numDeleted > 0 {
|
|
state.Deleted = make([]uint64, 0, state.NumDeleted)
|
|
// TODO(dlc) - Too Simplistic, once state is updated to allow runs etc redo.
|
|
for seq := state.FirstSeq + 1; seq < ms.state.LastSeq; seq++ {
|
|
if _, ok := ms.msgs[seq]; !ok {
|
|
state.Deleted = append(state.Deleted, seq)
|
|
}
|
|
}
|
|
}
|
|
if len(state.Deleted) > 0 {
|
|
state.NumDeleted = len(state.Deleted)
|
|
}
|
|
|
|
return state
|
|
}
|
|
|
|
func (ms *memStore) Utilization() (total, reported uint64, err error) {
|
|
ms.mu.RLock()
|
|
defer ms.mu.RUnlock()
|
|
return ms.state.Bytes, ms.state.Bytes, nil
|
|
}
|
|
|
|
func memStoreMsgSize(subj string, hdr, msg []byte) uint64 {
|
|
return uint64(len(subj) + len(hdr) + len(msg) + 16) // 8*2 for seq + age
|
|
}
|
|
|
|
// Delete is same as Stop for memory store.
|
|
func (ms *memStore) Delete() error {
|
|
return ms.Stop()
|
|
}
|
|
|
|
func (ms *memStore) Stop() error {
|
|
// These can't come back, so stop is same as Delete.
|
|
ms.Purge()
|
|
ms.mu.Lock()
|
|
if ms.ageChk != nil {
|
|
ms.ageChk.Stop()
|
|
ms.ageChk = nil
|
|
}
|
|
ms.msgs = nil
|
|
ms.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
func (ms *memStore) isClosed() bool {
|
|
ms.mu.RLock()
|
|
defer ms.mu.RUnlock()
|
|
return ms.msgs == nil
|
|
}
|
|
|
|
type consumerMemStore struct {
|
|
mu sync.Mutex
|
|
ms StreamStore
|
|
cfg ConsumerConfig
|
|
state ConsumerState
|
|
closed bool
|
|
}
|
|
|
|
func (ms *memStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerStore, error) {
|
|
if ms == nil {
|
|
return nil, fmt.Errorf("memstore is nil")
|
|
}
|
|
if ms.isClosed() {
|
|
return nil, ErrStoreClosed
|
|
}
|
|
if cfg == nil || name == _EMPTY_ {
|
|
return nil, fmt.Errorf("bad consumer config")
|
|
}
|
|
o := &consumerMemStore{ms: ms, cfg: *cfg}
|
|
ms.AddConsumer(o)
|
|
return o, nil
|
|
}
|
|
|
|
func (ms *memStore) AddConsumer(o ConsumerStore) error {
|
|
ms.mu.Lock()
|
|
ms.consumers++
|
|
ms.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
func (ms *memStore) RemoveConsumer(o ConsumerStore) error {
|
|
ms.mu.Lock()
|
|
if ms.consumers > 0 {
|
|
ms.consumers--
|
|
}
|
|
ms.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
func (ms *memStore) Snapshot(_ time.Duration, _, _ bool) (*SnapshotResult, error) {
|
|
return nil, fmt.Errorf("no impl")
|
|
}
|
|
|
|
// Binary encoded state snapshot, >= v2.10 server.
|
|
func (ms *memStore) EncodedStreamState(failed uint64) ([]byte, error) {
|
|
ms.mu.RLock()
|
|
defer ms.mu.RUnlock()
|
|
|
|
// Quick calculate num deleted.
|
|
numDeleted := int((ms.state.LastSeq - ms.state.FirstSeq + 1) - ms.state.Msgs)
|
|
if numDeleted < 0 {
|
|
numDeleted = 0
|
|
}
|
|
|
|
// Encoded is Msgs, Bytes, FirstSeq, LastSeq, Failed, NumDeleted and optional DeletedBlocks
|
|
var buf [1024]byte
|
|
buf[0], buf[1] = streamStateMagic, streamStateVersion
|
|
n := hdrLen
|
|
n += binary.PutUvarint(buf[n:], ms.state.Msgs)
|
|
n += binary.PutUvarint(buf[n:], ms.state.Bytes)
|
|
n += binary.PutUvarint(buf[n:], ms.state.FirstSeq)
|
|
n += binary.PutUvarint(buf[n:], ms.state.LastSeq)
|
|
n += binary.PutUvarint(buf[n:], failed)
|
|
n += binary.PutUvarint(buf[n:], uint64(numDeleted))
|
|
|
|
b := buf[0:n]
|
|
|
|
if numDeleted > 0 {
|
|
buf, err := ms.dmap.Encode(nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
b = append(b, buf...)
|
|
}
|
|
|
|
return b, nil
|
|
}
|
|
|
|
// SyncDeleted will make sure this stream has same deleted state as dbs.
|
|
func (ms *memStore) SyncDeleted(dbs DeleteBlocks) {
|
|
// For now we share one dmap, so if we have one entry here check if states are the same.
|
|
// Note this will work for any DeleteBlock type, but we expect this to be a dmap too.
|
|
if len(dbs) == 1 {
|
|
ms.mu.RLock()
|
|
min, max, num := ms.dmap.State()
|
|
ms.mu.RUnlock()
|
|
if pmin, pmax, pnum := dbs[0].State(); pmin == min && pmax == max && pnum == num {
|
|
return
|
|
}
|
|
}
|
|
for _, db := range dbs {
|
|
db.Range(func(dseq uint64) bool {
|
|
ms.RemoveMsg(dseq)
|
|
return true
|
|
})
|
|
}
|
|
}
|
|
|
|
func (o *consumerMemStore) Update(state *ConsumerState) error {
|
|
// Sanity checks.
|
|
if state.AckFloor.Consumer > state.Delivered.Consumer {
|
|
return fmt.Errorf("bad ack floor for consumer")
|
|
}
|
|
if state.AckFloor.Stream > state.Delivered.Stream {
|
|
return fmt.Errorf("bad ack floor for stream")
|
|
}
|
|
|
|
// Copy to our state.
|
|
var pending map[uint64]*Pending
|
|
var redelivered map[uint64]uint64
|
|
if len(state.Pending) > 0 {
|
|
pending = make(map[uint64]*Pending, len(state.Pending))
|
|
for seq, p := range state.Pending {
|
|
pending[seq] = &Pending{p.Sequence, p.Timestamp}
|
|
}
|
|
for seq := range pending {
|
|
if seq <= state.AckFloor.Stream || seq > state.Delivered.Stream {
|
|
return fmt.Errorf("bad pending entry, sequence [%d] out of range", seq)
|
|
}
|
|
}
|
|
}
|
|
if len(state.Redelivered) > 0 {
|
|
redelivered = make(map[uint64]uint64, len(state.Redelivered))
|
|
for seq, dc := range state.Redelivered {
|
|
redelivered[seq] = dc
|
|
}
|
|
}
|
|
|
|
// Replace our state.
|
|
o.mu.Lock()
|
|
|
|
// Check to see if this is an outdated update.
|
|
if state.Delivered.Consumer < o.state.Delivered.Consumer {
|
|
o.mu.Unlock()
|
|
return fmt.Errorf("old update ignored")
|
|
}
|
|
|
|
o.state.Delivered = state.Delivered
|
|
o.state.AckFloor = state.AckFloor
|
|
o.state.Pending = pending
|
|
o.state.Redelivered = redelivered
|
|
o.mu.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// SetStarting sets our starting stream sequence.
|
|
func (o *consumerMemStore) SetStarting(sseq uint64) error {
|
|
o.mu.Lock()
|
|
o.state.Delivered.Stream = sseq
|
|
o.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// HasState returns if this store has a recorded state.
|
|
func (o *consumerMemStore) HasState() bool {
|
|
return false
|
|
}
|
|
|
|
func (o *consumerMemStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) error {
|
|
o.mu.Lock()
|
|
defer o.mu.Unlock()
|
|
|
|
if dc != 1 && o.cfg.AckPolicy == AckNone {
|
|
return ErrNoAckPolicy
|
|
}
|
|
|
|
if dseq <= o.state.AckFloor.Consumer {
|
|
return nil
|
|
}
|
|
|
|
// See if we expect an ack for this.
|
|
if o.cfg.AckPolicy != AckNone {
|
|
// Need to create pending records here.
|
|
if o.state.Pending == nil {
|
|
o.state.Pending = make(map[uint64]*Pending)
|
|
}
|
|
var p *Pending
|
|
// Check for an update to a message already delivered.
|
|
if sseq <= o.state.Delivered.Stream {
|
|
if p = o.state.Pending[sseq]; p != nil {
|
|
p.Sequence, p.Timestamp = dseq, ts
|
|
}
|
|
} else {
|
|
// Add to pending.
|
|
o.state.Pending[sseq] = &Pending{dseq, ts}
|
|
}
|
|
// Update delivered as needed.
|
|
if dseq > o.state.Delivered.Consumer {
|
|
o.state.Delivered.Consumer = dseq
|
|
}
|
|
if sseq > o.state.Delivered.Stream {
|
|
o.state.Delivered.Stream = sseq
|
|
}
|
|
|
|
if dc > 1 {
|
|
if maxdc := uint64(o.cfg.MaxDeliver); maxdc > 0 && dc > maxdc {
|
|
// Make sure to remove from pending.
|
|
delete(o.state.Pending, sseq)
|
|
}
|
|
if o.state.Redelivered == nil {
|
|
o.state.Redelivered = make(map[uint64]uint64)
|
|
}
|
|
// Only update if greater then what we already have.
|
|
if o.state.Redelivered[sseq] < dc-1 {
|
|
o.state.Redelivered[sseq] = dc - 1
|
|
}
|
|
}
|
|
} else {
|
|
// For AckNone just update delivered and ackfloor at the same time.
|
|
if dseq > o.state.Delivered.Consumer {
|
|
o.state.Delivered.Consumer = dseq
|
|
o.state.AckFloor.Consumer = dseq
|
|
}
|
|
if sseq > o.state.Delivered.Stream {
|
|
o.state.Delivered.Stream = sseq
|
|
o.state.AckFloor.Stream = sseq
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (o *consumerMemStore) UpdateAcks(dseq, sseq uint64) error {
|
|
o.mu.Lock()
|
|
defer o.mu.Unlock()
|
|
|
|
if o.cfg.AckPolicy == AckNone {
|
|
return ErrNoAckPolicy
|
|
}
|
|
if len(o.state.Pending) == 0 || o.state.Pending[sseq] == nil {
|
|
return ErrStoreMsgNotFound
|
|
}
|
|
|
|
// On restarts the old leader may get a replay from the raft logs that are old.
|
|
if dseq <= o.state.AckFloor.Consumer {
|
|
return nil
|
|
}
|
|
|
|
// Check for AckAll here.
|
|
if o.cfg.AckPolicy == AckAll {
|
|
sgap := sseq - o.state.AckFloor.Stream
|
|
o.state.AckFloor.Consumer = dseq
|
|
o.state.AckFloor.Stream = sseq
|
|
for seq := sseq; seq > sseq-sgap; seq-- {
|
|
delete(o.state.Pending, seq)
|
|
if len(o.state.Redelivered) > 0 {
|
|
delete(o.state.Redelivered, seq)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// AckExplicit
|
|
|
|
// First delete from our pending state.
|
|
if p, ok := o.state.Pending[sseq]; ok {
|
|
delete(o.state.Pending, sseq)
|
|
dseq = p.Sequence // Use the original.
|
|
}
|
|
// Now remove from redelivered.
|
|
if len(o.state.Redelivered) > 0 {
|
|
delete(o.state.Redelivered, sseq)
|
|
}
|
|
|
|
if len(o.state.Pending) == 0 {
|
|
o.state.AckFloor.Consumer = o.state.Delivered.Consumer
|
|
o.state.AckFloor.Stream = o.state.Delivered.Stream
|
|
} else if dseq == o.state.AckFloor.Consumer+1 {
|
|
first := o.state.AckFloor.Consumer == 0
|
|
o.state.AckFloor.Consumer = dseq
|
|
o.state.AckFloor.Stream = sseq
|
|
|
|
if !first && o.state.Delivered.Consumer > dseq {
|
|
for ss := sseq + 1; ss < o.state.Delivered.Stream; ss++ {
|
|
if p, ok := o.state.Pending[ss]; ok {
|
|
if p.Sequence > 0 {
|
|
o.state.AckFloor.Consumer = p.Sequence - 1
|
|
o.state.AckFloor.Stream = ss - 1
|
|
}
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (o *consumerMemStore) UpdateConfig(cfg *ConsumerConfig) error {
|
|
o.mu.Lock()
|
|
defer o.mu.Unlock()
|
|
|
|
// This is mostly unchecked here. We are assuming the upper layers have done sanity checking.
|
|
o.cfg = *cfg
|
|
return nil
|
|
}
|
|
|
|
func (o *consumerMemStore) Stop() error {
|
|
o.mu.Lock()
|
|
o.closed = true
|
|
ms := o.ms
|
|
o.mu.Unlock()
|
|
ms.RemoveConsumer(o)
|
|
return nil
|
|
}
|
|
|
|
func (o *consumerMemStore) Delete() error {
|
|
return o.Stop()
|
|
}
|
|
|
|
func (o *consumerMemStore) StreamDelete() error {
|
|
return o.Stop()
|
|
}
|
|
|
|
func (o *consumerMemStore) State() (*ConsumerState, error) {
|
|
return o.stateWithCopy(true)
|
|
}
|
|
|
|
// This will not copy pending or redelivered, so should only be done under the
|
|
// consumer owner's lock.
|
|
func (o *consumerMemStore) BorrowState() (*ConsumerState, error) {
|
|
return o.stateWithCopy(false)
|
|
}
|
|
|
|
func (o *consumerMemStore) stateWithCopy(doCopy bool) (*ConsumerState, error) {
|
|
o.mu.Lock()
|
|
defer o.mu.Unlock()
|
|
|
|
if o.closed {
|
|
return nil, ErrStoreClosed
|
|
}
|
|
|
|
state := &ConsumerState{}
|
|
|
|
state.Delivered = o.state.Delivered
|
|
state.AckFloor = o.state.AckFloor
|
|
if len(o.state.Pending) > 0 {
|
|
if doCopy {
|
|
state.Pending = o.copyPending()
|
|
} else {
|
|
state.Pending = o.state.Pending
|
|
}
|
|
}
|
|
if len(o.state.Redelivered) > 0 {
|
|
if doCopy {
|
|
state.Redelivered = o.copyRedelivered()
|
|
} else {
|
|
state.Redelivered = o.state.Redelivered
|
|
}
|
|
}
|
|
return state, nil
|
|
}
|
|
|
|
// EncodeState for this consumer store.
|
|
func (o *consumerMemStore) EncodedState() ([]byte, error) {
|
|
o.mu.Lock()
|
|
defer o.mu.Unlock()
|
|
|
|
if o.closed {
|
|
return nil, ErrStoreClosed
|
|
}
|
|
|
|
return encodeConsumerState(&o.state), nil
|
|
}
|
|
|
|
func (o *consumerMemStore) copyPending() map[uint64]*Pending {
|
|
pending := make(map[uint64]*Pending, len(o.state.Pending))
|
|
for seq, p := range o.state.Pending {
|
|
pending[seq] = &Pending{p.Sequence, p.Timestamp}
|
|
}
|
|
return pending
|
|
}
|
|
|
|
func (o *consumerMemStore) copyRedelivered() map[uint64]uint64 {
|
|
redelivered := make(map[uint64]uint64, len(o.state.Redelivered))
|
|
for seq, dc := range o.state.Redelivered {
|
|
redelivered[seq] = dc
|
|
}
|
|
return redelivered
|
|
}
|
|
|
|
// Type returns the type of the underlying store.
|
|
func (o *consumerMemStore) Type() StorageType { return MemoryStorage }
|
|
|
|
// Templates
|
|
type templateMemStore struct{}
|
|
|
|
func newTemplateMemStore() *templateMemStore {
|
|
return &templateMemStore{}
|
|
}
|
|
|
|
// No-ops for memstore.
|
|
func (ts *templateMemStore) Store(t *streamTemplate) error { return nil }
|
|
func (ts *templateMemStore) Delete(t *streamTemplate) error { return nil }
|