From ccd2290355fb7c50b8cf4b320f6a7832b2baa230 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 12 May 2022 14:38:12 -0700 Subject: [PATCH] With use cases bringing us more data I wanted to suggest these changes. With inlining election timeout updates we double the lock contention and most likely introduced head of line issues for routes under heavy load. Also slowing down heartbeats with so many assets being deployed in our user ecosystem, also moved the normal follower to candidate timing further out, similar to the lost quorum. Note that the happy path transfer will still be very quick. Signed-off-by: Derek Collison --- server/jetstream_cluster_test.go | 11 +++++++++-- server/jetstream_helpers_test.go | 2 +- server/raft.go | 17 ++++++++--------- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 521b85d2..826100c9 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -3996,6 +3996,13 @@ func TestJetStreamClusterNoDuplicateOnNodeRestart(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } + sl := c.streamLeader("$G", "TEST") + if s == sl { + nc.Close() + nc, js = jsClientConnect(t, s) + defer nc.Close() + } + sub, err := js.SubscribeSync("foo", nats.Durable("dlc")) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -4008,14 +4015,14 @@ func TestJetStreamClusterNoDuplicateOnNodeRestart(t *testing.T) { m.AckSync() } - sl := c.streamLeader("$G", "TEST") sl.Shutdown() c.restartServer(sl) c.waitOnStreamLeader("$G", "TEST") + c.waitOnConsumerLeader("$G", "TEST", "dlc") // Send second msg js.Publish("foo", []byte("msg2")) - msg, err := sub.NextMsg(time.Second) + msg, err := sub.NextMsg(5 * time.Second) if err != nil { t.Fatalf("Error getting message: %v", err) } diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index 21129d5e..ef168b99 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -424,7 +424,7 @@ func (sc *supercluster) leader() *Server { func (sc *supercluster) waitOnLeader() { sc.t.Helper() - expires := time.Now().Add(5 * time.Second) + expires := time.Now().Add(10 * time.Second) for time.Now().Before(expires) { for _, c := range sc.clusters { if leader := c.leader(); leader != nil { diff --git a/server/raft.go b/server/raft.go index e78c15cf..39163a43 100644 --- a/server/raft.go +++ b/server/raft.go @@ -228,11 +228,11 @@ type lps struct { } const ( - minElectionTimeoutDefault = 2 * time.Second - maxElectionTimeoutDefault = 5 * time.Second + minElectionTimeoutDefault = 4 * time.Second + maxElectionTimeoutDefault = 9 * time.Second minCampaignTimeoutDefault = 100 * time.Millisecond maxCampaignTimeoutDefault = 8 * minCampaignTimeoutDefault - hbIntervalDefault = 500 * time.Millisecond + hbIntervalDefault = 1 * time.Second lostQuorumIntervalDefault = hbIntervalDefault * 20 // 10 seconds lostQuorumCheckIntervalDefault = hbIntervalDefault * 20 // 10 seconds @@ -2606,12 +2606,6 @@ func (n *raft) handleAppendEntry(sub *subscription, c *client, _ *Account, subje } else { n.warn("AppendEntry failed to be placed on internal channel: corrupt entry") } - n.Lock() - // Don't reset here if we have been asked to assume leader position. - if !n.lxfer { - n.resetElectionTimeout() - } - n.Unlock() } // Lock should be held. @@ -2680,6 +2674,11 @@ func (n *raft) updateLeader(newLeader string) { func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.Lock() + // Don't reset here if we have been asked to assume leader position. + if !n.lxfer { + n.resetElectionTimeout() + } + // Just return if closed or we had previous write error. if n.state == Closed || n.werr != nil { n.Unlock()