Added Compact to store interface for WAL functionality

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-12-03 13:53:24 -08:00
parent a97e84d8b9
commit 7564768027
7 changed files with 179 additions and 13 deletions

View File

@@ -2319,11 +2319,11 @@ func (fs *fileStore) dmapEntries() int {
// Purge will remove all messages from this store.
// Will return the number of purged messages.
func (fs *fileStore) Purge() uint64 {
func (fs *fileStore) Purge() (uint64, error) {
fs.mu.Lock()
if fs.closed {
fs.mu.Unlock()
return 0
return 0, ErrStoreClosed
}
purged := fs.state.Msgs
@@ -2370,7 +2370,43 @@ func (fs *fileStore) Purge() uint64 {
cb(-int64(purged), -rbytes, 0, _EMPTY_)
}
return purged
return purged, nil
}
// Compact will remove all messages from this store up to
// but not including the seq parameter.
// Will return the number of purged messages.
func (fs *fileStore) Compact(seq uint64) (uint64, error) {
if seq == 0 {
return fs.Purge()
}
if _, err := fs.msgForSeq(seq); err != nil {
return 0, err
}
var purged uint64
for fseq := fs.firstSeq(); fseq < seq; fseq = fs.firstSeq() {
if found, err := fs.removeMsg(fseq, false); err != nil {
if err == ErrStoreMsgNotFound {
continue
} else if err == ErrStoreEOF {
err = nil
}
return purged, err
} else if found {
purged++
}
}
return purged, nil
}
func (fs *fileStore) firstSeq() uint64 {
fs.mu.RLock()
fseq := fs.state.FirstSeq
fs.mu.RUnlock()
return fseq
}
// Returns number of msg blks.
@@ -2488,7 +2524,6 @@ func (fs *fileStore) Delete() error {
if fs.isClosed() {
return ErrStoreClosed
}
// TODO(dlc) - check error here?
fs.Purge()
if err := fs.Stop(); err != nil {
return err

View File

@@ -739,6 +739,40 @@ func TestFileStorePurge(t *testing.T) {
})
}
func TestFileStoreCompact(t *testing.T) {
storeDir, _ := ioutil.TempDir("", JetStreamStoreDir)
os.MkdirAll(storeDir, 0755)
defer os.RemoveAll(storeDir)
fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "zzz", Storage: FileStorage})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer fs.Stop()
subj, msg := "foo", []byte("Hello World")
for i := 0; i < 10; i++ {
fs.StoreMsg(subj, nil, msg)
}
if state := fs.State(); state.Msgs != 10 {
t.Fatalf("Expected 10 msgs, got %d", state.Msgs)
}
n, err := fs.Compact(6)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if n != 5 {
t.Fatalf("Expected to have purged 5 msgs, got %d", n)
}
state := fs.State()
if state.Msgs != 5 {
t.Fatalf("Expected 5 msgs, got %d", state.Msgs)
}
if state.FirstSeq != 6 {
t.Fatalf("Expected first seq of 6, got %d", state.FirstSeq)
}
}
func TestFileStoreRemovePartialRecovery(t *testing.T) {
storeDir, _ := ioutil.TempDir("", JetStreamStoreDir)
os.MkdirAll(storeDir, 0755)

View File

@@ -1127,8 +1127,13 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, subject, rep
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
resp.Purged = mset.Purge()
resp.Success = true
purged, err := mset.Purge()
if err != nil {
resp.Error = jsError(err)
} else {
resp.Purged = purged
resp.Success = true
}
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(resp))
}

View File

@@ -254,7 +254,7 @@ func (ms *memStore) expireMsgs() {
// Purge will remove all messages from this store.
// Will return the number of purged messages.
func (ms *memStore) Purge() uint64 {
func (ms *memStore) Purge() (uint64, error) {
ms.mu.Lock()
purged := uint64(len(ms.msgs))
cb := ms.scb
@@ -270,7 +270,46 @@ func (ms *memStore) Purge() uint64 {
cb(-int64(purged), -bytes, 0, _EMPTY_)
}
return purged
return purged, nil
}
// Compact will remove all messages from this store up to
// but not including the seq parameter.
// Will return the number of purged messages.
func (ms *memStore) Compact(seq uint64) (uint64, error) {
if seq == 0 {
return ms.Purge()
}
ms.mu.Lock()
sm, ok := ms.msgs[seq]
if !ok {
ms.mu.Unlock()
return 0, ErrStoreMsgNotFound
}
ms.state.FirstSeq = seq
ms.state.FirstTime = time.Unix(0, sm.ts).UTC()
var purged, bytes uint64
for seq := seq - 1; seq > 0; seq-- {
sm := ms.msgs[seq]
if sm == nil {
continue
}
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
purged++
delete(ms.msgs, seq)
}
ms.state.Msgs -= purged
ms.state.Bytes -= bytes
cb := ms.scb
ms.mu.Unlock()
if cb != nil {
cb(-int64(purged), -int64(bytes), 0, _EMPTY_)
}
return purged, nil
}
func (ms *memStore) deleteFirstMsgOrPanic() {

View File

@@ -198,6 +198,54 @@ func TestMemStoreTimeStamps(t *testing.T) {
}
}
func TestMemStorePurge(t *testing.T) {
ms, err := newMemStore(&StreamConfig{Storage: MemoryStorage})
if err != nil {
t.Fatalf("Unexpected error creating store: %v", err)
}
subj, msg := "foo", []byte("Hello World")
for i := 0; i < 10; i++ {
ms.StoreMsg(subj, nil, msg)
}
if state := ms.State(); state.Msgs != 10 {
t.Fatalf("Expected 10 msgs, got %d", state.Msgs)
}
ms.Purge()
if state := ms.State(); state.Msgs != 0 {
t.Fatalf("Expected no msgs, got %d", state.Msgs)
}
}
func TestMemStoreCompact(t *testing.T) {
ms, err := newMemStore(&StreamConfig{Storage: MemoryStorage})
if err != nil {
t.Fatalf("Unexpected error creating store: %v", err)
}
subj, msg := "foo", []byte("Hello World")
for i := 0; i < 10; i++ {
ms.StoreMsg(subj, nil, msg)
}
if state := ms.State(); state.Msgs != 10 {
t.Fatalf("Expected 10 msgs, got %d", state.Msgs)
}
n, err := ms.Compact(6)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if n != 5 {
t.Fatalf("Expected to have purged 5 msgs, got %d", n)
}
state := ms.State()
if state.Msgs != 5 {
t.Fatalf("Expected 5 msgs, got %d", state.Msgs)
}
if state.FirstSeq != 6 {
t.Fatalf("Expected first seq of 6, got %d", state.FirstSeq)
}
}
func TestMemStoreEraseMsg(t *testing.T) {
ms, err := newMemStore(&StreamConfig{Storage: MemoryStorage})
if err != nil {

View File

@@ -66,7 +66,8 @@ type StreamStore interface {
LoadMsg(seq uint64) (subj string, hdr, msg []byte, ts int64, err error)
RemoveMsg(seq uint64) (bool, error)
EraseMsg(seq uint64) (bool, error)
Purge() uint64
Purge() (uint64, error)
Compact(seq uint64) (uint64, error)
GetSeqFromTime(t time.Time) uint64
State() StreamState
RegisterStorageUpdates(StorageUpdateHandler)

View File

@@ -18,6 +18,7 @@ import (
"bytes"
"compress/gzip"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
@@ -610,11 +611,11 @@ func (mset *Stream) Update(config *StreamConfig) error {
}
// Purge will remove all messages from the stream and underlying store.
func (mset *Stream) Purge() uint64 {
func (mset *Stream) Purge() (uint64, error) {
mset.mu.Lock()
if mset.client == nil {
mset.mu.Unlock()
return 0
return 0, errors.New("stream closed")
}
// Purge dedupe.
mset.ddmap = nil
@@ -625,12 +626,15 @@ func (mset *Stream) Purge() uint64 {
}
mset.mu.Unlock()
purged := mset.store.Purge()
purged, err := mset.store.Purge()
if err != nil {
return purged, err
}
stats := mset.store.State()
for _, o := range obs {
o.purge(stats.FirstSeq)
}
return purged
return purged, nil
}
// RemoveMsg will remove a message from a stream.