Stabilize restart/catchup for raft.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-01-16 05:47:48 -08:00
parent 367d000314
commit b606dceb59
6 changed files with 138 additions and 75 deletions

View File

@@ -228,13 +228,13 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) {
func (mset *Stream) addConsumer(config *ConsumerConfig, oname string, node RaftNode) (*Consumer, error) {
mset.mu.RLock()
jsa := mset.jsa
s, jsa := mset.srv, mset.jsa
mset.mu.RUnlock()
// If we do not have the consumer assigned to us in cluster mode we can not proceed.
// Running in single server mode this always returns true.
if oname != _EMPTY_ && !jsa.consumerAssigned(mset.Name(), oname) {
return nil, ErrJetStreamNotAssigned
s.Debugf("Consumer %q > %q does not seem to be assigned to this server", mset.Name(), oname)
}
if config == nil {

View File

@@ -176,11 +176,14 @@ func (s *Server) EnableJetStream(config *JetStreamConfig) error {
// If we are in clustered mode go ahead and start the meta controller.
if !s.standAloneMode() {
if err := s.enableJetStreamClustering(); err != nil {
s.Errorf("Could not create JetStream cluster: %v", err)
return err
}
}
return s.enableJetStreamAccounts()
}
func (s *Server) enableJetStreamAccounts() error {
// If we have no configured accounts setup then setup imports on global account.
if s.globalAccountOnly() {
if err := s.GlobalAccount().EnableJetStream(nil); err != nil {
@@ -189,7 +192,6 @@ func (s *Server) EnableJetStream(config *JetStreamConfig) error {
} else if err := s.configAllJetStreamAccounts(); err != nil {
return fmt.Errorf("Error enabling jetstream on configured accounts: %v", err)
}
return nil
}

View File

@@ -293,7 +293,7 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool {
isCurrent := rg.node.Current()
if isCurrent {
// Check if we are processing a snapshot catchup.
// Check if we are processing a snapshot and are catching up.
acc, err := cc.s.LookupAccount(account)
if err != nil {
return false
@@ -505,8 +505,9 @@ func (jsa *jsAccount) streamAssigned(stream string) bool {
return false
}
js.mu.RLock()
defer js.mu.RUnlock()
return js.cluster.isStreamAssigned(acc, stream)
assigned := js.cluster.isStreamAssigned(acc, stream)
js.mu.RUnlock()
return assigned
}
// Read lock should be held.
@@ -639,6 +640,12 @@ func (js *jetStream) monitorCluster() {
case <-qch:
return
case ce := <-ach:
if ce == nil {
// Signals we have replayed all of our metadata.
// No-op for now.
s.Debugf("Recovered JetStream cluster metadata")
continue
}
// FIXME(dlc) - Deal with errors.
if hadSnapshot, err := js.applyMetaEntries(ce.Entries); err == nil {
n.Applied(ce.Index)
@@ -965,6 +972,10 @@ func (js *jetStream) monitorStreamRaftGroup(mset *Stream, sa *streamAssignment)
case <-qch:
return
case ce := <-ach:
// No special processing needed for when we are caught up on restart.
if ce == nil {
continue
}
// FIXME(dlc) - capture errors.
if hadSnapshot, err := js.applyStreamEntries(mset, ce); err == nil {
n.Applied(ce.Index)
@@ -1001,10 +1012,10 @@ func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry) (bool,
if err != nil {
panic(err.Error())
}
if lseq == 0 && mset.lastSeq() != 0 { // Very first msg
// Skip by hand here since first msg special case.
if lseq == 0 && mset.lastSeq() != 0 {
continue
}
if err := mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts); err != nil {
js.srv.Debugf("Got error processing JetStream msg: %v", err)
}
@@ -1211,7 +1222,6 @@ func (js *jetStream) processClusterCreateStream(sa *streamAssignment) {
// This is an error condition.
if err != nil {
js.srv.Debugf("Stream create failed for %q - %q: %v\n", sa.Client.Account, sa.Config.Name, err)
js.mu.Lock()
sa.err = err
sa.responded = true
@@ -1557,6 +1567,10 @@ func (js *jetStream) monitorConsumerRaftGroup(o *Consumer, ca *consumerAssignmen
case <-qch:
return
case ce := <-ach:
// No special processing needed for when we are caught up on restart.
if ce == nil {
continue
}
if _, err := js.applyConsumerEntries(o, ce); err == nil {
n.Applied(ce.Index)
last = ce.Index
@@ -1706,7 +1720,7 @@ func (js *jetStream) processStreamAssignmentResults(sub *subscription, c *client
// Check if this failed.
// TODO(dlc) - Could have mixed results, should track per peer.
if result.Response.Error != nil {
// So while we are delting we will not respond to list/names requests.
// Set sa.err while we are deleting so we will not respond to list/names requests.
sa.err = ErrJetStreamNotAssigned
cc.meta.Propose(encodeDeleteStreamAssignment(sa))
}

View File

@@ -22,6 +22,7 @@ import (
"os"
"path"
"sync"
"sync/atomic"
"time"
)
@@ -118,6 +119,7 @@ type raft struct {
hash string
s *Server
c *client
dflag bool
// Subjects for votes, updates, replays.
vsubj string
@@ -126,9 +128,7 @@ type raft struct {
areply string
// For when we need to catch up as a follower.
catchup *subscription
cterm uint64
cindex uint64
catchup *catchupState
// For leader or server catching up a follower.
progress map[string]chan uint64
@@ -150,6 +150,17 @@ type raft struct {
stepdown chan string
}
// cacthupState structure that holds our subscription, and catchup term and index
// as well as starting term and index and how many updates we have seen.
type catchupState struct {
sub *subscription
cterm uint64
cindex uint64
pterm uint64
pindex uint64
hbs int
}
// lps holds peer state of last time and last index replicated.
type lps struct {
ts int64
@@ -217,6 +228,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
return nil, ErrNoSysAccount
}
sendq := s.sys.sendq
sacc := s.sys.account
hash := s.sys.shash
s.mu.Unlock()
@@ -250,6 +262,11 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
leadc: make(chan bool, 4),
stepdown: make(chan string),
}
n.c.registerWithAccount(sacc)
if atomic.LoadInt32(&s.logging.debug) > 0 {
n.dflag = true
}
if term, vote, err := n.readTermVote(); err != nil && term > 0 {
n.term = term
@@ -266,18 +283,24 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
}
// Replay the log.
// Since doing this in place we need to make sure we have enough room on the applyc.
if uint64(cap(n.applyc)) < state.Msgs {
n.applyc = make(chan *CommittedEntry, state.Msgs)
needed := state.Msgs + 1 // 1 is for nil to mark end of replay.
if uint64(cap(n.applyc)) < needed {
n.applyc = make(chan *CommittedEntry, needed)
}
for index := state.FirstSeq; index <= state.LastSeq; index++ {
ae, err := n.loadEntry(index)
if err != nil {
panic("err loading index")
panic("err loading entry from WAL")
}
n.processAppendEntry(ae, nil)
}
}
// Send nil entry to signal the upper layers we are done doing replay/restore.
n.applyc <- nil
// Setup our internal subscriptions.
if err := n.createInternalSubs(); err != nil {
n.shutdown(true)
return nil, err
@@ -535,6 +558,12 @@ func (n *raft) isCurrent() bool {
if n.state == Leader {
return true
}
// Check here on catchup status.
if cs := n.catchup; cs != nil && n.pterm >= cs.cterm && n.pindex >= cs.cindex {
n.cancelCatchup()
}
// Check to see that we have heard from the current leader lately.
if n.leader != noLeader && n.leader != n.id && n.catchup == nil {
const okInterval = int64(hbInterval) * 2
@@ -721,13 +750,13 @@ func (n *raft) newInbox(cn string) string {
b[i] = digits[l%base]
l /= base
}
return fmt.Sprintf(raftReplySubj, n.group, n.hash, b[:])
return fmt.Sprintf(raftReplySubj, b[:])
}
const (
raftVoteSubj = "$SYS.NRG.%s.%s.V"
raftAppendSubj = "$SYS.NRG.%s.%s.A"
raftReplySubj = "$SYS.NRG.%s.%s.%s"
raftVoteSubj = "$NRG.V.%s.%s"
raftAppendSubj = "$NRG.E.%s.%s"
raftReplySubj = "$NRG.R.%s"
)
func (n *raft) createInternalSubs() error {
@@ -795,8 +824,10 @@ func (n *raft) run() {
}
func (n *raft) debug(format string, args ...interface{}) {
nf := fmt.Sprintf("RAFT [%s - %s] %s", n.id, n.group, format)
n.s.Debugf(nf, args...)
if n.dflag {
nf := fmt.Sprintf("RAFT [%s - %s] %s", n.id, n.group, format)
n.s.Debugf(nf, args...)
}
}
func (n *raft) error(format string, args ...interface{}) {
@@ -1211,8 +1242,6 @@ func (n *raft) applyCommit(index uint64) {
}
// Pass to the upper layers if we have normal entries.
if len(committed) > 0 {
// We will block here placing the commit entry on purpose.
// FIXME(dlc) - We should not block here.
select {
case n.applyc <- &CommittedEntry{index, committed}:
default:
@@ -1359,6 +1388,49 @@ func (n *raft) handleAppendEntry(sub *subscription, c *client, subject, reply st
n.processAppendEntry(ae, sub)
}
// Lock should be held.
func (n *raft) cancelCatchup() {
n.debug("Canceling catchup subscription since we are now up to date")
n.s.sysUnsubscribe(n.catchup.sub)
n.catchup = nil
}
// catchupStalled will try to determine if we are stalled. This is called
// on a new entry from our leader.
// Lock should be held.
func (n *raft) catchupStalled() bool {
if n.catchup == nil {
return false
}
const maxHBs = 3
if n.catchup.pindex == n.pindex {
n.catchup.hbs++
} else {
n.catchup.pindex = n.pindex
n.catchup.hbs = 0
}
return n.catchup.hbs >= maxHBs
}
// Lock should be held.
func (n *raft) createCatchup(ae *appendEntry) string {
// Cleanup any old ones.
if n.catchup != nil {
n.s.sysUnsubscribe(n.catchup.sub)
}
// Snapshot term and index.
n.catchup = &catchupState{
cterm: ae.pterm,
cindex: ae.pindex,
pterm: n.pterm,
pindex: n.pindex,
}
inbox := n.newInbox(n.s.ClusterName())
sub, _ := n.s.sysSubscribe(inbox, n.handleAppendEntry)
n.catchup.sub = sub
return inbox
}
// processAppendEntry will process an appendEntry.
func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.Lock()
@@ -1368,10 +1440,10 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
return
}
// Is this a new entry or a replay on startup?
isNew := sub != nil && sub != n.catchup
// Catching up state.
catchingUp := n.catchup != nil
// Is this a new entry or a replay on startup?
isNew := sub != nil && (!catchingUp || sub != n.catchup.sub)
if isNew {
n.resetElectionTimeout()
@@ -1394,18 +1466,24 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
// Check state if we are catching up.
if catchingUp && isNew {
if n.pterm >= n.cterm && n.pindex >= n.cindex {
// If we are here we are good, so if we have a catchup we can shut that down.
n.debug("Canceling catchup subscription since we are now up to date")
n.s.sysUnsubscribe(n.catchup)
n.catchup = nil
if cs := n.catchup; cs != nil && n.pterm >= cs.cterm && n.pindex >= cs.cindex {
// If we are here we are good, so if we have a catchup pending we can cancel.
n.cancelCatchup()
catchingUp = false
n.cterm, n.cindex = 0, 0
} else {
var ar *appendEntryResponse
var inbox string
// Check to see if we are stalled. If so recreate our catchup state and resend response.
if n.catchupStalled() {
n.debug("Catchup may be stalled, will request again")
inbox = n.createCatchup(ae)
ar = &appendEntryResponse{n.pterm, n.pindex, n.id, false, _EMPTY_}
}
// Ignore new while catching up or replaying.
// This is ok since builtin catchup should be small.
// For larger items will do this outside.
n.Unlock()
if ar != nil {
n.sendRPC(ae.reply, inbox, ar.encode())
}
return
}
}
@@ -1443,23 +1521,10 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.debug("AppendEntry did not match %d %d with %d %d", ae.pterm, ae.pindex, n.pterm, n.pindex)
// Reset our term.
n.term = n.pterm
// Snapshot term and index
n.cterm, n.cindex = ae.pterm, ae.pindex
// Setup our subscription for catching up.
// Cleanup any old ones.
if n.catchup != nil {
n.s.sysUnsubscribe(n.catchup)
}
inbox := n.newInbox(n.s.ClusterName())
var err error
if n.catchup, err = n.s.sysSubscribe(inbox, n.handleAppendEntry); err != nil {
n.Unlock()
n.debug("Error subscribing to our inbox for catchup: %v", err)
return
}
// Setup our state for catching up.
inbox := n.createCatchup(ae)
ar := appendEntryResponse{n.pterm, n.pindex, n.id, false, _EMPTY_}
n.Unlock()
n.sendRPC(ae.reply, inbox, ar.encode())
return
}

View File

@@ -153,7 +153,7 @@ func (a *Account) addStream(config *StreamConfig, fsConfig *FileStoreConfig, sa
// If we do not have the stream assigned to us in cluster mode we can not proceed.
// Running in single server mode this always returns true.
if !jsa.streamAssigned(config.Name) {
return nil, ErrJetStreamNotAssigned
s.Debugf("Stream %q does not seem to be assigned to this server", config.Name)
}
// Sensible defaults.

View File

@@ -24,7 +24,6 @@ import (
"testing"
"time"
"github.com/nats-io/nats-server/v2/logger"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
)
@@ -116,17 +115,8 @@ func TestJetStreamClusterSingleReplicaStreams(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R1S", 3)
defer c.shutdown()
sc := &server.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
}
// Make sure non-leaders error if directly called.
s := c.randomNonLeader()
if _, err := s.GlobalAccount().AddStream(sc); err == nil {
t.Fatalf("Expected an error from a non-leader")
}
// Client based API
s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()
@@ -298,7 +288,10 @@ func TestJetStreamClusterDelete(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
resp, _ := nc.Request(fmt.Sprintf(server.JSApiStreamCreateT, cfg.Name), req, time.Second)
resp, err := nc.Request(fmt.Sprintf(server.JSApiStreamCreateT, cfg.Name), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var scResp server.JSApiStreamCreateResponse
if err := json.Unmarshal(resp.Data, &scResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
@@ -619,6 +612,7 @@ func TestJetStreamClusterMetaSnapshotsMultiChange(t *testing.T) {
// Shut it down.
rs.Shutdown()
time.Sleep(250 * time.Millisecond)
// We want to make changes here that test each delta scenario for the meta snapshots.
// Add new stream and consumer.
@@ -1065,10 +1059,6 @@ func createJetStreamClusterExplicit(t *testing.T, clusterName string, numServers
sn := fmt.Sprintf("S-%d", cp-startClusterPort+1)
conf := fmt.Sprintf(jsClusterTempl, sn, storeDir, clusterName, cp, routeConfig)
s, o := RunServerWithConfig(createConfFile(t, []byte(conf)))
if doLog {
pre := fmt.Sprintf("[S-%d] - ", cp-startClusterPort+1)
s.SetLogger(logger.NewTestLogger(pre, true), true, true)
}
c.servers = append(c.servers, s)
c.opts = append(c.opts, o)
}
@@ -1088,10 +1078,6 @@ func (c *cluster) addInNewServer() *server.Server {
seedRoute := fmt.Sprintf("nats-route://127.0.0.1:%d", c.opts[0].Cluster.Port)
conf := fmt.Sprintf(jsClusterTempl, sn, storeDir, c.name, -1, seedRoute)
s, o := RunServerWithConfig(createConfFile(c.t, []byte(conf)))
if doLog {
pre := fmt.Sprintf("[%s] - ", sn)
s.SetLogger(logger.NewTestLogger(pre, true), true, true)
}
c.servers = append(c.servers, s)
c.opts = append(c.opts, o)
c.checkClusterFormed()
@@ -1141,10 +1127,6 @@ func (c *cluster) restartServer(rs *server.Server) *server.Server {
}
opts = c.opts[index]
s, o := RunServerWithConfig(opts.ConfigFile)
if doLog {
pre := fmt.Sprintf("[%s] - ", s.Name())
s.SetLogger(logger.NewTestLogger(pre, true), true, true)
}
c.servers[index] = s
c.opts[index] = o
return s