From b38ced51b29007b07cd300cfa3e39b45d0892b4f Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 30 Jan 2022 11:52:35 -0800 Subject: [PATCH] A true no wait pull request was not considering redeliveries. Signed-off-by: Derek Collison --- server/consumer.go | 2 +- server/jetstream_test.go | 69 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/server/consumer.go b/server/consumer.go index 70bc1745..932c7a16 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2374,7 +2374,7 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _ *Account, _, // If the request is for noWait and we have pending requests already, check if we have room. if noWait { - msgsPending := o.adjustedPending() + msgsPending := o.adjustedPending() + uint64(len(o.rdq)) // If no pending at all, decide what to do with request. // If no expires was set then fail. if msgsPending == 0 && expires.IsZero() { diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 8b034d0d..e9e275ca 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -14545,6 +14545,75 @@ func TestJetStreamConsumerUpdateSurvival(t *testing.T) { } } +func TestJetStreamNakRedeliveryWithNoWait(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + require_NoError(t, err) + + _, err = js.Publish("foo", []byte("NAK")) + require_NoError(t, err) + + ccReq := &CreateConsumerRequest{ + Stream: "TEST", + Config: ConsumerConfig{ + Durable: "dlc", + AckPolicy: AckExplicit, + MaxDeliver: 3, + AckWait: time.Minute, + BackOff: []time.Duration{5 * time.Second, 10 * time.Second}, + }, + } + // Do by hand for now until Go client catches up. + req, err := json.Marshal(ccReq) + require_NoError(t, err) + resp, err := nc.Request(fmt.Sprintf(JSApiDurableCreateT, "TEST", "dlc"), req, time.Second) + require_NoError(t, err) + var ccResp JSApiConsumerCreateResponse + err = json.Unmarshal(resp.Data, &ccResp) + require_NoError(t, err) + if ccResp.Error != nil { + t.Fatalf("Unexpected error: %+v", ccResp.Error) + } + + rsubj := fmt.Sprintf(JSApiRequestNextT, "TEST", "dlc") + m, err := nc.Request(rsubj, nil, time.Second) + require_NoError(t, err) + + // NAK this message. + delay, err := json.Marshal(&ConsumerNakOptions{Delay: 500 * time.Millisecond}) + require_NoError(t, err) + dnak := []byte(fmt.Sprintf("%s %s", AckNak, delay)) + m.Respond(dnak) + + // This message should come back to us after 500ms. If we do a one-shot request, with NoWait and Expires + // this will do the right thing and we get the message. + // What we want to test here is a true NoWait request with Expires==0 and eventually seeing the message be redelivered. + expires := time.Now().Add(time.Second) + for time.Now().Before(expires) { + m, err = nc.Request(rsubj, []byte(`{"batch":1, "no_wait": true}`), time.Second) + require_NoError(t, err) + if len(m.Data) > 0 { + // We got our message, so we are good. + return + } + // So we do not spin. + time.Sleep(100 * time.Millisecond) + } + t.Fatalf("Did not get the message in time") +} + /////////////////////////////////////////////////////////////////////////// // Simple JetStream Benchmarks ///////////////////////////////////////////////////////////////////////////