Merge pull request #1721 from nats-io/fsfix2

Fix for not properly recovering first sequence number on recovery.
This commit is contained in:
Derek Collison
2020-11-17 15:22:02 -08:00
committed by GitHub
3 changed files with 148 additions and 4 deletions

View File

@@ -606,6 +606,10 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
ts := time.Now().UnixNano()
mb.llts, mb.lrts, mb.lwts = ts, ts, ts
// Remember our last sequence number.
mb.first.seq = fs.state.LastSeq + 1
mb.last.seq = fs.state.LastSeq
// We know we will need this so go ahead and spin up.
mb.spinUpFlushLoop()
@@ -1148,6 +1152,7 @@ func (mb *msgBlock) selectNextFirst() {
}
// Set new first sequence.
mb.first.seq = seq
// Check if we are empty..
if mb.isEmpty() {
mb.first.ts = 0
@@ -1381,8 +1386,8 @@ func (fs *fileStore) checkAndFlushAllBlocks() {
for _, mb := range fs.blks {
if mb.pendingWriteSize() > 0 {
mb.flushPendingMsgsAndWait()
mb.writeIndexInfo()
}
mb.writeIndexInfo()
}
}
@@ -1562,7 +1567,7 @@ func (mb *msgBlock) numBytes() uint64 {
// Update accounting on a write msg.
// Lock should be held.
func (mb *msgBlock) updateAccounting(seq uint64, ts int64, rl uint64) {
if mb.first.seq == 0 {
if mb.first.seq == 0 || mb.first.ts == 0 {
mb.first.seq = seq
mb.first.ts = ts
}
@@ -2416,7 +2421,7 @@ func (mb *msgBlock) dirtyClose() {
// Should be called with lock held.
func (mb *msgBlock) dirtyCloseWithRemove(remove bool) {
if mb == nil || mb.qch == nil {
if mb == nil {
return
}
// Close cache

View File

@@ -224,6 +224,41 @@ func TestFileStoreBasicWriteMsgsAndRestore(t *testing.T) {
if state.Bytes != expectedSize*2 {
t.Fatalf("Expected %d bytes, got %d", expectedSize*2, state.Bytes)
}
fs.Purge()
fs.Stop()
fs, err = newFileStore(fcfg, StreamConfig{Name: "dlc", Storage: FileStorage})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer fs.Stop()
state = fs.State()
if state.Msgs != 0 {
t.Fatalf("Expected %d msgs, got %d", 0, state.Msgs)
}
if state.Bytes != 0 {
t.Fatalf("Expected %d bytes, got %d", 0, state.Bytes)
}
seq, _, err := fs.StoreMsg(subj, nil, []byte("Hello"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fs.RemoveMsg(seq)
fs.Stop()
fs, err = newFileStore(fcfg, StreamConfig{Name: "dlc", Storage: FileStorage})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer fs.Stop()
state = fs.State()
if state.FirstSeq != seq+1 {
t.Fatalf("Expected first seq to be %d, got %d", seq+1, state.FirstSeq)
}
}
func TestFileStoreSelectNextFirst(t *testing.T) {
@@ -803,7 +838,7 @@ func TestFileStoreRemoveOutOfOrderRecovery(t *testing.T) {
state2 := fs.State()
if state != state2 {
t.Fatalf("Expected receovered states to be the same, got %+v vs %+v\n", state, state2)
t.Fatalf("Expected recovered states to be the same, got %+v vs %+v\n", state, state2)
}
if _, _, _, _, err := fs.LoadMsg(1); err != nil {

View File

@@ -9696,3 +9696,107 @@ func TestJetStreamPullConsumerMaxAckPendingRedeliveries(t *testing.T) {
})
}
}
func TestJetStreamDeliveryAfterServerRestart(t *testing.T) {
opts := DefaultTestOptions
opts.Port = -1
opts.JetStream = true
s := RunServer(&opts)
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{
Name: "MY_STREAM",
Storage: server.FileStorage,
Subjects: []string{"foo.>"},
Retention: server.InterestPolicy,
})
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
defer mset.Delete()
nc := clientConnectToServer(t, s)
defer nc.Close()
inbox := nats.NewInbox()
o, err := mset.AddConsumer(&server.ConsumerConfig{
Durable: "dur",
DeliverSubject: inbox,
DeliverPolicy: server.DeliverNew,
AckPolicy: server.AckExplicit,
})
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
defer o.Delete()
sub, err := nc.SubscribeSync(inbox)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
nc.Flush()
// Send 1 message
sendStreamMsg(t, nc, "foo.bar", "msg1")
// Make sure we receive it and ack it.
msg, err := sub.NextMsg(250 * time.Millisecond)
if err != nil {
t.Fatalf("Did not get message: %v", err)
}
// Ack it!
msg.Respond(nil)
nc.Flush()
// Shutdown client and server
nc.Close()
dir := strings.TrimSuffix(s.JetStreamConfig().StoreDir, server.JetStreamStoreDir)
s.Shutdown()
opts.Port = -1
opts.StoreDir = dir
s = RunServer(&opts)
defer s.Shutdown()
// Lookup stream.
mset, err = s.GlobalAccount().LookupStream("MY_STREAM")
if err != nil {
t.Fatalf("Error looking up stream: %v", err)
}
// Update consumer's deliver subject with new inbox
inbox = nats.NewInbox()
o, err = mset.AddConsumer(&server.ConsumerConfig{
Durable: "dur",
DeliverSubject: inbox,
DeliverPolicy: server.DeliverNew,
AckPolicy: server.AckExplicit,
})
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
defer o.Delete()
nc = clientConnectToServer(t, s)
defer nc.Close()
// Send 2nd message
sendStreamMsg(t, nc, "foo.bar", "msg2")
// Start sub on new inbox
sub, err = nc.SubscribeSync(inbox)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
nc.Flush()
// Should receive message 2.
if _, err := sub.NextMsg(500 * time.Millisecond); err != nil {
t.Fatalf("Did not get message: %v", err)
}
}