diff --git a/server/filestore.go b/server/filestore.go index f5e91100..fd1520fa 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -606,6 +606,10 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { ts := time.Now().UnixNano() mb.llts, mb.lrts, mb.lwts = ts, ts, ts + // Remember our last sequence number. + mb.first.seq = fs.state.LastSeq + 1 + mb.last.seq = fs.state.LastSeq + // We know we will need this so go ahead and spin up. mb.spinUpFlushLoop() @@ -1148,6 +1152,7 @@ func (mb *msgBlock) selectNextFirst() { } // Set new first sequence. mb.first.seq = seq + // Check if we are empty.. if mb.isEmpty() { mb.first.ts = 0 @@ -1381,8 +1386,8 @@ func (fs *fileStore) checkAndFlushAllBlocks() { for _, mb := range fs.blks { if mb.pendingWriteSize() > 0 { mb.flushPendingMsgsAndWait() - mb.writeIndexInfo() } + mb.writeIndexInfo() } } @@ -1562,7 +1567,7 @@ func (mb *msgBlock) numBytes() uint64 { // Update accounting on a write msg. // Lock should be held. func (mb *msgBlock) updateAccounting(seq uint64, ts int64, rl uint64) { - if mb.first.seq == 0 { + if mb.first.seq == 0 || mb.first.ts == 0 { mb.first.seq = seq mb.first.ts = ts } @@ -2416,7 +2421,7 @@ func (mb *msgBlock) dirtyClose() { // Should be called with lock held. func (mb *msgBlock) dirtyCloseWithRemove(remove bool) { - if mb == nil || mb.qch == nil { + if mb == nil { return } // Close cache diff --git a/server/filestore_test.go b/server/filestore_test.go index ff5570b5..e9bcb935 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -224,6 +224,41 @@ func TestFileStoreBasicWriteMsgsAndRestore(t *testing.T) { if state.Bytes != expectedSize*2 { t.Fatalf("Expected %d bytes, got %d", expectedSize*2, state.Bytes) } + + fs.Purge() + fs.Stop() + + fs, err = newFileStore(fcfg, StreamConfig{Name: "dlc", Storage: FileStorage}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer fs.Stop() + + state = fs.State() + if state.Msgs != 0 { + t.Fatalf("Expected %d msgs, got %d", 0, state.Msgs) + } + if state.Bytes != 0 { + t.Fatalf("Expected %d bytes, got %d", 0, state.Bytes) + } + + seq, _, err := fs.StoreMsg(subj, nil, []byte("Hello")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + fs.RemoveMsg(seq) + + fs.Stop() + fs, err = newFileStore(fcfg, StreamConfig{Name: "dlc", Storage: FileStorage}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer fs.Stop() + + state = fs.State() + if state.FirstSeq != seq+1 { + t.Fatalf("Expected first seq to be %d, got %d", seq+1, state.FirstSeq) + } } func TestFileStoreSelectNextFirst(t *testing.T) { @@ -803,7 +838,7 @@ func TestFileStoreRemoveOutOfOrderRecovery(t *testing.T) { state2 := fs.State() if state != state2 { - t.Fatalf("Expected receovered states to be the same, got %+v vs %+v\n", state, state2) + t.Fatalf("Expected recovered states to be the same, got %+v vs %+v\n", state, state2) } if _, _, _, _, err := fs.LoadMsg(1); err != nil { diff --git a/test/jetstream_test.go b/test/jetstream_test.go index f974558f..cea5fc77 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -9696,3 +9696,107 @@ func TestJetStreamPullConsumerMaxAckPendingRedeliveries(t *testing.T) { }) } } + +func TestJetStreamDeliveryAfterServerRestart(t *testing.T) { + opts := DefaultTestOptions + opts.Port = -1 + opts.JetStream = true + s := RunServer(&opts) + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{ + Name: "MY_STREAM", + Storage: server.FileStorage, + Subjects: []string{"foo.>"}, + Retention: server.InterestPolicy, + }) + if err != nil { + t.Fatalf("Unexpected error adding stream: %v", err) + } + defer mset.Delete() + + nc := clientConnectToServer(t, s) + defer nc.Close() + + inbox := nats.NewInbox() + o, err := mset.AddConsumer(&server.ConsumerConfig{ + Durable: "dur", + DeliverSubject: inbox, + DeliverPolicy: server.DeliverNew, + AckPolicy: server.AckExplicit, + }) + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + defer o.Delete() + + sub, err := nc.SubscribeSync(inbox) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + nc.Flush() + + // Send 1 message + sendStreamMsg(t, nc, "foo.bar", "msg1") + + // Make sure we receive it and ack it. + msg, err := sub.NextMsg(250 * time.Millisecond) + if err != nil { + t.Fatalf("Did not get message: %v", err) + } + // Ack it! + msg.Respond(nil) + nc.Flush() + + // Shutdown client and server + nc.Close() + + dir := strings.TrimSuffix(s.JetStreamConfig().StoreDir, server.JetStreamStoreDir) + s.Shutdown() + + opts.Port = -1 + opts.StoreDir = dir + s = RunServer(&opts) + defer s.Shutdown() + + // Lookup stream. + mset, err = s.GlobalAccount().LookupStream("MY_STREAM") + if err != nil { + t.Fatalf("Error looking up stream: %v", err) + } + + // Update consumer's deliver subject with new inbox + inbox = nats.NewInbox() + o, err = mset.AddConsumer(&server.ConsumerConfig{ + Durable: "dur", + DeliverSubject: inbox, + DeliverPolicy: server.DeliverNew, + AckPolicy: server.AckExplicit, + }) + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + defer o.Delete() + + nc = clientConnectToServer(t, s) + defer nc.Close() + + // Send 2nd message + sendStreamMsg(t, nc, "foo.bar", "msg2") + + // Start sub on new inbox + sub, err = nc.SubscribeSync(inbox) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + nc.Flush() + + // Should receive message 2. + if _, err := sub.NextMsg(500 * time.Millisecond); err != nil { + t.Fatalf("Did not get message: %v", err) + } +}