Merge pull request #3132 from nats-io/raft-stable

With use cases bringing us more data I wanted to suggest these changes.
This commit is contained in:
Derek Collison
2022-05-18 09:30:04 -07:00
committed by GitHub
3 changed files with 18 additions and 12 deletions

View File

@@ -3996,6 +3996,13 @@ func TestJetStreamClusterNoDuplicateOnNodeRestart(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
sl := c.streamLeader("$G", "TEST")
if s == sl {
nc.Close()
nc, js = jsClientConnect(t, s)
defer nc.Close()
}
sub, err := js.SubscribeSync("foo", nats.Durable("dlc"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
@@ -4008,14 +4015,14 @@ func TestJetStreamClusterNoDuplicateOnNodeRestart(t *testing.T) {
m.AckSync()
}
sl := c.streamLeader("$G", "TEST")
sl.Shutdown()
c.restartServer(sl)
c.waitOnStreamLeader("$G", "TEST")
c.waitOnConsumerLeader("$G", "TEST", "dlc")
// Send second msg
js.Publish("foo", []byte("msg2"))
msg, err := sub.NextMsg(time.Second)
msg, err := sub.NextMsg(5 * time.Second)
if err != nil {
t.Fatalf("Error getting message: %v", err)
}

View File

@@ -424,7 +424,7 @@ func (sc *supercluster) leader() *Server {
func (sc *supercluster) waitOnLeader() {
sc.t.Helper()
expires := time.Now().Add(5 * time.Second)
expires := time.Now().Add(10 * time.Second)
for time.Now().Before(expires) {
for _, c := range sc.clusters {
if leader := c.leader(); leader != nil {

View File

@@ -228,11 +228,11 @@ type lps struct {
}
const (
minElectionTimeoutDefault = 2 * time.Second
maxElectionTimeoutDefault = 5 * time.Second
minElectionTimeoutDefault = 4 * time.Second
maxElectionTimeoutDefault = 9 * time.Second
minCampaignTimeoutDefault = 100 * time.Millisecond
maxCampaignTimeoutDefault = 8 * minCampaignTimeoutDefault
hbIntervalDefault = 500 * time.Millisecond
hbIntervalDefault = 1 * time.Second
lostQuorumIntervalDefault = hbIntervalDefault * 20 // 10 seconds
lostQuorumCheckIntervalDefault = hbIntervalDefault * 20 // 10 seconds
@@ -2606,12 +2606,6 @@ func (n *raft) handleAppendEntry(sub *subscription, c *client, _ *Account, subje
} else {
n.warn("AppendEntry failed to be placed on internal channel: corrupt entry")
}
n.Lock()
// Don't reset here if we have been asked to assume leader position.
if !n.lxfer {
n.resetElectionTimeout()
}
n.Unlock()
}
// Lock should be held.
@@ -2680,6 +2674,11 @@ func (n *raft) updateLeader(newLeader string) {
func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.Lock()
// Don't reset here if we have been asked to assume leader position.
if !n.lxfer {
n.resetElectionTimeout()
}
// Just return if closed or we had previous write error.
if n.state == Closed || n.werr != nil {
n.Unlock()