Add timestamps to stream state for first and last, addresses #1396

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-05-17 07:33:01 -07:00
parent 5b369ad7dc
commit 7b6352c991
6 changed files with 144 additions and 31 deletions

View File

@@ -40,7 +40,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.2.0-beta.10"
VERSION = "2.2.0-beta.11"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -428,9 +428,11 @@ func (fs *fileStore) recoverMsgs() error {
if mb := fs.recoverMsgBlock(fi, index); mb != nil {
if fs.state.FirstSeq == 0 || mb.first.seq < fs.state.FirstSeq {
fs.state.FirstSeq = mb.first.seq
fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC()
}
if mb.last.seq > fs.state.LastSeq {
fs.state.LastSeq = mb.last.seq
fs.state.LastTime = time.Unix(0, mb.last.ts).UTC()
}
fs.state.Msgs += mb.msgs
fs.state.Bytes += mb.bytes
@@ -568,7 +570,7 @@ func (fs *fileStore) enableLastMsgBlockForWriting() error {
}
// Store stores a message.
func (fs *fileStore) StoreMsg(subj string, msg []byte) (seq uint64, ts int64, err error) {
func (fs *fileStore) StoreMsg(subj string, msg []byte) (uint64, int64, error) {
fs.mu.Lock()
if fs.closed {
fs.mu.Unlock()
@@ -587,10 +589,7 @@ func (fs *fileStore) StoreMsg(subj string, msg []byte) (seq uint64, ts int64, er
}
}
seq = fs.state.LastSeq + 1
if fs.state.FirstSeq == 0 {
fs.state.FirstSeq = seq
}
seq := fs.state.LastSeq + 1
n, ts, err := fs.writeMsgRecord(seq, subj, msg)
if err != nil {
@@ -599,9 +598,15 @@ func (fs *fileStore) StoreMsg(subj string, msg []byte) (seq uint64, ts int64, er
}
fs.kickFlusher()
if fs.state.FirstSeq == 0 {
fs.state.FirstSeq = seq
fs.state.FirstTime = time.Unix(0, ts).UTC()
}
fs.state.Msgs++
fs.state.Bytes += n
fs.state.LastSeq = seq
fs.state.LastTime = time.Unix(0, ts).UTC()
// Limits checks and enforcement.
// If they do any deletions they will update the
@@ -737,7 +742,23 @@ func (mb *msgBlock) selectNextFirst() {
break
}
}
// Set new first sequence.
mb.first.seq = seq
// Need to get the timestamp.
// We will try the cache direct and fallback if needed.
sm, _ := mb.cacheLookupLocked(seq)
if sm == nil {
// Slow path, need to unlock.
mb.mu.Unlock()
sm, _ = mb.fetchMsg(seq)
mb.mu.Lock()
}
if sm != nil {
mb.first.ts = sm.ts
} else {
mb.first.ts = 0
}
}
func (fs *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStoredMsg, secure bool) {
@@ -761,6 +782,7 @@ func (fs *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStored
mb.selectNextFirst()
if seq == fs.state.FirstSeq {
fs.state.FirstSeq = mb.first.seq // new one.
fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC()
}
if mb.first.seq > mb.last.seq {
fs.removeMsgBlock(mb)
@@ -1338,7 +1360,12 @@ func (mb *msgBlock) cacheLookup(seq uint64) (*fileStoredMsg, error) {
// vs read lock and promote. Also defer based on 1.14 performance.
mb.mu.Lock()
defer mb.mu.Unlock()
return mb.cacheLookupLocked(seq)
}
// Will do a lookup from cache.
// lock should be held.
func (mb *msgBlock) cacheLookupLocked(seq uint64) (*fileStoredMsg, error) {
if mb.cache == nil {
return nil, errNoCache
}
@@ -1610,6 +1637,7 @@ func (mb *msgBlock) readIndexInfo() error {
mb.dmap[seq+mb.first.seq] = struct{}{}
}
}
return nil
}
@@ -1692,6 +1720,8 @@ func (fs *fileStore) Purge() uint64 {
rbytes := int64(fs.state.Bytes)
fs.state.FirstSeq = fs.state.LastSeq + 1
fs.state.FirstTime = time.Time{}
fs.state.Bytes = 0
fs.state.Msgs = 0

View File

@@ -95,9 +95,14 @@ func (ms *memStore) StoreMsg(subj string, msg []byte) (uint64, int64, error) {
}
}
// Grab time.
now := time.Now()
ts := now.UnixNano()
seq := ms.state.LastSeq + 1
if ms.state.FirstSeq == 0 {
ms.state.FirstSeq = seq
ms.state.FirstTime = now.UTC()
}
// Make copies - https://github.com/go101/go101/wiki
@@ -107,11 +112,11 @@ func (ms *memStore) StoreMsg(subj string, msg []byte) (uint64, int64, error) {
}
startBytes := int64(ms.state.Bytes)
ts := time.Now().UnixNano()
ms.msgs[seq] = &storedMsg{subj, msg, seq, ts}
ms.state.Msgs++
ms.state.Bytes += memStoreMsgSize(subj, msg)
ms.state.LastSeq = seq
ms.state.LastTime = now.UTC()
// Limits checks and enforcement.
ms.enforceMsgLimit()
@@ -226,6 +231,7 @@ func (ms *memStore) Purge() uint64 {
cb := ms.scb
bytes := int64(ms.state.Bytes)
ms.state.FirstSeq = ms.state.LastSeq + 1
ms.state.FirstTime = ms.state.LastTime
ms.state.Bytes = 0
ms.state.Msgs = 0
ms.msgs = make(map[uint64]*storedMsg)
@@ -286,29 +292,41 @@ func (ms *memStore) EraseMsg(seq uint64) (bool, error) {
func (ms *memStore) removeMsg(seq uint64, secure bool) bool {
var ss uint64
sm, ok := ms.msgs[seq]
if ok {
delete(ms.msgs, seq)
ms.state.Msgs--
ss = memStoreMsgSize(sm.subj, sm.msg)
ms.state.Bytes -= ss
if seq == ms.state.FirstSeq {
var nseq uint64
for nseq = ms.state.FirstSeq + 1; nseq < ms.state.LastSeq; nseq++ {
if _, ok := ms.msgs[nseq]; ok {
break
}
if !ok {
return false
}
delete(ms.msgs, seq)
ms.state.Msgs--
ss = memStoreMsgSize(sm.subj, sm.msg)
ms.state.Bytes -= ss
if seq == ms.state.FirstSeq {
var nsm *storedMsg
var ok bool
for nseq := ms.state.FirstSeq + 1; nseq <= ms.state.LastSeq; nseq++ {
if nsm, ok = ms.msgs[nseq]; ok {
break
}
ms.state.FirstSeq = nseq
}
if secure {
rand.Read(sm.msg)
sm.seq = 0
if nsm != nil {
ms.state.FirstSeq = nsm.seq
ms.state.FirstTime = time.Unix(0, nsm.ts).UTC()
} else {
// Like purge.
ms.state.FirstSeq = ms.state.LastSeq + 1
ms.state.FirstTime = time.Time{}
}
}
if secure {
rand.Read(sm.msg)
sm.seq = 0
}
if ms.scb != nil {
delta := int64(ss)
ms.scb(-delta)
}
return ok
}

View File

@@ -92,11 +92,13 @@ const (
// StreamStats is information about the given stream.
type StreamState struct {
Msgs uint64 `json:"messages"`
Bytes uint64 `json:"bytes"`
FirstSeq uint64 `json:"first_seq"`
LastSeq uint64 `json:"last_seq"`
Consumers int `json:"consumer_count"`
Msgs uint64 `json:"messages"`
Bytes uint64 `json:"bytes"`
FirstSeq uint64 `json:"first_seq"`
FirstTime time.Time `json:"first_ts"`
LastSeq uint64 `json:"last_seq"`
LastTime time.Time `json:"last_ts"`
Consumers int `json:"consumer_count"`
}
// ConsumerStore stores state on consumers for streams.

View File

@@ -716,6 +716,20 @@ func (mset *Stream) stop(delete bool) error {
return nil
}
func (mset *Stream) GetMsg(seq uint64) (*StoredMsg, error) {
subj, msg, ts, err := mset.store.LoadMsg(seq)
if err != nil {
return nil, err
}
sm := &StoredMsg{
Subject: subj,
Sequence: seq,
Data: msg,
Time: time.Unix(0, ts).UTC(),
}
return sm, nil
}
// Consunmers will return all the current consumers for this stream.
func (mset *Stream) Consumers() []*Consumer {
mset.mu.Lock()

View File

@@ -761,10 +761,7 @@ func TestJetStreamAddStreamOverlapWithJSAPISubjects(t *testing.T) {
expectErr(acc.AddStream(&server.StreamConfig{Name: "c", Subjects: []string{"$JS.API.*"}}))
// Events and Advisories etc should be ok.
if _, err := acc.AddStream(&server.StreamConfig{Name: "a", Subjects: []string{"$JS.EVENT.foo"}}); err != nil {
t.Fatalf("Expected this to work: %v", err)
}
if _, err := acc.AddStream(&server.StreamConfig{Name: "b", Subjects: []string{"$JS.EVENT.>"}}); err != nil {
if _, err := acc.AddStream(&server.StreamConfig{Name: "a", Subjects: []string{"$JS.EVENT.>"}}); err != nil {
t.Fatalf("Expected this to work: %v", err)
}
}
@@ -844,6 +841,49 @@ func TestJetStreamBasicAckPublish(t *testing.T) {
}
}
func TestJetStreamStateTimestamps(t *testing.T) {
cases := []struct {
name string
mconfig *server.StreamConfig
}{
{"MemoryStore", &server.StreamConfig{Name: "foo", Storage: server.MemoryStorage, Subjects: []string{"foo.*"}}},
{"FileStore", &server.StreamConfig{Name: "foo", Storage: server.FileStorage, Subjects: []string{"foo.*"}}},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
mset, err := s.GlobalAccount().AddStream(c.mconfig)
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
defer mset.Delete()
nc := clientConnectToServer(t, s)
defer nc.Close()
start := time.Now()
delay := 250 * time.Millisecond
sendStreamMsg(t, nc, "foo.bar", "Hello World!")
time.Sleep(delay)
sendStreamMsg(t, nc, "foo.bar", "Hello World Again!")
state := mset.State()
if state.FirstTime.Before(start) {
t.Fatalf("Unexpected first message timestamp: %v", state.FirstTime)
}
if state.LastTime.Before(start.Add(delay)) {
t.Fatalf("Unexpected last message timestamp: %v", state.LastTime)
}
})
}
}
func TestJetStreamNoAckStream(t *testing.T) {
cases := []struct {
name string
@@ -5317,7 +5357,15 @@ func TestJetStreamDeleteMsg(t *testing.T) {
expectedState.Msgs--
expectedState.Bytes -= bytesPerMsg
expectedState.FirstSeq = expectedFirstSeq
sm, err := mset.GetMsg(expectedFirstSeq)
if err != nil {
t.Fatalf("Error fetching message for seq: %d - %v", expectedFirstSeq, err)
}
expectedState.FirstTime = sm.Time
afterState := mset.State()
// Ignore first time in this test.
if afterState != expectedState {
t.Fatalf("Stats not what we expected. Expected %+v, got %+v\n", expectedState, afterState)
}
@@ -5360,6 +5408,7 @@ func TestJetStreamDeleteMsg(t *testing.T) {
expected := server.StreamState{Msgs: 6, Bytes: 6 * bytesPerMsg, FirstSeq: 12, LastSeq: 20}
state = mset.State()
state.FirstTime, state.LastTime = time.Time{}, time.Time{}
if state != expected {
t.Fatalf("State not what we expected. Expected %+v, got %+v\n", expected, state)
}