Merge pull request #3928 from nats-io/js-pub-inline

Revert behavior for JetStream messages published directly from client.
This commit is contained in:
Derek Collison
2023-03-01 05:05:14 -08:00
committed by GitHub
2 changed files with 34 additions and 16 deletions

View File

@@ -5124,21 +5124,26 @@ func TestNoRaceJetStreamClusterInterestPullConsumerStreamLimitBug(t *testing.T)
time.Sleep(5 * time.Second)
close(qch)
wg.Wait()
time.Sleep(time.Second)
si, err := js.StreamInfo("TEST")
require_NoError(t, err)
checkFor(t, 20*time.Second, 500*time.Millisecond, func() error {
si, err := js.StreamInfo("TEST")
if err != nil {
return err
}
ci, err := js.ConsumerInfo("TEST", "dur")
if err != nil {
return err
}
ci, err := js.ConsumerInfo("TEST", "dur")
require_NoError(t, err)
ld := ci.Delivered.Stream
if si.State.FirstSeq > ld {
ld = si.State.FirstSeq - 1
}
if si.State.LastSeq-ld != ci.NumPending {
t.Fatalf("Expected NumPending to be %d got %d", si.State.LastSeq-ld, ci.NumPending)
}
ld := ci.Delivered.Stream
if si.State.FirstSeq > ld {
ld = si.State.FirstSeq - 1
}
if si.State.LastSeq-ld != ci.NumPending {
return fmt.Errorf("Expected NumPending to be %d got %d", si.State.LastSeq-ld, ci.NumPending)
}
return nil
})
}
// Test that all peers have the direct access subs that participate in a queue group,

View File

@@ -1,4 +1,4 @@
// Copyright 2019-2022 The NATS Authors
// Copyright 2019-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -3646,9 +3646,22 @@ func (mset *stream) getDirectRequest(req *JSApiMsgGetRequest, reply string) {
// processInboundJetStreamMsg handles processing messages bound for a stream.
func (mset *stream) processInboundJetStreamMsg(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
// Always move this to another Go routine.
hdr, msg := c.msgParts(rmsg)
mset.queueInboundMsg(subject, reply, hdr, msg)
// If we are not receiving directly from a client we should move this to another Go routine.
// Make sure to grab no stream or js locks.
if c.kind != CLIENT {
mset.queueInboundMsg(subject, reply, hdr, msg)
return
}
// This is directly from a client so process inline.
// If we are clustered we need to propose this message to the underlying raft group.
if mset.IsClustered() {
mset.processClusteredInboundMsg(subject, reply, hdr, msg)
} else {
mset.processJetStreamMsg(subject, reply, hdr, msg, 0, 0)
}
}
var (