mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
1. We were holding open FDs longer than we should for consumers causing issues with open FD limits. We now do not hold them open and cap updates a bit better. 2. When doing a stream delete, consumer delete was repeating alot of work that was not necessary, causing longer delays. This has been optimized a bit, still more improvements to be made. 3. We cover all JS under a single export, but that was also trapping GetNext for pull based consumers, and since this was a no-op (is handled at user account level) we were creating alot of garbage service import responses and reverse map entries that had to be garbage collected. We have a fix in to avoind this but still looking for a better one. 4. Still had some lingering references to all exports vs single JS export. Signed-off-by: Derek Collison <derek@nats.io>
823 lines
19 KiB
Go
823 lines
19 KiB
Go
// Copyright 2019-2021 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package server
|
|
|
|
import (
|
|
"fmt"
|
|
"math/rand"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// TODO(dlc) - This is a fairly simplistic approach but should do for now.
|
|
type memStore struct {
|
|
mu sync.RWMutex
|
|
cfg StreamConfig
|
|
state StreamState
|
|
msgs map[uint64]*storedMsg
|
|
dmap map[uint64]struct{}
|
|
fss map[string]*SimpleState
|
|
maxp int64
|
|
scb StorageUpdateHandler
|
|
ageChk *time.Timer
|
|
consumers int
|
|
}
|
|
|
|
type storedMsg struct {
|
|
subj string
|
|
hdr []byte
|
|
msg []byte
|
|
seq uint64
|
|
ts int64 // nanoseconds
|
|
}
|
|
|
|
func newMemStore(cfg *StreamConfig) (*memStore, error) {
|
|
if cfg == nil {
|
|
return nil, fmt.Errorf("config required")
|
|
}
|
|
if cfg.Storage != MemoryStorage {
|
|
return nil, fmt.Errorf("memStore requires memory storage type in config")
|
|
}
|
|
ms := &memStore{
|
|
msgs: make(map[uint64]*storedMsg),
|
|
fss: make(map[string]*SimpleState),
|
|
dmap: make(map[uint64]struct{}),
|
|
maxp: cfg.MaxMsgsPer,
|
|
cfg: *cfg,
|
|
}
|
|
|
|
return ms, nil
|
|
}
|
|
|
|
func (ms *memStore) UpdateConfig(cfg *StreamConfig) error {
|
|
if cfg == nil {
|
|
return fmt.Errorf("config required")
|
|
}
|
|
if cfg.Storage != MemoryStorage {
|
|
return fmt.Errorf("memStore requires memory storage type in config")
|
|
}
|
|
|
|
ms.mu.Lock()
|
|
ms.cfg = *cfg
|
|
// Limits checks and enforcement.
|
|
ms.enforceMsgLimit()
|
|
ms.enforceBytesLimit()
|
|
// Do age timers.
|
|
if ms.ageChk == nil && ms.cfg.MaxAge != 0 {
|
|
ms.startAgeChk()
|
|
}
|
|
if ms.ageChk != nil && ms.cfg.MaxAge == 0 {
|
|
ms.ageChk.Stop()
|
|
ms.ageChk = nil
|
|
}
|
|
ms.mu.Unlock()
|
|
|
|
if cfg.MaxAge != 0 {
|
|
ms.expireMsgs()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Stores a raw message with expected sequence number and timestamp.
|
|
// Lock should be held.
|
|
func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int64) error {
|
|
if ms.msgs == nil {
|
|
return ErrStoreClosed
|
|
}
|
|
|
|
// Check if we are discarding new messages when we reach the limit.
|
|
if ms.cfg.Discard == DiscardNew {
|
|
if ms.cfg.MaxMsgs > 0 && ms.state.Msgs >= uint64(ms.cfg.MaxMsgs) {
|
|
return ErrMaxMsgs
|
|
}
|
|
if ms.cfg.MaxBytes > 0 && ms.state.Bytes+uint64(len(msg)+len(hdr)) >= uint64(ms.cfg.MaxBytes) {
|
|
return ErrMaxBytes
|
|
}
|
|
if ms.maxp > 0 && len(subj) > 0 {
|
|
if ss := ms.fss[subj]; ss != nil && ss.Msgs >= uint64(ms.maxp) {
|
|
return ErrMaxMsgsPerSubject
|
|
}
|
|
}
|
|
}
|
|
|
|
if seq != ms.state.LastSeq+1 {
|
|
if seq > 0 {
|
|
return ErrSequenceMismatch
|
|
}
|
|
seq = ms.state.LastSeq + 1
|
|
}
|
|
|
|
// Adjust first if needed.
|
|
now := time.Unix(0, ts).UTC()
|
|
if ms.state.Msgs == 0 {
|
|
ms.state.FirstSeq = seq
|
|
ms.state.FirstTime = now
|
|
}
|
|
|
|
// Make copies - https://github.com/go101/go101/wiki
|
|
// TODO(dlc) - Maybe be smarter here.
|
|
if len(msg) > 0 {
|
|
msg = append(msg[:0:0], msg...)
|
|
}
|
|
if len(hdr) > 0 {
|
|
hdr = append(hdr[:0:0], hdr...)
|
|
}
|
|
|
|
ms.msgs[seq] = &storedMsg{subj, hdr, msg, seq, ts}
|
|
ms.state.Msgs++
|
|
ms.state.Bytes += memStoreMsgSize(subj, hdr, msg)
|
|
ms.state.LastSeq = seq
|
|
ms.state.LastTime = now
|
|
|
|
// Track per subject.
|
|
if len(subj) > 0 {
|
|
if ss := ms.fss[subj]; ss != nil {
|
|
ss.Msgs++
|
|
ss.Last = seq
|
|
// Check per subject limits.
|
|
if ms.maxp > 0 && ss.Msgs > uint64(ms.maxp) {
|
|
ms.enforcePerSubjectLimit(ss)
|
|
}
|
|
} else {
|
|
ms.fss[subj] = &SimpleState{Msgs: 1, First: seq, Last: seq}
|
|
}
|
|
}
|
|
|
|
// Limits checks and enforcement.
|
|
ms.enforceMsgLimit()
|
|
ms.enforceBytesLimit()
|
|
|
|
// Check if we have and need the age expiration timer running.
|
|
if ms.ageChk == nil && ms.cfg.MaxAge != 0 {
|
|
ms.startAgeChk()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// StoreRawMsg stores a raw message with expected sequence number and timestamp.
|
|
func (ms *memStore) StoreRawMsg(subj string, hdr, msg []byte, seq uint64, ts int64) error {
|
|
ms.mu.Lock()
|
|
err := ms.storeRawMsg(subj, hdr, msg, seq, ts)
|
|
cb := ms.scb
|
|
ms.mu.Unlock()
|
|
|
|
if err == nil && cb != nil {
|
|
cb(1, int64(memStoreMsgSize(subj, hdr, msg)), seq, subj)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// Store stores a message.
|
|
func (ms *memStore) StoreMsg(subj string, hdr, msg []byte) (uint64, int64, error) {
|
|
ms.mu.Lock()
|
|
seq, ts := ms.state.LastSeq+1, time.Now().UnixNano()
|
|
err := ms.storeRawMsg(subj, hdr, msg, seq, ts)
|
|
cb := ms.scb
|
|
ms.mu.Unlock()
|
|
|
|
if err != nil {
|
|
seq, ts = 0, 0
|
|
} else if cb != nil {
|
|
cb(1, int64(memStoreMsgSize(subj, hdr, msg)), seq, subj)
|
|
}
|
|
|
|
return seq, ts, err
|
|
}
|
|
|
|
// SkipMsg will use the next sequence number but not store anything.
|
|
func (ms *memStore) SkipMsg() uint64 {
|
|
// Grab time.
|
|
now := time.Now().UTC()
|
|
|
|
ms.mu.Lock()
|
|
seq := ms.state.LastSeq + 1
|
|
ms.state.LastSeq = seq
|
|
ms.state.LastTime = now
|
|
if ms.state.Msgs == 0 {
|
|
ms.state.FirstSeq = seq
|
|
ms.state.FirstTime = now
|
|
}
|
|
ms.updateFirstSeq(seq)
|
|
ms.mu.Unlock()
|
|
return seq
|
|
}
|
|
|
|
// RegisterStorageUpdates registers a callback for updates to storage changes.
|
|
// It will present number of messages and bytes as a signed integer and an
|
|
// optional sequence number of the message if a single.
|
|
func (ms *memStore) RegisterStorageUpdates(cb StorageUpdateHandler) {
|
|
ms.mu.Lock()
|
|
ms.scb = cb
|
|
ms.mu.Unlock()
|
|
}
|
|
|
|
// GetSeqFromTime looks for the first sequence number that has the message
|
|
// with >= timestamp.
|
|
// FIXME(dlc) - inefficient.
|
|
func (ms *memStore) GetSeqFromTime(t time.Time) uint64 {
|
|
ts := t.UnixNano()
|
|
ms.mu.RLock()
|
|
defer ms.mu.RUnlock()
|
|
if len(ms.msgs) == 0 {
|
|
return ms.state.LastSeq + 1
|
|
}
|
|
if ts <= ms.msgs[ms.state.FirstSeq].ts {
|
|
return ms.state.FirstSeq
|
|
}
|
|
last := ms.msgs[ms.state.LastSeq].ts
|
|
if ts == last {
|
|
return ms.state.LastSeq
|
|
}
|
|
if ts > last {
|
|
return ms.state.LastSeq + 1
|
|
}
|
|
index := sort.Search(len(ms.msgs), func(i int) bool {
|
|
return ms.msgs[uint64(i)+ms.state.FirstSeq].ts >= ts
|
|
})
|
|
return uint64(index) + ms.state.FirstSeq
|
|
}
|
|
|
|
// FilteredState will return the SimpleState associated with the filtered subject and a proposed starting sequence.
|
|
func (ms *memStore) FilteredState(sseq uint64, subj string) SimpleState {
|
|
ms.mu.RLock()
|
|
defer ms.mu.RUnlock()
|
|
return ms.filteredStateLocked(sseq, subj)
|
|
}
|
|
|
|
func (ms *memStore) filteredStateLocked(sseq uint64, subj string) SimpleState {
|
|
var ss SimpleState
|
|
|
|
if sseq < ms.state.FirstSeq {
|
|
sseq = ms.state.FirstSeq
|
|
}
|
|
|
|
// If past the end no results.
|
|
if sseq > ms.state.LastSeq {
|
|
return ss
|
|
}
|
|
|
|
// If we want everything.
|
|
if subj == _EMPTY_ || subj == fwcs {
|
|
ss.Msgs, ss.First, ss.Last = ms.state.Msgs, ms.state.FirstSeq, ms.state.LastSeq
|
|
return ss
|
|
}
|
|
|
|
wc := subjectHasWildcard(subj)
|
|
subs := []string{subj}
|
|
if wc {
|
|
subs = subs[:0]
|
|
for fsubj := range ms.fss {
|
|
if subjectIsSubsetMatch(fsubj, subj) {
|
|
subs = append(subs, fsubj)
|
|
}
|
|
}
|
|
}
|
|
fseq, lseq := ms.state.LastSeq, uint64(0)
|
|
for _, subj := range subs {
|
|
ss := ms.fss[subj]
|
|
if ss == nil {
|
|
continue
|
|
}
|
|
if ss.First < fseq {
|
|
fseq = ss.First
|
|
}
|
|
if ss.Last > lseq {
|
|
lseq = ss.Last
|
|
}
|
|
}
|
|
if fseq < sseq {
|
|
fseq = sseq
|
|
}
|
|
|
|
// FIXME(dlc) - Optimize better like filestore.
|
|
eq := compareFn(subj)
|
|
for seq := fseq; seq <= lseq; seq++ {
|
|
if sm, ok := ms.msgs[seq]; ok && eq(sm.subj, subj) {
|
|
ss.Msgs++
|
|
if ss.First == 0 {
|
|
ss.First = seq
|
|
}
|
|
ss.Last = seq
|
|
}
|
|
}
|
|
return ss
|
|
}
|
|
|
|
// Will check the msg limit for this tracked subject.
|
|
// Lock should be held.
|
|
func (ms *memStore) enforcePerSubjectLimit(ss *SimpleState) {
|
|
if ms.maxp <= 0 {
|
|
return
|
|
}
|
|
for nmsgs := ss.Msgs; nmsgs > uint64(ms.maxp); nmsgs = ss.Msgs {
|
|
if !ms.removeMsg(ss.First, false) {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Will check the msg limit and drop firstSeq msg if needed.
|
|
// Lock should be held.
|
|
func (ms *memStore) enforceMsgLimit() {
|
|
if ms.cfg.MaxMsgs <= 0 || ms.state.Msgs <= uint64(ms.cfg.MaxMsgs) {
|
|
return
|
|
}
|
|
for nmsgs := ms.state.Msgs; nmsgs > uint64(ms.cfg.MaxMsgs); nmsgs = ms.state.Msgs {
|
|
ms.deleteFirstMsgOrPanic()
|
|
}
|
|
}
|
|
|
|
// Will check the bytes limit and drop msgs if needed.
|
|
// Lock should be held.
|
|
func (ms *memStore) enforceBytesLimit() {
|
|
if ms.cfg.MaxBytes <= 0 || ms.state.Bytes <= uint64(ms.cfg.MaxBytes) {
|
|
return
|
|
}
|
|
for bs := ms.state.Bytes; bs > uint64(ms.cfg.MaxBytes); bs = ms.state.Bytes {
|
|
ms.deleteFirstMsgOrPanic()
|
|
}
|
|
}
|
|
|
|
// Will start the age check timer.
|
|
// Lock should be held.
|
|
func (ms *memStore) startAgeChk() {
|
|
if ms.ageChk == nil && ms.cfg.MaxAge != 0 {
|
|
ms.ageChk = time.AfterFunc(ms.cfg.MaxAge, ms.expireMsgs)
|
|
}
|
|
}
|
|
|
|
// Will expire msgs that are too old.
|
|
func (ms *memStore) expireMsgs() {
|
|
ms.mu.Lock()
|
|
defer ms.mu.Unlock()
|
|
|
|
now := time.Now().UnixNano()
|
|
minAge := now - int64(ms.cfg.MaxAge)
|
|
for {
|
|
if sm, ok := ms.msgs[ms.state.FirstSeq]; ok && sm.ts <= minAge {
|
|
ms.deleteFirstMsgOrPanic()
|
|
} else {
|
|
if !ok {
|
|
if ms.ageChk != nil {
|
|
ms.ageChk.Stop()
|
|
ms.ageChk = nil
|
|
}
|
|
} else {
|
|
fireIn := time.Duration(sm.ts-now) + ms.cfg.MaxAge
|
|
if ms.ageChk != nil {
|
|
ms.ageChk.Reset(fireIn)
|
|
} else {
|
|
ms.ageChk = time.AfterFunc(fireIn, ms.expireMsgs)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// PurgeEx will remove messages based on subject filters, sequence and number of messages to keep.
|
|
// Will return the number of purged messages.
|
|
func (ms *memStore) PurgeEx(subject string, sequence, keep uint64) (purged uint64, err error) {
|
|
if subject == _EMPTY_ || subject == fwcs {
|
|
if keep == 0 && (sequence == 0 || sequence == 1) {
|
|
return ms.Purge()
|
|
}
|
|
if sequence > 1 {
|
|
return ms.Compact(sequence)
|
|
} else if keep > 0 {
|
|
ms.mu.RLock()
|
|
msgs, lseq := ms.state.Msgs, ms.state.LastSeq
|
|
ms.mu.RUnlock()
|
|
if keep >= msgs {
|
|
return 0, nil
|
|
}
|
|
return ms.Compact(lseq - keep + 1)
|
|
}
|
|
return 0, nil
|
|
|
|
}
|
|
eq := compareFn(subject)
|
|
if ss := ms.FilteredState(1, subject); ss.Msgs > 0 {
|
|
if keep > 0 {
|
|
if keep >= ss.Msgs {
|
|
return 0, nil
|
|
}
|
|
ss.Msgs -= keep
|
|
}
|
|
last := ss.Last
|
|
if sequence > 0 {
|
|
last = sequence - 1
|
|
}
|
|
ms.mu.Lock()
|
|
for seq := ss.First; seq <= last; seq++ {
|
|
if sm, ok := ms.msgs[seq]; ok && eq(sm.subj, subject) {
|
|
if ok := ms.removeMsg(sm.seq, false); ok {
|
|
purged++
|
|
if purged >= ss.Msgs {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
ms.mu.Unlock()
|
|
}
|
|
return purged, nil
|
|
}
|
|
|
|
// Purge will remove all messages from this store.
|
|
// Will return the number of purged messages.
|
|
func (ms *memStore) Purge() (uint64, error) {
|
|
ms.mu.Lock()
|
|
purged := uint64(len(ms.msgs))
|
|
cb := ms.scb
|
|
bytes := int64(ms.state.Bytes)
|
|
ms.state.FirstSeq = ms.state.LastSeq + 1
|
|
ms.state.FirstTime = time.Time{}
|
|
ms.state.Bytes = 0
|
|
ms.state.Msgs = 0
|
|
ms.msgs = make(map[uint64]*storedMsg)
|
|
ms.fss = make(map[string]*SimpleState)
|
|
ms.dmap = make(map[uint64]struct{})
|
|
ms.mu.Unlock()
|
|
|
|
if cb != nil {
|
|
cb(-int64(purged), -bytes, 0, _EMPTY_)
|
|
}
|
|
|
|
return purged, nil
|
|
}
|
|
|
|
// Compact will remove all messages from this store up to
|
|
// but not including the seq parameter.
|
|
// Will return the number of purged messages.
|
|
func (ms *memStore) Compact(seq uint64) (uint64, error) {
|
|
if seq == 0 {
|
|
return ms.Purge()
|
|
}
|
|
|
|
var purged, bytes uint64
|
|
|
|
ms.mu.Lock()
|
|
cb := ms.scb
|
|
if seq <= ms.state.LastSeq {
|
|
sm, ok := ms.msgs[seq]
|
|
if !ok {
|
|
ms.mu.Unlock()
|
|
return 0, ErrStoreMsgNotFound
|
|
}
|
|
ms.state.FirstSeq = seq
|
|
ms.state.FirstTime = time.Unix(0, sm.ts).UTC()
|
|
|
|
for seq := seq - 1; seq > 0; seq-- {
|
|
if sm := ms.msgs[seq]; sm != nil {
|
|
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
|
|
purged++
|
|
delete(ms.msgs, seq)
|
|
} else {
|
|
delete(ms.dmap, seq)
|
|
}
|
|
}
|
|
ms.state.Msgs -= purged
|
|
ms.state.Bytes -= bytes
|
|
} else {
|
|
// We are compacting past the end of our range. Do purge and set sequences correctly
|
|
// such that the next message placed will have seq.
|
|
purged = uint64(len(ms.msgs))
|
|
bytes = ms.state.Bytes
|
|
ms.state.Bytes = 0
|
|
ms.state.Msgs = 0
|
|
ms.state.FirstSeq = seq
|
|
ms.state.FirstTime = time.Time{}
|
|
ms.state.LastSeq = seq - 1
|
|
ms.msgs = make(map[uint64]*storedMsg)
|
|
}
|
|
ms.mu.Unlock()
|
|
|
|
if cb != nil {
|
|
cb(-int64(purged), -int64(bytes), 0, _EMPTY_)
|
|
}
|
|
|
|
return purged, nil
|
|
}
|
|
|
|
// Truncate will truncate a stream store up to and including seq. Sequence needs to be valid.
|
|
func (ms *memStore) Truncate(seq uint64) error {
|
|
var purged, bytes uint64
|
|
|
|
ms.mu.Lock()
|
|
lsm, ok := ms.msgs[seq]
|
|
if !ok {
|
|
ms.mu.Unlock()
|
|
return ErrInvalidSequence
|
|
}
|
|
|
|
for i := ms.state.LastSeq; i > seq; i-- {
|
|
if sm := ms.msgs[i]; sm != nil {
|
|
purged++
|
|
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
|
|
delete(ms.msgs, seq)
|
|
} else {
|
|
delete(ms.dmap, i)
|
|
}
|
|
}
|
|
// Reset last.
|
|
ms.state.LastSeq = lsm.seq
|
|
ms.state.LastTime = time.Unix(0, lsm.ts).UTC()
|
|
// Update msgs and bytes.
|
|
ms.state.Msgs -= purged
|
|
ms.state.Bytes -= bytes
|
|
cb := ms.scb
|
|
ms.mu.Unlock()
|
|
|
|
if cb != nil {
|
|
cb(-int64(purged), -int64(bytes), 0, _EMPTY_)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ms *memStore) deleteFirstMsgOrPanic() {
|
|
if !ms.deleteFirstMsg() {
|
|
panic("jetstream memstore has inconsistent state, can't find first seq msg")
|
|
}
|
|
}
|
|
|
|
func (ms *memStore) deleteFirstMsg() bool {
|
|
return ms.removeMsg(ms.state.FirstSeq, false)
|
|
}
|
|
|
|
// LoadMsg will lookup the message by sequence number and return it if found.
|
|
func (ms *memStore) LoadMsg(seq uint64) (string, []byte, []byte, int64, error) {
|
|
ms.mu.RLock()
|
|
sm, ok := ms.msgs[seq]
|
|
last := ms.state.LastSeq
|
|
ms.mu.RUnlock()
|
|
|
|
if !ok || sm == nil {
|
|
var err = ErrStoreEOF
|
|
if seq <= last {
|
|
err = ErrStoreMsgNotFound
|
|
}
|
|
return _EMPTY_, nil, nil, 0, err
|
|
}
|
|
return sm.subj, sm.hdr, sm.msg, sm.ts, nil
|
|
}
|
|
|
|
// LoadLastMsg will return the last message we have that matches a given subject.
|
|
// The subject can be a wildcard.
|
|
func (ms *memStore) LoadLastMsg(subject string) (subj string, seq uint64, hdr, msg []byte, ts int64, err error) {
|
|
var sm *storedMsg
|
|
var ok bool
|
|
|
|
ms.mu.RLock()
|
|
defer ms.mu.RUnlock()
|
|
|
|
if subject == _EMPTY_ || subject == fwcs {
|
|
sm, ok = ms.msgs[ms.state.LastSeq]
|
|
} else if ss := ms.filteredStateLocked(1, subject); ss.Msgs > 0 {
|
|
sm, ok = ms.msgs[ss.Last]
|
|
}
|
|
if !ok || sm == nil {
|
|
return _EMPTY_, 0, nil, nil, 0, ErrStoreMsgNotFound
|
|
}
|
|
return sm.subj, sm.seq, sm.hdr, sm.msg, sm.ts, nil
|
|
}
|
|
|
|
// RemoveMsg will remove the message from this store.
|
|
// Will return the number of bytes removed.
|
|
func (ms *memStore) RemoveMsg(seq uint64) (bool, error) {
|
|
ms.mu.Lock()
|
|
removed := ms.removeMsg(seq, false)
|
|
ms.mu.Unlock()
|
|
return removed, nil
|
|
}
|
|
|
|
// EraseMsg will remove the message and rewrite its contents.
|
|
func (ms *memStore) EraseMsg(seq uint64) (bool, error) {
|
|
ms.mu.Lock()
|
|
removed := ms.removeMsg(seq, true)
|
|
ms.mu.Unlock()
|
|
return removed, nil
|
|
}
|
|
|
|
// Performs logic to update first sequence number.
|
|
// Lock should be held.
|
|
func (ms *memStore) updateFirstSeq(seq uint64) {
|
|
if seq != ms.state.FirstSeq {
|
|
// Interior delete.
|
|
ms.dmap[seq] = struct{}{}
|
|
return
|
|
}
|
|
var nsm *storedMsg
|
|
var ok bool
|
|
for nseq := ms.state.FirstSeq + 1; nseq <= ms.state.LastSeq; nseq++ {
|
|
delete(ms.dmap, nseq)
|
|
if nsm, ok = ms.msgs[nseq]; ok {
|
|
break
|
|
}
|
|
}
|
|
if nsm != nil {
|
|
ms.state.FirstSeq = nsm.seq
|
|
ms.state.FirstTime = time.Unix(0, nsm.ts).UTC()
|
|
} else {
|
|
// Like purge.
|
|
ms.state.FirstSeq = ms.state.LastSeq + 1
|
|
ms.state.FirstTime = time.Time{}
|
|
ms.dmap = make(map[uint64]struct{})
|
|
}
|
|
}
|
|
|
|
// Remove a seq from the fss and select new first.
|
|
// Lock should be held.
|
|
func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) {
|
|
ss := ms.fss[subj]
|
|
if ss == nil {
|
|
return
|
|
}
|
|
if ss.Msgs == 1 {
|
|
delete(ms.fss, subj)
|
|
return
|
|
}
|
|
ss.Msgs--
|
|
if seq != ss.First {
|
|
return
|
|
}
|
|
// TODO(dlc) - Might want to optimize this.
|
|
for tseq := seq + 1; tseq < ss.Last; tseq++ {
|
|
if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj {
|
|
ss.First = tseq
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Removes the message referenced by seq.
|
|
// Lock should he held.
|
|
func (ms *memStore) removeMsg(seq uint64, secure bool) bool {
|
|
var ss uint64
|
|
sm, ok := ms.msgs[seq]
|
|
if !ok {
|
|
return false
|
|
}
|
|
|
|
ss = memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
|
|
|
|
delete(ms.msgs, seq)
|
|
ms.state.Msgs--
|
|
ms.state.Bytes -= ss
|
|
ms.updateFirstSeq(seq)
|
|
|
|
if secure {
|
|
if len(sm.hdr) > 0 {
|
|
sm.hdr = make([]byte, len(sm.hdr))
|
|
rand.Read(sm.hdr)
|
|
}
|
|
if len(sm.msg) > 0 {
|
|
sm.msg = make([]byte, len(sm.msg))
|
|
rand.Read(sm.msg)
|
|
}
|
|
sm.seq, sm.ts = 0, 0
|
|
}
|
|
|
|
// Remove any per subject tracking.
|
|
ms.removeSeqPerSubject(sm.subj, seq)
|
|
|
|
if ms.scb != nil {
|
|
// We do not want to hold any locks here.
|
|
ms.mu.Unlock()
|
|
delta := int64(ss)
|
|
ms.scb(-1, -delta, seq, sm.subj)
|
|
ms.mu.Lock()
|
|
}
|
|
|
|
return ok
|
|
}
|
|
|
|
// Type returns the type of the underlying store.
|
|
func (ms *memStore) Type() StorageType {
|
|
return MemoryStorage
|
|
}
|
|
|
|
// FastState will fill in state with only the following.
|
|
// Msgs, Bytes, FirstSeq, LastSeq
|
|
func (ms *memStore) FastState(state *StreamState) {
|
|
ms.mu.RLock()
|
|
state.Msgs = ms.state.Msgs
|
|
state.Bytes = ms.state.Bytes
|
|
state.FirstSeq = ms.state.FirstSeq
|
|
state.LastSeq = ms.state.LastSeq
|
|
ms.mu.RUnlock()
|
|
}
|
|
|
|
func (ms *memStore) State() StreamState {
|
|
ms.mu.RLock()
|
|
state := ms.state
|
|
state.Consumers = ms.consumers
|
|
state.Deleted = nil
|
|
for seq := range ms.dmap {
|
|
state.Deleted = append(state.Deleted, seq)
|
|
}
|
|
ms.mu.RUnlock()
|
|
if len(state.Deleted) > 0 {
|
|
sort.Slice(state.Deleted, func(i, j int) bool {
|
|
return state.Deleted[i] < state.Deleted[j]
|
|
})
|
|
state.NumDeleted = len(state.Deleted)
|
|
}
|
|
return state
|
|
}
|
|
|
|
func memStoreMsgSize(subj string, hdr, msg []byte) uint64 {
|
|
return uint64(len(subj) + len(hdr) + len(msg) + 16) // 8*2 for seq + age
|
|
}
|
|
|
|
// Delete is same as Stop for memory store.
|
|
func (ms *memStore) Delete() error {
|
|
ms.Purge()
|
|
return ms.Stop()
|
|
}
|
|
|
|
func (ms *memStore) Stop() error {
|
|
ms.mu.Lock()
|
|
if ms.ageChk != nil {
|
|
ms.ageChk.Stop()
|
|
ms.ageChk = nil
|
|
}
|
|
ms.msgs = nil
|
|
ms.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
func (ms *memStore) incConsumers() {
|
|
ms.mu.Lock()
|
|
ms.consumers++
|
|
ms.mu.Unlock()
|
|
}
|
|
|
|
func (ms *memStore) decConsumers() {
|
|
ms.mu.Lock()
|
|
if ms.consumers > 0 {
|
|
ms.consumers--
|
|
}
|
|
ms.mu.Unlock()
|
|
}
|
|
|
|
type consumerMemStore struct {
|
|
ms *memStore
|
|
}
|
|
|
|
func (ms *memStore) ConsumerStore(_ string, _ *ConsumerConfig) (ConsumerStore, error) {
|
|
ms.incConsumers()
|
|
return &consumerMemStore{ms}, nil
|
|
}
|
|
|
|
func (ms *memStore) Snapshot(_ time.Duration, _, _ bool) (*SnapshotResult, error) {
|
|
return nil, fmt.Errorf("no impl")
|
|
}
|
|
|
|
// No-ops.
|
|
func (os *consumerMemStore) Update(_ *ConsumerState) error { return nil }
|
|
|
|
func (os *consumerMemStore) UpdateDelivered(_, _, _ uint64, _ int64) error { return nil }
|
|
|
|
func (os *consumerMemStore) UpdateAcks(_, _ uint64) error { return nil }
|
|
|
|
func (os *consumerMemStore) Stop() error {
|
|
os.ms.decConsumers()
|
|
return nil
|
|
}
|
|
|
|
func (os *consumerMemStore) Delete() error {
|
|
return os.Stop()
|
|
}
|
|
func (os *consumerMemStore) StreamDelete() error {
|
|
return os.Stop()
|
|
}
|
|
|
|
func (os *consumerMemStore) State() (*ConsumerState, error) { return nil, nil }
|
|
|
|
// Templates
|
|
type templateMemStore struct{}
|
|
|
|
func newTemplateMemStore() *templateMemStore {
|
|
return &templateMemStore{}
|
|
}
|
|
|
|
// No-ops for memstore.
|
|
func (ts *templateMemStore) Store(t *streamTemplate) error { return nil }
|
|
func (ts *templateMemStore) Delete(t *streamTemplate) error { return nil }
|