From a7d7b6ff56fac2c694ad67ea8d5eccb451a4ef68 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 11 Jan 2020 09:23:21 -0800 Subject: [PATCH] Fix for writing messages after restart and delivery count suppression on max Signed-off-by: Derek Collison --- server/filestore.go | 24 +++++++- server/filestore_test.go | 37 +++++++++++- server/msgset.go | 2 +- server/observable.go | 2 + test/jetstream_test.go | 125 ++++++++++++++++++++++++++++++++++++++- 5 files changed, 184 insertions(+), 6 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index ba236af4..b4dbb7dd 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1,4 +1,4 @@ -// Copyright 2019 The NATS Authors +// Copyright 2019-2020 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -281,7 +281,7 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint64) *msgBlock { mb.ifn = path.Join(fs.fcfg.StoreDir, msgDir, fmt.Sprintf(indexScan, index)) // Open up the message file, but we will try to recover from the index file. - // We will check that the last checksufs match. + // We will check that the last checksums match. file, err := os.Open(mb.mfn) if err != nil { return nil @@ -373,6 +373,8 @@ func (fs *fileStore) recoverMsgs() error { if len(fs.blks) == 0 { _, err = fs.newMsgBlockForWrite() + } else { + err = fs.enableLastMsgBlockForWriting() } return err @@ -429,6 +431,24 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { return mb, nil } +// Make sure we can write to the last message block. +// Lock should be held. +func (fs *fileStore) enableLastMsgBlockForWriting() error { + mb := fs.lmb + if mb == nil { + return fmt.Errorf("No last message block assigned, can not enable for writing") + } + if mb.mfd != nil { + return nil + } + mfd, err := os.OpenFile(mb.mfn, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644) + if err != nil { + return fmt.Errorf("Error opening msg block file [%q]: %v", mb.mfn, err) + } + mb.mfd = mfd + return nil +} + // Store stores a message. func (fs *fileStore) StoreMsg(subj string, msg []byte) (uint64, error) { fs.mu.Lock() diff --git a/server/filestore_test.go b/server/filestore_test.go index 8c2f4444..40ac3660 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -1,4 +1,4 @@ -// Copyright 2019 The NATS Authors +// Copyright 2019-2020 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -94,7 +94,6 @@ func TestFileStoreBasicWriteMsgsAndRestore(t *testing.T) { // Write 100 msgs toStore := uint64(100) - for i := uint64(1); i <= toStore; i++ { msg := []byte(fmt.Sprintf("[%08d] Hello World!", i)) if seq, err := fs.StoreMsg(subj, msg); err != nil { @@ -116,6 +115,7 @@ func TestFileStoreBasicWriteMsgsAndRestore(t *testing.T) { // Stop will flush to disk. fs.Stop() + // Restart fs, err = newFileStore(fcfg, MsgSetConfig{Name: "dlc", Storage: FileStorage}) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -129,6 +129,39 @@ func TestFileStoreBasicWriteMsgsAndRestore(t *testing.T) { if stats.Bytes != expectedSize { t.Fatalf("Expected %d bytes, got %d", expectedSize, stats.Bytes) } + + // Now write 100 more msgs + for i := uint64(101); i <= toStore*2; i++ { + msg := []byte(fmt.Sprintf("[%08d] Hello World!", i)) + if seq, err := fs.StoreMsg(subj, msg); err != nil { + t.Fatalf("Error storing msg: %v", err) + } else if seq != uint64(i) { + t.Fatalf("Expected sequence to be %d, got %d", i, seq) + } + } + stats = fs.Stats() + if stats.Msgs != toStore*2 { + t.Fatalf("Expected %d msgs, got %d", toStore*2, stats.Msgs) + } + + // Now cycle again and make sure that last batch was stored. + // Stop will flush to disk. + fs.Stop() + + // Restart + fs, err = newFileStore(fcfg, MsgSetConfig{Name: "dlc", Storage: FileStorage}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer fs.Stop() + + stats = fs.Stats() + if stats.Msgs != toStore*2 { + t.Fatalf("Expected %d msgs, got %d", toStore*2, stats.Msgs) + } + if stats.Bytes != expectedSize*2 { + t.Fatalf("Expected %d bytes, got %d", expectedSize*2, stats.Bytes) + } } func TestFileStoreMsgLimit(t *testing.T) { diff --git a/server/msgset.go b/server/msgset.go index e3a90e58..02ef5f30 100644 --- a/server/msgset.go +++ b/server/msgset.go @@ -1,4 +1,4 @@ -// Copyright 2019 The NATS Authors +// Copyright 2019-2020 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/server/observable.go b/server/observable.go index 85d34f4f..0f7bff68 100644 --- a/server/observable.go +++ b/server/observable.go @@ -821,6 +821,8 @@ func (o *Observable) getNextMsg() (string, []byte, uint64, uint64, error) { o.rdq = append(o.rdq[:0], o.rdq[1:]...) dcount = o.incDeliveryCount(seq) if o.maxdc > 0 && dcount > o.maxdc { + // Make sure to remove from pending. + delete(o.pending, seq) continue } } diff --git a/test/jetstream_test.go b/test/jetstream_test.go index e2be751e..dbdfcec6 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -18,6 +18,7 @@ import ( "fmt" "log" "math/rand" + "net/url" "os" "path/filepath" "reflect" @@ -82,8 +83,15 @@ func RunBasicJetStreamServer() *server.Server { return RunServer(&opts) } +func RunJetStreamServerOnPort(port int) *server.Server { + opts := DefaultTestOptions + opts.Port = port + opts.JetStream = true + return RunServer(&opts) +} + func clientConnectToServer(t *testing.T, s *server.Server) *nats.Conn { - nc, err := nats.Connect(s.ClientURL()) + nc, err := nats.Connect(s.ClientURL(), nats.ReconnectWait(5*time.Millisecond), nats.MaxReconnects(-1)) if err != nil { t.Fatalf("Failed to create client: %v", err) } @@ -1408,6 +1416,121 @@ func TestJetStreamWorkQueueWorkingIndicator(t *testing.T) { }) } } + +func TestJetStreamObservableMaxDeliveryAndServerRestart(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + mname := "MYS" + mset, err := s.GlobalAccount().AddMsgSet(&server.MsgSetConfig{Name: mname, Storage: server.FileStorage}) + if err != nil { + t.Fatalf("Unexpected error adding message set: %v", err) + } + defer mset.Delete() + + dsubj := "D.TO" + max := 4 + + o, err := mset.AddObservable(&server.ObservableConfig{ + Durable: "TO", + Delivery: dsubj, + DeliverAll: true, + AckPolicy: server.AckExplicit, + AckWait: 25 * time.Millisecond, + MaxDeliver: max, + }) + defer o.Delete() + + nc := clientConnectToServer(t, s) + defer nc.Close() + + sub, _ := nc.SubscribeSync(dsubj) + defer sub.Unsubscribe() + + // Send one message. + resp, _ := nc.Request(mname, []byte("order-1"), 50*time.Millisecond) + expectOKResponse(t, resp) + + checkSubPending := func(numExpected int) { + t.Helper() + checkFor(t, 150*time.Millisecond, 10*time.Millisecond, func() error { + if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != numExpected { + return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, numExpected) + } + return nil + }) + } + + checkNumMsgs := func(numExpected uint64) { + t.Helper() + mset, err = s.GlobalAccount().LookupMsgSet(mname) + if err != nil { + t.Fatalf("Expected to find a msgset for %q", mname) + } + stats := mset.Stats() + if stats.Msgs != numExpected { + t.Fatalf("Expected %d msgs, got %d", numExpected, stats.Msgs) + } + } + + // Wait til we know we have max queue up. + checkSubPending(max) + + // Once here we have gone over the limit for the 1st message for max deliveries. + // Send second + resp, _ = nc.Request(mname, []byte("order-2"), 50*time.Millisecond) + expectOKResponse(t, resp) + + // Just wait for first delivery + one redelivery. + checkSubPending(max + 2) + + // Capture port since it was dynamic. + u, _ := url.Parse(s.ClientURL()) + port, _ := strconv.Atoi(u.Port()) + + restartServer := func() { + t.Helper() + // Stop current server. + s.Shutdown() + // Restart. + s = RunJetStreamServerOnPort(port) + } + + // Restart. + restartServer() + defer s.Shutdown() + + checkNumMsgs(2) + + // Wait for client to be reconnected. + checkFor(t, 2500*time.Millisecond, 5*time.Millisecond, func() error { + if !nc.IsConnected() { + return fmt.Errorf("Not connected") + } + return nil + }) + + // Once we are here send third order. + // Send third + resp, _ = nc.Request(mname, []byte("order-3"), 50*time.Millisecond) + expectOKResponse(t, resp) + + checkNumMsgs(3) + + // Restart. + restartServer() + defer s.Shutdown() + + checkNumMsgs(3) + + // Now we should have max times three on our sub. + checkSubPending(max * 3) +} + func TestJetStreamDeleteObservableAndServerRestart(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown()