Merge pull request #1933 from nats-io/stable

General stability improvements.
This commit is contained in:
Derek Collison
2021-02-24 10:26:32 -07:00
committed by GitHub
9 changed files with 80 additions and 50 deletions

View File

@@ -40,7 +40,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.2.0-beta.90"
VERSION = "2.2.0-beta.92"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -755,7 +755,7 @@ func (o *consumer) unsubscribe(sub *subscription) {
if sub == nil || o.client == nil {
return
}
o.client.unsubscribe(o.client.acc, sub, true, true)
o.client.processUnsub(sub.sid)
}
// We need to make sure we protect access to the sendq.
@@ -2582,6 +2582,7 @@ func (o *consumer) stopWithFlags(dflag, doSignal, advisory bool) error {
err = store.Stop()
}
}
return err
}

View File

@@ -1551,16 +1551,17 @@ func (s *Server) sysUnsubscribe(sub *subscription) {
if sub == nil {
return
}
s.mu.Lock()
if !s.eventsEnabled() {
s.mu.Unlock()
return
}
c := s.sys.client
c := sub.client
s.mu.Unlock()
c.processUnsub(sub.sid)
if c != nil {
c.processUnsub(sub.sid)
}
}
// This will generate the tracking subject for remote latency from the response subject.

View File

@@ -2903,7 +2903,7 @@ func (fs *fileStore) Stop() error {
fs.lmb = nil
fs.checkAndFlushAllBlocks()
fs.closeAllMsgBlocks(true)
fs.closeAllMsgBlocks(false)
if fs.syncTmr != nil {
fs.syncTmr.Stop()
@@ -3781,7 +3781,6 @@ func (o *consumerFileStore) Stop() error {
o.mu.Unlock()
if ifd != nil {
ifd.Sync()
ifd.Close()
}

View File

@@ -428,6 +428,9 @@ func (s *Server) shutdownJetStream() {
return
}
s.Noticef("Initiating JetStream Shutdown...")
defer s.Noticef("JetStream shutdown")
var _a [512]*Account
accounts := _a[:0]
@@ -450,7 +453,6 @@ func (s *Server) shutdownJetStream() {
js.mu.Lock()
js.accounts = nil
var n RaftNode
if cc := js.cluster; cc != nil {
js.stopUpdatesSub()
@@ -458,18 +460,9 @@ func (s *Server) shutdownJetStream() {
cc.c.closeConnection(ClientClosed)
cc.c = nil
}
n = cc.meta
cc.meta = nil
}
js.mu.Unlock()
// If we still have our raft group, do a snapshot on exit.
if n != nil {
if snap := js.metaSnapshot(); len(snap) > 0 {
n.InstallSnapshot(snap)
}
n.Stop()
}
}
// JetStreamConfig will return the current config. Useful if the system

View File

@@ -1526,7 +1526,7 @@ func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) {
} else {
// We are stepping down.
// Make sure if we are doing so because we have lost quorum that we send the appropriate advisories.
if node := mset.raftNode(); node != nil && !node.Quorum() && time.Since(node.Created()) > time.Second {
if node := mset.raftNode(); node != nil && !node.Quorum() && time.Since(node.Created()) > 5*time.Second {
s.sendStreamLostQuorumAdvisory(mset)
}
}
@@ -2616,7 +2616,7 @@ func (js *jetStream) processConsumerLeaderChange(o *consumer, isLeader bool) {
} else {
// We are stepping down.
// Make sure if we are doing so because we have lost quorum that we send the appropriate advisories.
if node := o.raftNode(); node != nil && !node.Quorum() && time.Since(node.Created()) > time.Second {
if node := o.raftNode(); node != nil && !node.Quorum() && time.Since(node.Created()) > 5*time.Second {
s.sendConsumerLostQuorumAdvisory(o)
}
}
@@ -2664,7 +2664,7 @@ func (s *Server) sendConsumerLostQuorumAdvisory(o *consumer) {
return
}
s.Warnf("JetStream cluster consumer '%s > %s >%s' has NO quorum, stalled.", acc.GetName(), stream, consumer)
s.Warnf("JetStream cluster consumer '%s > %s > %s' has NO quorum, stalled.", acc.GetName(), stream, consumer)
subj := JSAdvisoryConsumerQuorumLostPre + "." + stream + "." + consumer
adv := &JSConsumerQuorumLostAdvisory{

View File

@@ -2717,7 +2717,7 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) {
}
// Make sure we received our lost quorum advisories.
adv, _ := ssub.NextMsg(5 * time.Second)
adv, _ := ssub.NextMsg(10 * time.Second)
if adv == nil {
t.Fatalf("Expected to receive a stream quorum lost advisory")
}
@@ -2728,19 +2728,6 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) {
if len(lqa.Replicas) != 2 {
t.Fatalf("Expected reports for both replicas, only got %d", len(lqa.Replicas))
}
// Consumer too. Since we do not know if the consumer leader was not the one shutdown
// we should wait for a bit for the system to detect.
adv, _ = csub.NextMsg(5 * time.Second)
if adv == nil {
t.Fatalf("Expected to receive a consumer quorum lost advisory")
}
var clqa JSConsumerQuorumLostAdvisory
if err := json.Unmarshal(adv.Data, &clqa); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(clqa.Replicas) != 2 {
t.Fatalf("Expected reports for both replicas, only got %d", len(clqa.Replicas))
}
// Check to make sure we do not rapid fire these.
time.Sleep(500 * time.Millisecond)

View File

@@ -130,6 +130,7 @@ type raft struct {
acks map[uint64]map[string]struct{}
elect *time.Timer
active time.Time
llqrt time.Time
term uint64
pterm uint64
pindex uint64
@@ -371,14 +372,14 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
for index := state.FirstSeq; index <= state.LastSeq; index++ {
ae, err := n.loadEntry(index)
if ae.pindex != index-1 {
n.warn("Corrupt WAL, truncating")
n.wal.Truncate(index - 1)
break
}
if err != nil {
panic("err loading entry from WAL")
}
if ae.pindex != index-1 {
n.warn("Corrupt WAL, truncating and fixing")
n.truncateWal(ae)
break
}
n.processAppendEntry(ae, nil)
}
}
@@ -473,6 +474,28 @@ func (s *Server) reloadDebugRaftNodes() {
s.rnMu.RUnlock()
}
func (s *Server) shutdownRaftNodes() {
if s == nil {
return
}
var nodes []RaftNode
s.rnMu.RLock()
if len(s.raftNodes) > 0 {
s.Debugf("Shutting down all raft nodes")
}
for _, n := range s.raftNodes {
nodes = append(nodes, n)
}
s.rnMu.RUnlock()
for _, node := range nodes {
if node.Leader() {
node.StepDown()
}
node.Stop()
}
}
func (s *Server) transferRaftLeaders() bool {
if s == nil {
return false
@@ -1619,7 +1642,7 @@ func (n *raft) runCatchup(ar *appendEntryResponse, indexUpdatesC <-chan uint64)
ae, err := n.loadEntry(next)
if err != nil {
if err != ErrStoreEOF {
n.warn("Got an error loading %d index: %v", next, err)
n.debug("Got an error loading %d index: %v", next, err)
}
return true
}
@@ -1765,6 +1788,10 @@ func (n *raft) applyCommit(index uint64) error {
// FIXME(dlc) - Can keep this in memory if this too slow.
ae, err := n.loadEntry(index)
if err != nil {
state := n.wal.State()
if index < state.FirstSeq {
return nil
}
if err != ErrStoreClosed {
n.warn("Got an error loading %d index: %v", index, err)
}
@@ -2017,6 +2044,18 @@ func (n *raft) attemptStepDown(newLeader string) {
}
}
func (n *raft) truncateWal(ae *appendEntry) {
n.debug("Truncating and repairing WAL")
tindex := ae.pindex - 1
n.wal.Truncate(tindex)
n.pindex = tindex
if nae, _ := n.loadEntry(tindex); nae != nil {
n.pterm = nae.term
} else {
n.pterm = ae.term
}
}
// processAppendEntry will process an appendEntry.
func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.Lock()
@@ -2140,9 +2179,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
if eae, err := n.loadEntry(ae.pindex); err == nil && eae != nil {
// If terms mismatched, delete that entry and all others past it.
if ae.pterm > eae.pterm {
n.wal.Truncate(ae.pindex)
n.pindex = ae.pindex
n.pterm = ae.pterm
n.truncateWal(ae)
ar = &appendEntryResponse{n.pterm, n.pindex, n.id, false, _EMPTY_}
} else {
ar = &appendEntryResponse{ae.pterm, ae.pindex, n.id, true, _EMPTY_}
@@ -2159,6 +2196,13 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
// so make sure this is a snapshot entry. If it is not start the catchup process again since it
// means we may have missed additional messages.
if catchingUp {
// Check if only our terms do not match here.
if ae.pindex == n.pindex {
n.truncateWal(ae)
n.cancelCatchup()
n.Unlock()
return
}
// Snapshots and peerstate will always be together when a leader is catching us up.
if len(ae.entries) != 2 || ae.entries[0].Type != EntrySnapshot || ae.entries[1].Type != EntryPeerState {
n.warn("Expected first catchup entry to be a snapshot and peerstate, will retry")
@@ -2743,8 +2787,11 @@ func (n *raft) switchToCandidate() {
if n.state != Candidate {
n.debug("Switching to candidate")
} else if n.lostQuorumLocked() {
// We signal to the upper layers such that can alert on quorum lost.
n.updateLeadChange(false)
if time.Since(n.llqrt) > 20*time.Second {
// We signal to the upper layers such that can alert on quorum lost.
n.updateLeadChange(false)
n.llqrt = time.Now()
}
}
// Increment the term.
n.term++

View File

@@ -1647,8 +1647,8 @@ func (s *Server) Start() {
// Shutdown will shutdown the server instance by kicking out the AcceptLoop
// and closing all associated clients.
func (s *Server) Shutdown() {
// Transfer off any raft nodes that we are a leader.
s.transferRaftLeaders()
// Transfer off any raft nodes that we are a leader by shutting them all down.
s.shutdownRaftNodes()
// Shutdown the eventing system as needed.
// This is done first to send out any messages for
@@ -1656,9 +1656,6 @@ func (s *Server) Shutdown() {
// eventing items associated with accounts.
s.shutdownEventing()
// Now check jetstream.
s.shutdownJetStream()
s.mu.Lock()
// Prevent issues with multiple calls.
if s.shutdown {
@@ -1678,7 +1675,12 @@ func (s *Server) Shutdown() {
s.grMu.Lock()
s.grRunning = false
s.grMu.Unlock()
s.mu.Unlock()
// Now check jetstream.
s.shutdownJetStream()
s.mu.Lock()
conns := make(map[uint64]*client)
// Copy off the clients