From 623fa5ccecfa2d10f7379467e8a7dba25b4d6351 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 19 Feb 2020 17:36:33 -0800 Subject: [PATCH] Impl for GetSeqFromTime for FileStore was missing Signed-off-by: Derek Collison --- server/filestore.go | 46 +++++++++++++++++++++++----- server/memstore.go | 1 + server/stream.go | 17 +++++++++-- test/jetstream_test.go | 68 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 122 insertions(+), 10 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index e7232a74..d27efca7 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -39,7 +39,7 @@ type FileStoreConfig struct { StoreDir string // BlockSize is the file block size. This also represents the maximum overhead size. BlockSize uint64 - // ReadCacheExpire is how long with no activity til we expire the read cache. + // ReadCacheExpire is how long with no activity until we expire the read cache. ReadCacheExpire time.Duration // SyncInterval is how often we sync to disk in the background. SyncInterval time.Duration @@ -423,10 +423,29 @@ func (fs *fileStore) recoverMsgs() error { return err } -// GetSeqFromTime looks for the first sequence number that has the message -// with >= timestamp. -func (ms *fileStore) GetSeqFromTime(t time.Time) uint64 { - // TODO(dlc) - IMPL +// GetSeqFromTime looks for the first sequence number that has +// the message with >= timestamp. +// FIXME(dlc) - inefficient, and dumb really. Make this better. +func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 { + fs.mu.RLock() + defer fs.mu.RUnlock() + + mb := fs.selectMsgBlockForStart(t) + if mb == nil { + return fs.state.LastSeq + 1 + } + // Load in if we need to. + if mb.cache == nil { + fs.readAndCacheMsgs(mb, mb.last.seq) + } + // Linear search, hence the dumb part.. + ts := t.UnixNano() + for seq := mb.first.seq; seq <= mb.last.seq; seq++ { + sm := mb.cache[seq] + if sm != nil && sm.ts >= ts { + return sm.seq + } + } return 0 } @@ -1000,7 +1019,20 @@ func (fs *fileStore) selectMsgBlock(seq uint64) *msgBlock { return nil } -// Read and cache message from the underlying block. +// Select the message block where this message should be found. +// Return nil if not in the set. +// Read lock should be held. +func (fs *fileStore) selectMsgBlockForStart(minTime time.Time) *msgBlock { + t := minTime.UnixNano() + for _, mb := range fs.blks { + if t <= mb.last.ts { + return mb + } + } + return nil +} + +// Read and cache messages from the underlying block. func (fs *fileStore) readAndCacheMsgs(mb *msgBlock, seq uint64) *fileStoredMsg { // This detects if what we may be looking for is staged in the write buffer. if mb == fs.lmb && fs.wmb.Len() > 0 { @@ -1021,7 +1053,7 @@ func (fs *fileStore) readAndCacheMsgs(mb *msgBlock, seq uint64) *fileStoredMsg { var le = binary.LittleEndian var sm *fileStoredMsg - // Read until we get our message, cache the rest. + // Read in all messages and cache them remembering the one we are looking for. for index, skip := 0, 0; index < len(buf); { hdr := buf[index : index+msgHdrSize] rl := le.Uint32(hdr[0:]) diff --git a/server/memstore.go b/server/memstore.go index 60a9f8ad..d8c9eb8e 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -127,6 +127,7 @@ func (ms *memStore) StorageBytesUpdate(cb func(int64)) { // GetSeqFromTime looks for the first sequence number that has the message // with >= timestamp. +// FIXME(dlc) - inefficient. func (ms *memStore) GetSeqFromTime(t time.Time) uint64 { ts := t.UnixNano() ms.mu.RLock() diff --git a/server/stream.go b/server/stream.go index 3fa0e7e9..9b45775d 100644 --- a/server/stream.go +++ b/server/stream.go @@ -66,6 +66,11 @@ const ( // AddStream adds a stream for the given account. func (a *Account) AddStream(config *StreamConfig) (*Stream, error) { + return a.AddStreamWithStore(config, nil) +} + +// AddStreamWithStore adds a stream for the given account with custome store config options. +func (a *Account) AddStreamWithStore(config *StreamConfig, fsConfig *FileStoreConfig) (*Stream, error) { s, jsa, err := a.checkForJetStream() if err != nil { return nil, err @@ -114,7 +119,12 @@ func (a *Account) AddStream(config *StreamConfig) (*Stream, error) { c.registerWithAccount(a) // Create the appropriate storage - if err := mset.setupStore(storeDir); err != nil { + fsCfg := fsConfig + if fsCfg == nil { + fsCfg = &FileStoreConfig{} + } + fsCfg.StoreDir = storeDir + if err := mset.setupStore(fsCfg); err != nil { mset.delete() return nil, err } @@ -415,7 +425,8 @@ func (mset *Stream) unsubscribe(sub *subscription) { mset.client.unsubscribe(mset.client.acc, sub, true, true) } -func (mset *Stream) setupStore(storeDir string) error { +func (mset *Stream) setupStore(fsCfg *FileStoreConfig) error { + mset.mu.Lock() defer mset.mu.Unlock() @@ -427,7 +438,7 @@ func (mset *Stream) setupStore(storeDir string) error { } mset.store = ms case FileStorage: - fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, mset.config) + fs, err := newFileStore(*fsCfg, mset.config) if err != nil { return err } diff --git a/test/jetstream_test.go b/test/jetstream_test.go index 81b33108..4b9155ef 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -219,6 +219,74 @@ func TestJetStreamAddStream(t *testing.T) { } } +func TestJetStreamConsumerWithStartTime(t *testing.T) { + subj := "my_stream" + cases := []struct { + name string + mconfig *server.StreamConfig + }{ + {"MemoryStore", &server.StreamConfig{Name: subj, Storage: server.MemoryStorage}}, + {"FileStore", &server.StreamConfig{Name: subj, Storage: server.FileStorage}}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + fsCfg := &server.FileStoreConfig{BlockSize: 100} + mset, err := s.GlobalAccount().AddStreamWithStore(c.mconfig, fsCfg) + if err != nil { + t.Fatalf("Unexpected error adding stream: %v", err) + } + defer mset.Delete() + + nc := clientConnectToServer(t, s) + defer nc.Close() + + toSend := 250 + for i := 0; i < toSend; i++ { + sendStreamMsg(t, nc, subj, fmt.Sprintf("MSG: %d", i+1)) + } + + time.Sleep(10 * time.Millisecond) + startTime := time.Now() + + for i := 0; i < toSend; i++ { + sendStreamMsg(t, nc, subj, fmt.Sprintf("MSG: %d", i+1)) + } + + if msgs := mset.State().Msgs; msgs != uint64(toSend*2) { + t.Fatalf("Expected %d messages, got %d", toSend*2, msgs) + } + + o, err := mset.AddConsumer(&server.ConsumerConfig{ + Durable: "d", + StartTime: startTime, + AckPolicy: server.AckExplicit, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + msg, err := nc.Request(o.RequestNextMsgSubject(), nil, time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + sseq, dseq, _ := o.ReplyInfo(msg.Reply) + if dseq != 1 { + t.Fatalf("Expected delivered seq of 1, got %d", dseq) + } + if sseq != uint64(toSend+1) { + t.Fatalf("Expected to get store seq of %d, got %d", toSend+1, sseq) + } + }) + } +} + func TestJetStreamConsumerMaxDeliveries(t *testing.T) { cases := []struct { name string