Merge pull request #3229 from nats-io/pull_inactive_threshold

[FIXED] Pull consumer may be incorrectly removed after InactiveThreshold
This commit is contained in:
Ivan Kozlovic
2022-06-29 16:40:42 -06:00
committed by GitHub
2 changed files with 37 additions and 0 deletions

View File

@@ -2669,12 +2669,14 @@ func (o *consumer) processNextMsgRequest(reply string, msg []byte) {
// If no pending at all, decide what to do with request.
// If no expires was set then fail.
if msgsPending == 0 && expires.IsZero() {
o.waiting.last = time.Now()
sendErr(404, "No Messages")
return
}
if msgsPending > 0 {
_, _, batchPending, _ := o.processWaiting(false)
if msgsPending < uint64(batchPending) {
o.waiting.last = time.Now()
sendErr(408, "Requests Pending")
return
}

View File

@@ -14539,6 +14539,41 @@ func TestJetStreamEphemeralPullConsumers(t *testing.T) {
})
}
func TestJetStreamEphemeralPullConsumersInactiveThresholdAndNoWait(t *testing.T) {
s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{Name: "ECIT", Storage: nats.MemoryStorage})
require_NoError(t, err)
ci, err := js.AddConsumer("ECIT", &nats.ConsumerConfig{
AckPolicy: nats.AckExplicitPolicy,
InactiveThreshold: 100 * time.Millisecond,
})
require_NoError(t, err)
// Send 10 no_wait requests every 25ms and consumer should still be present.
req := &JSApiConsumerGetNextRequest{Batch: 10, NoWait: true}
jreq, err := json.Marshal(req)
require_NoError(t, err)
rsubj := fmt.Sprintf(JSApiRequestNextT, "ECIT", ci.Name)
for i := 0; i < 10; i++ {
err = nc.PublishRequest(rsubj, "xx", jreq)
require_NoError(t, err)
nc.Flush()
time.Sleep(25 * time.Millisecond)
}
_, err = js.ConsumerInfo("ECIT", ci.Name)
require_NoError(t, err)
}
func TestJetStreamPullConsumerCrossAccountExpires(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: 127.0.0.1:-1