Improve consumer error handling

Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
This commit is contained in:
Tomasz Pietrek
2023-05-26 23:22:54 +02:00
parent 9a792482e9
commit a463dfe5c4

View File

@@ -3267,7 +3267,6 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
return pmsg, 1, err
}
var lastErr error
// if we have filters, iterate over filters and optimize by buffering found messages.
for _, filter := range o.subjf {
if filter.nextSeq < o.sseq {
@@ -3323,15 +3322,16 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
// Grab next message applicable to us.
// Sort sequences first, to grab the first message.
filter := o.subjf[0]
err := filter.err
// This means we got a message in this subject fetched.
if filter.pmsg != nil {
filter.currentSeq = filter.nextSeq
o.sseq = filter.currentSeq
returned := filter.pmsg
filter.pmsg = nil
return returned, 1, filter.err
return returned, 1, err
}
if filter.err == ErrStoreEOF {
if err == ErrStoreEOF {
o.updateSkipped(filter.nextSeq)
}
@@ -3339,7 +3339,7 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
if filter.nextSeq > o.sseq {
o.sseq = filter.nextSeq
}
return nil, 0, lastErr
return nil, 0, err
}
// Will check for expiration and lack of interest on waiting requests.