diff --git a/server/const.go b/server/const.go index c9bb52b8..a438ff87 100644 --- a/server/const.go +++ b/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.2.1-RC11" + VERSION = "2.2.1-RC12" // PROTO is the currently supported protocol. // 0 was the original diff --git a/server/consumer.go b/server/consumer.go index c712b07b..c4f19477 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -249,6 +249,8 @@ const ( // JsFlowControlMaxPending specifies default pending bytes during flow control that can be // outstanding. JsFlowControlMaxPending = 1 * 1024 * 1024 + // JsDefaultMaxAckPending is set for consumers with explicit ack that do not set the max ack pending. + JsDefaultMaxAckPending = 20_000 ) func (mset *stream) addConsumer(config *ConsumerConfig) (*consumer, error) { @@ -338,6 +340,10 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri if config.MaxDeliver == 0 { config.MaxDeliver = -1 } + // Set proper default for max ack pending if we are ack explicit and none has been set. + if config.AckPolicy == AckExplicit && config.MaxAckPending == 0 { + config.MaxAckPending = JsDefaultMaxAckPending + } // Make sure any partition subject is also a literal. if config.FilterSubject != _EMPTY_ { @@ -1789,9 +1795,7 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _, reply string // If we are in replay mode, defer to processReplay for delivery. if o.replay { o.waiting.add(&wr) - o.mu.Unlock() o.signalNewMessages() - o.mu.Lock() return } diff --git a/server/filestore.go b/server/filestore.go index 8d7e6039..a2dc0425 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -3419,7 +3419,7 @@ func (o *consumerFileStore) flushLoop() { for { select { case <-fch: - time.Sleep(5 * time.Millisecond) + time.Sleep(10 * time.Millisecond) select { case <-qch: return diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 8176b6d4..67ccd9cf 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2666,7 +2666,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { if err := js.applyConsumerEntries(o, ce, isLeader); err == nil { ne, nb := n.Applied(ce.Index) // If we have at least min entries to compact, go ahead and snapshot/compact. - if ne >= compactNumMin || nb > compactNumMin { + if nb > 0 && ne >= compactNumMin || nb > compactSizeMin { doSnapshot() } } else { diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 92d4abb8..856f56df 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -1403,6 +1403,33 @@ func TestJetStreamClusterDoubleAdd(t *testing.T) { } } +func TestJetStreamClusterDefaultMaxAckPending(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R32", 2) + defer c.shutdown() + + s := c.randomServer() + + // Client based API + nc, js := jsClientConnect(t, s) + defer nc.Close() + + if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Replicas: 2}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Do Consumers too. + cfg := &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy} + ci, err := js.AddConsumer("TEST", cfg) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Check that we have a default set now for the max ack pending. + if ci.Config.MaxAckPending != JsDefaultMaxAckPending { + t.Fatalf("Expected a default for max ack pending of %d, got %d", JsDefaultMaxAckPending, ci.Config.MaxAckPending) + } +} + func TestJetStreamClusterStreamNormalCatchup(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown()