diff --git a/server/consumer.go b/server/consumer.go index 5e658c42..338a6d54 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -3454,10 +3454,10 @@ func (o *consumer) hbTimer() (time.Duration, *time.Timer) { // Should only be called from consumer leader. func (o *consumer) checkAckFloor() { o.mu.RLock() - mset, closed, asflr := o.mset, o.closed, o.asflr + mset, closed, asflr, numPending := o.mset, o.closed, o.asflr, len(o.pending) o.mu.RUnlock() - if closed || mset == nil { + if asflr == 0 || closed || mset == nil { return } @@ -3469,19 +3469,46 @@ func (o *consumer) checkAckFloor() { return } - // Process all messages that no longer exist. - for seq := asflr + 1; seq < ss.FirstSeq; seq++ { - // Check if this message was pending. + // Check which linear space is less to walk. + if ss.FirstSeq-asflr-1 < uint64(numPending) { + // Process all messages that no longer exist. + for seq := asflr + 1; seq < ss.FirstSeq; seq++ { + // Check if this message was pending. + o.mu.RLock() + p, isPending := o.pending[seq] + var rdc uint64 = 1 + if o.rdc != nil { + rdc = o.rdc[seq] + } + o.mu.RUnlock() + // If it was pending for us, get rid of it. + if isPending { + o.processTerm(seq, p.Sequence, rdc) + } + } + } else if numPending > 0 { + // here it shorter to walk pending. + // toTerm is seq, dseq, rcd for each entry. + toTerm := make([]uint64, 0, numPending*3) o.mu.RLock() - p, isPending := o.pending[seq] - var rdc uint64 = 1 - if o.rdc != nil { - rdc = o.rdc[seq] + for seq, p := range o.pending { + if seq < ss.FirstSeq { + var dseq uint64 = 1 + if p != nil { + dseq = p.Sequence + } + var rdc uint64 = 1 + if o.rdc != nil { + rdc = o.rdc[seq] + } + toTerm = append(toTerm, seq, dseq, rdc) + } } o.mu.RUnlock() - // If it was pending for us, get rid of it. - if isPending { - o.processTerm(seq, p.Sequence, rdc) + + for i := 0; i < len(toTerm); i += 3 { + seq, dseq, rdc := toTerm[i], toTerm[i+1], toTerm[i+2] + o.processTerm(seq, dseq, rdc) } } @@ -3522,20 +3549,13 @@ func (o *consumer) processInboundAcks(qch chan struct{}) { return } - // Track if we are interest retention policy, if not we can skip the ack floor check. - isInterestRetention := mset.isInterestRetention() - - checkAckFloor := func() { - if isInterestRetention { - o.checkAckFloor() - } - } - // We will check this on entry and periodically. - checkAckFloor() + o.checkAckFloor() // How often we will check for ack floor drift. - var ackFloorCheck = 30 * time.Second + // Spread these out for large numbers on a server restart. + delta := time.Duration(rand.Int63n(int64(time.Minute))) + var ackFloorCheck = time.Minute + delta for { select { @@ -3551,7 +3571,7 @@ func (o *consumer) processInboundAcks(qch chan struct{}) { o.suppressDeletion() } case <-time.After(ackFloorCheck): - checkAckFloor() + o.checkAckFloor() case <-qch: return case <-s.quitCh: diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 35833565..aaf395a4 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -3236,142 +3236,7 @@ func TestJetStreamClusterConsumerFollowerStoreStateAckFloorBug(t *testing.T) { checkAllLeaders() } -func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamPreAck(t *testing.T) { - tmpl := ` - listen: 127.0.0.1:-1 - server_name: %s - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} - - cluster { - name: "F3" - listen: 127.0.0.1:%d - routes = [%s] - } - - accounts { - $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } - } - ` - - // Route Ports - // "S1": 14622, - // "S2": 15622, - // "S3": 16622, - - // S2 (stream leader) will have a slow path to S1 (via proxy) and S3 (consumer leader) will have a fast path. - - // Do these in order, S1, S2 (proxy) then S3. - c := &cluster{t: t, servers: make([]*Server, 3), opts: make([]*Options, 3), name: "F3"} - - // S1 - conf := fmt.Sprintf(tmpl, "S1", t.TempDir(), 14622, "route://127.0.0.1:15622, route://127.0.0.1:16622") - c.servers[0], c.opts[0] = RunServerWithConfig(createConfFile(t, []byte(conf))) - - // S2 - // Create the proxy first. Connect this to S1. Make it slow, e.g. 5ms RTT. - np := createNetProxy(1*time.Millisecond, 1024*1024*1024, 1024*1024*1024, "route://127.0.0.1:14622", true) - routes := fmt.Sprintf("%s, route://127.0.0.1:16622", np.routeURL()) - conf = fmt.Sprintf(tmpl, "S2", t.TempDir(), 15622, routes) - c.servers[1], c.opts[1] = RunServerWithConfig(createConfFile(t, []byte(conf))) - - // S3 - conf = fmt.Sprintf(tmpl, "S3", t.TempDir(), 16622, "route://127.0.0.1:14622, route://127.0.0.1:15622") - c.servers[2], c.opts[2] = RunServerWithConfig(createConfFile(t, []byte(conf))) - - c.checkClusterFormed() - c.waitOnClusterReady() - defer c.shutdown() - defer np.stop() - - nc, js := jsClientConnect(t, c.randomServer()) - defer nc.Close() - - // Now create the stream. - _, err := js.AddStream(&nats.StreamConfig{ - Name: "EVENTS", - Subjects: []string{"EV.>"}, - Replicas: 3, - Retention: nats.InterestPolicy, - }) - require_NoError(t, err) - - // Make sure it's leader is on S2. - sl := c.servers[1] - checkFor(t, 20*time.Second, 200*time.Millisecond, func() error { - c.waitOnStreamLeader(globalAccountName, "EVENTS") - if s := c.streamLeader(globalAccountName, "EVENTS"); s != sl { - s.JetStreamStepdownStream(globalAccountName, "EVENTS") - return fmt.Errorf("Server %s is not stream leader yet", sl) - } - return nil - }) - - // Now create the consumer. - _, err = js.AddConsumer("EVENTS", &nats.ConsumerConfig{ - Durable: "C", - AckPolicy: nats.AckExplicitPolicy, - DeliverSubject: "dx", - }) - require_NoError(t, err) - - // Make sure the consumer leader is on S3. - cl := c.servers[2] - checkFor(t, 20*time.Second, 200*time.Millisecond, func() error { - c.waitOnConsumerLeader(globalAccountName, "EVENTS", "C") - if s := c.consumerLeader(globalAccountName, "EVENTS", "C"); s != cl { - s.JetStreamStepdownConsumer(globalAccountName, "EVENTS", "C") - return fmt.Errorf("Server %s is not consumer leader yet", sl) - } - return nil - }) - - // Create the real consumer on the consumer leader to make it efficient. - nc, js = jsClientConnect(t, cl) - defer nc.Close() - - _, err = js.Subscribe(_EMPTY_, func(msg *nats.Msg) { - msg.Ack() - }, nats.BindStream("EVENTS"), nats.Durable("C"), nats.ManualAck()) - require_NoError(t, err) - - for i := 0; i < 1_000; i++ { - _, err := js.PublishAsync("EVENTS.PAID", []byte("ok")) - require_NoError(t, err) - } - select { - case <-js.PublishAsyncComplete(): - case <-time.After(5 * time.Second): - t.Fatalf("Did not receive completion signal") - } - - slow := c.servers[0] - mset, err := slow.GlobalAccount().lookupStream("EVENTS") - require_NoError(t, err) - - // Make sure preAck is non-nil, so we know the logic has kicked in. - mset.mu.RLock() - preAcks := mset.preAcks - mset.mu.RUnlock() - require_NotNil(t, preAcks) - - checkFor(t, 5*time.Second, 200*time.Millisecond, func() error { - state := mset.state() - if state.Msgs == 0 { - mset.mu.RLock() - lp := len(mset.preAcks) - mset.mu.RUnlock() - if lp == 0 { - return nil - } else { - t.Fatalf("Expected no preAcks with no msgs, but got %d", lp) - } - } - return fmt.Errorf("Still have %d msgs left", state.Msgs) - }) - -} - -func TestJetStreamInterestLeakOnDisableJetStream(t *testing.T) { +func TestJetStreamClusterInterestLeakOnDisableJetStream(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() @@ -3410,7 +3275,7 @@ func TestJetStreamInterestLeakOnDisableJetStream(t *testing.T) { } } -func TestJetStreamNoLeadersDuringLameDuck(t *testing.T) { +func TestJetStreamClusterNoLeadersDuringLameDuck(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() @@ -3517,7 +3382,7 @@ func TestJetStreamNoLeadersDuringLameDuck(t *testing.T) { // it could miss the signal of a message going away. If that message was pending and expires the // ack floor could fall below the stream first sequence. This test will force that condition and // make sure the system resolves itself. -func TestJetStreamConsumerAckFloorDrift(t *testing.T) { +func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() diff --git a/server/norace_test.go b/server/norace_test.go index ea0b34b0..f6f01aca 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -8377,3 +8377,215 @@ func TestNoRaceJetStreamClusterLeafnodeConnectPerf(t *testing.T) { nc.Close() } } + +func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamPreAck(t *testing.T) { + tmpl := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + + cluster { + name: "F3" + listen: 127.0.0.1:%d + routes = [%s] + } + + accounts { + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } + } + ` + + // Route Ports + // "S1": 14622, + // "S2": 15622, + // "S3": 16622, + + // S2 (stream leader) will have a slow path to S1 (via proxy) and S3 (consumer leader) will have a fast path. + + // Do these in order, S1, S2 (proxy) then S3. + c := &cluster{t: t, servers: make([]*Server, 3), opts: make([]*Options, 3), name: "F3"} + + // S1 + conf := fmt.Sprintf(tmpl, "S1", t.TempDir(), 14622, "route://127.0.0.1:15622, route://127.0.0.1:16622") + c.servers[0], c.opts[0] = RunServerWithConfig(createConfFile(t, []byte(conf))) + + // S2 + // Create the proxy first. Connect this to S1. Make it slow, e.g. 5ms RTT. + np := createNetProxy(1*time.Millisecond, 1024*1024*1024, 1024*1024*1024, "route://127.0.0.1:14622", true) + routes := fmt.Sprintf("%s, route://127.0.0.1:16622", np.routeURL()) + conf = fmt.Sprintf(tmpl, "S2", t.TempDir(), 15622, routes) + c.servers[1], c.opts[1] = RunServerWithConfig(createConfFile(t, []byte(conf))) + + // S3 + conf = fmt.Sprintf(tmpl, "S3", t.TempDir(), 16622, "route://127.0.0.1:14622, route://127.0.0.1:15622") + c.servers[2], c.opts[2] = RunServerWithConfig(createConfFile(t, []byte(conf))) + + c.checkClusterFormed() + c.waitOnClusterReady() + defer c.shutdown() + defer np.stop() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // Now create the stream. + _, err := js.AddStream(&nats.StreamConfig{ + Name: "EVENTS", + Subjects: []string{"EV.>"}, + Replicas: 3, + Retention: nats.InterestPolicy, + }) + require_NoError(t, err) + + // Make sure it's leader is on S2. + sl := c.servers[1] + checkFor(t, 20*time.Second, 200*time.Millisecond, func() error { + c.waitOnStreamLeader(globalAccountName, "EVENTS") + if s := c.streamLeader(globalAccountName, "EVENTS"); s != sl { + s.JetStreamStepdownStream(globalAccountName, "EVENTS") + return fmt.Errorf("Server %s is not stream leader yet", sl) + } + return nil + }) + + // Now create the consumer. + _, err = js.AddConsumer("EVENTS", &nats.ConsumerConfig{ + Durable: "C", + AckPolicy: nats.AckExplicitPolicy, + DeliverSubject: "dx", + }) + require_NoError(t, err) + + // Make sure the consumer leader is on S3. + cl := c.servers[2] + checkFor(t, 20*time.Second, 200*time.Millisecond, func() error { + c.waitOnConsumerLeader(globalAccountName, "EVENTS", "C") + if s := c.consumerLeader(globalAccountName, "EVENTS", "C"); s != cl { + s.JetStreamStepdownConsumer(globalAccountName, "EVENTS", "C") + return fmt.Errorf("Server %s is not consumer leader yet", sl) + } + return nil + }) + + // Create the real consumer on the consumer leader to make it efficient. + nc, js = jsClientConnect(t, cl) + defer nc.Close() + + _, err = js.Subscribe(_EMPTY_, func(msg *nats.Msg) { + msg.Ack() + }, nats.BindStream("EVENTS"), nats.Durable("C"), nats.ManualAck()) + require_NoError(t, err) + + for i := 0; i < 1_000; i++ { + _, err := js.PublishAsync("EVENTS.PAID", []byte("ok")) + require_NoError(t, err) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + slow := c.servers[0] + mset, err := slow.GlobalAccount().lookupStream("EVENTS") + require_NoError(t, err) + + // Make sure preAck is non-nil, so we know the logic has kicked in. + mset.mu.RLock() + preAcks := mset.preAcks + mset.mu.RUnlock() + require_NotNil(t, preAcks) + + checkFor(t, 5*time.Second, 200*time.Millisecond, func() error { + state := mset.state() + if state.Msgs == 0 { + mset.mu.RLock() + lp := len(mset.preAcks) + mset.mu.RUnlock() + if lp == 0 { + return nil + } else { + t.Fatalf("Expected no preAcks with no msgs, but got %d", lp) + } + } + return fmt.Errorf("Still have %d msgs left", state.Msgs) + }) + +} + +func TestNoRaceCheckAckFloorWithVeryLargeFirstSeqAndNewConsumers(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, _ := jsClientConnect(t, s) + defer nc.Close() + + // Make sure to time bound here for the acksync call below. + js, err := nc.JetStream(nats.MaxWait(200 * time.Millisecond)) + require_NoError(t, err) + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"wq-req"}, + Retention: nats.WorkQueuePolicy, + }) + require_NoError(t, err) + + largeFirstSeq := uint64(1_200_000_000) + err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: largeFirstSeq}) + require_NoError(t, err) + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + require_True(t, si.State.FirstSeq == largeFirstSeq) + + // Add a simple request to the stream. + sendStreamMsg(t, nc, "wq-req", "HELP") + + sub, err := js.PullSubscribe("wq-req", "dlc") + require_NoError(t, err) + + msgs, err := sub.Fetch(1) + require_NoError(t, err) + require_True(t, len(msgs) == 1) + + // The bug is around the checkAckFloor walking the sequences from current ackfloor + // to the first sequence of the stream. We time bound the max wait with the js context + // to 200ms. Since checkAckFloor is spinning and holding up processing of acks this will fail. + // We will short circuit new consumers to fix this one. + require_NoError(t, msgs[0].AckSync()) + + // Now do again so we move past the new consumer with no ack floor situation. + err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 2 * largeFirstSeq}) + require_NoError(t, err) + si, err = js.StreamInfo("TEST") + require_NoError(t, err) + require_True(t, si.State.FirstSeq == 2*largeFirstSeq) + + sendStreamMsg(t, nc, "wq-req", "MORE HELP") + + // We check this one directly for this use case. + mset, err := s.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("dlc") + require_True(t, o != nil) + + // Purge will move the stream floor by default, so force into the situation where it is back to largeFirstSeq. + // This will not trigger the new consumer logic, but will trigger a walk of the sequence space. + // Fix will be to walk the lesser of the two linear spaces. + o.mu.Lock() + o.asflr = largeFirstSeq + o.mu.Unlock() + + done := make(chan bool) + go func() { + o.checkAckFloor() + done <- true + }() + + select { + case <-done: + return + case <-time.After(time.Second): + t.Fatalf("Check ack floor taking too long!") + } +}