Fixes for filestore not remember first sequence when all messages deleted.

Thsi fixed a few minor bugs as well as the one where we did not remember our sequence numbers.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-11-17 15:04:57 -08:00
parent 5e9bad5b26
commit d358aaddf6
3 changed files with 52 additions and 15 deletions

View File

@@ -606,6 +606,10 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
ts := time.Now().UnixNano()
mb.llts, mb.lrts, mb.lwts = ts, ts, ts
// Remember our last sequence number.
mb.first.seq = fs.state.LastSeq + 1
mb.last.seq = fs.state.LastSeq
// We know we will need this so go ahead and spin up.
mb.spinUpFlushLoop()
@@ -1148,6 +1152,7 @@ func (mb *msgBlock) selectNextFirst() {
}
// Set new first sequence.
mb.first.seq = seq
// Check if we are empty..
if mb.isEmpty() {
mb.first.ts = 0
@@ -1381,8 +1386,8 @@ func (fs *fileStore) checkAndFlushAllBlocks() {
for _, mb := range fs.blks {
if mb.pendingWriteSize() > 0 {
mb.flushPendingMsgsAndWait()
mb.writeIndexInfo()
}
mb.writeIndexInfo()
}
}
@@ -1562,7 +1567,7 @@ func (mb *msgBlock) numBytes() uint64 {
// Update accounting on a write msg.
// Lock should be held.
func (mb *msgBlock) updateAccounting(seq uint64, ts int64, rl uint64) {
if mb.first.seq == 0 {
if mb.first.seq == 0 || mb.first.ts == 0 {
mb.first.seq = seq
mb.first.ts = ts
}
@@ -2416,7 +2421,7 @@ func (mb *msgBlock) dirtyClose() {
// Should be called with lock held.
func (mb *msgBlock) dirtyCloseWithRemove(remove bool) {
if mb == nil || mb.qch == nil {
if mb == nil {
return
}
// Close cache

View File

@@ -224,6 +224,41 @@ func TestFileStoreBasicWriteMsgsAndRestore(t *testing.T) {
if state.Bytes != expectedSize*2 {
t.Fatalf("Expected %d bytes, got %d", expectedSize*2, state.Bytes)
}
fs.Purge()
fs.Stop()
fs, err = newFileStore(fcfg, StreamConfig{Name: "dlc", Storage: FileStorage})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer fs.Stop()
state = fs.State()
if state.Msgs != 0 {
t.Fatalf("Expected %d msgs, got %d", 0, state.Msgs)
}
if state.Bytes != 0 {
t.Fatalf("Expected %d bytes, got %d", 0, state.Bytes)
}
seq, _, err := fs.StoreMsg(subj, nil, []byte("Hello"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fs.RemoveMsg(seq)
fs.Stop()
fs, err = newFileStore(fcfg, StreamConfig{Name: "dlc", Storage: FileStorage})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer fs.Stop()
state = fs.State()
if state.FirstSeq != seq+1 {
t.Fatalf("Expected first seq to be %d, got %d", seq+1, state.FirstSeq)
}
}
func TestFileStoreSelectNextFirst(t *testing.T) {
@@ -803,7 +838,7 @@ func TestFileStoreRemoveOutOfOrderRecovery(t *testing.T) {
state2 := fs.State()
if state != state2 {
t.Fatalf("Expected receovered states to be the same, got %+v vs %+v\n", state, state2)
t.Fatalf("Expected recovered states to be the same, got %+v vs %+v\n", state, state2)
}
if _, _, _, _, err := fs.LoadMsg(1); err != nil {

View File

@@ -9682,8 +9682,6 @@ func TestJetStreamPullConsumerMaxAckPendingRedeliveries(t *testing.T) {
}
func TestJetStreamDeliveryAfterServerRestart(t *testing.T) {
t.Skip("Enable this test to show the delivery problem after restart")
opts := DefaultTestOptions
opts.Port = -1
opts.JetStream = true
@@ -9737,8 +9735,6 @@ func TestJetStreamDeliveryAfterServerRestart(t *testing.T) {
// Ack it!
msg.Respond(nil)
nc.Flush()
// Give chance for server to process this ack
time.Sleep(100 * time.Millisecond)
// Shutdown client and server
nc.Close()
@@ -9751,17 +9747,12 @@ func TestJetStreamDeliveryAfterServerRestart(t *testing.T) {
s = RunServer(&opts)
defer s.Shutdown()
nc = clientConnectToServer(t, s)
defer nc.Close()
// Send 2nd message
sendStreamMsg(t, nc, "foo.bar", "msg2")
// Lookup stream.
mset, err = s.GlobalAccount().LookupStream("MY_STREAM")
if err != nil {
t.Fatalf("Error looking up stream: %v", err)
}
// Update consumer's deliver subject with new inbox
inbox = nats.NewInbox()
o, err = mset.AddConsumer(&server.ConsumerConfig{
@@ -9775,6 +9766,12 @@ func TestJetStreamDeliveryAfterServerRestart(t *testing.T) {
}
defer o.Delete()
nc = clientConnectToServer(t, s)
defer nc.Close()
// Send 2nd message
sendStreamMsg(t, nc, "foo.bar", "msg2")
// Start sub on new inbox
sub, err = nc.SubscribeSync(inbox)
if err != nil {
@@ -9783,7 +9780,7 @@ func TestJetStreamDeliveryAfterServerRestart(t *testing.T) {
nc.Flush()
// Should receive message 2.
if _, err := sub.NextMsg(250 * time.Millisecond); err != nil {
if _, err := sub.NextMsg(500 * time.Millisecond); err != nil {
t.Fatalf("Did not get message: %v", err)
}
}