From 9eeffbcf561393237583079e916877830037b240 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 8 Jun 2023 18:32:39 -0700 Subject: [PATCH] Fix performance issues with checkAckFloor. Bail early if new consumer, meaning stream sequence floor is 0. Decide which linear space to scan. Do no work if no pending and we just need to adjust which we do at the end. Also realized some tests were named wrong and were not being run, or were in wrong file. Signed-off-by: Derek Collison --- server/consumer.go | 68 +++++---- server/jetstream_cluster_3_test.go | 141 +------------------ server/norace_test.go | 212 +++++++++++++++++++++++++++++ 3 files changed, 259 insertions(+), 162 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 61589e96..daa921b0 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -3256,10 +3256,10 @@ func (o *consumer) hbTimer() (time.Duration, *time.Timer) { // Should only be called from consumer leader. func (o *consumer) checkAckFloor() { o.mu.RLock() - mset, closed, asflr := o.mset, o.closed, o.asflr + mset, closed, asflr, numPending := o.mset, o.closed, o.asflr, len(o.pending) o.mu.RUnlock() - if closed || mset == nil { + if asflr == 0 || closed || mset == nil { return } @@ -3271,19 +3271,46 @@ func (o *consumer) checkAckFloor() { return } - // Process all messages that no longer exist. - for seq := asflr + 1; seq < ss.FirstSeq; seq++ { - // Check if this message was pending. + // Check which linear space is less to walk. + if ss.FirstSeq-asflr-1 < uint64(numPending) { + // Process all messages that no longer exist. + for seq := asflr + 1; seq < ss.FirstSeq; seq++ { + // Check if this message was pending. + o.mu.RLock() + p, isPending := o.pending[seq] + var rdc uint64 = 1 + if o.rdc != nil { + rdc = o.rdc[seq] + } + o.mu.RUnlock() + // If it was pending for us, get rid of it. + if isPending { + o.processTerm(seq, p.Sequence, rdc) + } + } + } else if numPending > 0 { + // here it shorter to walk pending. + // toTerm is seq, dseq, rcd for each entry. + toTerm := make([]uint64, 0, numPending*3) o.mu.RLock() - p, isPending := o.pending[seq] - var rdc uint64 = 1 - if o.rdc != nil { - rdc = o.rdc[seq] + for seq, p := range o.pending { + if seq < ss.FirstSeq { + var dseq uint64 = 1 + if p != nil { + dseq = p.Sequence + } + var rdc uint64 = 1 + if o.rdc != nil { + rdc = o.rdc[seq] + } + toTerm = append(toTerm, seq, dseq, rdc) + } } o.mu.RUnlock() - // If it was pending for us, get rid of it. - if isPending { - o.processTerm(seq, p.Sequence, rdc) + + for i := 0; i < len(toTerm); i += 3 { + seq, dseq, rdc := toTerm[i], toTerm[i+1], toTerm[i+2] + o.processTerm(seq, dseq, rdc) } } @@ -3324,20 +3351,13 @@ func (o *consumer) processInboundAcks(qch chan struct{}) { return } - // Track if we are interest retention policy, if not we can skip the ack floor check. - isInterestRetention := mset.isInterestRetention() - - checkAckFloor := func() { - if isInterestRetention { - o.checkAckFloor() - } - } - // We will check this on entry and periodically. - checkAckFloor() + o.checkAckFloor() // How often we will check for ack floor drift. - var ackFloorCheck = 30 * time.Second + // Spread these out for large numbers on a server restart. + delta := time.Duration(rand.Int63n(int64(time.Minute))) + var ackFloorCheck = time.Minute + delta for { select { @@ -3353,7 +3373,7 @@ func (o *consumer) processInboundAcks(qch chan struct{}) { o.suppressDeletion() } case <-time.After(ackFloorCheck): - checkAckFloor() + o.checkAckFloor() case <-qch: return case <-s.quitCh: diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 13d39a41..540cc152 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -3236,142 +3236,7 @@ func TestJetStreamClusterConsumerFollowerStoreStateAckFloorBug(t *testing.T) { checkAllLeaders() } -func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamPreAck(t *testing.T) { - tmpl := ` - listen: 127.0.0.1:-1 - server_name: %s - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} - - cluster { - name: "F3" - listen: 127.0.0.1:%d - routes = [%s] - } - - accounts { - $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } - } - ` - - // Route Ports - // "S1": 14622, - // "S2": 15622, - // "S3": 16622, - - // S2 (stream leader) will have a slow path to S1 (via proxy) and S3 (consumer leader) will have a fast path. - - // Do these in order, S1, S2 (proxy) then S3. - c := &cluster{t: t, servers: make([]*Server, 3), opts: make([]*Options, 3), name: "F3"} - - // S1 - conf := fmt.Sprintf(tmpl, "S1", t.TempDir(), 14622, "route://127.0.0.1:15622, route://127.0.0.1:16622") - c.servers[0], c.opts[0] = RunServerWithConfig(createConfFile(t, []byte(conf))) - - // S2 - // Create the proxy first. Connect this to S1. Make it slow, e.g. 5ms RTT. - np := createNetProxy(1*time.Millisecond, 1024*1024*1024, 1024*1024*1024, "route://127.0.0.1:14622", true) - routes := fmt.Sprintf("%s, route://127.0.0.1:16622", np.routeURL()) - conf = fmt.Sprintf(tmpl, "S2", t.TempDir(), 15622, routes) - c.servers[1], c.opts[1] = RunServerWithConfig(createConfFile(t, []byte(conf))) - - // S3 - conf = fmt.Sprintf(tmpl, "S3", t.TempDir(), 16622, "route://127.0.0.1:14622, route://127.0.0.1:15622") - c.servers[2], c.opts[2] = RunServerWithConfig(createConfFile(t, []byte(conf))) - - c.checkClusterFormed() - c.waitOnClusterReady() - defer c.shutdown() - defer np.stop() - - nc, js := jsClientConnect(t, c.randomServer()) - defer nc.Close() - - // Now create the stream. - _, err := js.AddStream(&nats.StreamConfig{ - Name: "EVENTS", - Subjects: []string{"EV.>"}, - Replicas: 3, - Retention: nats.InterestPolicy, - }) - require_NoError(t, err) - - // Make sure it's leader is on S2. - sl := c.servers[1] - checkFor(t, 20*time.Second, 200*time.Millisecond, func() error { - c.waitOnStreamLeader(globalAccountName, "EVENTS") - if s := c.streamLeader(globalAccountName, "EVENTS"); s != sl { - s.JetStreamStepdownStream(globalAccountName, "EVENTS") - return fmt.Errorf("Server %s is not stream leader yet", sl) - } - return nil - }) - - // Now create the consumer. - _, err = js.AddConsumer("EVENTS", &nats.ConsumerConfig{ - Durable: "C", - AckPolicy: nats.AckExplicitPolicy, - DeliverSubject: "dx", - }) - require_NoError(t, err) - - // Make sure the consumer leader is on S3. - cl := c.servers[2] - checkFor(t, 20*time.Second, 200*time.Millisecond, func() error { - c.waitOnConsumerLeader(globalAccountName, "EVENTS", "C") - if s := c.consumerLeader(globalAccountName, "EVENTS", "C"); s != cl { - s.JetStreamStepdownConsumer(globalAccountName, "EVENTS", "C") - return fmt.Errorf("Server %s is not consumer leader yet", sl) - } - return nil - }) - - // Create the real consumer on the consumer leader to make it efficient. - nc, js = jsClientConnect(t, cl) - defer nc.Close() - - _, err = js.Subscribe(_EMPTY_, func(msg *nats.Msg) { - msg.Ack() - }, nats.BindStream("EVENTS"), nats.Durable("C"), nats.ManualAck()) - require_NoError(t, err) - - for i := 0; i < 1_000; i++ { - _, err := js.PublishAsync("EVENTS.PAID", []byte("ok")) - require_NoError(t, err) - } - select { - case <-js.PublishAsyncComplete(): - case <-time.After(5 * time.Second): - t.Fatalf("Did not receive completion signal") - } - - slow := c.servers[0] - mset, err := slow.GlobalAccount().lookupStream("EVENTS") - require_NoError(t, err) - - // Make sure preAck is non-nil, so we know the logic has kicked in. - mset.mu.RLock() - preAcks := mset.preAcks - mset.mu.RUnlock() - require_NotNil(t, preAcks) - - checkFor(t, 5*time.Second, 200*time.Millisecond, func() error { - state := mset.state() - if state.Msgs == 0 { - mset.mu.RLock() - lp := len(mset.preAcks) - mset.mu.RUnlock() - if lp == 0 { - return nil - } else { - t.Fatalf("Expected no preAcks with no msgs, but got %d", lp) - } - } - return fmt.Errorf("Still have %d msgs left", state.Msgs) - }) - -} - -func TestJetStreamInterestLeakOnDisableJetStream(t *testing.T) { +func TestJetStreamClusterInterestLeakOnDisableJetStream(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() @@ -3410,7 +3275,7 @@ func TestJetStreamInterestLeakOnDisableJetStream(t *testing.T) { } } -func TestJetStreamNoLeadersDuringLameDuck(t *testing.T) { +func TestJetStreamClusterNoLeadersDuringLameDuck(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() @@ -3517,7 +3382,7 @@ func TestJetStreamNoLeadersDuringLameDuck(t *testing.T) { // it could miss the signal of a message going away. If that message was pending and expires the // ack floor could fall below the stream first sequence. This test will force that condition and // make sure the system resolves itself. -func TestJetStreamConsumerAckFloorDrift(t *testing.T) { +func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() diff --git a/server/norace_test.go b/server/norace_test.go index ffcfaf84..51005773 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -7926,3 +7926,215 @@ func TestNoRaceClientOutboundQueueMemory(t *testing.T) { t.Fatalf("memory increase was %.1f%% (should be <= 10%%)", inc) } } + +func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamPreAck(t *testing.T) { + tmpl := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + + cluster { + name: "F3" + listen: 127.0.0.1:%d + routes = [%s] + } + + accounts { + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } + } + ` + + // Route Ports + // "S1": 14622, + // "S2": 15622, + // "S3": 16622, + + // S2 (stream leader) will have a slow path to S1 (via proxy) and S3 (consumer leader) will have a fast path. + + // Do these in order, S1, S2 (proxy) then S3. + c := &cluster{t: t, servers: make([]*Server, 3), opts: make([]*Options, 3), name: "F3"} + + // S1 + conf := fmt.Sprintf(tmpl, "S1", t.TempDir(), 14622, "route://127.0.0.1:15622, route://127.0.0.1:16622") + c.servers[0], c.opts[0] = RunServerWithConfig(createConfFile(t, []byte(conf))) + + // S2 + // Create the proxy first. Connect this to S1. Make it slow, e.g. 5ms RTT. + np := createNetProxy(1*time.Millisecond, 1024*1024*1024, 1024*1024*1024, "route://127.0.0.1:14622", true) + routes := fmt.Sprintf("%s, route://127.0.0.1:16622", np.routeURL()) + conf = fmt.Sprintf(tmpl, "S2", t.TempDir(), 15622, routes) + c.servers[1], c.opts[1] = RunServerWithConfig(createConfFile(t, []byte(conf))) + + // S3 + conf = fmt.Sprintf(tmpl, "S3", t.TempDir(), 16622, "route://127.0.0.1:14622, route://127.0.0.1:15622") + c.servers[2], c.opts[2] = RunServerWithConfig(createConfFile(t, []byte(conf))) + + c.checkClusterFormed() + c.waitOnClusterReady() + defer c.shutdown() + defer np.stop() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // Now create the stream. + _, err := js.AddStream(&nats.StreamConfig{ + Name: "EVENTS", + Subjects: []string{"EV.>"}, + Replicas: 3, + Retention: nats.InterestPolicy, + }) + require_NoError(t, err) + + // Make sure it's leader is on S2. + sl := c.servers[1] + checkFor(t, 20*time.Second, 200*time.Millisecond, func() error { + c.waitOnStreamLeader(globalAccountName, "EVENTS") + if s := c.streamLeader(globalAccountName, "EVENTS"); s != sl { + s.JetStreamStepdownStream(globalAccountName, "EVENTS") + return fmt.Errorf("Server %s is not stream leader yet", sl) + } + return nil + }) + + // Now create the consumer. + _, err = js.AddConsumer("EVENTS", &nats.ConsumerConfig{ + Durable: "C", + AckPolicy: nats.AckExplicitPolicy, + DeliverSubject: "dx", + }) + require_NoError(t, err) + + // Make sure the consumer leader is on S3. + cl := c.servers[2] + checkFor(t, 20*time.Second, 200*time.Millisecond, func() error { + c.waitOnConsumerLeader(globalAccountName, "EVENTS", "C") + if s := c.consumerLeader(globalAccountName, "EVENTS", "C"); s != cl { + s.JetStreamStepdownConsumer(globalAccountName, "EVENTS", "C") + return fmt.Errorf("Server %s is not consumer leader yet", sl) + } + return nil + }) + + // Create the real consumer on the consumer leader to make it efficient. + nc, js = jsClientConnect(t, cl) + defer nc.Close() + + _, err = js.Subscribe(_EMPTY_, func(msg *nats.Msg) { + msg.Ack() + }, nats.BindStream("EVENTS"), nats.Durable("C"), nats.ManualAck()) + require_NoError(t, err) + + for i := 0; i < 1_000; i++ { + _, err := js.PublishAsync("EVENTS.PAID", []byte("ok")) + require_NoError(t, err) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + slow := c.servers[0] + mset, err := slow.GlobalAccount().lookupStream("EVENTS") + require_NoError(t, err) + + // Make sure preAck is non-nil, so we know the logic has kicked in. + mset.mu.RLock() + preAcks := mset.preAcks + mset.mu.RUnlock() + require_NotNil(t, preAcks) + + checkFor(t, 5*time.Second, 200*time.Millisecond, func() error { + state := mset.state() + if state.Msgs == 0 { + mset.mu.RLock() + lp := len(mset.preAcks) + mset.mu.RUnlock() + if lp == 0 { + return nil + } else { + t.Fatalf("Expected no preAcks with no msgs, but got %d", lp) + } + } + return fmt.Errorf("Still have %d msgs left", state.Msgs) + }) + +} + +func TestNoRaceCheckAckFloorWithVeryLargeFirstSeqAndNewConsumers(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, _ := jsClientConnect(t, s) + defer nc.Close() + + // Make sure to time bound here for the acksync call below. + js, err := nc.JetStream(nats.MaxWait(200 * time.Millisecond)) + require_NoError(t, err) + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"wq-req"}, + Retention: nats.WorkQueuePolicy, + }) + require_NoError(t, err) + + largeFirstSeq := uint64(1_200_000_000) + err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: largeFirstSeq}) + require_NoError(t, err) + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + require_True(t, si.State.FirstSeq == largeFirstSeq) + + // Add a simple request to the stream. + sendStreamMsg(t, nc, "wq-req", "HELP") + + sub, err := js.PullSubscribe("wq-req", "dlc") + require_NoError(t, err) + + msgs, err := sub.Fetch(1) + require_NoError(t, err) + require_True(t, len(msgs) == 1) + + // The bug is around the checkAckFloor walking the sequences from current ackfloor + // to the first sequence of the stream. We time bound the max wait with the js context + // to 200ms. Since checkAckFloor is spinning and holding up processing of acks this will fail. + // We will short circuit new consumers to fix this one. + require_NoError(t, msgs[0].AckSync()) + + // Now do again so we move past the new consumer with no ack floor situation. + err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 2 * largeFirstSeq}) + require_NoError(t, err) + si, err = js.StreamInfo("TEST") + require_NoError(t, err) + require_True(t, si.State.FirstSeq == 2*largeFirstSeq) + + sendStreamMsg(t, nc, "wq-req", "MORE HELP") + + // We check this one directly for this use case. + mset, err := s.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("dlc") + require_True(t, o != nil) + + // Purge will move the stream floor by default, so force into the situation where it is back to largeFirstSeq. + // This will not trigger the new consumer logic, but will trigger a walk of the sequence space. + // Fix will be to walk the lesser of the two linear spaces. + o.mu.Lock() + o.asflr = largeFirstSeq + o.mu.Unlock() + + done := make(chan bool) + go func() { + o.checkAckFloor() + done <- true + }() + + select { + case <-done: + return + case <-time.After(time.Second): + t.Fatalf("Check ack floor taking too long!") + } +}