From 5e9bad5b26b962cbb7c7028ad156a8284bed34e8 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 17 Nov 2020 09:01:36 -0700 Subject: [PATCH 1/2] Test showing issue with filestore after a restart I have pin-pointed that the issue started at this commit: d737ccef55336aedc474e83df51a13959c8ca7be This was PR: https://github.com/nats-io/nats-server/pull/1685 Prior to this PR, the test would pass. Signed-off-by: Ivan Kozlovic --- test/jetstream_test.go | 107 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 107 insertions(+) diff --git a/test/jetstream_test.go b/test/jetstream_test.go index 2941260b..91e1e68b 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -9680,3 +9680,110 @@ func TestJetStreamPullConsumerMaxAckPendingRedeliveries(t *testing.T) { }) } } + +func TestJetStreamDeliveryAfterServerRestart(t *testing.T) { + t.Skip("Enable this test to show the delivery problem after restart") + + opts := DefaultTestOptions + opts.Port = -1 + opts.JetStream = true + s := RunServer(&opts) + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{ + Name: "MY_STREAM", + Storage: server.FileStorage, + Subjects: []string{"foo.>"}, + Retention: server.InterestPolicy, + }) + if err != nil { + t.Fatalf("Unexpected error adding stream: %v", err) + } + defer mset.Delete() + + nc := clientConnectToServer(t, s) + defer nc.Close() + + inbox := nats.NewInbox() + o, err := mset.AddConsumer(&server.ConsumerConfig{ + Durable: "dur", + DeliverSubject: inbox, + DeliverPolicy: server.DeliverNew, + AckPolicy: server.AckExplicit, + }) + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + defer o.Delete() + + sub, err := nc.SubscribeSync(inbox) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + nc.Flush() + + // Send 1 message + sendStreamMsg(t, nc, "foo.bar", "msg1") + + // Make sure we receive it and ack it. + msg, err := sub.NextMsg(250 * time.Millisecond) + if err != nil { + t.Fatalf("Did not get message: %v", err) + } + // Ack it! + msg.Respond(nil) + nc.Flush() + // Give chance for server to process this ack + time.Sleep(100 * time.Millisecond) + + // Shutdown client and server + nc.Close() + + dir := strings.TrimSuffix(s.JetStreamConfig().StoreDir, server.JetStreamStoreDir) + s.Shutdown() + + opts.Port = -1 + opts.StoreDir = dir + s = RunServer(&opts) + defer s.Shutdown() + + nc = clientConnectToServer(t, s) + defer nc.Close() + + // Send 2nd message + sendStreamMsg(t, nc, "foo.bar", "msg2") + + // Lookup stream. + mset, err = s.GlobalAccount().LookupStream("MY_STREAM") + if err != nil { + t.Fatalf("Error looking up stream: %v", err) + } + // Update consumer's deliver subject with new inbox + inbox = nats.NewInbox() + o, err = mset.AddConsumer(&server.ConsumerConfig{ + Durable: "dur", + DeliverSubject: inbox, + DeliverPolicy: server.DeliverNew, + AckPolicy: server.AckExplicit, + }) + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + defer o.Delete() + + // Start sub on new inbox + sub, err = nc.SubscribeSync(inbox) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + nc.Flush() + + // Should receive message 2. + if _, err := sub.NextMsg(250 * time.Millisecond); err != nil { + t.Fatalf("Did not get message: %v", err) + } +} From d358aaddf6ddbfbbb8663f0061e113e360f467b4 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 17 Nov 2020 15:04:57 -0800 Subject: [PATCH 2/2] Fixes for filestore not remember first sequence when all messages deleted. Thsi fixed a few minor bugs as well as the one where we did not remember our sequence numbers. Signed-off-by: Derek Collison --- server/filestore.go | 11 ++++++++--- server/filestore_test.go | 37 ++++++++++++++++++++++++++++++++++++- test/jetstream_test.go | 19 ++++++++----------- 3 files changed, 52 insertions(+), 15 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index f5e91100..fd1520fa 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -606,6 +606,10 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { ts := time.Now().UnixNano() mb.llts, mb.lrts, mb.lwts = ts, ts, ts + // Remember our last sequence number. + mb.first.seq = fs.state.LastSeq + 1 + mb.last.seq = fs.state.LastSeq + // We know we will need this so go ahead and spin up. mb.spinUpFlushLoop() @@ -1148,6 +1152,7 @@ func (mb *msgBlock) selectNextFirst() { } // Set new first sequence. mb.first.seq = seq + // Check if we are empty.. if mb.isEmpty() { mb.first.ts = 0 @@ -1381,8 +1386,8 @@ func (fs *fileStore) checkAndFlushAllBlocks() { for _, mb := range fs.blks { if mb.pendingWriteSize() > 0 { mb.flushPendingMsgsAndWait() - mb.writeIndexInfo() } + mb.writeIndexInfo() } } @@ -1562,7 +1567,7 @@ func (mb *msgBlock) numBytes() uint64 { // Update accounting on a write msg. // Lock should be held. func (mb *msgBlock) updateAccounting(seq uint64, ts int64, rl uint64) { - if mb.first.seq == 0 { + if mb.first.seq == 0 || mb.first.ts == 0 { mb.first.seq = seq mb.first.ts = ts } @@ -2416,7 +2421,7 @@ func (mb *msgBlock) dirtyClose() { // Should be called with lock held. func (mb *msgBlock) dirtyCloseWithRemove(remove bool) { - if mb == nil || mb.qch == nil { + if mb == nil { return } // Close cache diff --git a/server/filestore_test.go b/server/filestore_test.go index ff5570b5..e9bcb935 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -224,6 +224,41 @@ func TestFileStoreBasicWriteMsgsAndRestore(t *testing.T) { if state.Bytes != expectedSize*2 { t.Fatalf("Expected %d bytes, got %d", expectedSize*2, state.Bytes) } + + fs.Purge() + fs.Stop() + + fs, err = newFileStore(fcfg, StreamConfig{Name: "dlc", Storage: FileStorage}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer fs.Stop() + + state = fs.State() + if state.Msgs != 0 { + t.Fatalf("Expected %d msgs, got %d", 0, state.Msgs) + } + if state.Bytes != 0 { + t.Fatalf("Expected %d bytes, got %d", 0, state.Bytes) + } + + seq, _, err := fs.StoreMsg(subj, nil, []byte("Hello")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + fs.RemoveMsg(seq) + + fs.Stop() + fs, err = newFileStore(fcfg, StreamConfig{Name: "dlc", Storage: FileStorage}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer fs.Stop() + + state = fs.State() + if state.FirstSeq != seq+1 { + t.Fatalf("Expected first seq to be %d, got %d", seq+1, state.FirstSeq) + } } func TestFileStoreSelectNextFirst(t *testing.T) { @@ -803,7 +838,7 @@ func TestFileStoreRemoveOutOfOrderRecovery(t *testing.T) { state2 := fs.State() if state != state2 { - t.Fatalf("Expected receovered states to be the same, got %+v vs %+v\n", state, state2) + t.Fatalf("Expected recovered states to be the same, got %+v vs %+v\n", state, state2) } if _, _, _, _, err := fs.LoadMsg(1); err != nil { diff --git a/test/jetstream_test.go b/test/jetstream_test.go index 91e1e68b..90d78278 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -9682,8 +9682,6 @@ func TestJetStreamPullConsumerMaxAckPendingRedeliveries(t *testing.T) { } func TestJetStreamDeliveryAfterServerRestart(t *testing.T) { - t.Skip("Enable this test to show the delivery problem after restart") - opts := DefaultTestOptions opts.Port = -1 opts.JetStream = true @@ -9737,8 +9735,6 @@ func TestJetStreamDeliveryAfterServerRestart(t *testing.T) { // Ack it! msg.Respond(nil) nc.Flush() - // Give chance for server to process this ack - time.Sleep(100 * time.Millisecond) // Shutdown client and server nc.Close() @@ -9751,17 +9747,12 @@ func TestJetStreamDeliveryAfterServerRestart(t *testing.T) { s = RunServer(&opts) defer s.Shutdown() - nc = clientConnectToServer(t, s) - defer nc.Close() - - // Send 2nd message - sendStreamMsg(t, nc, "foo.bar", "msg2") - // Lookup stream. mset, err = s.GlobalAccount().LookupStream("MY_STREAM") if err != nil { t.Fatalf("Error looking up stream: %v", err) } + // Update consumer's deliver subject with new inbox inbox = nats.NewInbox() o, err = mset.AddConsumer(&server.ConsumerConfig{ @@ -9775,6 +9766,12 @@ func TestJetStreamDeliveryAfterServerRestart(t *testing.T) { } defer o.Delete() + nc = clientConnectToServer(t, s) + defer nc.Close() + + // Send 2nd message + sendStreamMsg(t, nc, "foo.bar", "msg2") + // Start sub on new inbox sub, err = nc.SubscribeSync(inbox) if err != nil { @@ -9783,7 +9780,7 @@ func TestJetStreamDeliveryAfterServerRestart(t *testing.T) { nc.Flush() // Should receive message 2. - if _, err := sub.NextMsg(250 * time.Millisecond); err != nil { + if _, err := sub.NextMsg(500 * time.Millisecond); err != nil { t.Fatalf("Did not get message: %v", err) } }