From e12703962255fa561c19a8a52f023434fbe9cd2a Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 22 Nov 2019 12:49:11 -0800 Subject: [PATCH] More API impls, tests for DeleteMsg Signed-off-by: Derek Collison --- server/filestore.go | 23 ++++--- server/filestore_test.go | 2 +- server/jetstream.go | 73 ++++++++++++++++++++ server/memstore.go | 7 +- server/msgset.go | 12 +++- server/observable.go | 20 ++++-- server/store.go | 7 +- test/jetstream_test.go | 145 ++++++++++++++++++++++++++++++++++++++- 8 files changed, 267 insertions(+), 22 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 63148d43..075a0d00 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -132,7 +132,7 @@ const ( // coalesceDelay coalesceDelay = 10 * time.Millisecond // coalesceMaximum - coalesceMaximum = 32 * 1024 + coalesceMaximum = 64 * 1024 ) func newFileStore(fcfg FileStoreConfig, cfg MsgSetConfig) (*fileStore, error) { @@ -659,7 +659,7 @@ func (fs *fileStore) expireMsgs() { minAge := now - int64(fs.cfg.MaxAge) for { - if sm := fs.msgForSeq(0); sm != nil && sm.ts <= minAge { + if sm, _ := fs.msgForSeq(0); sm != nil && sm.ts <= minAge { fs.mu.Lock() fs.deleteFirstMsg() fs.mu.Unlock() @@ -1041,7 +1041,8 @@ func (fs *fileStore) checkPrefetch(seq uint64, mb *msgBlock) { } // Will return message for the given sequence number. -func (fs *fileStore) msgForSeq(seq uint64) *fileStoredMsg { +func (fs *fileStore) msgForSeq(seq uint64) (*fileStoredMsg, error) { + var err = ErrStoreEOF fs.mu.Lock() // seq == 0 indicates we want first msg. if seq == 0 { @@ -1049,8 +1050,11 @@ func (fs *fileStore) msgForSeq(seq uint64) *fileStoredMsg { } mb := fs.selectMsgBlock(seq) if mb == nil { + if seq <= fs.stats.LastSeq { + err = ErrStoreMsgNotFound + } fs.mu.Unlock() - return nil + return nil, err } // Check cache. @@ -1058,7 +1062,7 @@ func (fs *fileStore) msgForSeq(seq uint64) *fileStoredMsg { if sm, ok := mb.cache[seq]; ok { mb.cgenid++ fs.mu.Unlock() - return sm + return sm, nil } } @@ -1069,9 +1073,11 @@ func (fs *fileStore) msgForSeq(seq uint64) *fileStoredMsg { sm := fs.readAndCacheMsgs(mb, seq) if sm != nil { mb.cgenid++ + } else if seq <= fs.stats.LastSeq { + err = ErrStoreMsgNotFound } fs.mu.Unlock() - return sm + return sm, err } // Internal function to return msg parts from a raw buffer. @@ -1095,10 +1101,11 @@ func msgFromBuf(buf []byte) (string, []byte, uint64, int64, error) { // LoadMsg will lookup the message by sequence number and return it if found. func (fs *fileStore) LoadMsg(seq uint64) (string, []byte, int64, error) { - if sm := fs.msgForSeq(seq); sm != nil { + sm, err := fs.msgForSeq(seq) + if sm != nil { return sm.subj, sm.msg, sm.ts, nil } - return "", nil, 0, ErrStoreMsgNotFound + return "", nil, 0, err } func (fs *fileStore) Stats() MsgSetStats { diff --git a/server/filestore_test.go b/server/filestore_test.go index a981c0e8..96984c12 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -602,7 +602,7 @@ func TestFileStoreEraseMsg(t *testing.T) { if !bytes.Equal(msg, smsg) { t.Fatalf("Expected same msg, got %q vs %q", smsg, msg) } - sm := fs.msgForSeq(1) + sm, _ := fs.msgForSeq(1) if !fs.EraseMsg(1) { t.Fatalf("Expected erase msg to return success") } diff --git a/server/jetstream.go b/server/jetstream.go index 2ecdd030..0076d944 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -21,6 +21,7 @@ import ( "os" "path" "path/filepath" + "strconv" "strings" "sync" @@ -96,6 +97,16 @@ const ( JetStreamDeleteMsgSet = "$JS.MSGSET.DELETE" jsDeleteMsgSetExport = "$JS.*.MSGSET.DELETE" + // JetStreamPurgeMsgSet is the endpoint to purge message sets. + // Will return +OK on success and -ERR on failure. + JetStreamPurgeMsgSet = "$JS.MSGSET.PURGE" + jsPurgeMsgSetExport = "$JS.*.MSGSET.PURGE" + + // JetStreamDeleteMsg is the endpoint to delete messages from a message set. + // Will return +OK on success and -ERR on failure. + JetStreamDeleteMsg = "$JS.MSGSET.MSG.DELETE" + jsDeleteMsgExport = "$JS.*.MSGSET.MSG.DELETE" + // JetStreamCreateObservable is the endpoint to create observers for a message set. // Will return +OK on success and -ERR on failure. JetStreamCreateObservable = "$JS.OBSERVABLE.CREATE" @@ -148,6 +159,8 @@ var allJsExports = []string{ jsMsgSetsExport, jsMsgSetInfoExport, jsDeleteMsgSetExport, + jsPurgeMsgSetExport, + jsDeleteMsgExport, jsCreateObservableExport, jsObservablesExport, jsObservableInfoExport, @@ -241,6 +254,12 @@ func (s *Server) EnableJetStream(config *JetStreamConfig) error { if _, err := s.sysSubscribe(jsDeleteMsgSetExport, s.jsMsgSetDeleteRequest); err != nil { return fmt.Errorf("Error setting up internal jetstream subscriptions: %v", err) } + if _, err := s.sysSubscribe(jsPurgeMsgSetExport, s.jsMsgSetPurgeRequest); err != nil { + return fmt.Errorf("Error setting up internal jetstream subscriptions: %v", err) + } + if _, err := s.sysSubscribe(jsDeleteMsgExport, s.jsMsgDeleteRequest); err != nil { + return fmt.Errorf("Error setting up internal jetstream subscriptions: %v", err) + } if _, err := s.sysSubscribe(jsCreateObservableExport, s.jsCreateObservableRequest); err != nil { return fmt.Errorf("Error setting up internal jetstream subscriptions: %v", err) } @@ -905,6 +924,60 @@ func (s *Server) jsMsgSetDeleteRequest(sub *subscription, c *client, subject, re s.sendInternalAccountMsg(c.acc, reply, response) } +// Request to delete a message. +// This expects a message set name and store sequence number as the msg body. +func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, subject, reply string, msg []byte) { + if c == nil || c.acc == nil { + return + } + if !c.acc.JetStreamEnabled() { + s.sendInternalAccountMsg(c.acc, reply, JetStreamNotEnabled) + return + } + args := strings.Split(string(msg), " ") + if len(args) != 2 { + s.sendInternalAccountMsg(c.acc, reply, JetStreamBadRequest) + return + } + name := args[0] + seq, _ := strconv.Atoi(args[1]) + + mset, err := c.acc.LookupMsgSet(name) + if err != nil { + s.sendInternalAccountMsg(c.acc, reply, fmt.Sprintf("%s %v", ErrPrefix, err)) + return + } + var response = OK + if !mset.EraseMsg(uint64(seq)) { + response = fmt.Sprintf("%s sequence [%d] not found", ErrPrefix, seq) + } + s.sendInternalAccountMsg(c.acc, reply, response) +} + +// Request to purge a message set. +// This expects a message set name as the msg body. +func (s *Server) jsMsgSetPurgeRequest(sub *subscription, c *client, subject, reply string, msg []byte) { + if c == nil || c.acc == nil { + return + } + if !c.acc.JetStreamEnabled() { + s.sendInternalAccountMsg(c.acc, reply, JetStreamNotEnabled) + return + } + if len(msg) == 0 { + s.sendInternalAccountMsg(c.acc, reply, JetStreamBadRequest) + return + } + mset, err := c.acc.LookupMsgSet(string(msg)) + if err != nil { + s.sendInternalAccountMsg(c.acc, reply, fmt.Sprintf("%s %v", ErrPrefix, err)) + return + } + + mset.Purge() + s.sendInternalAccountMsg(c.acc, reply, OK) +} + // Request to create an observable. func (s *Server) jsCreateObservableRequest(sub *subscription, c *client, subject, reply string, msg []byte) { if c == nil || c.acc == nil { diff --git a/server/memstore.go b/server/memstore.go index 3425ae85..0db2ca07 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -207,10 +207,15 @@ func (ms *memStore) deleteFirstMsg() bool { func (ms *memStore) LoadMsg(seq uint64) (string, []byte, int64, error) { ms.mu.RLock() sm, ok := ms.msgs[seq] + last := ms.stats.LastSeq ms.mu.RUnlock() if !ok || sm == nil { - return "", nil, 0, ErrStoreMsgNotFound + var err = ErrStoreEOF + if seq <= last { + err = ErrStoreMsgNotFound + } + return "", nil, 0, err } return sm.subj, sm.msg, sm.ts, nil } diff --git a/server/msgset.go b/server/msgset.go index cce5edaf..52b9a07a 100644 --- a/server/msgset.go +++ b/server/msgset.go @@ -193,10 +193,16 @@ func (mset *MsgSet) Purge() uint64 { } // RemoveMsg will remove a message from a message set. +// FIXME(dlc) - Should pick one and be consistent. func (mset *MsgSet) RemoveMsg(seq uint64) bool { return mset.store.RemoveMsg(seq) } +// DeleteMsg will remove a message from a message set. +func (mset *MsgSet) DeleteMsg(seq uint64) bool { + return mset.store.RemoveMsg(seq) +} + // EraseMsg will securely remove a message and rewrite the data with random data. func (mset *MsgSet) EraseMsg(seq uint64) bool { return mset.store.EraseMsg(seq) @@ -347,7 +353,7 @@ type jsPubMsg struct { } // TODO(dlc) - Maybe look at onering instead of chan - https://github.com/pltr/onering -const nmsSendQSize = 1024 +const msetSendQSize = 1024 // This is similar to system semantics but did not want to overload the single system sendq, // or require system account when doing simple setup with jetstream. @@ -357,7 +363,7 @@ func (mset *MsgSet) setupSendCapabilities() { if mset.sendq != nil { return } - mset.sendq = make(chan *jsPubMsg, nmsSendQSize) + mset.sendq = make(chan *jsPubMsg, msetSendQSize) go mset.internalSendLoop() } @@ -381,7 +387,7 @@ func (mset *MsgSet) internalSendLoop() { mset.mu.Unlock() // Warn when internal send queue is backed up past 75% - warnThresh := 3 * nmsSendQSize / 4 + warnThresh := 3 * msetSendQSize / 4 warnFreq := time.Second last := time.Now().Add(-warnFreq) diff --git a/server/observable.go b/server/observable.go index 763aaaed..f3e06012 100644 --- a/server/observable.go +++ b/server/observable.go @@ -607,12 +607,13 @@ func (o *Observable) getNextMsg() (string, []byte, uint64, uint64, error) { // We have the msg here. return subj, msg, seq, dcount, nil } - // We got an error here. - // If this was a redelivery the message may have expired so move on to next one. - // Only return if first delivery. - if dcount == 1 { + // We got an error here. If this is an EOF we will return, otherwise + // we can continue looking. + if err == ErrStoreEOF { return "", nil, 0, 0, err } + // Skip since its probably deleted or expired. + o.sseq++ } } @@ -769,7 +770,7 @@ func (o *Observable) loopAndDeliverMsgs(s *Server, a *Account) { // On error either wait or return. if err != nil { - if err == ErrStoreMsgNotFound { + if err == ErrStoreMsgNotFound || err == ErrStoreEOF { goto waitForMsgs } else { o.mu.Unlock() @@ -983,12 +984,18 @@ func (o *Observable) checkPending() { } } -// SeqFromReply will extract a sequence number from a reply ack subject. +// SeqFromReply will extract a sequence number from a reply subject. func (o *Observable) SeqFromReply(reply string) uint64 { _, seq, _ := o.ReplyInfo(reply) return seq } +// SetSeqFromReply will extract the message set sequence from the reply subject. +func (o *Observable) SetSeqFromReply(reply string) uint64 { + seq, _, _ := o.ReplyInfo(reply) + return seq +} + func (o *Observable) ReplyInfo(reply string) (sseq, dseq, dcount uint64) { n, err := fmt.Sscanf(reply, o.ackReplyT, &dcount, &sseq, &dseq) if err != nil || n != 3 { @@ -1205,7 +1212,6 @@ func (o *Observable) SetActiveCheckParams(achk time.Duration, thresh int) error // RequestNextMsgSubject returns the subject to request the next message when in pull or worker mode. // Returns empty otherwise.1 - func (o *Observable) RequestNextMsgSubject() string { return o.nextMsgSubj } diff --git a/server/store.go b/server/store.go index 8089c1a5..7cfef094 100644 --- a/server/store.go +++ b/server/store.go @@ -30,7 +30,12 @@ const ( FileStorage ) -var ErrStoreMsgNotFound = errors.New("no message found") +var ( + // ErrStoreMsgNotFound when message was not found but was expected to be. + ErrStoreMsgNotFound = errors.New("no message found") + // ErrStoreEOF is returned when message seq is greater than the last sequence. + ErrStoreEOF = errors.New("msgset EOF") +) type MsgSetStore interface { StoreMsg(subj string, msg []byte) (uint64, error) diff --git a/test/jetstream_test.go b/test/jetstream_test.go index 11352eed..d8425afe 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -1426,7 +1426,7 @@ func TestJetStreamObservableReconnect(t *testing.T) { // reconnect scenarios. getMsg := func(seqno int) *nats.Msg { t.Helper() - m, err := sub.NextMsg(5 * time.Second) + m, err := sub.NextMsg(time.Second) if err != nil { t.Fatalf("Unexpected error for %d: %v", seqno, err) } @@ -2862,6 +2862,149 @@ func TestJetStreamRequestAPI(t *testing.T) { } } +func TestJetStreamDeleteMsg(t *testing.T) { + cases := []struct { + name string + mconfig *server.MsgSetConfig + }{ + {name: "MemoryStore", + mconfig: &server.MsgSetConfig{ + Name: "foo", + Retention: server.StreamPolicy, + MaxAge: time.Hour, + Storage: server.MemoryStorage, + Replicas: 1, + }}, + {name: "FileStore", + mconfig: &server.MsgSetConfig{ + Name: "foo", + Retention: server.StreamPolicy, + MaxAge: time.Hour, + Storage: server.FileStorage, + Replicas: 1, + }}, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + + s := RunBasicJetStreamServer() + defer s.Shutdown() + + config := s.JetStreamConfig() + if config == nil { + t.Fatalf("Expected non-nil config") + } + defer os.RemoveAll(config.StoreDir) + + cfg := &server.MsgSetConfig{Name: "foo", Storage: server.FileStorage} + mset, err := s.GlobalAccount().AddMsgSet(cfg) + if err != nil { + t.Fatalf("Unexpected error adding message set: %v", err) + } + + nc := clientConnectToServer(t, s) + defer nc.Close() + + pubTen := func() { + t.Helper() + for i := 0; i < 10; i++ { + nc.Publish("foo", []byte("Hello World!")) + } + nc.Flush() + } + + pubTen() + + stats := mset.Stats() + if stats.Msgs != 10 { + t.Fatalf("Expected 10 messages, got %d", stats.Msgs) + } + bytesPerMsg := stats.Bytes / 10 + if bytesPerMsg == 0 { + t.Fatalf("Expected non-zero bytes for msg size") + } + + deleteAndCheck := func(seq, expectedFirstSeq uint64) { + t.Helper() + beforeStats := mset.Stats() + if !mset.DeleteMsg(seq) { + t.Fatalf("Expected the delete of sequence %d to succeed", seq) + } + expectedStats := beforeStats + expectedStats.Msgs-- + expectedStats.Bytes -= bytesPerMsg + expectedStats.FirstSeq = expectedFirstSeq + afterStats := mset.Stats() + if afterStats != expectedStats { + t.Fatalf("Stats not what we expected. Expected %+v, got %+v\n", expectedStats, afterStats) + } + } + + // Delete one from the middle + deleteAndCheck(5, 1) + // Now make sure sequences are update properly. + // Delete first msg. + deleteAndCheck(1, 2) + // Now last + deleteAndCheck(10, 2) + // Now gaps. + deleteAndCheck(3, 2) + deleteAndCheck(2, 4) + + mset.Purge() + // Put ten more one. + pubTen() + deleteAndCheck(11, 12) + deleteAndCheck(15, 12) + deleteAndCheck(16, 12) + deleteAndCheck(20, 12) + + // Shutdown the server. + s.Shutdown() + + s = RunBasicJetStreamServer() + defer s.Shutdown() + + mset, err = s.GlobalAccount().LookupMsgSet("foo") + if err != nil { + t.Fatalf("Expected to get the message set back") + } + + expected := server.MsgSetStats{Msgs: 6, Bytes: 6 * bytesPerMsg, FirstSeq: 12, LastSeq: 20} + stats = mset.Stats() + if stats != expected { + t.Fatalf("Stats not what we expected. Expected %+v, got %+v\n", expected, stats) + } + + // Now create an observable and make sure we get the right sequence. + nc = clientConnectToServer(t, s) + defer nc.Close() + + delivery := nats.NewInbox() + sub, _ := nc.SubscribeSync(delivery) + nc.Flush() + + o, err := mset.AddObservable(&server.ObservableConfig{Delivery: delivery, DeliverAll: true, Subject: "foo"}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + expectedStoreSeq := []uint64{12, 13, 14, 17, 18, 19} + + for i := 0; i < 6; i++ { + m, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if o.SetSeqFromReply(m.Reply) != expectedStoreSeq[i] { + t.Fatalf("Expected store seq of %d, got %d", expectedStoreSeq[i], o.SetSeqFromReply(m.Reply)) + } + } + }) + } +} + func TestJetStreamPubSubPerf(t *testing.T) { // Uncomment to run, holding place for now. t.SkipNow()