mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
@@ -1550,7 +1550,8 @@ func (c *client) flushSignal() {
|
||||
func (c *client) traceMsg(msg []byte) {
|
||||
maxTrace := c.srv.getOpts().MaxTracedMsgLen
|
||||
if maxTrace > 0 && (len(msg)-LEN_CR_LF) > maxTrace {
|
||||
c.Tracef("<<- MSG_PAYLOAD: [\"%s...\"]", msg[:maxTrace])
|
||||
tm := fmt.Sprintf("%q", msg[:maxTrace])
|
||||
c.Tracef("<<- MSG_PAYLOAD: [\"%s...\"]", tm[1:maxTrace+1])
|
||||
} else {
|
||||
c.Tracef("<<- MSG_PAYLOAD: [%q]", msg[:len(msg)-LEN_CR_LF])
|
||||
}
|
||||
|
||||
@@ -41,7 +41,7 @@ var (
|
||||
|
||||
const (
|
||||
// VERSION is the current version for the server.
|
||||
VERSION = "2.6.2"
|
||||
VERSION = "2.6.3-beta"
|
||||
|
||||
// PROTO is the currently supported protocol.
|
||||
// 0 was the original
|
||||
|
||||
@@ -3718,6 +3718,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
|
||||
if sequence > 0 && sequence <= l {
|
||||
l = sequence - 1
|
||||
}
|
||||
|
||||
for seq := f; seq <= l; seq++ {
|
||||
if sm, _ := mb.cacheLookup(seq); sm != nil && eq(sm.subj, subject) {
|
||||
rl := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)
|
||||
@@ -3760,6 +3761,11 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
|
||||
mb.mu.Unlock()
|
||||
// Update our index info on disk.
|
||||
mb.writeIndexInfo()
|
||||
|
||||
// Check if we should break out of top level too.
|
||||
if maxp > 0 && purged >= maxp {
|
||||
break
|
||||
}
|
||||
}
|
||||
if firstSeqNeedsUpdate {
|
||||
fs.selectNextFirst()
|
||||
|
||||
@@ -3355,3 +3355,39 @@ func TestFileStoreSparseCompactionWithInteriorDeletes(t *testing.T) {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// When messages span multiple blocks and we want to purge but keep some amount, say 1, we would remove all.
|
||||
// This is because we would not break out of iterator across more message blocks.
|
||||
// Issue #2622
|
||||
func TestFileStorePurgeExKeepOneBug(t *testing.T) {
|
||||
storeDir := createDir(t, JetStreamStoreDir)
|
||||
defer removeDir(t, storeDir)
|
||||
|
||||
fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir, BlockSize: 128}, StreamConfig{Name: "zzz", Storage: FileStorage})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
defer fs.Stop()
|
||||
|
||||
fill := bytes.Repeat([]byte("X"), 128)
|
||||
|
||||
fs.StoreMsg("A", nil, []byte("META"))
|
||||
fs.StoreMsg("B", nil, fill)
|
||||
fs.StoreMsg("A", nil, []byte("META"))
|
||||
fs.StoreMsg("B", nil, fill)
|
||||
|
||||
if fss := fs.FilteredState(1, "A"); fss.Msgs != 2 {
|
||||
t.Fatalf("Expected to find 2 `A` msgs, got %d", fss.Msgs)
|
||||
}
|
||||
|
||||
n, err := fs.PurgeEx("A", 0, 1)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if n != 1 {
|
||||
t.Fatalf("Expected PurgeEx to remove 1 `A` msgs, got %d", n)
|
||||
}
|
||||
if fss := fs.FilteredState(1, "A"); fss.Msgs != 1 {
|
||||
t.Fatalf("Expected to find 1 `A` msgs, got %d", fss.Msgs)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user