Make pull consumers FIFO per message, not per request.

This effectively means that requests with batch > 1 will process a message and go to the end of the line.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-07-04 13:05:57 -07:00
parent 7cf814de8e
commit 1b580c67f3
2 changed files with 68 additions and 1 deletions

View File

@@ -2467,12 +2467,19 @@ func (wq *waitQueue) peek() *waitingRequest {
}
// pop will return the next request and move the read cursor.
// This will now place a request that still has pending items at the ends of the list.
func (wq *waitQueue) pop() *waitingRequest {
wr := wq.peek()
if wr != nil {
wr.d++
wr.n--
if wr.n <= 0 {
// Always remove current now on a pop, and move to end if still valid.
// If we were the only one don't need to remove since this can be a no-op.
if wr.n > 0 && wq.n > 1 {
wq.removeCurrent()
wq.add(wr)
} else if wr.n <= 0 {
wq.removeCurrent()
}
}

View File

@@ -18033,6 +18033,66 @@ func TestJetStreamConsumerUpdateFilterSubject(t *testing.T) {
}
}
// Originally pull consumers were FIFO with respect to the request, not delivery of messages.
// We have changed to have the behavior be FIFO but on an individual message basis.
// So after a message is delivered, the request, if still outstanding, effectively
// goes to the end of the queue of requests pending.
func TestJetStreamConsumerPullConsumerFIFO(t *testing.T) {
s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{Name: "T"})
require_NoError(t, err)
// Create pull consumer.
_, err = js.AddConsumer("T", &nats.ConsumerConfig{Durable: "d", AckPolicy: nats.AckExplicitPolicy})
require_NoError(t, err)
// Simulate 10 pull requests each asking for 10 messages.
var subs []*nats.Subscription
for i := 0; i < 10; i++ {
inbox := nats.NewInbox()
sub := natsSubSync(t, nc, inbox)
subs = append(subs, sub)
req := &JSApiConsumerGetNextRequest{Batch: 10, Expires: 60 * time.Second}
jreq, err := json.Marshal(req)
require_NoError(t, err)
err = nc.PublishRequest(fmt.Sprintf(JSApiRequestNextT, "T", "d"), inbox, jreq)
require_NoError(t, err)
}
// Now send 100 messages.
for i := 0; i < 100; i++ {
js.PublishAsync("T", []byte("FIFO FTW!"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
// Wait for messages
for index, sub := range subs {
checkSubsPending(t, sub, 10)
for i := 0; i < 10; i++ {
m, err := sub.NextMsg(time.Second)
require_NoError(t, err)
meta, err := m.Metadata()
require_NoError(t, err)
// We expect these to be FIFO per message. E.g. sub #1 = [1, 11, 21, 31, ..]
if sseq := meta.Sequence.Stream; sseq != uint64(index+1+(10*i)) {
t.Fatalf("Expected message #%d for sub #%d to be %d, but got %d", i+1, index+1, index+1+(10*i), sseq)
}
}
}
}
///////////////////////////////////////////////////////////////////////////
// Simple JetStream Benchmarks
///////////////////////////////////////////////////////////////////////////