Introduced default max ack pending for ack explicit.

Fixed a bug that would introduce performance degradation for durable consumers R>1.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-03-30 11:47:24 -07:00
parent 14a826fb60
commit bb7a8a5f79
5 changed files with 36 additions and 5 deletions

View File

@@ -41,7 +41,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.2.1-RC11"
VERSION = "2.2.1-RC12"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -249,6 +249,8 @@ const (
// JsFlowControlMaxPending specifies default pending bytes during flow control that can be
// outstanding.
JsFlowControlMaxPending = 1 * 1024 * 1024
// JsDefaultMaxAckPending is set for consumers with explicit ack that do not set the max ack pending.
JsDefaultMaxAckPending = 20_000
)
func (mset *stream) addConsumer(config *ConsumerConfig) (*consumer, error) {
@@ -338,6 +340,10 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
if config.MaxDeliver == 0 {
config.MaxDeliver = -1
}
// Set proper default for max ack pending if we are ack explicit and none has been set.
if config.AckPolicy == AckExplicit && config.MaxAckPending == 0 {
config.MaxAckPending = JsDefaultMaxAckPending
}
// Make sure any partition subject is also a literal.
if config.FilterSubject != _EMPTY_ {
@@ -1789,9 +1795,7 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _, reply string
// If we are in replay mode, defer to processReplay for delivery.
if o.replay {
o.waiting.add(&wr)
o.mu.Unlock()
o.signalNewMessages()
o.mu.Lock()
return
}

View File

@@ -3419,7 +3419,7 @@ func (o *consumerFileStore) flushLoop() {
for {
select {
case <-fch:
time.Sleep(5 * time.Millisecond)
time.Sleep(10 * time.Millisecond)
select {
case <-qch:
return

View File

@@ -2666,7 +2666,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
if err := js.applyConsumerEntries(o, ce, isLeader); err == nil {
ne, nb := n.Applied(ce.Index)
// If we have at least min entries to compact, go ahead and snapshot/compact.
if ne >= compactNumMin || nb > compactNumMin {
if nb > 0 && ne >= compactNumMin || nb > compactSizeMin {
doSnapshot()
}
} else {

View File

@@ -1403,6 +1403,33 @@ func TestJetStreamClusterDoubleAdd(t *testing.T) {
}
}
func TestJetStreamClusterDefaultMaxAckPending(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R32", 2)
defer c.shutdown()
s := c.randomServer()
// Client based API
nc, js := jsClientConnect(t, s)
defer nc.Close()
if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Replicas: 2}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Do Consumers too.
cfg := &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy}
ci, err := js.AddConsumer("TEST", cfg)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Check that we have a default set now for the max ack pending.
if ci.Config.MaxAckPending != JsDefaultMaxAckPending {
t.Fatalf("Expected a default for max ack pending of %d, got %d", JsDefaultMaxAckPending, ci.Config.MaxAckPending)
}
}
func TestJetStreamClusterStreamNormalCatchup(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()