mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
Merge pull request #3241 from nats-io/fifo_pull
Make pull consumers FIFO per message, not per request.
This commit is contained in:
@@ -2467,12 +2467,19 @@ func (wq *waitQueue) peek() *waitingRequest {
|
||||
}
|
||||
|
||||
// pop will return the next request and move the read cursor.
|
||||
// This will now place a request that still has pending items at the ends of the list.
|
||||
func (wq *waitQueue) pop() *waitingRequest {
|
||||
wr := wq.peek()
|
||||
if wr != nil {
|
||||
wr.d++
|
||||
wr.n--
|
||||
if wr.n <= 0 {
|
||||
|
||||
// Always remove current now on a pop, and move to end if still valid.
|
||||
// If we were the only one don't need to remove since this can be a no-op.
|
||||
if wr.n > 0 && wq.n > 1 {
|
||||
wq.removeCurrent()
|
||||
wq.add(wr)
|
||||
} else if wr.n <= 0 {
|
||||
wq.removeCurrent()
|
||||
}
|
||||
}
|
||||
@@ -3045,6 +3052,10 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
|
||||
|
||||
// On error either wait or return.
|
||||
if err != nil || pmsg == nil {
|
||||
// If we are stalled here in pull mode, invalidate all requests that have had deliveries.
|
||||
if err == errMaxAckPending && o.isPullMode() {
|
||||
o.processWaiting(true)
|
||||
}
|
||||
if err == ErrStoreMsgNotFound || err == ErrStoreEOF || err == errMaxAckPending || err == errPartialCache {
|
||||
goto waitForMsgs
|
||||
} else {
|
||||
|
||||
@@ -18033,6 +18033,100 @@ func TestJetStreamConsumerUpdateFilterSubject(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Originally pull consumers were FIFO with respect to the request, not delivery of messages.
|
||||
// We have changed to have the behavior be FIFO but on an individual message basis.
|
||||
// So after a message is delivered, the request, if still outstanding, effectively
|
||||
// goes to the end of the queue of requests pending.
|
||||
func TestJetStreamConsumerPullConsumerFIFO(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
if config := s.JetStreamConfig(); config != nil {
|
||||
defer removeDir(t, config.StoreDir)
|
||||
}
|
||||
defer s.Shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{Name: "T"})
|
||||
require_NoError(t, err)
|
||||
|
||||
// Create pull consumer.
|
||||
_, err = js.AddConsumer("T", &nats.ConsumerConfig{Durable: "d", AckPolicy: nats.AckExplicitPolicy})
|
||||
require_NoError(t, err)
|
||||
|
||||
// Simulate 10 pull requests each asking for 10 messages.
|
||||
var subs []*nats.Subscription
|
||||
for i := 0; i < 10; i++ {
|
||||
inbox := nats.NewInbox()
|
||||
sub := natsSubSync(t, nc, inbox)
|
||||
subs = append(subs, sub)
|
||||
req := &JSApiConsumerGetNextRequest{Batch: 10, Expires: 60 * time.Second}
|
||||
jreq, err := json.Marshal(req)
|
||||
require_NoError(t, err)
|
||||
err = nc.PublishRequest(fmt.Sprintf(JSApiRequestNextT, "T", "d"), inbox, jreq)
|
||||
require_NoError(t, err)
|
||||
}
|
||||
|
||||
// Now send 100 messages.
|
||||
for i := 0; i < 100; i++ {
|
||||
js.PublishAsync("T", []byte("FIFO FTW!"))
|
||||
}
|
||||
select {
|
||||
case <-js.PublishAsyncComplete():
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatalf("Did not receive completion signal")
|
||||
}
|
||||
|
||||
// Wait for messages
|
||||
for index, sub := range subs {
|
||||
checkSubsPending(t, sub, 10)
|
||||
for i := 0; i < 10; i++ {
|
||||
m, err := sub.NextMsg(time.Second)
|
||||
require_NoError(t, err)
|
||||
meta, err := m.Metadata()
|
||||
require_NoError(t, err)
|
||||
// We expect these to be FIFO per message. E.g. sub #1 = [1, 11, 21, 31, ..]
|
||||
if sseq := meta.Sequence.Stream; sseq != uint64(index+1+(10*i)) {
|
||||
t.Fatalf("Expected message #%d for sub #%d to be %d, but got %d", i+1, index+1, index+1+(10*i), sseq)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure that when we reach an ack limit that we follow one shot semantics.
|
||||
func TestJetStreamConsumerPullConsumerOneShotOnMaxAckLimit(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
if config := s.JetStreamConfig(); config != nil {
|
||||
defer removeDir(t, config.StoreDir)
|
||||
}
|
||||
defer s.Shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{Name: "T"})
|
||||
require_NoError(t, err)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
js.Publish("T", []byte("OK"))
|
||||
}
|
||||
|
||||
sub, err := js.PullSubscribe("T", "d", nats.MaxAckPending(5))
|
||||
require_NoError(t, err)
|
||||
|
||||
start := time.Now()
|
||||
msgs, err := sub.Fetch(10, nats.MaxWait(2*time.Second))
|
||||
require_NoError(t, err)
|
||||
|
||||
if elapsed := time.Since(start); elapsed >= 2*time.Second {
|
||||
t.Fatalf("Took too long, not one shot behavior: %v", elapsed)
|
||||
}
|
||||
|
||||
if len(msgs) != 5 {
|
||||
t.Fatalf("Expected 5 msgs, got %d", len(msgs))
|
||||
}
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
// Simple JetStream Benchmarks
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Reference in New Issue
Block a user