mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-15 02:30:40 -07:00
A true no wait pull request was not considering redeliveries.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -2374,7 +2374,7 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _ *Account, _,
|
||||
|
||||
// If the request is for noWait and we have pending requests already, check if we have room.
|
||||
if noWait {
|
||||
msgsPending := o.adjustedPending()
|
||||
msgsPending := o.adjustedPending() + uint64(len(o.rdq))
|
||||
// If no pending at all, decide what to do with request.
|
||||
// If no expires was set then fail.
|
||||
if msgsPending == 0 && expires.IsZero() {
|
||||
|
||||
@@ -14545,6 +14545,75 @@ func TestJetStreamConsumerUpdateSurvival(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamNakRedeliveryWithNoWait(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
if config := s.JetStreamConfig(); config != nil {
|
||||
defer removeDir(t, config.StoreDir)
|
||||
}
|
||||
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"foo"},
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
_, err = js.Publish("foo", []byte("NAK"))
|
||||
require_NoError(t, err)
|
||||
|
||||
ccReq := &CreateConsumerRequest{
|
||||
Stream: "TEST",
|
||||
Config: ConsumerConfig{
|
||||
Durable: "dlc",
|
||||
AckPolicy: AckExplicit,
|
||||
MaxDeliver: 3,
|
||||
AckWait: time.Minute,
|
||||
BackOff: []time.Duration{5 * time.Second, 10 * time.Second},
|
||||
},
|
||||
}
|
||||
// Do by hand for now until Go client catches up.
|
||||
req, err := json.Marshal(ccReq)
|
||||
require_NoError(t, err)
|
||||
resp, err := nc.Request(fmt.Sprintf(JSApiDurableCreateT, "TEST", "dlc"), req, time.Second)
|
||||
require_NoError(t, err)
|
||||
var ccResp JSApiConsumerCreateResponse
|
||||
err = json.Unmarshal(resp.Data, &ccResp)
|
||||
require_NoError(t, err)
|
||||
if ccResp.Error != nil {
|
||||
t.Fatalf("Unexpected error: %+v", ccResp.Error)
|
||||
}
|
||||
|
||||
rsubj := fmt.Sprintf(JSApiRequestNextT, "TEST", "dlc")
|
||||
m, err := nc.Request(rsubj, nil, time.Second)
|
||||
require_NoError(t, err)
|
||||
|
||||
// NAK this message.
|
||||
delay, err := json.Marshal(&ConsumerNakOptions{Delay: 500 * time.Millisecond})
|
||||
require_NoError(t, err)
|
||||
dnak := []byte(fmt.Sprintf("%s %s", AckNak, delay))
|
||||
m.Respond(dnak)
|
||||
|
||||
// This message should come back to us after 500ms. If we do a one-shot request, with NoWait and Expires
|
||||
// this will do the right thing and we get the message.
|
||||
// What we want to test here is a true NoWait request with Expires==0 and eventually seeing the message be redelivered.
|
||||
expires := time.Now().Add(time.Second)
|
||||
for time.Now().Before(expires) {
|
||||
m, err = nc.Request(rsubj, []byte(`{"batch":1, "no_wait": true}`), time.Second)
|
||||
require_NoError(t, err)
|
||||
if len(m.Data) > 0 {
|
||||
// We got our message, so we are good.
|
||||
return
|
||||
}
|
||||
// So we do not spin.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
t.Fatalf("Did not get the message in time")
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
// Simple JetStream Benchmarks
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Reference in New Issue
Block a user