diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 00000000..f77be596 --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1 @@ +* @nats-io/server diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index 2dd0085a..c55a8777 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -13,5 +13,3 @@ Resolves # - - - - -/cc @nats-io/core diff --git a/server/filestore.go b/server/filestore.go index 53cede18..f5af9eec 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -104,26 +104,27 @@ type psi struct { } type fileStore struct { - mu sync.RWMutex - state StreamState - ld *LostStreamData - scb StorageUpdateHandler - ageChk *time.Timer - syncTmr *time.Timer - cfg FileStreamInfo - fcfg FileStoreConfig - prf keyGen - aek cipher.AEAD - lmb *msgBlock - blks []*msgBlock - bim map[uint32]*msgBlock - psim map[string]*psi - hh hash.Hash64 - qch chan struct{} - cfs []ConsumerStore - sips int - closed bool - fip bool + mu sync.RWMutex + state StreamState + ld *LostStreamData + scb StorageUpdateHandler + ageChk *time.Timer + syncTmr *time.Timer + cfg FileStreamInfo + fcfg FileStoreConfig + prf keyGen + aek cipher.AEAD + lmb *msgBlock + blks []*msgBlock + bim map[uint32]*msgBlock + psim map[string]*psi + hh hash.Hash64 + qch chan struct{} + cfs []ConsumerStore + sips int + closed bool + fip bool + receivedAny bool } // Represents a message store block and its data. @@ -2100,6 +2101,14 @@ func (fs *fileStore) StoreRawMsg(subj string, hdr, msg []byte, seq uint64, ts in fs.mu.Lock() err := fs.storeRawMsg(subj, hdr, msg, seq, ts) cb := fs.scb + // Check if first message timestamp requires expiry + // sooner than initial replica expiry timer set to MaxAge when initializing. + if !fs.receivedAny && fs.cfg.MaxAge != 0 && ts > 0 { + fs.receivedAny = true + // don't block here by calling expireMsgs directly. + // Instead, set short timeout. + fs.resetAgeChk(int64(time.Millisecond * 50)) + } fs.mu.Unlock() if err == nil && cb != nil { diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 153fab1c..ed2e6d5f 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -127,9 +127,10 @@ type streamAssignment struct { Reply string `json:"reply"` Restore *StreamState `json:"restore_state,omitempty"` // Internal - consumers map[string]*consumerAssignment - responded bool - err error + consumers map[string]*consumerAssignment + responded bool + recovering bool + err error } // consumerAssignment is what the meta controller uses to assign consumers to streams. @@ -144,9 +145,10 @@ type consumerAssignment struct { Reply string `json:"reply"` State *ConsumerState `json:"state,omitempty"` // Internal - responded bool - deleted bool - err error + responded bool + recovering bool + deleted bool + err error } // streamPurge is what the stream leader will replicate when purging a stream. @@ -987,7 +989,6 @@ func (js *jetStream) monitorCluster() { isLeader bool lastSnap []byte lastSnapTime time.Time - beenLeader bool ) // Highwayhash key for generating hashes. @@ -1007,6 +1008,8 @@ func (js *jetStream) monitorCluster() { if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) { if err := n.InstallSnapshot(snap); err == nil { lastSnap, lastSnapTime = hash[:], time.Now() + } else { + s.Warnf("Error snapshotting JetStream cluster state: %v", err) } } } @@ -1059,6 +1062,15 @@ func (js *jetStream) monitorCluster() { // FIXME(dlc) - Deal with errors. if didSnap, didRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil { _, nb := n.Applied(ce.Index) + // If we processed a snapshot and are recovering remove our pending state. + if didSnap && js.isMetaRecovering() { + ru = &recoveryUpdates{ + removeStreams: make(map[string]*streamAssignment), + removeConsumers: make(map[string]*consumerAssignment), + updateStreams: make(map[string]*streamAssignment), + updateConsumers: make(map[string]*consumerAssignment), + } + } if js.hasPeerEntries(ce.Entries) || didSnap || didRemoval { // Since we received one make sure we have our own since we do not store // our meta state outside of raft. @@ -1070,20 +1082,15 @@ func (js *jetStream) monitorCluster() { } aq.recycle(&ces) case isLeader = <-lch: - // We want to make sure we are updated on statsz so ping the extended cluster. + js.processLeaderChange(isLeader) + if isLeader { s.sendInternalMsgLocked(serverStatsPingReqSubj, _EMPTY_, nil, nil) - } - js.processLeaderChange(isLeader) - if isLeader && !beenLeader { - beenLeader = true - if n.NeedSnapshot() { - if err := n.InstallSnapshot(js.metaSnapshot()); err != nil { - s.Warnf("Error snapshotting JetStream cluster state: %v", err) - } - } + // Optionally install a snapshot as we become leader. + doSnapshot() js.checkClusterSize() } + case <-t.C: doSnapshot() // Periodically check the cluster size. @@ -1279,27 +1286,20 @@ func (js *jetStream) applyMetaSnapshot(buf []byte) error { } } } - isRecovering := js.metaRecovering js.mu.Unlock() // Do removals first. for _, sa := range saDel { - if isRecovering { - js.setStreamAssignmentRecovering(sa) - } + js.setStreamAssignmentRecovering(sa) js.processStreamRemoval(sa) } // Now do add for the streams. Also add in all consumers. for _, sa := range saAdd { - if isRecovering { - js.setStreamAssignmentRecovering(sa) - } + js.setStreamAssignmentRecovering(sa) js.processStreamAssignment(sa) // We can simply add the consumers. for _, ca := range sa.consumers { - if isRecovering { - js.setConsumerAssignmentRecovering(ca) - } + js.setConsumerAssignmentRecovering(ca) js.processConsumerAssignment(ca) } } @@ -1307,23 +1307,17 @@ func (js *jetStream) applyMetaSnapshot(buf []byte) error { // Perform updates on those in saChk. These were existing so make // sure to process any changes. for _, sa := range saChk { - if isRecovering { - js.setStreamAssignmentRecovering(sa) - } + js.setStreamAssignmentRecovering(sa) js.processUpdateStreamAssignment(sa) } // Now do the deltas for existing stream's consumers. for _, ca := range caDel { - if isRecovering { - js.setConsumerAssignmentRecovering(ca) - } + js.setConsumerAssignmentRecovering(ca) js.processConsumerRemoval(ca) } for _, ca := range caAdd { - if isRecovering { - js.setConsumerAssignmentRecovering(ca) - } + js.setConsumerAssignmentRecovering(ca) js.processConsumerAssignment(ca) } @@ -1335,6 +1329,7 @@ func (js *jetStream) setStreamAssignmentRecovering(sa *streamAssignment) { js.mu.Lock() defer js.mu.Unlock() sa.responded = true + sa.recovering = true sa.Restore = nil if sa.Group != nil { sa.Group.Preferred = _EMPTY_ @@ -1346,6 +1341,7 @@ func (js *jetStream) setConsumerAssignmentRecovering(ca *consumerAssignment) { js.mu.Lock() defer js.mu.Unlock() ca.responded = true + ca.recovering = true if ca.Group != nil { ca.Group.Preferred = _EMPTY_ } @@ -2977,6 +2973,7 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss storage, cfg := sa.Config.Storage, sa.Config hasResponded := sa.responded sa.responded = true + recovering := sa.recovering js.mu.Unlock() mset, err := acc.lookupStream(cfg.Name) @@ -3002,7 +2999,7 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss js.mu.Unlock() } // Call update. - if err = mset.update(cfg); err != nil { + if err = mset.updateWithAdvisory(cfg, !recovering); err != nil { s.Warnf("JetStream cluster error updating stream %q for account %q: %v", cfg.Name, acc.Name, err) } // Set the new stream assignment. @@ -3036,9 +3033,7 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss return } - mset.mu.RLock() - isLeader := mset.isLeader() - mset.mu.RUnlock() + isLeader := mset.IsLeader() // Check for missing syncSubject bug. if isLeader && osa != nil && osa.Sync == _EMPTY_ { @@ -3054,7 +3049,7 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss } // Check if we should bail. - if !isLeader || hasResponded { + if !isLeader || hasResponded || recovering { return } @@ -3114,19 +3109,21 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme if sa.Group.Name == osa.Group.Name && reflect.DeepEqual(sa.Group.Peers, osa.Group.Peers) { // Since this already exists we know it succeeded, just respond to this caller. js.mu.RLock() - client, subject, reply := sa.Client, sa.Subject, sa.Reply + client, subject, reply, recovering := sa.Client, sa.Subject, sa.Reply, sa.recovering js.mu.RUnlock() - var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}} - resp.StreamInfo = &StreamInfo{ - Created: mset.createdTime(), - State: mset.state(), - Config: mset.config(), - Cluster: js.clusterInfo(mset.raftGroup()), - Sources: mset.sourcesInfo(), - Mirror: mset.mirrorInfo(), + if !recovering { + var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}} + resp.StreamInfo = &StreamInfo{ + Created: mset.createdTime(), + State: mset.state(), + Config: mset.config(), + Cluster: js.clusterInfo(mset.raftGroup()), + Sources: mset.sourcesInfo(), + Mirror: mset.mirrorInfo(), + } + s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) } - s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) return } else { // We had a bug where we could have multiple assignments for the same @@ -3342,6 +3339,7 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember, if cc := js.cluster; cc != nil { isMetaLeader = cc.isLeader() } + recovering := sa.recovering js.mu.RUnlock() stopped := false @@ -3400,7 +3398,7 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember, } // Do not respond if the account does not exist any longer - if acc == nil { + if acc == nil || recovering { return } @@ -3653,7 +3651,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state // This consumer exists. // Only update if config is really different. cfg := o.config() - if !reflect.DeepEqual(&cfg, ca.Config) { + if isConfigUpdate = !reflect.DeepEqual(&cfg, ca.Config); isConfigUpdate { // Call into update, ignore consumer exists error here since this means an old deliver subject is bound // which can happen on restart etc. if err := o.updateConfig(ca.Config); err != nil && err != NewJSConsumerNameExistError() { @@ -3693,7 +3691,9 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state // If we look like we are scaling up, let's send our current state to the group. sendState = len(ca.Group.Peers) > len(oca.Group.Peers) && o.IsLeader() && n != nil // Signal that this is an update - isConfigUpdate = true + if ca.Reply != _EMPTY_ { + isConfigUpdate = true + } } js.mu.RUnlock() @@ -3794,13 +3794,15 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state // For existing consumer, only send response if not recovering. if wasExisting && !js.isMetaRecovering() { if o.IsLeader() || (!didCreate && needsLocalResponse) { - // Process if existing as an update. + // Process if existing as an update. Double check that this is not recovered. js.mu.RLock() - client, subject, reply := ca.Client, ca.Subject, ca.Reply + client, subject, reply, recovering := ca.Client, ca.Subject, ca.Reply, ca.recovering js.mu.RUnlock() - var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}} - resp.ConsumerInfo = o.info() - s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) + if !recovering { + var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}} + resp.ConsumerInfo = o.info() + s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) + } } } } @@ -3819,6 +3821,7 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb if cc := js.cluster; cc != nil { isMetaLeader = cc.isLeader() } + recovering := ca.recovering js.mu.RUnlock() stopped := false @@ -3856,8 +3859,8 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb } } - // Do not respond if the account does not exist any longer - if acc == nil { + // Do not respond if the account does not exist any longer or this is during recovery. + if acc == nil || recovering { return } diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index eb53d5e4..250b2043 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -199,10 +199,10 @@ func TestJetStreamClusterMultiRestartBug(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - checkFor(t, 10*time.Second, 250*time.Millisecond, func() error { + checkFor(t, 20*time.Second, 250*time.Millisecond, func() error { si, _ := js2.StreamInfo("TEST") if si == nil || si.Cluster == nil { - t.Fatalf("Did not get stream info") + return fmt.Errorf("No stream info or cluster") } for _, pi := range si.Cluster.Replicas { if !pi.Current { diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 4646bc79..4b6ec601 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -2835,3 +2835,93 @@ func TestJetStreamClusterWALBuildupOnNoOpPull(t *testing.T) { t.Fatalf("got %d entries, expected less than %d entries", entries, max) } } + +// Found in https://github.com/nats-io/nats-server/issues/3848 +// When Max Age was specified and stream was scaled up, new replicas +// were expiring messages much later than the leader. +func TestJetStreamClusterStreamMaxAgeScaleUp(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + for _, test := range []struct { + name string + storage nats.StorageType + stream string + purge bool + }{ + {name: "file", storage: nats.FileStorage, stream: "A", purge: false}, + {name: "memory", storage: nats.MemoryStorage, stream: "B", purge: false}, + {name: "file with purge", storage: nats.FileStorage, stream: "C", purge: true}, + {name: "memory with purge", storage: nats.MemoryStorage, stream: "D", purge: true}, + } { + + t.Run(test.name, func(t *testing.T) { + ttl := time.Second * 5 + // Add stream with one replica and short MaxAge. + _, err := js.AddStream(&nats.StreamConfig{ + Name: test.stream, + Replicas: 1, + Subjects: []string{test.stream}, + MaxAge: ttl, + Storage: test.storage, + }) + require_NoError(t, err) + + // Add some messages. + for i := 0; i < 10; i++ { + sendStreamMsg(t, nc, test.stream, "HELLO") + } + // We need to also test if we properly set expiry + // if first sequence is not 1. + if test.purge { + err = js.PurgeStream(test.stream) + require_NoError(t, err) + // Add some messages. + for i := 0; i < 10; i++ { + sendStreamMsg(t, nc, test.stream, "HELLO") + } + } + // Mark the time when all messages were published. + start := time.Now() + + // Sleep for half of the MaxAge time. + time.Sleep(ttl / 2) + + // Scale up the Stream to 3 replicas. + _, err = js.UpdateStream(&nats.StreamConfig{ + Name: test.stream, + Replicas: 3, + Subjects: []string{test.stream}, + MaxAge: ttl, + Storage: test.storage, + }) + require_NoError(t, err) + + // All messages should still be there. + info, err := js.StreamInfo(test.stream) + require_NoError(t, err) + require_True(t, info.State.Msgs == 10) + + // Wait until MaxAge is reached. + time.Sleep(ttl - time.Since(start) + (10 * time.Millisecond)) + + // Check if all messages are expired. + info, err = js.StreamInfo(test.stream) + require_NoError(t, err) + require_True(t, info.State.Msgs == 0) + + // Now switch leader to one of replicas + _, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, test.stream), nil, time.Second) + require_NoError(t, err) + c.waitOnStreamLeader("$G", test.stream) + + // and make sure that it also expired all messages + info, err = js.StreamInfo(test.stream) + require_NoError(t, err) + require_True(t, info.State.Msgs == 0) + }) + } +} diff --git a/server/memstore.go b/server/memstore.go index 921c1df1..ebcd8085 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -23,15 +23,16 @@ import ( // TODO(dlc) - This is a fairly simplistic approach but should do for now. type memStore struct { - mu sync.RWMutex - cfg StreamConfig - state StreamState - msgs map[uint64]*StoreMsg - fss map[string]*SimpleState - maxp int64 - scb StorageUpdateHandler - ageChk *time.Timer - consumers int + mu sync.RWMutex + cfg StreamConfig + state StreamState + msgs map[uint64]*StoreMsg + fss map[string]*SimpleState + maxp int64 + scb StorageUpdateHandler + ageChk *time.Timer + consumers int + receivedAny bool } func newMemStore(cfg *StreamConfig) (*memStore, error) { @@ -190,7 +191,6 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int if ms.ageChk == nil && ms.cfg.MaxAge != 0 { ms.startAgeChk() } - return nil } @@ -199,6 +199,13 @@ func (ms *memStore) StoreRawMsg(subj string, hdr, msg []byte, seq uint64, ts int ms.mu.Lock() err := ms.storeRawMsg(subj, hdr, msg, seq, ts) cb := ms.scb + // Check if first message timestamp requires expiry + // sooner than initial replica expiry timer set to MaxAge when initializing. + if !ms.receivedAny && ms.cfg.MaxAge != 0 && ts > 0 { + ms.receivedAny = true + // Calculate duration when the next expireMsgs should be called. + ms.resetAgeChk(int64(time.Millisecond) * 50) + } ms.mu.Unlock() if err == nil && cb != nil { @@ -411,6 +418,23 @@ func (ms *memStore) startAgeChk() { } } +// Lock should be held. +func (ms *memStore) resetAgeChk(delta int64) { + if ms.cfg.MaxAge == 0 { + return + } + + fireIn := ms.cfg.MaxAge + if delta > 0 && time.Duration(delta) < fireIn { + fireIn = time.Duration(delta) + } + if ms.ageChk != nil { + ms.ageChk.Reset(fireIn) + } else { + ms.ageChk = time.AfterFunc(fireIn, ms.expireMsgs) + } +} + // Will expire msgs that are too old. func (ms *memStore) expireMsgs() { ms.mu.Lock() diff --git a/server/norace_test.go b/server/norace_test.go index 29936184..8bbb719b 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -6408,3 +6408,116 @@ func TestNoRaceJetStreamConsumerCreateTimeNumPending(t *testing.T) { t.Fatalf("Consumer create took longer than expected, %v vs %v", elapsed, threshold) } } + +func TestNoRaceJetStreamClusterGhostConsumers(t *testing.T) { + c := createJetStreamClusterExplicit(t, "GHOST", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"events.>"}, + Replicas: 3, + }) + require_NoError(t, err) + + for i := 0; i < 10; i++ { + for j := 0; j < 10; j++ { + require_NoError(t, nc.Publish(fmt.Sprintf("events.%d.%d", i, j), []byte(`test`))) + } + } + + fetch := func(id int) { + subject := fmt.Sprintf("events.%d.*", id) + subscription, err := js.PullSubscribe(subject, + _EMPTY_, // ephemeral consumer + nats.DeliverAll(), + nats.ReplayInstant(), + nats.BindStream("TEST"), + nats.ConsumerReplicas(1), + nats.ConsumerMemoryStorage(), + ) + if err != nil { + return + } + defer subscription.Unsubscribe() + + info, err := subscription.ConsumerInfo() + if err != nil { + return + } + + subscription.Fetch(int(info.NumPending)) + } + + replay := func(ctx context.Context, id int) { + for { + select { + case <-ctx.Done(): + return + default: + fetch(id) + } + } + } + + ctx, cancel := context.WithCancel(context.Background()) + + go replay(ctx, 0) + go replay(ctx, 1) + go replay(ctx, 2) + go replay(ctx, 3) + go replay(ctx, 4) + go replay(ctx, 5) + go replay(ctx, 6) + go replay(ctx, 7) + go replay(ctx, 8) + go replay(ctx, 9) + + time.Sleep(5 * time.Second) + + for _, server := range c.servers { + server.Shutdown() + restarted := c.restartServer(server) + checkFor(t, time.Second, 200*time.Millisecond, func() error { + hs := restarted.healthz(&HealthzOptions{ + JSEnabled: true, + JSServerOnly: true, + }) + if hs.Error != _EMPTY_ { + return errors.New(hs.Error) + } + return nil + }) + c.waitOnStreamLeader(globalAccountName, "TEST") + time.Sleep(time.Second * 2) + go replay(ctx, 5) + go replay(ctx, 6) + go replay(ctx, 7) + go replay(ctx, 8) + go replay(ctx, 9) + } + + time.Sleep(5 * time.Second) + cancel() + + getMissing := func() []string { + m, err := nc.Request("$JS.API.CONSUMER.LIST.TEST", nil, time.Second*10) + require_NoError(t, err) + + var resp JSApiConsumerListResponse + err = json.Unmarshal(m.Data, &resp) + require_NoError(t, err) + return resp.Missing + } + + checkFor(t, 10*time.Second, 500*time.Millisecond, func() error { + missing := getMissing() + if len(missing) == 0 { + return nil + } + return fmt.Errorf("Still have missing: %+v", missing) + }) +} diff --git a/server/raft.go b/server/raft.go index 97b2f00e..eda9bd40 100644 --- a/server/raft.go +++ b/server/raft.go @@ -939,9 +939,9 @@ func (n *raft) InstallSnapshot(data []byte) error { var state StreamState n.wal.FastState(&state) - if state.FirstSeq >= n.applied { + if n.applied == 0 || len(data) == 0 { n.Unlock() - return nil + return errNoSnapAvailable } n.debug("Installing snapshot of %d bytes", len(data)) @@ -976,7 +976,7 @@ func (n *raft) InstallSnapshot(data []byte) error { // Remember our latest snapshot file. n.snapfile = sfile - if _, err := n.wal.Compact(snap.lastIndex); err != nil { + if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil { n.setWriteErrLocked(err) n.Unlock() return err @@ -1281,7 +1281,6 @@ func (n *raft) StepDown(preferred ...string) error { n.debug("Being asked to stepdown") // See if we have up to date followers. - nowts := time.Now().UnixNano() maybeLeader := noLeader if len(preferred) > 0 { if preferred[0] != _EMPTY_ { @@ -1291,20 +1290,42 @@ func (n *raft) StepDown(preferred ...string) error { } } - for peer, ps := range n.peers { - // If not us and alive and caughtup. - if peer != n.id && (nowts-ps.ts) < int64(hbInterval*3) { - if maybeLeader != noLeader && maybeLeader != peer { - continue - } - if si, ok := n.s.nodeToInfo.Load(peer); !ok || si.(nodeInfo).offline { - continue - } - n.debug("Looking at %q which is %v behind", peer, time.Duration(nowts-ps.ts)) - maybeLeader = peer - break + // Can't pick ourselves. + if maybeLeader == n.id { + maybeLeader = noLeader + preferred = nil + } + + nowts := time.Now().UnixNano() + + // If we have a preferred check it first. + if maybeLeader != noLeader { + var isHealthy bool + if ps, ok := n.peers[maybeLeader]; ok { + si, ok := n.s.nodeToInfo.Load(maybeLeader) + isHealthy = ok && !si.(nodeInfo).offline && (nowts-ps.ts) < int64(hbInterval*3) + } + if !isHealthy { + maybeLeader = noLeader } } + + // If we do not have a preferred at this point pick the first healthy one. + // Make sure not ourselves. + if maybeLeader == noLeader { + for peer, ps := range n.peers { + if peer == n.id { + continue + } + si, ok := n.s.nodeToInfo.Load(peer) + isHealthy := ok && !si.(nodeInfo).offline && (nowts-ps.ts) < int64(hbInterval*3) + if isHealthy { + maybeLeader = peer + break + } + } + } + stepdown := n.stepdown n.Unlock()