Improvements to meta raft layer around snapshots and recovery.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-10-11 18:54:30 -07:00
parent 5e1276c27e
commit bbffd71c4a
5 changed files with 169 additions and 112 deletions

View File

@@ -463,9 +463,16 @@ func (s *Server) setJetStreamDisabled() {
}
}
func (s *Server) handleOutOfSpace(stream string) {
func (s *Server) handleOutOfSpace(mset *stream) {
if s.JetStreamEnabled() && !s.jetStreamOOSPending() {
s.Errorf("JetStream out of resources, will be DISABLED")
var stream string
if mset != nil {
stream = mset.name()
s.Errorf("JetStream out of %s resources, will be DISABLED", mset.Store().Type())
} else {
s.Errorf("JetStream out of resources, will be DISABLED")
}
go s.DisableJetStream()
adv := &JSServerOutOfSpaceAdvisory{

View File

@@ -753,9 +753,19 @@ func (js *jetStream) monitorCluster() {
isLeader bool
lastSnap []byte
lastSnapTime time.Time
isRecovering bool
beenLeader bool
)
// Set to true to start.
isRecovering = true
// Snapshotting function.
doSnapshot := func() {
// Suppress during recovery.
if isRecovering {
return
}
if snap := js.metaSnapshot(); !bytes.Equal(lastSnap, snap) {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap = snap
@@ -764,9 +774,6 @@ func (js *jetStream) monitorCluster() {
}
}
isRecovering := true
beenLeader := false
for {
select {
case <-s.quitCh:
@@ -787,7 +794,7 @@ func (js *jetStream) monitorCluster() {
// Since we received one make sure we have our own since we do not store
// our meta state outside of raft.
doSnapshot()
} else if nb > uint64(len(lastSnap)*4) {
} else if lls := len(lastSnap); nb > uint64(lls*8) && lls > 0 {
doSnapshot()
}
}
@@ -1743,7 +1750,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
s.Debugf("Got error processing JetStream msg: %v", err)
}
if isOutOfSpaceErr(err) {
s.handleOutOfSpace(mset.name())
s.handleOutOfSpace(mset)
return err
}
}
@@ -4539,7 +4546,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
}
if err != nil && isOutOfSpaceErr(err) {
s.handleOutOfSpace(name)
s.handleOutOfSpace(mset)
}
return err
@@ -4660,7 +4667,7 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) {
mset.mu.Lock()
state := mset.store.State()
sreq := mset.calculateSyncRequest(&state, snap)
s, js, subject, n, name := mset.srv, mset.js, mset.sa.Sync, mset.node, mset.cfg.Name
s, js, subject, n := mset.srv, mset.js, mset.sa.Sync, mset.node
mset.mu.Unlock()
// Just return if up to date or already exceeded limits.
@@ -4767,7 +4774,7 @@ RETRY:
return
}
} else if isOutOfSpaceErr(err) {
s.handleOutOfSpace(name)
s.handleOutOfSpace(mset)
return
} else if err == NewJSInsufficientResourcesError() {
if mset.js.limitsExceeded(mset.cfg.Storage) {

View File

@@ -25,7 +25,6 @@ import (
"os"
"path"
"reflect"
"runtime"
"strconv"
"strings"
"sync"
@@ -5845,18 +5844,8 @@ func TestJetStreamClusterMultiRestartBug(t *testing.T) {
c.waitOnStreamLeader("$G", "TEST")
s = c.serverByName(s.Name())
opts = s.getOpts()
c.waitOnStreamCurrent(s, "$G", "TEST")
snaps, err := ioutil.ReadDir(path.Join(opts.StoreDir, JetStreamStoreDir, "$SYS", "_js_", "_meta_", "snapshots"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(snaps) == 0 {
t.Fatalf("Expected a meta snapshot for the restarted server")
}
// Now restart them all..
c.stopAll()
c.restartAll()
@@ -5868,8 +5857,12 @@ func TestJetStreamClusterMultiRestartBug(t *testing.T) {
defer nc.Close()
// Make sure the replicas are current.
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
si, _ := js.StreamInfo("TEST")
js2, err := nc.JetStream(nats.MaxWait(250 * time.Millisecond))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkFor(t, 10*time.Second, 250*time.Millisecond, func() error {
si, _ := js2.StreamInfo("TEST")
if si == nil || si.Cluster == nil {
t.Fatalf("Did not get stream info")
}
@@ -8497,76 +8490,6 @@ func TestJetStreamDeadlockOnVarz(t *testing.T) {
wg.Wait()
}
// Make sure when we try to hard reset a stream state in a cluster that we also re-create the consumers.
func TestJetStreamClusterStreamReset(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.*"},
Replicas: 2,
Retention: nats.WorkQueuePolicy,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
numRequests := 20
for i := 0; i < numRequests; i++ {
js.Publish("foo.created", []byte("REQ"))
}
// Durable.
sub, err := js.SubscribeSync("foo.created", nats.Durable("d1"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()
si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != uint64(numRequests) {
t.Fatalf("Expected %d msgs, got bad state: %+v", numRequests, si.State)
}
// Let settle a bit.
time.Sleep(250 * time.Millisecond)
// Grab number go routines.
base := runtime.NumGoroutine()
// Grab a server that is the consumer leader for the durable.
cl := c.consumerLeader("$G", "TEST", "d1")
mset, err := cl.GlobalAccount().lookupStream("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Do a hard reset here by hand.
mset.resetClusteredState()
// Wait til we have the leader elected.
c.waitOnConsumerLeader("$G", "TEST", "d1")
// So do not wait 10s in call in checkFor.
js2, _ := nc.JetStream(nats.MaxWait(250 * time.Millisecond))
// Make sure we can get the consumer info eventually.
checkFor(t, 5*time.Second, 200*time.Millisecond, func() error {
_, err := js2.ConsumerInfo("TEST", "d1")
return err
})
// Grab number go routines.
if after := runtime.NumGoroutine(); base > after {
t.Fatalf("Expected %d go routines, got %d", base, after)
}
}
// Issue #2397
func TestJetStreamClusterStreamCatchupNoState(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R2S", 2)
@@ -9245,7 +9168,6 @@ func TestJetStreamAppendOnly(t *testing.T) {
if resp.Error == nil {
t.Fatalf("Expected an error")
}
}
// Support functions

View File

@@ -2042,8 +2042,6 @@ func TestNoRaceJetStreamClusterSuperClusterRIPStress(t *testing.T) {
nc, js := jsClientConnect(t, s)
defer nc.Close()
fmt.Printf("CONNECT is %v\n", s.ClientURL())
scm := make(map[string][]string)
// Create 50 streams per cluster.
@@ -3561,3 +3559,109 @@ func TestNoRaceJetStreamClusterMaxConsumersAndDirect(t *testing.T) {
return nil
})
}
// Make sure when we try to hard reset a stream state in a cluster that we also re-create the consumers.
func TestNoRaceJetStreamClusterStreamReset(t *testing.T) {
// Speed up raft
minElectionTimeout = 250 * time.Millisecond
maxElectionTimeout = time.Second
hbInterval = 50 * time.Millisecond
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.*"},
Replicas: 2,
Retention: nats.WorkQueuePolicy,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
numRequests := 20
for i := 0; i < numRequests; i++ {
js.Publish("foo.created", []byte("REQ"))
}
// Durable.
sub, err := js.SubscribeSync("foo.created", nats.Durable("d1"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()
si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != uint64(numRequests) {
t.Fatalf("Expected %d msgs, got bad state: %+v", numRequests, si.State)
}
// Let settle a bit for Go routine checks.
time.Sleep(250 * time.Millisecond)
// Grab number go routines.
base := runtime.NumGoroutine()
// Make the consumer busy here by async sending a bunch of messages.
for i := 0; i < numRequests*10; i++ {
js.PublishAsync("foo.created", []byte("REQ"))
}
// Grab a server that is the consumer leader for the durable.
cl := c.consumerLeader("$G", "TEST", "d1")
mset, err := cl.GlobalAccount().lookupStream("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Do a hard reset here by hand.
mset.resetClusteredState()
// Wait til we have the consumer leader re-elected.
c.waitOnConsumerLeader("$G", "TEST", "d1")
// So we do not wait all 10s in each call to ConsumerInfo.
js2, _ := nc.JetStream(nats.MaxWait(250 * time.Millisecond))
// Make sure we can get the consumer info eventually.
checkFor(t, 5*time.Second, 200*time.Millisecond, func() error {
_, err := js2.ConsumerInfo("TEST", "d1")
return err
})
// Grab number go routines.
if after := runtime.NumGoroutine(); base > after {
t.Fatalf("Expected %d go routines, got %d", base, after)
}
// Simulate a low level write error on our consumer and make sure we can recover etc.
cl = c.consumerLeader("$G", "TEST", "d1")
mset, err = cl.GlobalAccount().lookupStream("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
o := mset.lookupConsumer("d1")
if o == nil {
t.Fatalf("Did not retrieve consumer")
}
node := o.raftNode().(*raft)
if node == nil {
t.Fatalf("could not retrieve the raft node for consumer")
}
nc.Close()
node.setWriteErr(io.ErrShortWrite)
c.stopAll()
c.restartAll()
c.waitOnStreamLeader("$G", "TEST")
c.waitOnConsumerLeader("$G", "TEST", "d1")
}

View File

@@ -221,12 +221,21 @@ type lps struct {
}
const (
minElectionTimeout = 2 * time.Second
maxElectionTimeout = 5 * time.Second
minCampaignTimeout = 100 * time.Millisecond
maxCampaignTimeout = 4 * minCampaignTimeout
hbInterval = 500 * time.Millisecond
lostQuorumInterval = hbInterval * 5
minElectionTimeoutDefault = 2 * time.Second
maxElectionTimeoutDefault = 5 * time.Second
minCampaignTimeoutDefault = 100 * time.Millisecond
maxCampaignTimeoutDefault = 4 * minCampaignTimeoutDefault
hbIntervalDefault = 500 * time.Millisecond
lostQuorumIntervalDefault = hbIntervalDefault * 5
)
var (
minElectionTimeout = minElectionTimeoutDefault
maxElectionTimeout = maxElectionTimeoutDefault
minCampaignTimeout = minCampaignTimeoutDefault
maxCampaignTimeout = maxCampaignTimeoutDefault
hbInterval = hbIntervalDefault
lostQuorumInterval = lostQuorumIntervalDefault
)
type RaftConfig struct {
@@ -388,19 +397,18 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
key := sha256.Sum256([]byte(n.group))
n.hh, _ = highwayhash.New64(key[:])
if term, vote, err := n.readTermVote(); err != nil && term > 0 {
if term, vote, err := n.readTermVote(); err == nil && term > 0 {
n.term = term
n.vote = vote
}
if err := os.MkdirAll(path.Join(cfg.Store, snapshotsDir), 0750); err != nil {
if err := os.MkdirAll(path.Join(n.sd, snapshotsDir), 0750); err != nil {
return nil, fmt.Errorf("could not create snapshots directory - %v", err)
}
// Can't recover snapshots if memory based.
if _, ok := n.wal.(*memStore); ok {
snapDir := path.Join(n.sd, snapshotsDir, "*")
os.RemoveAll(snapDir)
os.Remove(path.Join(n.sd, snapshotsDir, "*"))
} else {
// See if we have any snapshots and if so load and process on startup.
n.setupLastSnapshot()
@@ -937,6 +945,7 @@ func (n *raft) InstallSnapshot(data []byte) error {
n.setWriteErr(err)
return err
}
n.Unlock()
psnaps, _ := ioutil.ReadDir(snapDir)
@@ -1110,7 +1119,7 @@ func (n *raft) isCurrent() bool {
// Check to see that we have heard from the current leader lately.
if n.leader != noLeader && n.leader != n.id && n.catchup == nil {
const okInterval = int64(hbInterval) * 2
okInterval := int64(hbInterval) * 2
ts := time.Now().UnixNano()
if ps := n.peers[n.leader]; ps != nil && ps.ts > 0 && (ts-ps.ts) <= okInterval {
return true
@@ -3060,18 +3069,26 @@ func (n *raft) readTermVote() (term uint64, voted string, err error) {
// Lock should be held.
func (n *raft) setWriteErrLocked(err error) {
// Ignore if already set.
if n.werr == err {
return
}
// Ignore non-write errors.
if err != nil {
if err == ErrStoreClosed || err == ErrStoreEOF || err == ErrInvalidSequence || err == ErrStoreMsgNotFound || err == errNoPending {
return
}
// If this is a not found report but do not disable.
if os.IsNotExist(err) {
n.error("Resource not found: %v", err)
return
}
n.error("Critical write error: %v", err)
}
n.werr = err
// For now since this can be happening all under the covers, we will call up and disable JetStream.
n.Unlock()
n.s.handleOutOfSpace(_EMPTY_)
n.Lock()
go n.s.handleOutOfSpace(nil)
}
// Capture our write error if any and hold.
@@ -3101,7 +3118,7 @@ func (n *raft) fileWriter() {
n.RUnlock()
if err := ioutil.WriteFile(tvf, buf[:], 0640); err != nil {
n.setWriteErr(err)
n.error("Error writing term and vote file for %q: %v", n.group, err)
n.warn("Error writing term and vote file for %q: %v", n.group, err)
}
case <-n.wpsch:
n.RLock()
@@ -3109,7 +3126,7 @@ func (n *raft) fileWriter() {
n.RUnlock()
if err := ioutil.WriteFile(psf, buf, 0640); err != nil {
n.setWriteErr(err)
n.error("Error writing peer state file for %q: %v", n.group, err)
n.warn("Error writing peer state file for %q: %v", n.group, err)
}
}
}