diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 521b85d2..826100c9 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -3996,6 +3996,13 @@ func TestJetStreamClusterNoDuplicateOnNodeRestart(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } + sl := c.streamLeader("$G", "TEST") + if s == sl { + nc.Close() + nc, js = jsClientConnect(t, s) + defer nc.Close() + } + sub, err := js.SubscribeSync("foo", nats.Durable("dlc")) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -4008,14 +4015,14 @@ func TestJetStreamClusterNoDuplicateOnNodeRestart(t *testing.T) { m.AckSync() } - sl := c.streamLeader("$G", "TEST") sl.Shutdown() c.restartServer(sl) c.waitOnStreamLeader("$G", "TEST") + c.waitOnConsumerLeader("$G", "TEST", "dlc") // Send second msg js.Publish("foo", []byte("msg2")) - msg, err := sub.NextMsg(time.Second) + msg, err := sub.NextMsg(5 * time.Second) if err != nil { t.Fatalf("Error getting message: %v", err) } diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index 21129d5e..ef168b99 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -424,7 +424,7 @@ func (sc *supercluster) leader() *Server { func (sc *supercluster) waitOnLeader() { sc.t.Helper() - expires := time.Now().Add(5 * time.Second) + expires := time.Now().Add(10 * time.Second) for time.Now().Before(expires) { for _, c := range sc.clusters { if leader := c.leader(); leader != nil { diff --git a/server/raft.go b/server/raft.go index e78c15cf..39163a43 100644 --- a/server/raft.go +++ b/server/raft.go @@ -228,11 +228,11 @@ type lps struct { } const ( - minElectionTimeoutDefault = 2 * time.Second - maxElectionTimeoutDefault = 5 * time.Second + minElectionTimeoutDefault = 4 * time.Second + maxElectionTimeoutDefault = 9 * time.Second minCampaignTimeoutDefault = 100 * time.Millisecond maxCampaignTimeoutDefault = 8 * minCampaignTimeoutDefault - hbIntervalDefault = 500 * time.Millisecond + hbIntervalDefault = 1 * time.Second lostQuorumIntervalDefault = hbIntervalDefault * 20 // 10 seconds lostQuorumCheckIntervalDefault = hbIntervalDefault * 20 // 10 seconds @@ -2606,12 +2606,6 @@ func (n *raft) handleAppendEntry(sub *subscription, c *client, _ *Account, subje } else { n.warn("AppendEntry failed to be placed on internal channel: corrupt entry") } - n.Lock() - // Don't reset here if we have been asked to assume leader position. - if !n.lxfer { - n.resetElectionTimeout() - } - n.Unlock() } // Lock should be held. @@ -2680,6 +2674,11 @@ func (n *raft) updateLeader(newLeader string) { func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.Lock() + // Don't reset here if we have been asked to assume leader position. + if !n.lxfer { + n.resetElectionTimeout() + } + // Just return if closed or we had previous write error. if n.state == Closed || n.werr != nil { n.Unlock()