Files
nats-server/server/memstore.go
Waldemar Quevedo 1417ca6671 Fix shutdown deadlock in TestJetStreamClusterMemLeaderRestart (#4430)
While shutting down a server an error during purge from a memory stream
would cause a deadlock sometimes, this would sometimes show up in the
`TestJetStreamClusterMemLeaderRestart` while tearing down the cluster.

This was introduced in
4d8d01949b
so only relates to v2.10.
2023-08-25 07:41:23 -07:00

1598 lines
38 KiB
Go

// Copyright 2019-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
crand "crypto/rand"
"encoding/binary"
"fmt"
"sort"
"sync"
"time"
"github.com/nats-io/nats-server/v2/server/avl"
)
// TODO(dlc) - This is a fairly simplistic approach but should do for now.
type memStore struct {
mu sync.RWMutex
cfg StreamConfig
state StreamState
msgs map[uint64]*StoreMsg
fss map[string]*SimpleState
dmap avl.SequenceSet
maxp int64
scb StorageUpdateHandler
ageChk *time.Timer
consumers int
receivedAny bool
}
func newMemStore(cfg *StreamConfig) (*memStore, error) {
if cfg == nil {
return nil, fmt.Errorf("config required")
}
if cfg.Storage != MemoryStorage {
return nil, fmt.Errorf("memStore requires memory storage type in config")
}
ms := &memStore{
msgs: make(map[uint64]*StoreMsg),
fss: make(map[string]*SimpleState),
maxp: cfg.MaxMsgsPer,
cfg: *cfg,
}
if cfg.FirstSeq > 0 {
if _, err := ms.purge(cfg.FirstSeq); err != nil {
return nil, err
}
}
return ms, nil
}
func (ms *memStore) UpdateConfig(cfg *StreamConfig) error {
if cfg == nil {
return fmt.Errorf("config required")
}
if cfg.Storage != MemoryStorage {
return fmt.Errorf("memStore requires memory storage type in config")
}
ms.mu.Lock()
ms.cfg = *cfg
// Limits checks and enforcement.
ms.enforceMsgLimit()
ms.enforceBytesLimit()
// Do age timers.
if ms.ageChk == nil && ms.cfg.MaxAge != 0 {
ms.startAgeChk()
}
if ms.ageChk != nil && ms.cfg.MaxAge == 0 {
ms.ageChk.Stop()
ms.ageChk = nil
}
// Make sure to update MaxMsgsPer
maxp := ms.maxp
ms.maxp = cfg.MaxMsgsPer
// If the value is smaller we need to enforce that.
if ms.maxp != 0 && ms.maxp < maxp {
lm := uint64(ms.maxp)
for subj, ss := range ms.fss {
if ss.Msgs > lm {
ms.enforcePerSubjectLimit(subj, ss)
}
}
}
ms.mu.Unlock()
if cfg.MaxAge != 0 {
ms.expireMsgs()
}
return nil
}
// Stores a raw message with expected sequence number and timestamp.
// Lock should be held.
func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int64) error {
if ms.msgs == nil {
return ErrStoreClosed
}
// Tracking by subject.
var ss *SimpleState
var asl bool
if len(subj) > 0 {
if ss = ms.fss[subj]; ss != nil {
asl = ms.maxp > 0 && ss.Msgs >= uint64(ms.maxp)
}
}
// Check if we are discarding new messages when we reach the limit.
if ms.cfg.Discard == DiscardNew {
if asl && ms.cfg.DiscardNewPer {
return ErrMaxMsgsPerSubject
}
if ms.cfg.MaxMsgs > 0 && ms.state.Msgs >= uint64(ms.cfg.MaxMsgs) {
// If we are tracking max messages per subject and are at the limit we will replace, so this is ok.
if !asl {
return ErrMaxMsgs
}
}
if ms.cfg.MaxBytes > 0 && ms.state.Bytes+uint64(len(msg)+len(hdr)) >= uint64(ms.cfg.MaxBytes) {
if !asl {
return ErrMaxBytes
}
// If we are here we are at a subject maximum, need to determine if dropping last message gives us enough room.
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
sm, ok := ms.msgs[ss.First]
if !ok || memStoreMsgSize(sm.subj, sm.hdr, sm.msg) < uint64(len(msg)+len(hdr)) {
return ErrMaxBytes
}
}
}
if seq != ms.state.LastSeq+1 {
if seq > 0 {
return ErrSequenceMismatch
}
seq = ms.state.LastSeq + 1
}
// Adjust first if needed.
now := time.Unix(0, ts).UTC()
if ms.state.Msgs == 0 {
ms.state.FirstSeq = seq
ms.state.FirstTime = now
}
// Make copies
// TODO(dlc) - Maybe be smarter here.
if len(msg) > 0 {
msg = copyBytes(msg)
}
if len(hdr) > 0 {
hdr = copyBytes(hdr)
}
// FIXME(dlc) - Could pool at this level?
sm := &StoreMsg{subj, nil, nil, make([]byte, 0, len(hdr)+len(msg)), seq, ts}
sm.buf = append(sm.buf, hdr...)
sm.buf = append(sm.buf, msg...)
if len(hdr) > 0 {
sm.hdr = sm.buf[:len(hdr)]
}
sm.msg = sm.buf[len(hdr):]
ms.msgs[seq] = sm
ms.state.Msgs++
ms.state.Bytes += memStoreMsgSize(subj, hdr, msg)
ms.state.LastSeq = seq
ms.state.LastTime = now
// Track per subject.
if len(subj) > 0 {
if ss != nil {
ss.Msgs++
ss.Last = seq
// Check per subject limits.
if ms.maxp > 0 && ss.Msgs > uint64(ms.maxp) {
ms.enforcePerSubjectLimit(subj, ss)
}
} else {
ms.fss[subj] = &SimpleState{Msgs: 1, First: seq, Last: seq}
}
}
// Limits checks and enforcement.
ms.enforceMsgLimit()
ms.enforceBytesLimit()
// Check if we have and need the age expiration timer running.
if ms.ageChk == nil && ms.cfg.MaxAge != 0 {
ms.startAgeChk()
}
return nil
}
// StoreRawMsg stores a raw message with expected sequence number and timestamp.
func (ms *memStore) StoreRawMsg(subj string, hdr, msg []byte, seq uint64, ts int64) error {
ms.mu.Lock()
err := ms.storeRawMsg(subj, hdr, msg, seq, ts)
cb := ms.scb
// Check if first message timestamp requires expiry
// sooner than initial replica expiry timer set to MaxAge when initializing.
if !ms.receivedAny && ms.cfg.MaxAge != 0 && ts > 0 {
ms.receivedAny = true
// Calculate duration when the next expireMsgs should be called.
ms.resetAgeChk(int64(time.Millisecond) * 50)
}
ms.mu.Unlock()
if err == nil && cb != nil {
cb(1, int64(memStoreMsgSize(subj, hdr, msg)), seq, subj)
}
return err
}
// Store stores a message.
func (ms *memStore) StoreMsg(subj string, hdr, msg []byte) (uint64, int64, error) {
ms.mu.Lock()
seq, ts := ms.state.LastSeq+1, time.Now().UnixNano()
err := ms.storeRawMsg(subj, hdr, msg, seq, ts)
cb := ms.scb
ms.mu.Unlock()
if err != nil {
seq, ts = 0, 0
} else if cb != nil {
cb(1, int64(memStoreMsgSize(subj, hdr, msg)), seq, subj)
}
return seq, ts, err
}
// SkipMsg will use the next sequence number but not store anything.
func (ms *memStore) SkipMsg() uint64 {
// Grab time.
now := time.Now().UTC()
ms.mu.Lock()
seq := ms.state.LastSeq + 1
ms.state.LastSeq = seq
ms.state.LastTime = now
if ms.state.Msgs == 0 {
ms.state.FirstSeq = seq
ms.state.FirstTime = now
}
ms.updateFirstSeq(seq)
ms.mu.Unlock()
return seq
}
// RegisterStorageUpdates registers a callback for updates to storage changes.
// It will present number of messages and bytes as a signed integer and an
// optional sequence number of the message if a single.
func (ms *memStore) RegisterStorageUpdates(cb StorageUpdateHandler) {
ms.mu.Lock()
ms.scb = cb
ms.mu.Unlock()
}
// GetSeqFromTime looks for the first sequence number that has the message
// with >= timestamp.
// FIXME(dlc) - inefficient.
func (ms *memStore) GetSeqFromTime(t time.Time) uint64 {
ts := t.UnixNano()
ms.mu.RLock()
defer ms.mu.RUnlock()
if len(ms.msgs) == 0 {
return ms.state.LastSeq + 1
}
if ts <= ms.msgs[ms.state.FirstSeq].ts {
return ms.state.FirstSeq
}
last := ms.msgs[ms.state.LastSeq].ts
if ts == last {
return ms.state.LastSeq
}
if ts > last {
return ms.state.LastSeq + 1
}
index := sort.Search(len(ms.msgs), func(i int) bool {
return ms.msgs[uint64(i)+ms.state.FirstSeq].ts >= ts
})
return uint64(index) + ms.state.FirstSeq
}
// FilteredState will return the SimpleState associated with the filtered subject and a proposed starting sequence.
func (ms *memStore) FilteredState(sseq uint64, subj string) SimpleState {
ms.mu.RLock()
defer ms.mu.RUnlock()
return ms.filteredStateLocked(sseq, subj, false)
}
func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubject bool) SimpleState {
var ss SimpleState
if sseq < ms.state.FirstSeq {
sseq = ms.state.FirstSeq
}
// If past the end no results.
if sseq > ms.state.LastSeq {
return ss
}
isAll := filter == _EMPTY_ || filter == fwcs
// First check if we can optimize this part.
// This means we want all and the starting sequence was before this block.
if isAll && sseq <= ms.state.FirstSeq {
total := ms.state.Msgs
if lastPerSubject {
total = uint64(len(ms.fss))
}
return SimpleState{
Msgs: total,
First: ms.state.FirstSeq,
Last: ms.state.LastSeq,
}
}
tsa := [32]string{}
fsa := [32]string{}
fts := tokenizeSubjectIntoSlice(fsa[:0], filter)
wc := subjectHasWildcard(filter)
// 1. See if we match any subs from fss.
// 2. If we match and the sseq is past ss.Last then we can use meta only.
// 3. If we match and we need to do a partial, break and clear any totals and do a full scan like num pending.
isMatch := func(subj string) bool {
if isAll {
return true
}
if !wc {
return subj == filter
}
tts := tokenizeSubjectIntoSlice(tsa[:0], subj)
return isSubsetMatchTokenized(tts, fts)
}
update := func(fss *SimpleState) {
msgs, first, last := fss.Msgs, fss.First, fss.Last
if lastPerSubject {
msgs, first = 1, last
}
ss.Msgs += msgs
if ss.First == 0 || first < ss.First {
ss.First = first
}
if last > ss.Last {
ss.Last = last
}
}
var havePartial bool
// We will track start and end sequences as we go.
for subj, fss := range ms.fss {
if isMatch(subj) {
if fss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, fss.First, fss)
}
if sseq <= fss.First {
update(fss)
} else if sseq <= fss.Last {
// We matched but its a partial.
havePartial = true
// Don't break here, we will update to keep tracking last.
update(fss)
}
}
}
// If we did not encounter any partials we can return here.
if !havePartial {
return ss
}
// If we are here we need to scan the msgs.
// Capture first and last sequences for scan and then clear what we had.
first, last := ss.First, ss.Last
// To track if we decide to exclude and we need to calculate first.
var needScanFirst bool
if first < sseq {
first = sseq
needScanFirst = true
}
// Now we want to check if it is better to scan inclusive and recalculate that way
// or leave and scan exclusive and adjust our totals.
// ss.Last is always correct here.
toScan, toExclude := last-first, first-ms.state.FirstSeq+ms.state.LastSeq-ss.Last
var seen map[string]bool
if lastPerSubject {
seen = make(map[string]bool)
}
if toScan < toExclude {
ss.Msgs, ss.First = 0, 0
for seq := first; seq <= last; seq++ {
if sm, ok := ms.msgs[seq]; ok && !seen[sm.subj] && isMatch(sm.subj) {
ss.Msgs++
if ss.First == 0 {
ss.First = seq
}
if seen != nil {
seen[sm.subj] = true
}
}
}
} else {
// We will adjust from the totals above by scanning what we need to exclude.
ss.First = first
var adjust uint64
for seq := ms.state.FirstSeq; seq < first; seq++ {
if sm, ok := ms.msgs[seq]; ok && !seen[sm.subj] && isMatch(sm.subj) {
adjust++
if seen != nil {
seen[sm.subj] = true
}
}
}
// Now do range at end.
for seq := last + 1; seq < ms.state.LastSeq; seq++ {
if sm, ok := ms.msgs[seq]; ok && !seen[sm.subj] && isMatch(sm.subj) {
adjust++
if seen != nil {
seen[sm.subj] = true
}
}
}
ss.Msgs -= adjust
if needScanFirst {
for seq := first; seq < last; seq++ {
if sm, ok := ms.msgs[seq]; ok && isMatch(sm.subj) {
ss.First = seq
break
}
}
}
}
return ss
}
// SubjectsState returns a map of SimpleState for all matching subjects.
func (ms *memStore) SubjectsState(subject string) map[string]SimpleState {
ms.mu.RLock()
defer ms.mu.RUnlock()
if len(ms.fss) == 0 {
return nil
}
fss := make(map[string]SimpleState)
for subj, ss := range ms.fss {
if subject == _EMPTY_ || subject == fwcs || subjectIsSubsetMatch(subj, subject) {
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
oss := fss[subj]
if oss.First == 0 { // New
fss[subj] = *ss
} else {
// Merge here.
oss.Last, oss.Msgs = ss.Last, oss.Msgs+ss.Msgs
fss[subj] = oss
}
}
}
return fss
}
// SubjectsTotal return message totals per subject.
func (ms *memStore) SubjectsTotals(filterSubject string) map[string]uint64 {
ms.mu.RLock()
defer ms.mu.RUnlock()
if len(ms.fss) == 0 {
return nil
}
tsa := [32]string{}
fsa := [32]string{}
fts := tokenizeSubjectIntoSlice(fsa[:0], filterSubject)
isAll := filterSubject == _EMPTY_ || filterSubject == fwcs
fst := make(map[string]uint64)
for subj, ss := range ms.fss {
if isAll {
fst[subj] = ss.Msgs
} else {
if tts := tokenizeSubjectIntoSlice(tsa[:0], subj); isSubsetMatchTokenized(tts, fts) {
fst[subj] = ss.Msgs
}
}
}
return fst
}
// NumPending will return the number of pending messages matching the filter subject starting at sequence.
func (ms *memStore) NumPending(sseq uint64, filter string, lastPerSubject bool) (total, validThrough uint64) {
ms.mu.RLock()
defer ms.mu.RUnlock()
ss := ms.filteredStateLocked(sseq, filter, lastPerSubject)
return ss.Msgs, ms.state.LastSeq
}
// Will check the msg limit for this tracked subject.
// Lock should be held.
func (ms *memStore) enforcePerSubjectLimit(subj string, ss *SimpleState) {
if ms.maxp <= 0 {
return
}
for nmsgs := ss.Msgs; nmsgs > uint64(ms.maxp); nmsgs = ss.Msgs {
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
if !ms.removeMsg(ss.First, false) {
break
}
}
}
// Will check the msg limit and drop firstSeq msg if needed.
// Lock should be held.
func (ms *memStore) enforceMsgLimit() {
if ms.cfg.MaxMsgs <= 0 || ms.state.Msgs <= uint64(ms.cfg.MaxMsgs) {
return
}
for nmsgs := ms.state.Msgs; nmsgs > uint64(ms.cfg.MaxMsgs); nmsgs = ms.state.Msgs {
ms.deleteFirstMsgOrPanic()
}
}
// Will check the bytes limit and drop msgs if needed.
// Lock should be held.
func (ms *memStore) enforceBytesLimit() {
if ms.cfg.MaxBytes <= 0 || ms.state.Bytes <= uint64(ms.cfg.MaxBytes) {
return
}
for bs := ms.state.Bytes; bs > uint64(ms.cfg.MaxBytes); bs = ms.state.Bytes {
ms.deleteFirstMsgOrPanic()
}
}
// Will start the age check timer.
// Lock should be held.
func (ms *memStore) startAgeChk() {
if ms.ageChk == nil && ms.cfg.MaxAge != 0 {
ms.ageChk = time.AfterFunc(ms.cfg.MaxAge, ms.expireMsgs)
}
}
// Lock should be held.
func (ms *memStore) resetAgeChk(delta int64) {
if ms.cfg.MaxAge == 0 {
return
}
fireIn := ms.cfg.MaxAge
if delta > 0 && time.Duration(delta) < fireIn {
fireIn = time.Duration(delta)
}
if ms.ageChk != nil {
ms.ageChk.Reset(fireIn)
} else {
ms.ageChk = time.AfterFunc(fireIn, ms.expireMsgs)
}
}
// Will expire msgs that are too old.
func (ms *memStore) expireMsgs() {
ms.mu.Lock()
defer ms.mu.Unlock()
now := time.Now().UnixNano()
minAge := now - int64(ms.cfg.MaxAge)
for {
if sm, ok := ms.msgs[ms.state.FirstSeq]; ok && sm.ts <= minAge {
ms.deleteFirstMsgOrPanic()
// Recalculate in case we are expiring a bunch.
now = time.Now().UnixNano()
minAge = now - int64(ms.cfg.MaxAge)
} else {
if len(ms.msgs) == 0 {
if ms.ageChk != nil {
ms.ageChk.Stop()
ms.ageChk = nil
}
} else {
var fireIn time.Duration
if sm == nil {
fireIn = ms.cfg.MaxAge
} else {
fireIn = time.Duration(sm.ts - minAge)
}
if ms.ageChk != nil {
ms.ageChk.Reset(fireIn)
} else {
ms.ageChk = time.AfterFunc(fireIn, ms.expireMsgs)
}
}
return
}
}
}
// PurgeEx will remove messages based on subject filters, sequence and number of messages to keep.
// Will return the number of purged messages.
func (ms *memStore) PurgeEx(subject string, sequence, keep uint64) (purged uint64, err error) {
if subject == _EMPTY_ || subject == fwcs {
if keep == 0 && (sequence == 0 || sequence == 1) {
return ms.Purge()
}
if sequence > 1 {
return ms.Compact(sequence)
} else if keep > 0 {
ms.mu.RLock()
msgs, lseq := ms.state.Msgs, ms.state.LastSeq
ms.mu.RUnlock()
if keep >= msgs {
return 0, nil
}
return ms.Compact(lseq - keep + 1)
}
return 0, nil
}
eq := compareFn(subject)
if ss := ms.FilteredState(1, subject); ss.Msgs > 0 {
if keep > 0 {
if keep >= ss.Msgs {
return 0, nil
}
ss.Msgs -= keep
}
last := ss.Last
if sequence > 1 {
last = sequence - 1
}
ms.mu.Lock()
for seq := ss.First; seq <= last; seq++ {
if sm, ok := ms.msgs[seq]; ok && eq(sm.subj, subject) {
if ok := ms.removeMsg(sm.seq, false); ok {
purged++
if purged >= ss.Msgs {
break
}
}
}
}
ms.mu.Unlock()
}
return purged, nil
}
// Purge will remove all messages from this store.
// Will return the number of purged messages.
func (ms *memStore) Purge() (uint64, error) {
ms.mu.RLock()
first := ms.state.LastSeq + 1
ms.mu.RUnlock()
return ms.purge(first)
}
func (ms *memStore) purge(fseq uint64) (uint64, error) {
ms.mu.Lock()
purged := uint64(len(ms.msgs))
cb := ms.scb
bytes := int64(ms.state.Bytes)
if fseq < ms.state.LastSeq {
ms.mu.Unlock()
return 0, fmt.Errorf("partial purges not supported on memory store")
}
ms.state.FirstSeq = fseq
ms.state.LastSeq = fseq - 1
ms.state.FirstTime = time.Time{}
ms.state.Bytes = 0
ms.state.Msgs = 0
ms.msgs = make(map[uint64]*StoreMsg)
ms.fss = make(map[string]*SimpleState)
ms.mu.Unlock()
if cb != nil {
cb(-int64(purged), -bytes, 0, _EMPTY_)
}
return purged, nil
}
// Compact will remove all messages from this store up to
// but not including the seq parameter.
// Will return the number of purged messages.
func (ms *memStore) Compact(seq uint64) (uint64, error) {
if seq == 0 {
return ms.Purge()
}
var purged, bytes uint64
ms.mu.Lock()
cb := ms.scb
if seq <= ms.state.LastSeq {
sm, ok := ms.msgs[seq]
if !ok {
ms.mu.Unlock()
return 0, ErrStoreMsgNotFound
}
ms.state.FirstSeq = seq
ms.state.FirstTime = time.Unix(0, sm.ts).UTC()
for seq := seq - 1; seq > 0; seq-- {
if sm := ms.msgs[seq]; sm != nil {
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
purged++
delete(ms.msgs, seq)
ms.removeSeqPerSubject(sm.subj, seq)
}
}
if purged > ms.state.Msgs {
purged = ms.state.Msgs
}
ms.state.Msgs -= purged
if bytes > ms.state.Bytes {
bytes = ms.state.Bytes
}
ms.state.Bytes -= bytes
} else {
// We are compacting past the end of our range. Do purge and set sequences correctly
// such that the next message placed will have seq.
purged = uint64(len(ms.msgs))
bytes = ms.state.Bytes
ms.state.Bytes = 0
ms.state.Msgs = 0
ms.state.FirstSeq = seq
ms.state.FirstTime = time.Time{}
ms.state.LastSeq = seq - 1
ms.msgs = make(map[uint64]*StoreMsg)
}
ms.mu.Unlock()
if cb != nil {
cb(-int64(purged), -int64(bytes), 0, _EMPTY_)
}
return purged, nil
}
// Will completely reset our store.
func (ms *memStore) reset() error {
ms.mu.Lock()
var purged, bytes uint64
cb := ms.scb
if cb != nil {
for _, sm := range ms.msgs {
purged++
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
}
}
// Reset
ms.state.FirstSeq = 0
ms.state.FirstTime = time.Time{}
ms.state.LastSeq = 0
ms.state.LastTime = time.Now().UTC()
// Update msgs and bytes.
ms.state.Msgs = 0
ms.state.Bytes = 0
// Reset msgs and fss.
ms.msgs = make(map[uint64]*StoreMsg)
ms.fss = make(map[string]*SimpleState)
ms.mu.Unlock()
if cb != nil {
cb(-int64(purged), -int64(bytes), 0, _EMPTY_)
}
return nil
}
// Truncate will truncate a stream store up to seq. Sequence needs to be valid.
func (ms *memStore) Truncate(seq uint64) error {
// Check for request to reset.
if seq == 0 {
return ms.reset()
}
var purged, bytes uint64
ms.mu.Lock()
lsm, ok := ms.msgs[seq]
if !ok {
ms.mu.Unlock()
return ErrInvalidSequence
}
for i := ms.state.LastSeq; i > seq; i-- {
if sm := ms.msgs[i]; sm != nil {
purged++
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
delete(ms.msgs, i)
ms.removeSeqPerSubject(sm.subj, i)
}
}
// Reset last.
ms.state.LastSeq = lsm.seq
ms.state.LastTime = time.Unix(0, lsm.ts).UTC()
// Update msgs and bytes.
if purged > ms.state.Msgs {
purged = ms.state.Msgs
}
ms.state.Msgs -= purged
if bytes > ms.state.Bytes {
bytes = ms.state.Bytes
}
ms.state.Bytes -= bytes
cb := ms.scb
ms.mu.Unlock()
if cb != nil {
cb(-int64(purged), -int64(bytes), 0, _EMPTY_)
}
return nil
}
func (ms *memStore) deleteFirstMsgOrPanic() {
if !ms.deleteFirstMsg() {
panic("jetstream memstore has inconsistent state, can't find first seq msg")
}
}
func (ms *memStore) deleteFirstMsg() bool {
return ms.removeMsg(ms.state.FirstSeq, false)
}
// LoadMsg will lookup the message by sequence number and return it if found.
func (ms *memStore) LoadMsg(seq uint64, smp *StoreMsg) (*StoreMsg, error) {
ms.mu.RLock()
sm, ok := ms.msgs[seq]
last := ms.state.LastSeq
ms.mu.RUnlock()
if !ok || sm == nil {
var err = ErrStoreEOF
if seq <= last {
err = ErrStoreMsgNotFound
}
return nil, err
}
if smp == nil {
smp = new(StoreMsg)
}
sm.copy(smp)
return smp, nil
}
// LoadLastMsg will return the last message we have that matches a given subject.
// The subject can be a wildcard.
func (ms *memStore) LoadLastMsg(subject string, smp *StoreMsg) (*StoreMsg, error) {
var sm *StoreMsg
var ok bool
ms.mu.RLock()
defer ms.mu.RUnlock()
if subject == _EMPTY_ || subject == fwcs {
sm, ok = ms.msgs[ms.state.LastSeq]
} else if subjectIsLiteral(subject) {
if ss := ms.fss[subject]; ss != nil && ss.Msgs > 0 {
sm, ok = ms.msgs[ss.Last]
}
} else if ss := ms.filteredStateLocked(1, subject, true); ss.Msgs > 0 {
sm, ok = ms.msgs[ss.Last]
}
if !ok || sm == nil {
return nil, ErrStoreMsgNotFound
}
if smp == nil {
smp = new(StoreMsg)
}
sm.copy(smp)
return smp, nil
}
// LoadNextMsg will find the next message matching the filter subject starting at the start sequence.
// The filter subject can be a wildcard.
func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *StoreMsg) (*StoreMsg, uint64, error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
if start < ms.state.FirstSeq {
start = ms.state.FirstSeq
}
// If past the end no results.
if start > ms.state.LastSeq {
return nil, ms.state.LastSeq, ErrStoreEOF
}
isAll := filter == _EMPTY_ || filter == fwcs
// Skip scan of ms.fss is number of messages in the block are less than
// 1/2 the number of subjects in ms.fss. Or we have a wc and lots of fss entries.
const linearScanMaxFSS = 256
doLinearScan := isAll || 2*int(ms.state.LastSeq-start) < len(ms.fss) || (wc && len(ms.fss) > linearScanMaxFSS)
// Initial setup.
fseq, lseq := start, ms.state.LastSeq
if !doLinearScan {
subs := []string{filter}
if wc || isAll {
subs = subs[:0]
for fsubj := range ms.fss {
if isAll || subjectIsSubsetMatch(fsubj, filter) {
subs = append(subs, fsubj)
}
}
}
fseq, lseq = ms.state.LastSeq, uint64(0)
for _, subj := range subs {
ss := ms.fss[subj]
if ss == nil {
continue
}
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
if ss.First < fseq {
fseq = ss.First
}
if ss.Last > lseq {
lseq = ss.Last
}
}
if fseq < start {
fseq = start
}
}
eq := subjectsEqual
if wc {
eq = subjectIsSubsetMatch
}
for nseq := fseq; nseq <= lseq; nseq++ {
if sm, ok := ms.msgs[nseq]; ok && (isAll || eq(sm.subj, filter)) {
if smp == nil {
smp = new(StoreMsg)
}
sm.copy(smp)
return smp, nseq, nil
}
}
return nil, ms.state.LastSeq, ErrStoreEOF
}
// RemoveMsg will remove the message from this store.
// Will return the number of bytes removed.
func (ms *memStore) RemoveMsg(seq uint64) (bool, error) {
ms.mu.Lock()
removed := ms.removeMsg(seq, false)
ms.mu.Unlock()
return removed, nil
}
// EraseMsg will remove the message and rewrite its contents.
func (ms *memStore) EraseMsg(seq uint64) (bool, error) {
ms.mu.Lock()
removed := ms.removeMsg(seq, true)
ms.mu.Unlock()
return removed, nil
}
// Performs logic to update first sequence number.
// Lock should be held.
func (ms *memStore) updateFirstSeq(seq uint64) {
if seq != ms.state.FirstSeq {
// Interior delete.
return
}
var nsm *StoreMsg
var ok bool
for nseq := ms.state.FirstSeq + 1; nseq <= ms.state.LastSeq; nseq++ {
if nsm, ok = ms.msgs[nseq]; ok {
break
}
}
oldFirst := ms.state.FirstSeq
if nsm != nil {
ms.state.FirstSeq = nsm.seq
ms.state.FirstTime = time.Unix(0, nsm.ts).UTC()
} else {
// Like purge.
ms.state.FirstSeq = ms.state.LastSeq + 1
ms.state.FirstTime = time.Time{}
}
if oldFirst == ms.state.FirstSeq-1 {
ms.dmap.Delete(oldFirst)
} else {
for seq := oldFirst; seq < ms.state.FirstSeq; seq++ {
ms.dmap.Delete(seq)
}
}
if ms.dmap.IsEmpty() {
ms.dmap.SetInitialMin(ms.state.FirstSeq)
}
}
// Remove a seq from the fss and select new first.
// Lock should be held.
func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) {
ss := ms.fss[subj]
if ss == nil {
return
}
if ss.Msgs == 1 {
delete(ms.fss, subj)
return
}
ss.Msgs--
// If we know we only have 1 msg left don't need to search for next first.
if ss.Msgs == 1 {
if seq == ss.Last {
ss.Last = ss.First
} else {
ss.First = ss.Last
}
ss.firstNeedsUpdate = false
} else {
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
}
}
// Will recalulate the first sequence for this subject in this block.
func (ms *memStore) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) {
for tseq := startSeq + 1; tseq <= ss.Last; tseq++ {
if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj {
ss.First = tseq
ss.firstNeedsUpdate = false
return
}
}
}
// Removes the message referenced by seq.
// Lock should he held.
func (ms *memStore) removeMsg(seq uint64, secure bool) bool {
var ss uint64
sm, ok := ms.msgs[seq]
if !ok {
return false
}
ss = memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
delete(ms.msgs, seq)
if ms.state.Msgs > 0 {
ms.state.Msgs--
if ss > ms.state.Bytes {
ss = ms.state.Bytes
}
ms.state.Bytes -= ss
}
ms.dmap.Insert(seq)
ms.updateFirstSeq(seq)
if secure {
if len(sm.hdr) > 0 {
sm.hdr = make([]byte, len(sm.hdr))
crand.Read(sm.hdr)
}
if len(sm.msg) > 0 {
sm.msg = make([]byte, len(sm.msg))
crand.Read(sm.msg)
}
sm.seq, sm.ts = 0, 0
}
// Remove any per subject tracking.
ms.removeSeqPerSubject(sm.subj, seq)
if ms.scb != nil {
// We do not want to hold any locks here.
ms.mu.Unlock()
delta := int64(ss)
ms.scb(-1, -delta, seq, sm.subj)
ms.mu.Lock()
}
return ok
}
// Type returns the type of the underlying store.
func (ms *memStore) Type() StorageType {
return MemoryStorage
}
// FastState will fill in state with only the following.
// Msgs, Bytes, First and Last Sequence and Time and NumDeleted.
func (ms *memStore) FastState(state *StreamState) {
ms.mu.RLock()
state.Msgs = ms.state.Msgs
state.Bytes = ms.state.Bytes
state.FirstSeq = ms.state.FirstSeq
state.FirstTime = ms.state.FirstTime
state.LastSeq = ms.state.LastSeq
state.LastTime = ms.state.LastTime
if state.LastSeq > state.FirstSeq {
state.NumDeleted = int((state.LastSeq - state.FirstSeq + 1) - state.Msgs)
if state.NumDeleted < 0 {
state.NumDeleted = 0
}
}
state.Consumers = ms.consumers
state.NumSubjects = len(ms.fss)
ms.mu.RUnlock()
}
func (ms *memStore) State() StreamState {
ms.mu.RLock()
defer ms.mu.RUnlock()
state := ms.state
state.Consumers = ms.consumers
state.NumSubjects = len(ms.fss)
state.Deleted = nil
// Calculate interior delete details.
if numDeleted := int((state.LastSeq - state.FirstSeq + 1) - state.Msgs); numDeleted > 0 {
state.Deleted = make([]uint64, 0, state.NumDeleted)
// TODO(dlc) - Too Simplistic, once state is updated to allow runs etc redo.
for seq := state.FirstSeq + 1; seq < ms.state.LastSeq; seq++ {
if _, ok := ms.msgs[seq]; !ok {
state.Deleted = append(state.Deleted, seq)
}
}
}
if len(state.Deleted) > 0 {
state.NumDeleted = len(state.Deleted)
}
return state
}
func (ms *memStore) Utilization() (total, reported uint64, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
return ms.state.Bytes, ms.state.Bytes, nil
}
func memStoreMsgSize(subj string, hdr, msg []byte) uint64 {
return uint64(len(subj) + len(hdr) + len(msg) + 16) // 8*2 for seq + age
}
// Delete is same as Stop for memory store.
func (ms *memStore) Delete() error {
return ms.Stop()
}
func (ms *memStore) Stop() error {
// These can't come back, so stop is same as Delete.
ms.Purge()
ms.mu.Lock()
if ms.ageChk != nil {
ms.ageChk.Stop()
ms.ageChk = nil
}
ms.msgs = nil
ms.mu.Unlock()
return nil
}
func (ms *memStore) isClosed() bool {
ms.mu.RLock()
defer ms.mu.RUnlock()
return ms.msgs == nil
}
type consumerMemStore struct {
mu sync.Mutex
ms StreamStore
cfg ConsumerConfig
state ConsumerState
closed bool
}
func (ms *memStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerStore, error) {
if ms == nil {
return nil, fmt.Errorf("memstore is nil")
}
if ms.isClosed() {
return nil, ErrStoreClosed
}
if cfg == nil || name == _EMPTY_ {
return nil, fmt.Errorf("bad consumer config")
}
o := &consumerMemStore{ms: ms, cfg: *cfg}
ms.AddConsumer(o)
return o, nil
}
func (ms *memStore) AddConsumer(o ConsumerStore) error {
ms.mu.Lock()
ms.consumers++
ms.mu.Unlock()
return nil
}
func (ms *memStore) RemoveConsumer(o ConsumerStore) error {
ms.mu.Lock()
if ms.consumers > 0 {
ms.consumers--
}
ms.mu.Unlock()
return nil
}
func (ms *memStore) Snapshot(_ time.Duration, _, _ bool) (*SnapshotResult, error) {
return nil, fmt.Errorf("no impl")
}
// Binary encoded state snapshot, >= v2.10 server.
func (ms *memStore) EncodedStreamState(failed uint64) ([]byte, error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
// Quick calculate num deleted.
numDeleted := int((ms.state.LastSeq - ms.state.FirstSeq + 1) - ms.state.Msgs)
if numDeleted < 0 {
numDeleted = 0
}
// Encoded is Msgs, Bytes, FirstSeq, LastSeq, Failed, NumDeleted and optional DeletedBlocks
var buf [1024]byte
buf[0], buf[1] = streamStateMagic, streamStateVersion
n := hdrLen
n += binary.PutUvarint(buf[n:], ms.state.Msgs)
n += binary.PutUvarint(buf[n:], ms.state.Bytes)
n += binary.PutUvarint(buf[n:], ms.state.FirstSeq)
n += binary.PutUvarint(buf[n:], ms.state.LastSeq)
n += binary.PutUvarint(buf[n:], failed)
n += binary.PutUvarint(buf[n:], uint64(numDeleted))
b := buf[0:n]
if numDeleted > 0 {
buf, err := ms.dmap.Encode(nil)
if err != nil {
return nil, err
}
b = append(b, buf...)
}
return b, nil
}
// SyncDeleted will make sure this stream has same deleted state as dbs.
func (ms *memStore) SyncDeleted(dbs DeleteBlocks) {
// For now we share one dmap, so if we have one entry here check if states are the same.
// Note this will work for any DeleteBlock type, but we expect this to be a dmap too.
if len(dbs) == 1 {
ms.mu.RLock()
min, max, num := ms.dmap.State()
ms.mu.RUnlock()
if pmin, pmax, pnum := dbs[0].State(); pmin == min && pmax == max && pnum == num {
return
}
}
for _, db := range dbs {
db.Range(func(dseq uint64) bool {
ms.RemoveMsg(dseq)
return true
})
}
}
func (o *consumerMemStore) Update(state *ConsumerState) error {
// Sanity checks.
if state.AckFloor.Consumer > state.Delivered.Consumer {
return fmt.Errorf("bad ack floor for consumer")
}
if state.AckFloor.Stream > state.Delivered.Stream {
return fmt.Errorf("bad ack floor for stream")
}
// Copy to our state.
var pending map[uint64]*Pending
var redelivered map[uint64]uint64
if len(state.Pending) > 0 {
pending = make(map[uint64]*Pending, len(state.Pending))
for seq, p := range state.Pending {
pending[seq] = &Pending{p.Sequence, p.Timestamp}
}
for seq := range pending {
if seq <= state.AckFloor.Stream || seq > state.Delivered.Stream {
return fmt.Errorf("bad pending entry, sequence [%d] out of range", seq)
}
}
}
if len(state.Redelivered) > 0 {
redelivered = make(map[uint64]uint64, len(state.Redelivered))
for seq, dc := range state.Redelivered {
redelivered[seq] = dc
}
}
// Replace our state.
o.mu.Lock()
// Check to see if this is an outdated update.
if state.Delivered.Consumer < o.state.Delivered.Consumer {
o.mu.Unlock()
return fmt.Errorf("old update ignored")
}
o.state.Delivered = state.Delivered
o.state.AckFloor = state.AckFloor
o.state.Pending = pending
o.state.Redelivered = redelivered
o.mu.Unlock()
return nil
}
// SetStarting sets our starting stream sequence.
func (o *consumerMemStore) SetStarting(sseq uint64) error {
o.mu.Lock()
o.state.Delivered.Stream = sseq
o.mu.Unlock()
return nil
}
// HasState returns if this store has a recorded state.
func (o *consumerMemStore) HasState() bool {
return false
}
func (o *consumerMemStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) error {
o.mu.Lock()
defer o.mu.Unlock()
if dc != 1 && o.cfg.AckPolicy == AckNone {
return ErrNoAckPolicy
}
if dseq <= o.state.AckFloor.Consumer {
return nil
}
// See if we expect an ack for this.
if o.cfg.AckPolicy != AckNone {
// Need to create pending records here.
if o.state.Pending == nil {
o.state.Pending = make(map[uint64]*Pending)
}
var p *Pending
// Check for an update to a message already delivered.
if sseq <= o.state.Delivered.Stream {
if p = o.state.Pending[sseq]; p != nil {
p.Sequence, p.Timestamp = dseq, ts
}
} else {
// Add to pending.
o.state.Pending[sseq] = &Pending{dseq, ts}
}
// Update delivered as needed.
if dseq > o.state.Delivered.Consumer {
o.state.Delivered.Consumer = dseq
}
if sseq > o.state.Delivered.Stream {
o.state.Delivered.Stream = sseq
}
if dc > 1 {
if maxdc := uint64(o.cfg.MaxDeliver); maxdc > 0 && dc > maxdc {
// Make sure to remove from pending.
delete(o.state.Pending, sseq)
}
if o.state.Redelivered == nil {
o.state.Redelivered = make(map[uint64]uint64)
}
// Only update if greater then what we already have.
if o.state.Redelivered[sseq] < dc-1 {
o.state.Redelivered[sseq] = dc - 1
}
}
} else {
// For AckNone just update delivered and ackfloor at the same time.
if dseq > o.state.Delivered.Consumer {
o.state.Delivered.Consumer = dseq
o.state.AckFloor.Consumer = dseq
}
if sseq > o.state.Delivered.Stream {
o.state.Delivered.Stream = sseq
o.state.AckFloor.Stream = sseq
}
}
return nil
}
func (o *consumerMemStore) UpdateAcks(dseq, sseq uint64) error {
o.mu.Lock()
defer o.mu.Unlock()
if o.cfg.AckPolicy == AckNone {
return ErrNoAckPolicy
}
if len(o.state.Pending) == 0 || o.state.Pending[sseq] == nil {
return ErrStoreMsgNotFound
}
// On restarts the old leader may get a replay from the raft logs that are old.
if dseq <= o.state.AckFloor.Consumer {
return nil
}
// Check for AckAll here.
if o.cfg.AckPolicy == AckAll {
sgap := sseq - o.state.AckFloor.Stream
o.state.AckFloor.Consumer = dseq
o.state.AckFloor.Stream = sseq
for seq := sseq; seq > sseq-sgap; seq-- {
delete(o.state.Pending, seq)
if len(o.state.Redelivered) > 0 {
delete(o.state.Redelivered, seq)
}
}
return nil
}
// AckExplicit
// First delete from our pending state.
if p, ok := o.state.Pending[sseq]; ok {
delete(o.state.Pending, sseq)
dseq = p.Sequence // Use the original.
}
// Now remove from redelivered.
if len(o.state.Redelivered) > 0 {
delete(o.state.Redelivered, sseq)
}
if len(o.state.Pending) == 0 {
o.state.AckFloor.Consumer = o.state.Delivered.Consumer
o.state.AckFloor.Stream = o.state.Delivered.Stream
} else if dseq == o.state.AckFloor.Consumer+1 {
first := o.state.AckFloor.Consumer == 0
o.state.AckFloor.Consumer = dseq
o.state.AckFloor.Stream = sseq
if !first && o.state.Delivered.Consumer > dseq {
for ss := sseq + 1; ss < o.state.Delivered.Stream; ss++ {
if p, ok := o.state.Pending[ss]; ok {
if p.Sequence > 0 {
o.state.AckFloor.Consumer = p.Sequence - 1
o.state.AckFloor.Stream = ss - 1
}
break
}
}
}
}
return nil
}
func (o *consumerMemStore) UpdateConfig(cfg *ConsumerConfig) error {
o.mu.Lock()
defer o.mu.Unlock()
// This is mostly unchecked here. We are assuming the upper layers have done sanity checking.
o.cfg = *cfg
return nil
}
func (o *consumerMemStore) Stop() error {
o.mu.Lock()
o.closed = true
ms := o.ms
o.mu.Unlock()
ms.RemoveConsumer(o)
return nil
}
func (o *consumerMemStore) Delete() error {
return o.Stop()
}
func (o *consumerMemStore) StreamDelete() error {
return o.Stop()
}
func (o *consumerMemStore) State() (*ConsumerState, error) {
return o.stateWithCopy(true)
}
// This will not copy pending or redelivered, so should only be done under the
// consumer owner's lock.
func (o *consumerMemStore) BorrowState() (*ConsumerState, error) {
return o.stateWithCopy(false)
}
func (o *consumerMemStore) stateWithCopy(doCopy bool) (*ConsumerState, error) {
o.mu.Lock()
defer o.mu.Unlock()
if o.closed {
return nil, ErrStoreClosed
}
state := &ConsumerState{}
state.Delivered = o.state.Delivered
state.AckFloor = o.state.AckFloor
if len(o.state.Pending) > 0 {
if doCopy {
state.Pending = o.copyPending()
} else {
state.Pending = o.state.Pending
}
}
if len(o.state.Redelivered) > 0 {
if doCopy {
state.Redelivered = o.copyRedelivered()
} else {
state.Redelivered = o.state.Redelivered
}
}
return state, nil
}
// EncodeState for this consumer store.
func (o *consumerMemStore) EncodedState() ([]byte, error) {
o.mu.Lock()
defer o.mu.Unlock()
if o.closed {
return nil, ErrStoreClosed
}
return encodeConsumerState(&o.state), nil
}
func (o *consumerMemStore) copyPending() map[uint64]*Pending {
pending := make(map[uint64]*Pending, len(o.state.Pending))
for seq, p := range o.state.Pending {
pending[seq] = &Pending{p.Sequence, p.Timestamp}
}
return pending
}
func (o *consumerMemStore) copyRedelivered() map[uint64]uint64 {
redelivered := make(map[uint64]uint64, len(o.state.Redelivered))
for seq, dc := range o.state.Redelivered {
redelivered[seq] = dc
}
return redelivered
}
// Type returns the type of the underlying store.
func (o *consumerMemStore) Type() StorageType { return MemoryStorage }
// Templates
type templateMemStore struct{}
func newTemplateMemStore() *templateMemStore {
return &templateMemStore{}
}
// No-ops for memstore.
func (ts *templateMemStore) Store(t *streamTemplate) error { return nil }
func (ts *templateMemStore) Delete(t *streamTemplate) error { return nil }