mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
Fix for writing messages after restart and delivery count suppression on max
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
// Copyright 2019 The NATS Authors
|
||||
// Copyright 2019-2020 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -281,7 +281,7 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint64) *msgBlock {
|
||||
mb.ifn = path.Join(fs.fcfg.StoreDir, msgDir, fmt.Sprintf(indexScan, index))
|
||||
|
||||
// Open up the message file, but we will try to recover from the index file.
|
||||
// We will check that the last checksufs match.
|
||||
// We will check that the last checksums match.
|
||||
file, err := os.Open(mb.mfn)
|
||||
if err != nil {
|
||||
return nil
|
||||
@@ -373,6 +373,8 @@ func (fs *fileStore) recoverMsgs() error {
|
||||
|
||||
if len(fs.blks) == 0 {
|
||||
_, err = fs.newMsgBlockForWrite()
|
||||
} else {
|
||||
err = fs.enableLastMsgBlockForWriting()
|
||||
}
|
||||
|
||||
return err
|
||||
@@ -429,6 +431,24 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
|
||||
return mb, nil
|
||||
}
|
||||
|
||||
// Make sure we can write to the last message block.
|
||||
// Lock should be held.
|
||||
func (fs *fileStore) enableLastMsgBlockForWriting() error {
|
||||
mb := fs.lmb
|
||||
if mb == nil {
|
||||
return fmt.Errorf("No last message block assigned, can not enable for writing")
|
||||
}
|
||||
if mb.mfd != nil {
|
||||
return nil
|
||||
}
|
||||
mfd, err := os.OpenFile(mb.mfn, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error opening msg block file [%q]: %v", mb.mfn, err)
|
||||
}
|
||||
mb.mfd = mfd
|
||||
return nil
|
||||
}
|
||||
|
||||
// Store stores a message.
|
||||
func (fs *fileStore) StoreMsg(subj string, msg []byte) (uint64, error) {
|
||||
fs.mu.Lock()
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2019 The NATS Authors
|
||||
// Copyright 2019-2020 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -94,7 +94,6 @@ func TestFileStoreBasicWriteMsgsAndRestore(t *testing.T) {
|
||||
|
||||
// Write 100 msgs
|
||||
toStore := uint64(100)
|
||||
|
||||
for i := uint64(1); i <= toStore; i++ {
|
||||
msg := []byte(fmt.Sprintf("[%08d] Hello World!", i))
|
||||
if seq, err := fs.StoreMsg(subj, msg); err != nil {
|
||||
@@ -116,6 +115,7 @@ func TestFileStoreBasicWriteMsgsAndRestore(t *testing.T) {
|
||||
// Stop will flush to disk.
|
||||
fs.Stop()
|
||||
|
||||
// Restart
|
||||
fs, err = newFileStore(fcfg, MsgSetConfig{Name: "dlc", Storage: FileStorage})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
@@ -129,6 +129,39 @@ func TestFileStoreBasicWriteMsgsAndRestore(t *testing.T) {
|
||||
if stats.Bytes != expectedSize {
|
||||
t.Fatalf("Expected %d bytes, got %d", expectedSize, stats.Bytes)
|
||||
}
|
||||
|
||||
// Now write 100 more msgs
|
||||
for i := uint64(101); i <= toStore*2; i++ {
|
||||
msg := []byte(fmt.Sprintf("[%08d] Hello World!", i))
|
||||
if seq, err := fs.StoreMsg(subj, msg); err != nil {
|
||||
t.Fatalf("Error storing msg: %v", err)
|
||||
} else if seq != uint64(i) {
|
||||
t.Fatalf("Expected sequence to be %d, got %d", i, seq)
|
||||
}
|
||||
}
|
||||
stats = fs.Stats()
|
||||
if stats.Msgs != toStore*2 {
|
||||
t.Fatalf("Expected %d msgs, got %d", toStore*2, stats.Msgs)
|
||||
}
|
||||
|
||||
// Now cycle again and make sure that last batch was stored.
|
||||
// Stop will flush to disk.
|
||||
fs.Stop()
|
||||
|
||||
// Restart
|
||||
fs, err = newFileStore(fcfg, MsgSetConfig{Name: "dlc", Storage: FileStorage})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
defer fs.Stop()
|
||||
|
||||
stats = fs.Stats()
|
||||
if stats.Msgs != toStore*2 {
|
||||
t.Fatalf("Expected %d msgs, got %d", toStore*2, stats.Msgs)
|
||||
}
|
||||
if stats.Bytes != expectedSize*2 {
|
||||
t.Fatalf("Expected %d bytes, got %d", expectedSize*2, stats.Bytes)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFileStoreMsgLimit(t *testing.T) {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2019 The NATS Authors
|
||||
// Copyright 2019-2020 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
|
||||
@@ -821,6 +821,8 @@ func (o *Observable) getNextMsg() (string, []byte, uint64, uint64, error) {
|
||||
o.rdq = append(o.rdq[:0], o.rdq[1:]...)
|
||||
dcount = o.incDeliveryCount(seq)
|
||||
if o.maxdc > 0 && dcount > o.maxdc {
|
||||
// Make sure to remove from pending.
|
||||
delete(o.pending, seq)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
@@ -82,8 +83,15 @@ func RunBasicJetStreamServer() *server.Server {
|
||||
return RunServer(&opts)
|
||||
}
|
||||
|
||||
func RunJetStreamServerOnPort(port int) *server.Server {
|
||||
opts := DefaultTestOptions
|
||||
opts.Port = port
|
||||
opts.JetStream = true
|
||||
return RunServer(&opts)
|
||||
}
|
||||
|
||||
func clientConnectToServer(t *testing.T, s *server.Server) *nats.Conn {
|
||||
nc, err := nats.Connect(s.ClientURL())
|
||||
nc, err := nats.Connect(s.ClientURL(), nats.ReconnectWait(5*time.Millisecond), nats.MaxReconnects(-1))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create client: %v", err)
|
||||
}
|
||||
@@ -1408,6 +1416,121 @@ func TestJetStreamWorkQueueWorkingIndicator(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamObservableMaxDeliveryAndServerRestart(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
if config := s.JetStreamConfig(); config != nil {
|
||||
defer os.RemoveAll(config.StoreDir)
|
||||
}
|
||||
|
||||
mname := "MYS"
|
||||
mset, err := s.GlobalAccount().AddMsgSet(&server.MsgSetConfig{Name: mname, Storage: server.FileStorage})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error adding message set: %v", err)
|
||||
}
|
||||
defer mset.Delete()
|
||||
|
||||
dsubj := "D.TO"
|
||||
max := 4
|
||||
|
||||
o, err := mset.AddObservable(&server.ObservableConfig{
|
||||
Durable: "TO",
|
||||
Delivery: dsubj,
|
||||
DeliverAll: true,
|
||||
AckPolicy: server.AckExplicit,
|
||||
AckWait: 25 * time.Millisecond,
|
||||
MaxDeliver: max,
|
||||
})
|
||||
defer o.Delete()
|
||||
|
||||
nc := clientConnectToServer(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
sub, _ := nc.SubscribeSync(dsubj)
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
// Send one message.
|
||||
resp, _ := nc.Request(mname, []byte("order-1"), 50*time.Millisecond)
|
||||
expectOKResponse(t, resp)
|
||||
|
||||
checkSubPending := func(numExpected int) {
|
||||
t.Helper()
|
||||
checkFor(t, 150*time.Millisecond, 10*time.Millisecond, func() error {
|
||||
if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != numExpected {
|
||||
return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, numExpected)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
checkNumMsgs := func(numExpected uint64) {
|
||||
t.Helper()
|
||||
mset, err = s.GlobalAccount().LookupMsgSet(mname)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected to find a msgset for %q", mname)
|
||||
}
|
||||
stats := mset.Stats()
|
||||
if stats.Msgs != numExpected {
|
||||
t.Fatalf("Expected %d msgs, got %d", numExpected, stats.Msgs)
|
||||
}
|
||||
}
|
||||
|
||||
// Wait til we know we have max queue up.
|
||||
checkSubPending(max)
|
||||
|
||||
// Once here we have gone over the limit for the 1st message for max deliveries.
|
||||
// Send second
|
||||
resp, _ = nc.Request(mname, []byte("order-2"), 50*time.Millisecond)
|
||||
expectOKResponse(t, resp)
|
||||
|
||||
// Just wait for first delivery + one redelivery.
|
||||
checkSubPending(max + 2)
|
||||
|
||||
// Capture port since it was dynamic.
|
||||
u, _ := url.Parse(s.ClientURL())
|
||||
port, _ := strconv.Atoi(u.Port())
|
||||
|
||||
restartServer := func() {
|
||||
t.Helper()
|
||||
// Stop current server.
|
||||
s.Shutdown()
|
||||
// Restart.
|
||||
s = RunJetStreamServerOnPort(port)
|
||||
}
|
||||
|
||||
// Restart.
|
||||
restartServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
checkNumMsgs(2)
|
||||
|
||||
// Wait for client to be reconnected.
|
||||
checkFor(t, 2500*time.Millisecond, 5*time.Millisecond, func() error {
|
||||
if !nc.IsConnected() {
|
||||
return fmt.Errorf("Not connected")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Once we are here send third order.
|
||||
// Send third
|
||||
resp, _ = nc.Request(mname, []byte("order-3"), 50*time.Millisecond)
|
||||
expectOKResponse(t, resp)
|
||||
|
||||
checkNumMsgs(3)
|
||||
|
||||
// Restart.
|
||||
restartServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
checkNumMsgs(3)
|
||||
|
||||
// Now we should have max times three on our sub.
|
||||
checkSubPending(max * 3)
|
||||
}
|
||||
|
||||
func TestJetStreamDeleteObservableAndServerRestart(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
Reference in New Issue
Block a user