From 44c57a5702ca155866388851e576ea70a122616d Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 1 Sep 2021 16:57:13 -0600 Subject: [PATCH 1/2] [FIXED] Pull requests: don't send 408 when request expires When expiring requests, the server would send 408 if interest was still present, which can happen for pull subscribe implementations that maintain interest for the duration of the pull subscription. Let's keep the 408 for when a request is "force expired", that is, a request was removed from the queue because it queue was full but interest is still found. Signed-off-by: Ivan Kozlovic --- server/consumer.go | 4 ++-- server/jetstream_test.go | 14 ++++++++------ 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index ec69f404..481d8774 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2089,7 +2089,7 @@ func (o *consumer) expireWaiting() int { now := time.Now() for wr := o.waiting.peek(); wr != nil; wr = o.waiting.peek() { if !wr.expires.IsZero() && now.After(wr.expires) { - o.forceExpireFirstWaiting() + o.waiting.pop() expired++ continue } @@ -2103,7 +2103,7 @@ func (o *consumer) expireWaiting() int { break } // No more interest so go ahead and remove this one from our list. - o.forceExpireFirstWaiting() + o.waiting.pop() expired++ } return expired diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 471ba207..9a808eec 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -1853,13 +1853,15 @@ func TestJetStreamWorkQueueRequest(t *testing.T) { // Send a few more messages. These should not be delivered to the sub. sendStreamMsg(t, nc, "foo", "Hello World!") sendStreamMsg(t, nc, "bar", "Hello World!") - // We will have an alert here. + time.Sleep(10 * time.Millisecond) + checkSubPending(0) + + // Send a new request, we should not get the 408 because our previous request + // should have expired. + nc.PublishRequest(getSubj, reply, jreq) checkSubPending(1) - m, _ := sub.NextMsg(0) - // Make sure this is an alert that tells us our request is now stale. - if m.Header.Get("Status") != "408" { - t.Fatalf("Expected a 408 status code, got %q", m.Header.Get("Status")) - } + sub.NextMsg(time.Millisecond) + checkSubPending(0) }) } } From ba36aa452b47cb7db6cd02e557aaef7dfa2881dd Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 1 Sep 2021 18:03:14 -0600 Subject: [PATCH 2/2] Fix some timing and bump server version Signed-off-by: Ivan Kozlovic --- server/const.go | 2 +- server/jetstream_test.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/server/const.go b/server/const.go index e8ab6eb6..39d08a42 100644 --- a/server/const.go +++ b/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.4.0" + VERSION = "2.4.1-beta.1" // PROTO is the currently supported protocol. // 0 was the original diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 9a808eec..db42390b 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -1843,24 +1843,24 @@ func TestJetStreamWorkQueueRequest(t *testing.T) { // Now do expiration req.Batch = 1 req.NoWait = false - req.Expires = 10 * time.Millisecond + req.Expires = 100 * time.Millisecond jreq, _ = json.Marshal(req) nc.PublishRequest(getSubj, reply, jreq) // Let it expire - time.Sleep(20 * time.Millisecond) + time.Sleep(200 * time.Millisecond) // Send a few more messages. These should not be delivered to the sub. sendStreamMsg(t, nc, "foo", "Hello World!") sendStreamMsg(t, nc, "bar", "Hello World!") - time.Sleep(10 * time.Millisecond) + time.Sleep(100 * time.Millisecond) checkSubPending(0) // Send a new request, we should not get the 408 because our previous request // should have expired. nc.PublishRequest(getSubj, reply, jreq) checkSubPending(1) - sub.NextMsg(time.Millisecond) + sub.NextMsg(time.Second) checkSubPending(0) }) }