Add pending messages/bytes info to request errors and statuses

Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
This commit is contained in:
Tomasz Pietrek
2022-10-21 13:32:55 +02:00
parent d8ac76450c
commit ef764598ee
2 changed files with 89 additions and 11 deletions

View File

@@ -32,6 +32,12 @@ import (
"golang.org/x/time/rate"
)
// Headers sent with Request Timeout
const (
JSPullRequestPendingMsgs = "Nats-Pending-Messages"
JSPullRequestPendingBytes = "Nats-Pending-Bytes"
)
type ConsumerInfo struct {
Stream string `json:"stream_name"`
Name string `json:"name"`
@@ -2626,7 +2632,7 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest {
} else {
// Since we can't send that message to the requestor, we need to
// notify that we are closing the request.
hdr := []byte("NATS/1.0 409 Message Size Exceeds MaxBytes\r\n\r\n")
hdr := []byte(fmt.Sprintf("NATS/1.0 409 Message Size Exceeds MaxBytes\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b))
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
// Remove the current one, no longer valid due to max bytes limit.
o.waiting.removeCurrent()
@@ -2649,7 +2655,7 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest {
}
}
if wr.interest != wr.reply {
hdr := []byte("NATS/1.0 408 Interest Expired\r\n\r\n")
hdr := []byte(fmt.Sprintf("NATS/1.0 408 Interest Expired\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b))
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
}
// Remove the current one, no longer valid.
@@ -2955,7 +2961,7 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) {
wr := wq.reqs[rp]
// Check expiration.
if (eos && wr.noWait && wr.d > 0) || (!wr.expires.IsZero() && now.After(wr.expires)) {
hdr := []byte("NATS/1.0 408 Request Timeout\r\n\r\n")
hdr := []byte(fmt.Sprintf("NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b))
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
remove(wr, rp)
i++

View File

@@ -14924,7 +14924,7 @@ func TestJetStreamPullConsumersOneShotBehavior(t *testing.T) {
}
noMsgs := &nats.Header{"Status": []string{"404"}, "Description": []string{"No Messages"}}
reqTimeout := &nats.Header{"Status": []string{"408"}, "Description": []string{"Request Timeout"}}
reqTimeout := &nats.Header{"Status": []string{"408"}, "Description": []string{"Request Timeout"}, "Nats-Pending-Bytes": []string{"0"}, "Nats-Pending-Messages": []string{"1"}}
// We are empty here, meaning no messages available.
// Do not wait, should get noMsgs.
@@ -15628,7 +15628,6 @@ func TestJetStreamPullConsumerHeartBeats(t *testing.T) {
// If HB larger than 50% of expires..
expectErr(doReq(1, 75*time.Millisecond, 100*time.Millisecond, 1))
reqTimeout := nats.Header{"Status": []string{"408"}, "Description": []string{"Request Timeout"}}
expectHBs := func(start time.Time, msgs []*tsMsg, expected int, hbi time.Duration) {
t.Helper()
if len(msgs) != expected {
@@ -15650,8 +15649,8 @@ func TestJetStreamPullConsumerHeartBeats(t *testing.T) {
}
// Last msg should be timeout.
lm := msgs[len(msgs)-1].msg
if !reflect.DeepEqual(lm.Header, reqTimeout) {
t.Fatalf("Expected %+v hdr, got %+v", reqTimeout, lm.Header)
if key := lm.Header.Get("Status"); key != "408" {
t.Fatalf("Expected 408 Request Timeout, got %s", key)
}
}
@@ -15676,8 +15675,8 @@ func TestJetStreamPullConsumerHeartBeats(t *testing.T) {
}
// Last should be timeout.
lm := msgs[len(msgs)-1].msg
if !reflect.DeepEqual(lm.Header, reqTimeout) {
t.Fatalf("Expected %+v hdr, got %+v", reqTimeout, lm.Header)
if key := lm.Header.Get("Status"); key != "408" {
t.Fatalf("Expected 408 Request Timeout, got %s", key)
}
}
@@ -17400,8 +17399,10 @@ func TestJetStreamPullMaxBytes(t *testing.T) {
if len(m.Data) != 0 {
t.Fatalf("Did not expect data, got %d bytes", len(m.Data))
}
if !reflect.DeepEqual(&m.Header, expected) {
t.Fatalf("Expected %+v hdr, got %+v", expected, m.Header)
expectedStatus, givenStatus := expected.Get("Status"), m.Header.Get("Status")
expectedDesc, givenDesc := expected.Get("Description"), m.Header.Get("Description")
if expectedStatus != givenStatus || expectedDesc != givenDesc {
t.Fatalf("expected %s %s, got %s %s", expectedStatus, expectedDesc, givenStatus, givenDesc)
}
}
@@ -19644,3 +19645,74 @@ func TestJetStreamPullConsumerLastPerSubjectRedeliveries(t *testing.T) {
m.Ack()
}
}
func TestJetStreamPullConsumersTimeoutHeaders(t *testing.T) {
s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()
// Client for API requests.
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.>"},
})
require_NoError(t, err)
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "dlc",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)
nc.Publish("foo.foo", []byte("foo"))
nc.Publish("foo.bar", []byte("bar"))
nc.Publish("foo.else", []byte("baz"))
nc.Flush()
// We will do low level requests by hand for this test as to not depend on any client impl.
rsubj := fmt.Sprintf(JSApiRequestNextT, "TEST", "dlc")
maxBytes := 1024
batch := 50
req := &JSApiConsumerGetNextRequest{Batch: batch, Expires: 100 * time.Millisecond, NoWait: false, MaxBytes: maxBytes}
jreq, err := json.Marshal(req)
require_NoError(t, err)
// Create listener.
reply, msgs := nats.NewInbox(), make(chan *nats.Msg, batch)
sub, err := nc.ChanSubscribe(reply, msgs)
require_NoError(t, err)
defer sub.Unsubscribe()
// Send request.
err = nc.PublishRequest(rsubj, reply, jreq)
require_NoError(t, err)
bytesReceived := 0
messagesReceived := 0
for {
select {
case m := <-msgs:
if len(m.Data) == 0 && m.Header != nil {
if value := m.Header.Get(JSPullRequestPendingMsgs); value != fmt.Sprint(batch-messagesReceived) {
t.Fatalf("Expected %d messages, got %s", batch-messagesReceived, value)
}
if value := m.Header.Get(JSPullRequestPendingBytes); value != fmt.Sprint(maxBytes-bytesReceived) {
t.Fatalf("Expected %d bytes, got %s", maxBytes-bytesReceived, value)
}
return
} else {
messagesReceived += 1
bytesReceived += (len(m.Data) + len(m.Header) + len(m.Reply) + len(m.Subject))
}
case <-time.After(100 + 250*time.Millisecond):
t.Fatalf("Did not receive all the msgs in time")
}
}
}