diff --git a/server/filestore.go b/server/filestore.go index 3f4a0a86..27607adc 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2319,11 +2319,11 @@ func (fs *fileStore) dmapEntries() int { // Purge will remove all messages from this store. // Will return the number of purged messages. -func (fs *fileStore) Purge() uint64 { +func (fs *fileStore) Purge() (uint64, error) { fs.mu.Lock() if fs.closed { fs.mu.Unlock() - return 0 + return 0, ErrStoreClosed } purged := fs.state.Msgs @@ -2370,7 +2370,43 @@ func (fs *fileStore) Purge() uint64 { cb(-int64(purged), -rbytes, 0, _EMPTY_) } - return purged + return purged, nil +} + +// Compact will remove all messages from this store up to +// but not including the seq parameter. +// Will return the number of purged messages. +func (fs *fileStore) Compact(seq uint64) (uint64, error) { + if seq == 0 { + return fs.Purge() + } + + if _, err := fs.msgForSeq(seq); err != nil { + return 0, err + } + + var purged uint64 + for fseq := fs.firstSeq(); fseq < seq; fseq = fs.firstSeq() { + if found, err := fs.removeMsg(fseq, false); err != nil { + if err == ErrStoreMsgNotFound { + continue + } else if err == ErrStoreEOF { + err = nil + } + return purged, err + } else if found { + purged++ + } + } + + return purged, nil +} + +func (fs *fileStore) firstSeq() uint64 { + fs.mu.RLock() + fseq := fs.state.FirstSeq + fs.mu.RUnlock() + return fseq } // Returns number of msg blks. @@ -2488,7 +2524,6 @@ func (fs *fileStore) Delete() error { if fs.isClosed() { return ErrStoreClosed } - // TODO(dlc) - check error here? fs.Purge() if err := fs.Stop(); err != nil { return err diff --git a/server/filestore_test.go b/server/filestore_test.go index 4bdc2c01..5e431718 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -739,6 +739,40 @@ func TestFileStorePurge(t *testing.T) { }) } +func TestFileStoreCompact(t *testing.T) { + storeDir, _ := ioutil.TempDir("", JetStreamStoreDir) + os.MkdirAll(storeDir, 0755) + defer os.RemoveAll(storeDir) + + fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "zzz", Storage: FileStorage}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer fs.Stop() + + subj, msg := "foo", []byte("Hello World") + for i := 0; i < 10; i++ { + fs.StoreMsg(subj, nil, msg) + } + if state := fs.State(); state.Msgs != 10 { + t.Fatalf("Expected 10 msgs, got %d", state.Msgs) + } + n, err := fs.Compact(6) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if n != 5 { + t.Fatalf("Expected to have purged 5 msgs, got %d", n) + } + state := fs.State() + if state.Msgs != 5 { + t.Fatalf("Expected 5 msgs, got %d", state.Msgs) + } + if state.FirstSeq != 6 { + t.Fatalf("Expected first seq of 6, got %d", state.FirstSeq) + } +} + func TestFileStoreRemovePartialRecovery(t *testing.T) { storeDir, _ := ioutil.TempDir("", JetStreamStoreDir) os.MkdirAll(storeDir, 0755) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 32773443..2822cc85 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1127,8 +1127,13 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, subject, rep s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp)) return } - resp.Purged = mset.Purge() - resp.Success = true + purged, err := mset.Purge() + if err != nil { + resp.Error = jsError(err) + } else { + resp.Purged = purged + resp.Success = true + } s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(resp)) } diff --git a/server/memstore.go b/server/memstore.go index 971ed45b..9d932293 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -254,7 +254,7 @@ func (ms *memStore) expireMsgs() { // Purge will remove all messages from this store. // Will return the number of purged messages. -func (ms *memStore) Purge() uint64 { +func (ms *memStore) Purge() (uint64, error) { ms.mu.Lock() purged := uint64(len(ms.msgs)) cb := ms.scb @@ -270,7 +270,46 @@ func (ms *memStore) Purge() uint64 { cb(-int64(purged), -bytes, 0, _EMPTY_) } - return purged + return purged, nil +} + +// Compact will remove all messages from this store up to +// but not including the seq parameter. +// Will return the number of purged messages. +func (ms *memStore) Compact(seq uint64) (uint64, error) { + if seq == 0 { + return ms.Purge() + } + ms.mu.Lock() + sm, ok := ms.msgs[seq] + if !ok { + ms.mu.Unlock() + return 0, ErrStoreMsgNotFound + } + ms.state.FirstSeq = seq + ms.state.FirstTime = time.Unix(0, sm.ts).UTC() + + var purged, bytes uint64 + for seq := seq - 1; seq > 0; seq-- { + sm := ms.msgs[seq] + if sm == nil { + continue + } + bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg) + purged++ + delete(ms.msgs, seq) + } + ms.state.Msgs -= purged + ms.state.Bytes -= bytes + + cb := ms.scb + ms.mu.Unlock() + + if cb != nil { + cb(-int64(purged), -int64(bytes), 0, _EMPTY_) + } + + return purged, nil } func (ms *memStore) deleteFirstMsgOrPanic() { diff --git a/server/memstore_test.go b/server/memstore_test.go index 4d91a212..e5f5bfce 100644 --- a/server/memstore_test.go +++ b/server/memstore_test.go @@ -198,6 +198,54 @@ func TestMemStoreTimeStamps(t *testing.T) { } } +func TestMemStorePurge(t *testing.T) { + ms, err := newMemStore(&StreamConfig{Storage: MemoryStorage}) + if err != nil { + t.Fatalf("Unexpected error creating store: %v", err) + } + + subj, msg := "foo", []byte("Hello World") + for i := 0; i < 10; i++ { + ms.StoreMsg(subj, nil, msg) + } + if state := ms.State(); state.Msgs != 10 { + t.Fatalf("Expected 10 msgs, got %d", state.Msgs) + } + ms.Purge() + if state := ms.State(); state.Msgs != 0 { + t.Fatalf("Expected no msgs, got %d", state.Msgs) + } +} + +func TestMemStoreCompact(t *testing.T) { + ms, err := newMemStore(&StreamConfig{Storage: MemoryStorage}) + if err != nil { + t.Fatalf("Unexpected error creating store: %v", err) + } + + subj, msg := "foo", []byte("Hello World") + for i := 0; i < 10; i++ { + ms.StoreMsg(subj, nil, msg) + } + if state := ms.State(); state.Msgs != 10 { + t.Fatalf("Expected 10 msgs, got %d", state.Msgs) + } + n, err := ms.Compact(6) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if n != 5 { + t.Fatalf("Expected to have purged 5 msgs, got %d", n) + } + state := ms.State() + if state.Msgs != 5 { + t.Fatalf("Expected 5 msgs, got %d", state.Msgs) + } + if state.FirstSeq != 6 { + t.Fatalf("Expected first seq of 6, got %d", state.FirstSeq) + } +} + func TestMemStoreEraseMsg(t *testing.T) { ms, err := newMemStore(&StreamConfig{Storage: MemoryStorage}) if err != nil { diff --git a/server/store.go b/server/store.go index bfa41bc4..943c6a57 100644 --- a/server/store.go +++ b/server/store.go @@ -66,7 +66,8 @@ type StreamStore interface { LoadMsg(seq uint64) (subj string, hdr, msg []byte, ts int64, err error) RemoveMsg(seq uint64) (bool, error) EraseMsg(seq uint64) (bool, error) - Purge() uint64 + Purge() (uint64, error) + Compact(seq uint64) (uint64, error) GetSeqFromTime(t time.Time) uint64 State() StreamState RegisterStorageUpdates(StorageUpdateHandler) diff --git a/server/stream.go b/server/stream.go index 210a3edc..661939e5 100644 --- a/server/stream.go +++ b/server/stream.go @@ -18,6 +18,7 @@ import ( "bytes" "compress/gzip" "encoding/json" + "errors" "fmt" "io" "io/ioutil" @@ -610,11 +611,11 @@ func (mset *Stream) Update(config *StreamConfig) error { } // Purge will remove all messages from the stream and underlying store. -func (mset *Stream) Purge() uint64 { +func (mset *Stream) Purge() (uint64, error) { mset.mu.Lock() if mset.client == nil { mset.mu.Unlock() - return 0 + return 0, errors.New("stream closed") } // Purge dedupe. mset.ddmap = nil @@ -625,12 +626,15 @@ func (mset *Stream) Purge() uint64 { } mset.mu.Unlock() - purged := mset.store.Purge() + purged, err := mset.store.Purge() + if err != nil { + return purged, err + } stats := mset.store.State() for _, o := range obs { o.purge(stats.FirstSeq) } - return purged + return purged, nil } // RemoveMsg will remove a message from a stream.