diff --git a/server/consumer.go b/server/consumer.go index 2f90a7c8..dc6513fd 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -3794,7 +3794,9 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { sz int wrn, wrb int ) + o.mu.Lock() + // consumer is closed when mset is set to nil. if o.mset == nil { o.mu.Unlock() @@ -3817,6 +3819,13 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { // Grab our next msg. pmsg, dc, err = o.getNextMsg() + // We can release the lock now under getNextMsg so need to check this condition again here. + // consumer is closed when mset is set to nil. + if o.mset == nil { + o.mu.Unlock() + return + } + // On error either wait or return. if err != nil || pmsg == nil { // On EOF we can optionally fast sync num pending state. diff --git a/server/filestore_test.go b/server/filestore_test.go index 2be7eb0b..72c10246 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -5104,7 +5104,7 @@ func TestFileStoreNumPendingLargeNumBlks(t *testing.T) { start = time.Now() total, _ = fs.NumPending(6000, "zzz", false) - require_LessThan(t, time.Since(start), 15*time.Millisecond) + require_LessThan(t, time.Since(start), 25*time.Millisecond) require_Equal(t, total, 4001) // Now delete a message in first half and second half.