Merge pull request #2538 from nats-io/batch-expires

Make large batch requests expire more efficiently.
This commit is contained in:
Derek Collison
2021-09-16 15:19:17 -07:00
committed by GitHub
3 changed files with 50 additions and 1 deletions

View File

@@ -41,7 +41,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.5.1-beta.2"
VERSION = "2.5.1-beta.3"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -2089,6 +2089,7 @@ func (o *consumer) expireWaiting() int {
now := time.Now()
for wr := o.waiting.peek(); wr != nil; wr = o.waiting.peek() {
if !wr.expires.IsZero() && now.After(wr.expires) {
wr.n = 0 // Force removal by setting requests left to 0.
o.waiting.pop()
expired++
continue
@@ -2103,6 +2104,7 @@ func (o *consumer) expireWaiting() int {
break
}
// No more interest so go ahead and remove this one from our list.
wr.n = 0 // Force removal by setting requests left to 0.
o.waiting.pop()
expired++
}

View File

@@ -13047,6 +13047,53 @@ func TestJetStreamPublishExpectNoMsg(t *testing.T) {
}
}
func TestJetStreamPullLargeBatchExpired(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
config := s.JetStreamConfig()
if config != nil {
defer removeDir(t, config.StoreDir)
}
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
if err != nil {
t.Fatalf("add stream failed: %s", err)
}
sub, err := js.PullSubscribe("foo", "dlc", nats.PullMaxWaiting(10), nats.MaxAckPending(10*50_000_000))
if err != nil {
t.Fatalf("Error creating pull subscriber: %v", err)
}
// Queue up 10 batch requests with timeout.
rsubj := fmt.Sprintf(JSApiRequestNextT, "TEST", "dlc")
req := &JSApiConsumerGetNextRequest{Batch: 50_000_000, Expires: 100 * time.Millisecond}
jreq, _ := json.Marshal(req)
for i := 0; i < 10; i++ {
nc.PublishRequest(rsubj, "bar", jreq)
}
nc.Flush()
// Let them all expire.
time.Sleep(150 * time.Millisecond)
// Now do another and measure how long to timeout and shutdown the server.
start := time.Now()
sub.Fetch(1, nats.MaxWait(100*time.Millisecond))
s.Shutdown()
if delta := time.Since(start); delta > 200*time.Millisecond {
t.Fatalf("Took too long to expire: %v", delta)
}
}
///////////////////////////////////////////////////////////////////////////
// Simple JetStream Benchmarks
///////////////////////////////////////////////////////////////////////////