More API impls, tests for DeleteMsg

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2019-11-22 12:49:11 -08:00
parent 582282725d
commit e127039622
8 changed files with 267 additions and 22 deletions

View File

@@ -132,7 +132,7 @@ const (
// coalesceDelay
coalesceDelay = 10 * time.Millisecond
// coalesceMaximum
coalesceMaximum = 32 * 1024
coalesceMaximum = 64 * 1024
)
func newFileStore(fcfg FileStoreConfig, cfg MsgSetConfig) (*fileStore, error) {
@@ -659,7 +659,7 @@ func (fs *fileStore) expireMsgs() {
minAge := now - int64(fs.cfg.MaxAge)
for {
if sm := fs.msgForSeq(0); sm != nil && sm.ts <= minAge {
if sm, _ := fs.msgForSeq(0); sm != nil && sm.ts <= minAge {
fs.mu.Lock()
fs.deleteFirstMsg()
fs.mu.Unlock()
@@ -1041,7 +1041,8 @@ func (fs *fileStore) checkPrefetch(seq uint64, mb *msgBlock) {
}
// Will return message for the given sequence number.
func (fs *fileStore) msgForSeq(seq uint64) *fileStoredMsg {
func (fs *fileStore) msgForSeq(seq uint64) (*fileStoredMsg, error) {
var err = ErrStoreEOF
fs.mu.Lock()
// seq == 0 indicates we want first msg.
if seq == 0 {
@@ -1049,8 +1050,11 @@ func (fs *fileStore) msgForSeq(seq uint64) *fileStoredMsg {
}
mb := fs.selectMsgBlock(seq)
if mb == nil {
if seq <= fs.stats.LastSeq {
err = ErrStoreMsgNotFound
}
fs.mu.Unlock()
return nil
return nil, err
}
// Check cache.
@@ -1058,7 +1062,7 @@ func (fs *fileStore) msgForSeq(seq uint64) *fileStoredMsg {
if sm, ok := mb.cache[seq]; ok {
mb.cgenid++
fs.mu.Unlock()
return sm
return sm, nil
}
}
@@ -1069,9 +1073,11 @@ func (fs *fileStore) msgForSeq(seq uint64) *fileStoredMsg {
sm := fs.readAndCacheMsgs(mb, seq)
if sm != nil {
mb.cgenid++
} else if seq <= fs.stats.LastSeq {
err = ErrStoreMsgNotFound
}
fs.mu.Unlock()
return sm
return sm, err
}
// Internal function to return msg parts from a raw buffer.
@@ -1095,10 +1101,11 @@ func msgFromBuf(buf []byte) (string, []byte, uint64, int64, error) {
// LoadMsg will lookup the message by sequence number and return it if found.
func (fs *fileStore) LoadMsg(seq uint64) (string, []byte, int64, error) {
if sm := fs.msgForSeq(seq); sm != nil {
sm, err := fs.msgForSeq(seq)
if sm != nil {
return sm.subj, sm.msg, sm.ts, nil
}
return "", nil, 0, ErrStoreMsgNotFound
return "", nil, 0, err
}
func (fs *fileStore) Stats() MsgSetStats {

View File

@@ -602,7 +602,7 @@ func TestFileStoreEraseMsg(t *testing.T) {
if !bytes.Equal(msg, smsg) {
t.Fatalf("Expected same msg, got %q vs %q", smsg, msg)
}
sm := fs.msgForSeq(1)
sm, _ := fs.msgForSeq(1)
if !fs.EraseMsg(1) {
t.Fatalf("Expected erase msg to return success")
}

View File

@@ -21,6 +21,7 @@ import (
"os"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
@@ -96,6 +97,16 @@ const (
JetStreamDeleteMsgSet = "$JS.MSGSET.DELETE"
jsDeleteMsgSetExport = "$JS.*.MSGSET.DELETE"
// JetStreamPurgeMsgSet is the endpoint to purge message sets.
// Will return +OK on success and -ERR on failure.
JetStreamPurgeMsgSet = "$JS.MSGSET.PURGE"
jsPurgeMsgSetExport = "$JS.*.MSGSET.PURGE"
// JetStreamDeleteMsg is the endpoint to delete messages from a message set.
// Will return +OK on success and -ERR on failure.
JetStreamDeleteMsg = "$JS.MSGSET.MSG.DELETE"
jsDeleteMsgExport = "$JS.*.MSGSET.MSG.DELETE"
// JetStreamCreateObservable is the endpoint to create observers for a message set.
// Will return +OK on success and -ERR on failure.
JetStreamCreateObservable = "$JS.OBSERVABLE.CREATE"
@@ -148,6 +159,8 @@ var allJsExports = []string{
jsMsgSetsExport,
jsMsgSetInfoExport,
jsDeleteMsgSetExport,
jsPurgeMsgSetExport,
jsDeleteMsgExport,
jsCreateObservableExport,
jsObservablesExport,
jsObservableInfoExport,
@@ -241,6 +254,12 @@ func (s *Server) EnableJetStream(config *JetStreamConfig) error {
if _, err := s.sysSubscribe(jsDeleteMsgSetExport, s.jsMsgSetDeleteRequest); err != nil {
return fmt.Errorf("Error setting up internal jetstream subscriptions: %v", err)
}
if _, err := s.sysSubscribe(jsPurgeMsgSetExport, s.jsMsgSetPurgeRequest); err != nil {
return fmt.Errorf("Error setting up internal jetstream subscriptions: %v", err)
}
if _, err := s.sysSubscribe(jsDeleteMsgExport, s.jsMsgDeleteRequest); err != nil {
return fmt.Errorf("Error setting up internal jetstream subscriptions: %v", err)
}
if _, err := s.sysSubscribe(jsCreateObservableExport, s.jsCreateObservableRequest); err != nil {
return fmt.Errorf("Error setting up internal jetstream subscriptions: %v", err)
}
@@ -905,6 +924,60 @@ func (s *Server) jsMsgSetDeleteRequest(sub *subscription, c *client, subject, re
s.sendInternalAccountMsg(c.acc, reply, response)
}
// Request to delete a message.
// This expects a message set name and store sequence number as the msg body.
func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, subject, reply string, msg []byte) {
if c == nil || c.acc == nil {
return
}
if !c.acc.JetStreamEnabled() {
s.sendInternalAccountMsg(c.acc, reply, JetStreamNotEnabled)
return
}
args := strings.Split(string(msg), " ")
if len(args) != 2 {
s.sendInternalAccountMsg(c.acc, reply, JetStreamBadRequest)
return
}
name := args[0]
seq, _ := strconv.Atoi(args[1])
mset, err := c.acc.LookupMsgSet(name)
if err != nil {
s.sendInternalAccountMsg(c.acc, reply, fmt.Sprintf("%s %v", ErrPrefix, err))
return
}
var response = OK
if !mset.EraseMsg(uint64(seq)) {
response = fmt.Sprintf("%s sequence [%d] not found", ErrPrefix, seq)
}
s.sendInternalAccountMsg(c.acc, reply, response)
}
// Request to purge a message set.
// This expects a message set name as the msg body.
func (s *Server) jsMsgSetPurgeRequest(sub *subscription, c *client, subject, reply string, msg []byte) {
if c == nil || c.acc == nil {
return
}
if !c.acc.JetStreamEnabled() {
s.sendInternalAccountMsg(c.acc, reply, JetStreamNotEnabled)
return
}
if len(msg) == 0 {
s.sendInternalAccountMsg(c.acc, reply, JetStreamBadRequest)
return
}
mset, err := c.acc.LookupMsgSet(string(msg))
if err != nil {
s.sendInternalAccountMsg(c.acc, reply, fmt.Sprintf("%s %v", ErrPrefix, err))
return
}
mset.Purge()
s.sendInternalAccountMsg(c.acc, reply, OK)
}
// Request to create an observable.
func (s *Server) jsCreateObservableRequest(sub *subscription, c *client, subject, reply string, msg []byte) {
if c == nil || c.acc == nil {

View File

@@ -207,10 +207,15 @@ func (ms *memStore) deleteFirstMsg() bool {
func (ms *memStore) LoadMsg(seq uint64) (string, []byte, int64, error) {
ms.mu.RLock()
sm, ok := ms.msgs[seq]
last := ms.stats.LastSeq
ms.mu.RUnlock()
if !ok || sm == nil {
return "", nil, 0, ErrStoreMsgNotFound
var err = ErrStoreEOF
if seq <= last {
err = ErrStoreMsgNotFound
}
return "", nil, 0, err
}
return sm.subj, sm.msg, sm.ts, nil
}

View File

@@ -193,10 +193,16 @@ func (mset *MsgSet) Purge() uint64 {
}
// RemoveMsg will remove a message from a message set.
// FIXME(dlc) - Should pick one and be consistent.
func (mset *MsgSet) RemoveMsg(seq uint64) bool {
return mset.store.RemoveMsg(seq)
}
// DeleteMsg will remove a message from a message set.
func (mset *MsgSet) DeleteMsg(seq uint64) bool {
return mset.store.RemoveMsg(seq)
}
// EraseMsg will securely remove a message and rewrite the data with random data.
func (mset *MsgSet) EraseMsg(seq uint64) bool {
return mset.store.EraseMsg(seq)
@@ -347,7 +353,7 @@ type jsPubMsg struct {
}
// TODO(dlc) - Maybe look at onering instead of chan - https://github.com/pltr/onering
const nmsSendQSize = 1024
const msetSendQSize = 1024
// This is similar to system semantics but did not want to overload the single system sendq,
// or require system account when doing simple setup with jetstream.
@@ -357,7 +363,7 @@ func (mset *MsgSet) setupSendCapabilities() {
if mset.sendq != nil {
return
}
mset.sendq = make(chan *jsPubMsg, nmsSendQSize)
mset.sendq = make(chan *jsPubMsg, msetSendQSize)
go mset.internalSendLoop()
}
@@ -381,7 +387,7 @@ func (mset *MsgSet) internalSendLoop() {
mset.mu.Unlock()
// Warn when internal send queue is backed up past 75%
warnThresh := 3 * nmsSendQSize / 4
warnThresh := 3 * msetSendQSize / 4
warnFreq := time.Second
last := time.Now().Add(-warnFreq)

View File

@@ -607,12 +607,13 @@ func (o *Observable) getNextMsg() (string, []byte, uint64, uint64, error) {
// We have the msg here.
return subj, msg, seq, dcount, nil
}
// We got an error here.
// If this was a redelivery the message may have expired so move on to next one.
// Only return if first delivery.
if dcount == 1 {
// We got an error here. If this is an EOF we will return, otherwise
// we can continue looking.
if err == ErrStoreEOF {
return "", nil, 0, 0, err
}
// Skip since its probably deleted or expired.
o.sseq++
}
}
@@ -769,7 +770,7 @@ func (o *Observable) loopAndDeliverMsgs(s *Server, a *Account) {
// On error either wait or return.
if err != nil {
if err == ErrStoreMsgNotFound {
if err == ErrStoreMsgNotFound || err == ErrStoreEOF {
goto waitForMsgs
} else {
o.mu.Unlock()
@@ -983,12 +984,18 @@ func (o *Observable) checkPending() {
}
}
// SeqFromReply will extract a sequence number from a reply ack subject.
// SeqFromReply will extract a sequence number from a reply subject.
func (o *Observable) SeqFromReply(reply string) uint64 {
_, seq, _ := o.ReplyInfo(reply)
return seq
}
// SetSeqFromReply will extract the message set sequence from the reply subject.
func (o *Observable) SetSeqFromReply(reply string) uint64 {
seq, _, _ := o.ReplyInfo(reply)
return seq
}
func (o *Observable) ReplyInfo(reply string) (sseq, dseq, dcount uint64) {
n, err := fmt.Sscanf(reply, o.ackReplyT, &dcount, &sseq, &dseq)
if err != nil || n != 3 {
@@ -1205,7 +1212,6 @@ func (o *Observable) SetActiveCheckParams(achk time.Duration, thresh int) error
// RequestNextMsgSubject returns the subject to request the next message when in pull or worker mode.
// Returns empty otherwise.1
func (o *Observable) RequestNextMsgSubject() string {
return o.nextMsgSubj
}

View File

@@ -30,7 +30,12 @@ const (
FileStorage
)
var ErrStoreMsgNotFound = errors.New("no message found")
var (
// ErrStoreMsgNotFound when message was not found but was expected to be.
ErrStoreMsgNotFound = errors.New("no message found")
// ErrStoreEOF is returned when message seq is greater than the last sequence.
ErrStoreEOF = errors.New("msgset EOF")
)
type MsgSetStore interface {
StoreMsg(subj string, msg []byte) (uint64, error)

View File

@@ -1426,7 +1426,7 @@ func TestJetStreamObservableReconnect(t *testing.T) {
// reconnect scenarios.
getMsg := func(seqno int) *nats.Msg {
t.Helper()
m, err := sub.NextMsg(5 * time.Second)
m, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error for %d: %v", seqno, err)
}
@@ -2862,6 +2862,149 @@ func TestJetStreamRequestAPI(t *testing.T) {
}
}
func TestJetStreamDeleteMsg(t *testing.T) {
cases := []struct {
name string
mconfig *server.MsgSetConfig
}{
{name: "MemoryStore",
mconfig: &server.MsgSetConfig{
Name: "foo",
Retention: server.StreamPolicy,
MaxAge: time.Hour,
Storage: server.MemoryStorage,
Replicas: 1,
}},
{name: "FileStore",
mconfig: &server.MsgSetConfig{
Name: "foo",
Retention: server.StreamPolicy,
MaxAge: time.Hour,
Storage: server.FileStorage,
Replicas: 1,
}},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
config := s.JetStreamConfig()
if config == nil {
t.Fatalf("Expected non-nil config")
}
defer os.RemoveAll(config.StoreDir)
cfg := &server.MsgSetConfig{Name: "foo", Storage: server.FileStorage}
mset, err := s.GlobalAccount().AddMsgSet(cfg)
if err != nil {
t.Fatalf("Unexpected error adding message set: %v", err)
}
nc := clientConnectToServer(t, s)
defer nc.Close()
pubTen := func() {
t.Helper()
for i := 0; i < 10; i++ {
nc.Publish("foo", []byte("Hello World!"))
}
nc.Flush()
}
pubTen()
stats := mset.Stats()
if stats.Msgs != 10 {
t.Fatalf("Expected 10 messages, got %d", stats.Msgs)
}
bytesPerMsg := stats.Bytes / 10
if bytesPerMsg == 0 {
t.Fatalf("Expected non-zero bytes for msg size")
}
deleteAndCheck := func(seq, expectedFirstSeq uint64) {
t.Helper()
beforeStats := mset.Stats()
if !mset.DeleteMsg(seq) {
t.Fatalf("Expected the delete of sequence %d to succeed", seq)
}
expectedStats := beforeStats
expectedStats.Msgs--
expectedStats.Bytes -= bytesPerMsg
expectedStats.FirstSeq = expectedFirstSeq
afterStats := mset.Stats()
if afterStats != expectedStats {
t.Fatalf("Stats not what we expected. Expected %+v, got %+v\n", expectedStats, afterStats)
}
}
// Delete one from the middle
deleteAndCheck(5, 1)
// Now make sure sequences are update properly.
// Delete first msg.
deleteAndCheck(1, 2)
// Now last
deleteAndCheck(10, 2)
// Now gaps.
deleteAndCheck(3, 2)
deleteAndCheck(2, 4)
mset.Purge()
// Put ten more one.
pubTen()
deleteAndCheck(11, 12)
deleteAndCheck(15, 12)
deleteAndCheck(16, 12)
deleteAndCheck(20, 12)
// Shutdown the server.
s.Shutdown()
s = RunBasicJetStreamServer()
defer s.Shutdown()
mset, err = s.GlobalAccount().LookupMsgSet("foo")
if err != nil {
t.Fatalf("Expected to get the message set back")
}
expected := server.MsgSetStats{Msgs: 6, Bytes: 6 * bytesPerMsg, FirstSeq: 12, LastSeq: 20}
stats = mset.Stats()
if stats != expected {
t.Fatalf("Stats not what we expected. Expected %+v, got %+v\n", expected, stats)
}
// Now create an observable and make sure we get the right sequence.
nc = clientConnectToServer(t, s)
defer nc.Close()
delivery := nats.NewInbox()
sub, _ := nc.SubscribeSync(delivery)
nc.Flush()
o, err := mset.AddObservable(&server.ObservableConfig{Delivery: delivery, DeliverAll: true, Subject: "foo"})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
expectedStoreSeq := []uint64{12, 13, 14, 17, 18, 19}
for i := 0; i < 6; i++ {
m, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if o.SetSeqFromReply(m.Reply) != expectedStoreSeq[i] {
t.Fatalf("Expected store seq of %d, got %d", expectedStoreSeq[i], o.SetSeqFromReply(m.Reply))
}
}
})
}
}
func TestJetStreamPubSubPerf(t *testing.T) {
// Uncomment to run, holding place for now.
t.SkipNow()