diff --git a/server/consumer.go b/server/consumer.go index 890e25b6..46431797 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2467,12 +2467,19 @@ func (wq *waitQueue) peek() *waitingRequest { } // pop will return the next request and move the read cursor. +// This will now place a request that still has pending items at the ends of the list. func (wq *waitQueue) pop() *waitingRequest { wr := wq.peek() if wr != nil { wr.d++ wr.n-- - if wr.n <= 0 { + + // Always remove current now on a pop, and move to end if still valid. + // If we were the only one don't need to remove since this can be a no-op. + if wr.n > 0 && wq.n > 1 { + wq.removeCurrent() + wq.add(wr) + } else if wr.n <= 0 { wq.removeCurrent() } } diff --git a/server/jetstream_test.go b/server/jetstream_test.go index f0190fa0..a023e2e0 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -18033,6 +18033,66 @@ func TestJetStreamConsumerUpdateFilterSubject(t *testing.T) { } } +// Originally pull consumers were FIFO with respect to the request, not delivery of messages. +// We have changed to have the behavior be FIFO but on an individual message basis. +// So after a message is delivered, the request, if still outstanding, effectively +// goes to the end of the queue of requests pending. +func TestJetStreamConsumerPullConsumerFIFO(t *testing.T) { + s := RunBasicJetStreamServer() + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{Name: "T"}) + require_NoError(t, err) + + // Create pull consumer. + _, err = js.AddConsumer("T", &nats.ConsumerConfig{Durable: "d", AckPolicy: nats.AckExplicitPolicy}) + require_NoError(t, err) + + // Simulate 10 pull requests each asking for 10 messages. + var subs []*nats.Subscription + for i := 0; i < 10; i++ { + inbox := nats.NewInbox() + sub := natsSubSync(t, nc, inbox) + subs = append(subs, sub) + req := &JSApiConsumerGetNextRequest{Batch: 10, Expires: 60 * time.Second} + jreq, err := json.Marshal(req) + require_NoError(t, err) + err = nc.PublishRequest(fmt.Sprintf(JSApiRequestNextT, "T", "d"), inbox, jreq) + require_NoError(t, err) + } + + // Now send 100 messages. + for i := 0; i < 100; i++ { + js.PublishAsync("T", []byte("FIFO FTW!")) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + // Wait for messages + for index, sub := range subs { + checkSubsPending(t, sub, 10) + for i := 0; i < 10; i++ { + m, err := sub.NextMsg(time.Second) + require_NoError(t, err) + meta, err := m.Metadata() + require_NoError(t, err) + // We expect these to be FIFO per message. E.g. sub #1 = [1, 11, 21, 31, ..] + if sseq := meta.Sequence.Stream; sseq != uint64(index+1+(10*i)) { + t.Fatalf("Expected message #%d for sub #%d to be %d, but got %d", i+1, index+1, index+1+(10*i), sseq) + } + } + } +} + /////////////////////////////////////////////////////////////////////////// // Simple JetStream Benchmarks ///////////////////////////////////////////////////////////////////////////