Merge pull request #2482 from nats-io/js_expire_pull_reqs

[FIXED] Pull requests: don't send 408 when request expires
This commit is contained in:
Ivan Kozlovic
2021-09-02 09:02:28 -06:00
committed by GitHub
3 changed files with 13 additions and 11 deletions

View File

@@ -41,7 +41,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.4.0"
VERSION = "2.4.1-beta.1"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -2089,7 +2089,7 @@ func (o *consumer) expireWaiting() int {
now := time.Now()
for wr := o.waiting.peek(); wr != nil; wr = o.waiting.peek() {
if !wr.expires.IsZero() && now.After(wr.expires) {
o.forceExpireFirstWaiting()
o.waiting.pop()
expired++
continue
}
@@ -2103,7 +2103,7 @@ func (o *consumer) expireWaiting() int {
break
}
// No more interest so go ahead and remove this one from our list.
o.forceExpireFirstWaiting()
o.waiting.pop()
expired++
}
return expired

View File

@@ -1843,23 +1843,25 @@ func TestJetStreamWorkQueueRequest(t *testing.T) {
// Now do expiration
req.Batch = 1
req.NoWait = false
req.Expires = 10 * time.Millisecond
req.Expires = 100 * time.Millisecond
jreq, _ = json.Marshal(req)
nc.PublishRequest(getSubj, reply, jreq)
// Let it expire
time.Sleep(20 * time.Millisecond)
time.Sleep(200 * time.Millisecond)
// Send a few more messages. These should not be delivered to the sub.
sendStreamMsg(t, nc, "foo", "Hello World!")
sendStreamMsg(t, nc, "bar", "Hello World!")
// We will have an alert here.
time.Sleep(100 * time.Millisecond)
checkSubPending(0)
// Send a new request, we should not get the 408 because our previous request
// should have expired.
nc.PublishRequest(getSubj, reply, jreq)
checkSubPending(1)
m, _ := sub.NextMsg(0)
// Make sure this is an alert that tells us our request is now stale.
if m.Header.Get("Status") != "408" {
t.Fatalf("Expected a 408 status code, got %q", m.Header.Get("Status"))
}
sub.NextMsg(time.Second)
checkSubPending(0)
})
}
}