Impl for GetSeqFromTime for FileStore was missing

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-02-19 17:36:33 -08:00
parent c9d3bdd5ab
commit 623fa5ccec
4 changed files with 122 additions and 10 deletions

View File

@@ -39,7 +39,7 @@ type FileStoreConfig struct {
StoreDir string
// BlockSize is the file block size. This also represents the maximum overhead size.
BlockSize uint64
// ReadCacheExpire is how long with no activity til we expire the read cache.
// ReadCacheExpire is how long with no activity until we expire the read cache.
ReadCacheExpire time.Duration
// SyncInterval is how often we sync to disk in the background.
SyncInterval time.Duration
@@ -423,10 +423,29 @@ func (fs *fileStore) recoverMsgs() error {
return err
}
// GetSeqFromTime looks for the first sequence number that has the message
// with >= timestamp.
func (ms *fileStore) GetSeqFromTime(t time.Time) uint64 {
// TODO(dlc) - IMPL
// GetSeqFromTime looks for the first sequence number that has
// the message with >= timestamp.
// FIXME(dlc) - inefficient, and dumb really. Make this better.
func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 {
fs.mu.RLock()
defer fs.mu.RUnlock()
mb := fs.selectMsgBlockForStart(t)
if mb == nil {
return fs.state.LastSeq + 1
}
// Load in if we need to.
if mb.cache == nil {
fs.readAndCacheMsgs(mb, mb.last.seq)
}
// Linear search, hence the dumb part..
ts := t.UnixNano()
for seq := mb.first.seq; seq <= mb.last.seq; seq++ {
sm := mb.cache[seq]
if sm != nil && sm.ts >= ts {
return sm.seq
}
}
return 0
}
@@ -1000,7 +1019,20 @@ func (fs *fileStore) selectMsgBlock(seq uint64) *msgBlock {
return nil
}
// Read and cache message from the underlying block.
// Select the message block where this message should be found.
// Return nil if not in the set.
// Read lock should be held.
func (fs *fileStore) selectMsgBlockForStart(minTime time.Time) *msgBlock {
t := minTime.UnixNano()
for _, mb := range fs.blks {
if t <= mb.last.ts {
return mb
}
}
return nil
}
// Read and cache messages from the underlying block.
func (fs *fileStore) readAndCacheMsgs(mb *msgBlock, seq uint64) *fileStoredMsg {
// This detects if what we may be looking for is staged in the write buffer.
if mb == fs.lmb && fs.wmb.Len() > 0 {
@@ -1021,7 +1053,7 @@ func (fs *fileStore) readAndCacheMsgs(mb *msgBlock, seq uint64) *fileStoredMsg {
var le = binary.LittleEndian
var sm *fileStoredMsg
// Read until we get our message, cache the rest.
// Read in all messages and cache them remembering the one we are looking for.
for index, skip := 0, 0; index < len(buf); {
hdr := buf[index : index+msgHdrSize]
rl := le.Uint32(hdr[0:])

View File

@@ -127,6 +127,7 @@ func (ms *memStore) StorageBytesUpdate(cb func(int64)) {
// GetSeqFromTime looks for the first sequence number that has the message
// with >= timestamp.
// FIXME(dlc) - inefficient.
func (ms *memStore) GetSeqFromTime(t time.Time) uint64 {
ts := t.UnixNano()
ms.mu.RLock()

View File

@@ -66,6 +66,11 @@ const (
// AddStream adds a stream for the given account.
func (a *Account) AddStream(config *StreamConfig) (*Stream, error) {
return a.AddStreamWithStore(config, nil)
}
// AddStreamWithStore adds a stream for the given account with custome store config options.
func (a *Account) AddStreamWithStore(config *StreamConfig, fsConfig *FileStoreConfig) (*Stream, error) {
s, jsa, err := a.checkForJetStream()
if err != nil {
return nil, err
@@ -114,7 +119,12 @@ func (a *Account) AddStream(config *StreamConfig) (*Stream, error) {
c.registerWithAccount(a)
// Create the appropriate storage
if err := mset.setupStore(storeDir); err != nil {
fsCfg := fsConfig
if fsCfg == nil {
fsCfg = &FileStoreConfig{}
}
fsCfg.StoreDir = storeDir
if err := mset.setupStore(fsCfg); err != nil {
mset.delete()
return nil, err
}
@@ -415,7 +425,8 @@ func (mset *Stream) unsubscribe(sub *subscription) {
mset.client.unsubscribe(mset.client.acc, sub, true, true)
}
func (mset *Stream) setupStore(storeDir string) error {
func (mset *Stream) setupStore(fsCfg *FileStoreConfig) error {
mset.mu.Lock()
defer mset.mu.Unlock()
@@ -427,7 +438,7 @@ func (mset *Stream) setupStore(storeDir string) error {
}
mset.store = ms
case FileStorage:
fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, mset.config)
fs, err := newFileStore(*fsCfg, mset.config)
if err != nil {
return err
}

View File

@@ -219,6 +219,74 @@ func TestJetStreamAddStream(t *testing.T) {
}
}
func TestJetStreamConsumerWithStartTime(t *testing.T) {
subj := "my_stream"
cases := []struct {
name string
mconfig *server.StreamConfig
}{
{"MemoryStore", &server.StreamConfig{Name: subj, Storage: server.MemoryStorage}},
{"FileStore", &server.StreamConfig{Name: subj, Storage: server.FileStorage}},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
fsCfg := &server.FileStoreConfig{BlockSize: 100}
mset, err := s.GlobalAccount().AddStreamWithStore(c.mconfig, fsCfg)
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
defer mset.Delete()
nc := clientConnectToServer(t, s)
defer nc.Close()
toSend := 250
for i := 0; i < toSend; i++ {
sendStreamMsg(t, nc, subj, fmt.Sprintf("MSG: %d", i+1))
}
time.Sleep(10 * time.Millisecond)
startTime := time.Now()
for i := 0; i < toSend; i++ {
sendStreamMsg(t, nc, subj, fmt.Sprintf("MSG: %d", i+1))
}
if msgs := mset.State().Msgs; msgs != uint64(toSend*2) {
t.Fatalf("Expected %d messages, got %d", toSend*2, msgs)
}
o, err := mset.AddConsumer(&server.ConsumerConfig{
Durable: "d",
StartTime: startTime,
AckPolicy: server.AckExplicit,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
msg, err := nc.Request(o.RequestNextMsgSubject(), nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
sseq, dseq, _ := o.ReplyInfo(msg.Reply)
if dseq != 1 {
t.Fatalf("Expected delivered seq of 1, got %d", dseq)
}
if sseq != uint64(toSend+1) {
t.Fatalf("Expected to get store seq of %d, got %d", toSend+1, sseq)
}
})
}
}
func TestJetStreamConsumerMaxDeliveries(t *testing.T) {
cases := []struct {
name string