[FIXE] JetStream: Pull requests closed due to max_bytes were silent

If the client pull requests has a max_bytes value and the server
cannot deliver a single message (because size is too big), it
is sending a 409 to signal that to the client library. However,
if it sends at least a message then it would close the request
without notifying the client with a 409, which would cause the
client library to have to wait for its expiration/timeout.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2022-09-15 16:55:41 -06:00
parent dc2e4b714a
commit ff0bda415b
2 changed files with 14 additions and 7 deletions

View File

@@ -2619,11 +2619,10 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest {
wr.n = 1
}
} else {
// If we have not delivered anything to the requestor let them know.
if wr.d == 0 {
hdr := []byte("NATS/1.0 409 Message Size Exceeds MaxBytes\r\n\r\n")
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
}
// Since we can't send that message to the requestor, we need to
// notify that we are closing the request.
hdr := []byte("NATS/1.0 409 Message Size Exceeds MaxBytes\r\n\r\n")
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
// Remove the current one, no longer valid due to max bytes limit.
o.waiting.removeCurrent()
if o.node != nil {

View File

@@ -17443,23 +17443,31 @@ func TestJetStreamPullMaxBytes(t *testing.T) {
req = &JSApiConsumerGetNextRequest{Batch: 1_000, MaxBytes: msz * 4, NoWait: true}
jreq, _ = json.Marshal(req)
nc.PublishRequest(subj, reply, jreq)
checkSubsPending(t, sub, 4)
// Receive 4 messages + the 409
checkSubsPending(t, sub, 5)
for i := 0; i < 4; i++ {
m, err = sub.NextMsg(time.Second)
require_NoError(t, err)
require_True(t, len(m.Data) == dsz)
require_True(t, len(m.Header) == 0)
}
m, err = sub.NextMsg(time.Second)
require_NoError(t, err)
checkHeader(m, badReq)
checkSubsPending(t, sub, 0)
req = &JSApiConsumerGetNextRequest{Batch: 1_000, MaxBytes: msz, NoWait: true}
jreq, _ = json.Marshal(req)
nc.PublishRequest(subj, reply, jreq)
checkSubsPending(t, sub, 1)
// Receive 1 message + 409
checkSubsPending(t, sub, 2)
m, err = sub.NextMsg(time.Second)
require_NoError(t, err)
require_True(t, len(m.Data) == dsz)
require_True(t, len(m.Header) == 0)
m, err = sub.NextMsg(time.Second)
require_NoError(t, err)
checkHeader(m, badReq)
checkSubsPending(t, sub, 0)
}