mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Under certain scenarios the pending for a consumer could appear to get stuck.
Under the covers we were calculating pending per msg block incorrectly when a single message existed beyond the requested sequence. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1115,7 +1115,7 @@ func (mb *msgBlock) filteredPendingLocked(subj string, wc bool, seq uint64) (tot
|
||||
for i, subj := range subs {
|
||||
// If the starting seq is less then or equal that means we want all and we do not need to load any messages.
|
||||
ss := mb.fss[subj]
|
||||
if ss == nil {
|
||||
if ss == nil || seq > ss.Last {
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
@@ -3480,6 +3480,33 @@ func TestFileStoreRemoveLastWriteIndex(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestFileStoreFilteredPendingBug(t *testing.T) {
|
||||
storeDir := createDir(t, JetStreamStoreDir)
|
||||
defer removeDir(t, storeDir)
|
||||
|
||||
fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "TEST", Storage: FileStorage})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
defer fs.Stop()
|
||||
|
||||
fs.StoreMsg("foo", nil, []byte("msg"))
|
||||
fs.StoreMsg("bar", nil, []byte("msg"))
|
||||
fs.StoreMsg("baz", nil, []byte("msg"))
|
||||
|
||||
fs.mu.Lock()
|
||||
mb := fs.lmb
|
||||
fs.mu.Unlock()
|
||||
|
||||
total, f, l := mb.filteredPending("foo", false, 3)
|
||||
if total != 0 {
|
||||
t.Fatalf("Expected total of 0 but got %d", total)
|
||||
}
|
||||
if f != 0 || l != 0 {
|
||||
t.Fatalf("Expected first and last to be 0 as well, but got %d %d", f, l)
|
||||
}
|
||||
}
|
||||
|
||||
// Test to optimize the selectMsgBlock with lots of blocks.
|
||||
func TestFileStoreFetchPerf(t *testing.T) {
|
||||
// Comment out to run.
|
||||
|
||||
Reference in New Issue
Block a user