Filestore read perf, secure delete for stores

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2019-10-27 14:31:54 -07:00
parent 4afdfafd89
commit e8f465eaf3
7 changed files with 350 additions and 61 deletions

View File

@@ -22,8 +22,10 @@ import (
"hash"
"io"
"io/ioutil"
"math/rand"
"os"
"path"
"strings"
"sync"
"time"
@@ -33,10 +35,8 @@ import (
type FileStoreConfig struct {
// Where the parent directory for all storage will be located.
StoreDir string
// BlockSize is th essage data file block size. This also represents the maximum overhead size.
// BlockSize is the file block size. This also represents the maximum overhead size.
BlockSize uint64
// ReadBufferSize is how much we read from a block during lookups.
ReadBufferSize int
}
type fileStore struct {
@@ -58,15 +58,16 @@ type fileStore struct {
// Represents a message store block and its data.
type msgBlock struct {
mfd *os.File
mfn string
mfd *os.File
ifn string
ifd *os.File
index uint64
bytes uint64
msgs uint64
first msgId
last msgId
cache map[uint64]*storedMsg
cache map[uint64]*fileStoredMsg
dmap map[uint64]struct{}
lchk [8]byte
}
@@ -76,6 +77,14 @@ type msgId struct {
ts int64
}
type fileStoredMsg struct {
subj string
msg []byte
seq uint64
ts int64 // nanoseconds
off int64 // offset into block file
}
const (
// This is where we keep the message store blocks.
msgDir = "msgs"
@@ -91,11 +100,11 @@ const (
const (
// Default stream block size.
defaultStreamBlockSize = 512 * 1024 * 1024 // 128MB
defaultStreamBlockSize = 128 * 1024 * 1024 // 128MB
// Default for workqueue or interest based.
defaultOtherBlockSize = 32 * 1024 * 1024 // 32MB
// Default ReadBuffer size
defaultReadBufferSize = 4 * 1024 * 1024 // 4MB
// max block size for now.
maxBlockSize = defaultStreamBlockSize
)
func newFileStore(fcfg FileStoreConfig, cfg MsgSetConfig) (*fileStore, error) {
@@ -108,11 +117,8 @@ func newFileStore(fcfg FileStoreConfig, cfg MsgSetConfig) (*fileStore, error) {
if fcfg.BlockSize == 0 {
fcfg.BlockSize = dynBlkSize(cfg.Retention, cfg.MaxBytes)
}
if fcfg.ReadBufferSize == 0 {
fcfg.ReadBufferSize = defaultReadBufferSize
}
if fcfg.ReadBufferSize >= int(fcfg.BlockSize) {
fcfg.ReadBufferSize = int(fcfg.BlockSize)
if fcfg.BlockSize > maxBlockSize {
return nil, fmt.Errorf("filestore max block size is %s", FriendlyBytes(maxBlockSize))
}
// Check the directory
@@ -288,7 +294,7 @@ func (ms *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
if ms.lmb != nil {
index = ms.lmb.index + 1
ms.flushToFileLocked()
ms.closeLastMsgBlock()
ms.closeLastMsgBlock(false)
} else {
index = 1
}
@@ -305,6 +311,11 @@ func (ms *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
mb.mfd = mfd
mb.ifn = path.Join(ms.fcfg.StoreDir, msgDir, fmt.Sprintf(indexScan, mb.index))
ifd, err := os.OpenFile(mb.ifn, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return nil, fmt.Errorf("Error creating msg index file [%q]: %v", mb.mfn, err)
}
mb.ifd = ifd
return mb, nil
}
@@ -371,7 +382,7 @@ func (ms *fileStore) enforceBytesLimit() {
}
func (ms *fileStore) deleteFirstMsg() bool {
return ms.removeMsg(ms.stats.FirstSeq)
return ms.removeMsg(ms.stats.FirstSeq, false)
}
// RemoveMsg will remove the message from this store.
@@ -379,15 +390,21 @@ func (ms *fileStore) deleteFirstMsg() bool {
func (ms *fileStore) RemoveMsg(seq uint64) bool {
ms.mu.Lock()
defer ms.mu.Unlock()
return ms.removeMsg(seq)
return ms.removeMsg(seq, false)
}
func (ms *fileStore) removeMsg(seq uint64) bool {
func (ms *fileStore) EraseMsg(seq uint64) bool {
ms.mu.Lock()
defer ms.mu.Unlock()
return ms.removeMsg(seq, true)
}
func (ms *fileStore) removeMsg(seq uint64, secure bool) bool {
mb := ms.selectMsgBlock(seq)
if mb == nil {
return false
}
var sm *storedMsg
var sm *fileStoredMsg
if mb.cache != nil {
sm = mb.cache[seq]
}
@@ -400,12 +417,12 @@ func (ms *fileStore) removeMsg(seq uint64) bool {
return false
}
// We have the message here, so we can delete it.
ms.deleteMsgFromBlock(mb, seq, sm)
ms.deleteMsgFromBlock(mb, seq, sm, secure)
return true
}
// Lock should be held.
func (ms *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *storedMsg) {
func (ms *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStoredMsg, secure bool) {
// Update global accounting.
msz := fileStoreMsgSize(sm.subj, sm.msg)
ms.stats.Msgs--
@@ -437,6 +454,9 @@ func (ms *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *storedMsg)
mb.dmap[seq] = struct{}{}
mb.writeIndexInfo()
}
if secure {
ms.eraseMsg(mb, sm)
}
}
func (ms *fileStore) startAgeChk() {
@@ -620,6 +640,59 @@ func (ms *fileStore) writeMsgRecord(seq uint64, subj string, msg []byte) (uint64
return uint64(rl), nil
}
// Will rewrite the message in the underlying store.
func (ms *fileStore) eraseMsg(mb *msgBlock, sm *fileStoredMsg) error {
if sm == nil || sm.off < 0 || uint64(sm.off) > mb.bytes {
return fmt.Errorf("bad stored message")
}
// erase contents and rewrite with new hash.
rand.Read(sm.msg)
sm.seq, sm.ts = 0, 0
chars := []rune("ABCDEFGHIJKLMNOPQRSTUVWXYZ")
var b strings.Builder
for i := 0; i < len(sm.subj); i++ {
b.WriteRune(chars[rand.Intn(len(chars))])
}
sm.subj = b.String()
var le = binary.LittleEndian
var hdr [msgHdrSize]byte
rl := fileStoreMsgSize(sm.subj, sm.msg)
le.PutUint32(hdr[0:], uint32(rl))
le.PutUint64(hdr[4:], 0)
le.PutUint64(hdr[12:], 0)
le.PutUint16(hdr[20:], uint16(len(sm.subj)))
// Now write to underlying buffer.
var wmb bytes.Buffer
wmb.Write(hdr[:])
wmb.WriteString(sm.subj)
wmb.Write(sm.msg)
// Calculate hash.
ms.hh.Reset()
ms.hh.Write(hdr[4:20])
ms.hh.Write([]byte(sm.subj))
ms.hh.Write(sm.msg)
checksum := ms.hh.Sum(nil)
// Write to msg record.
wmb.Write(checksum)
mfd, err := os.OpenFile(mb.mfn, os.O_RDWR, 0644)
if err != nil {
return err
}
_, err = mfd.WriteAt(wmb.Bytes(), sm.off)
mfd.Sync()
mfd.Close()
return err
}
// Select the message block where this message should be found.
// Return nil if not in the set.
// Read lock should be held.
@@ -636,7 +709,7 @@ func (ms *fileStore) selectMsgBlock(seq uint64) *msgBlock {
}
// Read and cache message from the underlying block.
func (ms *fileStore) readAndCacheMsgs(mb *msgBlock, seq uint64) *storedMsg {
func (ms *fileStore) readAndCacheMsgs(mb *msgBlock, seq uint64) *fileStoredMsg {
// TODO(dlc) - Could reuse if already open fd. Also release locks for
// load in parallel. For now opt for simple approach.
msgFile := path.Join(ms.fcfg.StoreDir, msgDir, fmt.Sprintf(blkScan, mb.index))
@@ -651,51 +724,55 @@ func (ms *fileStore) readAndCacheMsgs(mb *msgBlock, seq uint64) *storedMsg {
ms.flushToFileLocked()
}
if mb.cache == nil {
mb.cache = make(map[uint64]*storedMsg)
mb.cache = make(map[uint64]*fileStoredMsg)
}
r := bufio.NewReaderSize(fd, ms.fcfg.ReadBufferSize)
buf, err := ioutil.ReadFile(msgFile)
if err != nil {
// FIXME(dlc) - complain somehow.
return nil
}
var le = binary.LittleEndian
var cachedSize int
var sm *storedMsg
var sm *fileStoredMsg
// Read until we get our message, or see a message that has higher sequence.
for {
var hdr [msgHdrSize]byte
if _, err := io.ReadFull(r, hdr[:]); err != nil {
break
}
for index, skip := 0, 0; index < len(buf); {
hdr := buf[index : index+msgHdrSize]
rl := le.Uint32(hdr[0:])
dlen := int(rl) - msgHdrSize
mseq := le.Uint64(hdr[4:])
if seq > mseq || mb.cache[mseq] != nil {
// Skip over
io.CopyN(ioutil.Discard, r, int64(dlen))
index += int(rl)
skip += int(rl)
continue
}
// If we have a delete map check it.
if mb.dmap != nil {
if _, ok := mb.dmap[mseq]; ok {
// Skip over
io.CopyN(ioutil.Discard, r, int64(dlen))
index += int(rl)
skip += int(rl)
continue
}
}
// Read in the message regardless.
// Read in the message.
ts := int64(le.Uint64(hdr[12:]))
slen := le.Uint16(hdr[20:])
// Do some quick sanity checks here.
if dlen < 0 || int(slen) > dlen || dlen > int(rl) {
// This means something is off.
ms.bad = append(ms.bad, seq)
index += int(rl)
skip += int(rl)
fmt.Printf("ZZZZZ\n\n")
continue
}
data := make([]byte, dlen)
if _, err := io.ReadFull(r, data); err != nil {
break
}
index += msgHdrSize
data := buf[index : index+dlen]
// Check the checksum here.
ms.hh.Reset()
ms.hh.Write(hdr[4:20])
@@ -703,29 +780,29 @@ func (ms *fileStore) readAndCacheMsgs(mb *msgBlock, seq uint64) *storedMsg {
ms.hh.Write(data[slen : dlen-8])
checksum := ms.hh.Sum(nil)
if !bytes.Equal(checksum, data[len(data)-8:]) {
index += dlen
ms.bad = append(ms.bad, seq)
fmt.Printf("ZZZZZ\n\n")
continue
}
msg := &storedMsg{
msg := &fileStoredMsg{
subj: string(data[:slen]),
msg: data[slen : dlen-8],
seq: mseq,
ts: ts,
off: int64(index - msgHdrSize),
}
mb.cache[mseq] = msg
if mseq == seq {
sm = msg
}
cachedSize += dlen
if cachedSize > ms.fcfg.ReadBufferSize {
break
}
index += dlen
}
return sm
}
// Will return message for the given sequence number.
func (ms *fileStore) msgForSeq(seq uint64) *storedMsg {
func (ms *fileStore) msgForSeq(seq uint64) *fileStoredMsg {
ms.mu.RLock()
// seq == 0 indidcates we want first msg.
if seq == 0 {
@@ -751,6 +828,25 @@ func (ms *fileStore) msgForSeq(seq uint64) *storedMsg {
return sm
}
// Internal function to return msg parts from a raw buffer.
func msgFromBuf(buf []byte) (string, []byte, uint64, int64, error) {
if len(buf) < msgHdrSize {
return "", nil, 0, 0, fmt.Errorf("buf too small for msg")
}
var le = binary.LittleEndian
hdr := buf[:msgHdrSize]
rl := le.Uint32(hdr[0:])
dlen := int(rl) - msgHdrSize
seq := le.Uint64(hdr[4:])
ts := int64(le.Uint64(hdr[12:]))
slen := le.Uint16(hdr[20:])
if dlen < 0 || int(slen) > dlen || dlen > int(rl) {
return "", nil, 0, 0, fmt.Errorf("malformed or corrupt msg")
}
data := buf[msgHdrSize:]
return string(data[:slen]), data[slen : dlen-8], seq, ts, nil
}
// Lookup will lookup the message by sequence number.
func (ms *fileStore) Lookup(seq uint64) (string, []byte, int64, error) {
if sm := ms.msgForSeq(seq); sm != nil {
@@ -779,14 +875,13 @@ func (ms *fileStore) flushToFile() {
// Lock should be held.
func (ms *fileStore) flushToFileLocked() {
lbb := ms.wmb.Len()
mb := ms.lmb
if mb == nil {
return
}
// Append new data to the message block file.
if lbb > 0 && mb.mfd != nil {
if lbb := ms.wmb.Len(); lbb > 0 && mb.mfd != nil {
n, _ := ms.wmb.WriteTo(mb.mfd)
if int(n) != lbb {
ms.wmb.Truncate(int(n))
@@ -820,7 +915,13 @@ func (mb *msgBlock) writeIndexInfo() error {
if len(mb.dmap) > 0 {
buf = append(buf, mb.genDeleteMap()...)
}
return ioutil.WriteFile(mb.ifn, buf, 0644)
var err error
if mb.ifd != nil {
_, err = mb.ifd.WriteAt(buf, 0)
} else {
err = ioutil.WriteFile(mb.ifn, buf, 0644)
}
return err
}
func (mb *msgBlock) readIndexInfo() error {
@@ -878,11 +979,15 @@ func (mb *msgBlock) genDeleteMap() []byte {
return buf[:n]
}
func syncAndClose(mfd *os.File) {
func syncAndClose(mfd, ifd *os.File) {
if mfd != nil {
mfd.Sync()
mfd.Close()
}
if ifd != nil {
ifd.Sync()
ifd.Close()
}
}
// Purge will remove all messages from this store.
@@ -932,12 +1037,17 @@ func (ms *fileStore) numMsgBlocks() int {
// Removes the msgBlock
// Lock should be held.
func (ms *fileStore) removeMsgBlock(mb *msgBlock) {
if mb.ifd != nil {
mb.ifd.Close()
mb.ifd = nil
}
os.Remove(mb.ifn)
if mb.mfd != nil {
mb.mfd.Close()
mb.mfd = nil
}
os.Remove(mb.mfn)
for i, omb := range ms.blks {
if mb == omb {
ms.blks = append(ms.blks[:i], ms.blks[i+1:]...)
@@ -954,12 +1064,17 @@ func (ms *fileStore) removeMsgBlock(mb *msgBlock) {
}
}
func (ms *fileStore) closeLastMsgBlock() {
func (ms *fileStore) closeLastMsgBlock(sync bool) {
if ms.lmb == nil || ms.lmb.mfd == nil {
return
}
go syncAndClose(ms.lmb.mfd)
if sync {
syncAndClose(ms.lmb.mfd, ms.lmb.ifd)
} else {
go syncAndClose(ms.lmb.mfd, ms.lmb.ifd)
}
ms.lmb.mfd = nil
ms.lmb.ifd = nil
ms.lmb = nil
}
@@ -972,7 +1087,7 @@ func (ms *fileStore) Stop() {
ms.closed = true
close(ms.qch)
ms.flushToFileLocked()
ms.closeLastMsgBlock()
ms.closeLastMsgBlock(true)
ms.wmb = &bytes.Buffer{}
if ms.ageChk != nil {
ms.ageChk.Stop()

View File

@@ -20,6 +20,7 @@ import (
"math/bits"
"math/rand"
"os"
"path"
"path/filepath"
"reflect"
"testing"
@@ -409,7 +410,7 @@ func TestFileStoreRemovePartialRecovery(t *testing.T) {
stats2 := ms.Stats()
if stats != stats2 {
t.Fatalf("Expected receovered stats to be the same, got %+v vs %+v\n", stats, stats2)
t.Fatalf("Expected recovered stats to be the same, got %+v vs %+v\n", stats, stats2)
}
}
@@ -581,3 +582,130 @@ func TestFileStoreBitRot(t *testing.T) {
t.Fatalf("Different reporting on bad msgs: %+v vs %+v", bseqs, ms.checkMsgs())
}
}
func TestFileStoreEraseMsg(t *testing.T) {
storeDir, _ := ioutil.TempDir("", JetStreamStoreDir)
os.MkdirAll(storeDir, 0755)
defer os.RemoveAll(storeDir)
ms, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, MsgSetConfig{Name: "zzz", Storage: FileStorage})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer ms.Stop()
subj, msg := "foo", []byte("Hello World")
ms.StoreMsg(subj, msg)
_, smsg, _, err := ms.Lookup(1)
if err != nil {
t.Fatalf("Unexpected error looking up msg: %v", err)
}
if !bytes.Equal(msg, smsg) {
t.Fatalf("Expected same msg, got %q vs %q", smsg, msg)
}
sm := ms.msgForSeq(1)
if !ms.EraseMsg(1) {
t.Fatalf("Expected erase msg to return success")
}
if bytes.Equal(msg, smsg) {
t.Fatalf("Expected msg to be erased")
}
// Now look on disk as well.
rl := fileStoreMsgSize(subj, msg)
buf := make([]byte, rl)
fp, err := os.Open(path.Join(storeDir, msgDir, fmt.Sprintf(blkScan, 1)))
if err != nil {
t.Fatalf("Error opening msgs file: %v", err)
}
defer fp.Close()
fp.ReadAt(buf, sm.off)
nsubj, nmsg, seq, ts, err := msgFromBuf(buf)
if err != nil {
t.Fatalf("error reading message from block: %v", err)
}
if nsubj == subj {
t.Fatalf("Expected the subjects to be different")
}
if seq != 0 {
t.Fatalf("Expected seq to be 0, marking as deleted, got %d", seq)
}
if ts != 0 {
t.Fatalf("Expected timestamp to be 0, got %d", ts)
}
if bytes.Equal(nmsg, msg) {
t.Fatalf("Expected message body to be randomized")
}
}
func TestFileStorePerf(t *testing.T) {
// Uncomment to run, holding place for now.
t.SkipNow()
subj, msg := "foo", make([]byte, 4*1024)
for i := 0; i < len(msg); i++ {
msg[i] = 'D'
}
storedMsgSize := fileStoreMsgSize(subj, msg)
// 10GB
toStore := 10 * 1024 * 1024 * 1024 / storedMsgSize
fmt.Printf("storing %d msgs of %s each, totalling %s\n",
toStore,
FriendlyBytes(int64(storedMsgSize)),
FriendlyBytes(int64(toStore*storedMsgSize)),
)
storeDir, _ := ioutil.TempDir("", JetStreamStoreDir)
os.MkdirAll(storeDir, 0755)
defer os.RemoveAll(storeDir)
fmt.Printf("StoreDir is %q\n", storeDir)
ms, err := newFileStore(
FileStoreConfig{StoreDir: storeDir},
MsgSetConfig{Name: "zzz", Storage: FileStorage},
)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
start := time.Now()
for i := 0; i < int(toStore); i++ {
ms.StoreMsg(subj, msg)
}
ms.Stop()
tt := time.Since(start)
fmt.Printf("time to store is %v\n", tt)
fmt.Printf("%.0f msgs/sec\n", float64(toStore)/tt.Seconds())
fmt.Printf("%s per sec\n", FriendlyBytes(int64(float64(toStore*storedMsgSize)/tt.Seconds())))
fmt.Printf("Filesystem cache flush, paused 5 seconds.\n\n")
time.Sleep(5 * time.Second)
fmt.Printf("reading %d msgs of %s each, totalling %s\n",
toStore,
FriendlyBytes(int64(storedMsgSize)),
FriendlyBytes(int64(toStore*storedMsgSize)),
)
ms, err = newFileStore(
FileStoreConfig{StoreDir: storeDir, BlockSize: 128 * 1024 * 1024},
MsgSetConfig{Name: "zzz", Storage: FileStorage},
)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
start = time.Now()
for i := uint64(1); i <= toStore; i++ {
ms.Lookup(i)
}
ms.Stop()
tt = time.Since(start)
fmt.Printf("time to read all back messages is %v\n", tt)
fmt.Printf("%.0f msgs/sec\n", float64(toStore)/tt.Seconds())
fmt.Printf("%s per sec\n", FriendlyBytes(int64(float64(toStore*storedMsgSize)/tt.Seconds())))
}

View File

@@ -15,6 +15,7 @@ package server
import (
"fmt"
"math/rand"
"sort"
"sync"
"time"
@@ -199,7 +200,7 @@ func (ms *memStore) deleteFirstMsgOrPanic() {
}
func (ms *memStore) deleteFirstMsg() bool {
return ms.removeMsg(ms.stats.FirstSeq)
return ms.removeMsg(ms.stats.FirstSeq, false)
}
// Lookup will lookup the message by sequence number.
@@ -218,13 +219,21 @@ func (ms *memStore) Lookup(seq uint64) (string, []byte, int64, error) {
// Will return the number of bytes removed.
func (ms *memStore) RemoveMsg(seq uint64) bool {
ms.mu.Lock()
removed := ms.removeMsg(seq)
removed := ms.removeMsg(seq, false)
ms.mu.Unlock()
return removed
}
// EraseMsg will remove the message and rewrite its contents.
func (ms *memStore) EraseMsg(seq uint64) bool {
ms.mu.Lock()
removed := ms.removeMsg(seq, true)
ms.mu.Unlock()
return removed
}
// Removes the message referenced by seq.
func (ms *memStore) removeMsg(seq uint64) bool {
func (ms *memStore) removeMsg(seq uint64, secure bool) bool {
var ss uint64
sm, ok := ms.msgs[seq]
if ok {
@@ -235,6 +244,10 @@ func (ms *memStore) removeMsg(seq uint64) bool {
if seq == ms.stats.FirstSeq {
ms.stats.FirstSeq++
}
if secure {
rand.Read(sm.msg)
sm.seq = 0
}
}
if ms.scb != nil {
delta := int64(ss)

View File

@@ -192,3 +192,25 @@ func TestMemStoreTimeStamps(t *testing.T) {
last = ts
}
}
func TestMemStoreEraseMsg(t *testing.T) {
ms, err := newMemStore(&MsgSetConfig{Storage: MemoryStorage})
if err != nil {
t.Fatalf("Unexpected error creating store: %v", err)
}
subj, msg := "foo", []byte("Hello World")
ms.StoreMsg(subj, msg)
_, smsg, _, err := ms.Lookup(1)
if err != nil {
t.Fatalf("Unexpected error looking up msg: %v", err)
}
if !bytes.Equal(msg, smsg) {
t.Fatalf("Expected same msg, got %q vs %q", smsg, msg)
}
if !ms.EraseMsg(1) {
t.Fatalf("Expected erase msg to return success")
}
if bytes.Equal(msg, smsg) {
t.Fatalf("Expected msg to be erased")
}
}

View File

@@ -181,6 +181,16 @@ func (mset *MsgSet) Purge() uint64 {
return mset.store.Purge()
}
// RemoveMsg will remove a message from a message set.
func (mset *MsgSet) RemoveMsg(seq uint64) bool {
return mset.store.RemoveMsg(seq)
}
// EraseMsg will securely remove a message and rewrite the data with random data.
func (mset *MsgSet) EraseMsg(seq uint64) bool {
return mset.store.EraseMsg(seq)
}
// Will create internal subscriptions for the msgSet.
// Lock should be held.
func (mset *MsgSet) subscribeToMsgSet() error {

View File

@@ -32,6 +32,7 @@ type MsgSetStore interface {
StoreMsg(subj string, msg []byte) (uint64, error)
Lookup(seq uint64) (subj string, msg []byte, ts int64, err error)
RemoveMsg(seq uint64) bool
EraseMsg(seq uint64) bool
Purge() uint64
GetSeqFromTime(t time.Time) uint64
StorageBytesUpdate(func(int64))

View File

@@ -1148,7 +1148,7 @@ func TestJetStreamEphemeralObservables(t *testing.T) {
t.Fatalf("Expected the observable to be considered active")
}
if numo := mset.NumObservables(); numo != 1 {
t.Fatalf("Expected number of observables to be 1, go %d", numo)
t.Fatalf("Expected number of observables to be 1, got %d", numo)
}
// Make sure works now.
@@ -1171,13 +1171,13 @@ func TestJetStreamEphemeralObservables(t *testing.T) {
// The reason for this still being 1 is that we give some time in case of a reconnect scenario.
// We detect right away on the publish but we wait for interest to be re-established.
if numo := mset.NumObservables(); numo != 1 {
t.Fatalf("Expected number of observables to be 1, go %d", numo)
t.Fatalf("Expected number of observables to be 1, got %d", numo)
}
// We should delete this one after the check interval.
checkFor(t, time.Second, 100*time.Millisecond, func() error {
if numo := mset.NumObservables(); numo != 0 {
return fmt.Errorf("Expected number of observables to be 0, go %d", numo)
return fmt.Errorf("Expected number of observables to be 0, got %d", numo)
}
return nil
})
@@ -1198,7 +1198,7 @@ func TestJetStreamEphemeralObservables(t *testing.T) {
t.Fatalf("Expected the observable to be considered active")
}
if numo := mset.NumObservables(); numo != 1 {
t.Fatalf("Expected number of observables to be 1, go %d", numo)
t.Fatalf("Expected number of observables to be 1, got %d", numo)
}
sub.Unsubscribe()
nc.Flush()
@@ -1210,7 +1210,7 @@ func TestJetStreamEphemeralObservables(t *testing.T) {
t.Fatalf("Expected the ephemeral observable to be considered inactive")
}
if numo := mset.NumObservables(); numo != 0 {
t.Fatalf("Expected number of observables to be 0, go %d", numo)
t.Fatalf("Expected number of observables to be 0, got %d", numo)
}
}
@@ -1245,7 +1245,7 @@ func TestJetStreamObservableReconnect(t *testing.T) {
t.Fatalf("Expected the observable to be considered active")
}
if numo := mset.NumObservables(); numo != 1 {
t.Fatalf("Expected number of observables to be 1, go %d", numo)
t.Fatalf("Expected number of observables to be 1, got %d", numo)
}
// We will simulate reconnect by unsubscribing on one connection and forming