Merge pull request #3288 from nats-io/debug_test_failure

[FIXED] JetStream: Some scaling up issues
This commit is contained in:
Ivan Kozlovic
2022-07-26 08:57:17 -06:00
committed by GitHub
4 changed files with 106 additions and 37 deletions

View File

@@ -1775,12 +1775,14 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
}
defer stopDirectMonitoring()
// This is triggered during a scale up from 1 to clustered mode. We need the new followers to catchup,
// similar to how we trigger the catchup mechanism post a backup/restore. It's ok to do here and preferred
// over waiting to be elected, this just queues it up for the new members to see first and trigger the above
// RAFT layer catchup mechanism.
if sendSnapshot && mset != nil && n != nil {
// This is triggered during a scale up from R1 to clustered mode. We need the new followers to catchup,
// similar to how we trigger the catchup mechanism post a backup/restore.
// We can arrive here NOT being the leader, so we send the snapshot only if we are, and in this case
// reset the notion that we need to send the snapshot. If we are not, then the first time the server
// will switch to leader (in the loop below), we will send the snapshot.
if sendSnapshot && isLeader && mset != nil && n != nil {
n.SendSnapshot(mset.stateSnapshot())
sendSnapshot = false
}
for {
select {
@@ -1829,6 +1831,10 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
aq.recycle(&ces)
case isLeader = <-lch:
if isLeader {
if sendSnapshot && mset != nil && n != nil {
n.SendSnapshot(mset.stateSnapshot())
sendSnapshot = false
}
if isRestore {
acc, _ := s.LookupAccount(sa.Client.serviceAccount())
restoreDoneCh = s.processStreamRestore(sa.Client, acc, sa.Config, _EMPTY_, sa.Reply, _EMPTY_)
@@ -1884,8 +1890,17 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
mset.mu.Lock()
ad, md, current := mset.cfg.AllowDirect, mset.cfg.MirrorDirect, mset.isCurrent()
if !current {
mset.mu.Unlock()
continue
const syncThreshold = 90.0
// We are not current, but current means exactly caught up. Under heavy publish
// loads we may never reach this, so check if we are within 90% caught up.
_, c, a := mset.node.Progress()
if p := float64(a) / float64(c) * 100.0; p < syncThreshold {
mset.mu.Unlock()
continue
} else {
s.Debugf("Stream '%s > %s' enabling direct gets at %.0f%% synchronized",
sa.Client.serviceAccount(), sa.Config.Name, p)
}
}
// We are current, cancel monitoring and create the direct subs as needed.
if ad {
@@ -2753,8 +2768,10 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
var needsSetLeader bool
if !alreadyRunning && numReplicas > 1 {
if needsNode {
mset.setLeader(false)
js.createRaftGroup(acc.GetName(), rg, storage)
}
// Start monitoring..
s.startGoRoutine(func() { js.monitorStream(mset, sa, needsNode) })
} else if numReplicas == 1 && alreadyRunning {
// We downgraded to R1. Make sure we cleanup the raft node and the stream monitor.
@@ -2767,13 +2784,12 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
rg.node = nil
js.mu.Unlock()
}
// Call update.
if err = mset.update(cfg); err != nil {
s.Warnf("JetStream cluster error updating stream %q for account %q: %v", cfg.Name, acc.Name, err)
}
// Set the new stream assignment.
mset.setStreamAssignment(sa)
// Make sure we are the leader now that we are R1.
if needsSetLeader {
mset.setLeader(true)
@@ -5994,6 +6010,7 @@ func (mset *stream) stateSnapshot() []byte {
Deleted: state.Deleted,
}
b, _ := json.Marshal(snap)
return b
}
@@ -6007,6 +6024,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
name, stype := mset.cfg.Name, mset.cfg.Storage
s, js, jsa, st, rf, tierName, outq, node := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.tier, mset.outq, mset.node
maxMsgSize, lseq := int(mset.cfg.MaxMsgSize), mset.lseq
isLeader := mset.isLeader()
mset.mu.RUnlock()
// This should not happen but possible now that we allow scale up, and scale down where this could trigger.
@@ -6014,6 +6032,11 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
return mset.processJetStreamMsg(subject, reply, hdr, msg, 0, 0)
}
// Check that we are the leader. This can be false if we have scaled up from an R1 that had inbound queued messages.
if !isLeader {
return NewJSClusterNotLeaderError()
}
// Check here pre-emptively if we have exceeded this server limits.
if js.limitsExceeded(stype) {
s.resourcesExeededError()
@@ -6161,7 +6184,7 @@ func (mset *stream) processSnapshotDeletes(snap *streamSnapshot) {
state := mset.state()
// Adjust if FirstSeq has moved.
if snap.FirstSeq > state.FirstSeq {
if snap.FirstSeq > state.FirstSeq && state.FirstSeq != 0 {
mset.store.Compact(snap.FirstSeq)
state = mset.store.State()
mset.setLastSeq(snap.LastSeq)
@@ -6311,10 +6334,24 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) (e error) {
var sub *subscription
var err error
const activityInterval = 10 * time.Second
const maxActivityInterval = 10 * time.Second
const minActivityInterval = time.Second
activityInterval := minActivityInterval
notActive := time.NewTimer(activityInterval)
defer notActive.Stop()
var gotMsgs bool
getActivityInterval := func() time.Duration {
if gotMsgs || activityInterval == maxActivityInterval {
return maxActivityInterval
}
activityInterval *= 2
if activityInterval > maxActivityInterval {
activityInterval = maxActivityInterval
}
return activityInterval
}
defer func() {
if sub != nil {
s.sysUnsubscribe(sub)
@@ -6377,7 +6414,7 @@ RETRY:
default:
}
}
notActive.Reset(activityInterval)
notActive.Reset(getActivityInterval())
// Grab sync request again on failures.
if sreq == nil {
@@ -6423,7 +6460,8 @@ RETRY:
for qch, lch := n.QuitC(), n.LeadChangeC(); ; {
select {
case <-msgsQ.ch:
notActive.Reset(activityInterval)
gotMsgs = true
notActive.Reset(getActivityInterval())
mrecs := msgsQ.pop()
for _, mreci := range mrecs {
@@ -6508,13 +6546,13 @@ func (mset *stream) processCatchupMsg(msg []byte) (uint64, error) {
// Messages to be skipped have no subject or timestamp.
// TODO(dlc) - formalize with skipMsgOp
if subj == _EMPTY_ && ts == 0 {
lseq := mset.store.SkipMsg()
if lseq != seq {
if lseq := mset.store.SkipMsg(); lseq != seq {
return 0, errors.New("wrong sequence for skipped msg")
}
} else if err := mset.store.StoreRawMsg(subj, hdr, msg, seq, ts); err != nil {
return 0, err
}
// Update our lseq.
mset.setLastSeq(seq)

View File

@@ -30,6 +30,7 @@ import (
"net/http"
"net/url"
"path/filepath"
"reflect"
"runtime"
"runtime/debug"
"strconv"
@@ -5265,6 +5266,7 @@ func TestNoRaceJetStreamClusterDirectAccessAllPeersSubs(t *testing.T) {
getSubj := fmt.Sprintf(JSDirectMsgGetT, "TEST")
getMsg := func(key string) *nats.Msg {
t.Helper()
req := []byte(fmt.Sprintf(`{"last_by_subj":%q}`, key))
m, err := nc.Request(getSubj, req, time.Second)
require_NoError(t, err)
@@ -5287,13 +5289,12 @@ func TestNoRaceJetStreamClusterDirectAccessAllPeersSubs(t *testing.T) {
js, _ := nc.JetStream(nats.MaxWait(500 * time.Millisecond))
defer nc.Close()
for {
pt := time.NewTimer(time.Duration(50 * time.Millisecond))
select {
case <-pt.C:
js.Publish(fmt.Sprintf("kv.%d", rand.Intn(1000)), msg)
case <-qch:
pt.Stop()
return
default:
// Send as fast as we can.
js.PublishAsync(fmt.Sprintf("kv.%d", rand.Intn(1000)), msg)
}
}
}()
@@ -5320,12 +5321,18 @@ func TestNoRaceJetStreamClusterDirectAccessAllPeersSubs(t *testing.T) {
return nil
})
close(qch)
wg.Wait()
// Just make sure we can succeed here.
getMsg("kv.22")
// For each non-leader check that the direct sub fires up.
// We just test all, the leader will already have a directSub.
for _, s := range c.servers {
mset, err := s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
checkFor(t, 10*time.Second, 500*time.Millisecond, func() error {
checkFor(t, 20*time.Second, 500*time.Millisecond, func() error {
mset.mu.RLock()
ok := mset.directSub != nil
mset.mu.RUnlock()
@@ -5336,18 +5343,30 @@ func TestNoRaceJetStreamClusterDirectAccessAllPeersSubs(t *testing.T) {
})
}
close(qch)
wg.Wait()
// Just make sure we can succeed here.
getMsg("kv.22")
si, err := js.StreamInfo("TEST")
require_NoError(t, err)
if si.State.Msgs == uint64(num) {
t.Fatalf("Expected to see messages increase, got %d", si.State.Msgs)
}
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
// Make sure they are all the same from a state perspective.
// Leader will have the expected state.
lmset, err := c.streamLeader("$G", "TEST").GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
expected := lmset.state()
for _, s := range c.servers {
mset, err := s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
if state := mset.state(); !reflect.DeepEqual(expected, state) {
return fmt.Errorf("Expected %+v, got %+v", expected, state)
}
}
return nil
})
}
func TestNoRaceJetStreamClusterStreamNamesAndInfosMoreThanAPILimit(t *testing.T) {

View File

@@ -959,8 +959,6 @@ func (n *raft) InstallSnapshot(data []byte) error {
snapDir := filepath.Join(n.sd, snapshotsDir)
sn := fmt.Sprintf(snapFileT, snap.lastTerm, snap.lastIndex)
sfile := filepath.Join(snapDir, sn)
// Remember our latest snapshot file.
n.snapfile = sfile
if err := ioutil.WriteFile(sfile, n.encodeSnapshot(snap), 0640); err != nil {
n.Unlock()
@@ -969,6 +967,9 @@ func (n *raft) InstallSnapshot(data []byte) error {
return err
}
// Remember our latest snapshot file.
n.snapfile = sfile
if _, err := n.wal.Compact(snap.lastIndex); err != nil {
n.Unlock()
n.setWriteErr(err)
@@ -1137,8 +1138,8 @@ func (n *raft) isCatchingUp() bool {
// Lock should be held.
func (n *raft) isCurrent() bool {
// First check if we match commit and applied.
if n.commit != n.applied {
// First check if have had activity and we match commit and applied.
if n.commit == 0 || n.commit != n.applied {
n.debug("Not current, commit %d != applied %d", n.commit, n.applied)
return false
}

View File

@@ -598,8 +598,8 @@ func (mset *stream) updateC() <-chan struct{} {
// IsLeader will return if we are the current leader.
func (mset *stream) IsLeader() bool {
mset.mu.Lock()
defer mset.mu.Unlock()
mset.mu.RLock()
defer mset.mu.RUnlock()
return mset.isLeader()
}
@@ -3246,6 +3246,13 @@ func getExpectedLastSeqPerSubject(hdr []byte) (uint64, bool) {
return uint64(parseInt64(bseq)), true
}
// Signal if we are clustered. Will acquire rlock.
func (mset *stream) IsClustered() bool {
mset.mu.RLock()
defer mset.mu.RUnlock()
return mset.isClustered()
}
// Lock should be held.
func (mset *stream) isClustered() bool {
return mset.node != nil
@@ -3369,7 +3376,7 @@ func (mset *stream) getDirectRequest(req *JSApiMsgGetRequest, reply string) {
// processInboundJetStreamMsg handles processing messages bound for a stream.
func (mset *stream) processInboundJetStreamMsg(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
mset.mu.RLock()
isLeader, isClustered, isSealed := mset.isLeader(), mset.node != nil, mset.cfg.Sealed
isLeader, isClustered, isSealed := mset.isLeader(), mset.isClustered(), mset.cfg.Sealed
mset.mu.RUnlock()
// If we are not the leader just ignore.
@@ -3437,6 +3444,12 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
var buf [256]byte
pubAck := append(buf[:0], mset.pubAck...)
// If this is a non-clustered msg and we are not considered active, meaning no active subscription, do not process.
if lseq == 0 && ts == 0 && !mset.active {
mset.mu.Unlock()
return nil
}
// For clustering the lower layers will pass our expected lseq. If it is present check for that here.
if lseq > 0 && lseq != (mset.lseq+mset.clfs) {
isMisMatch := true
@@ -4027,13 +4040,11 @@ func (mset *stream) internalLoop() {
outq.recycle(&pms)
case <-msgs.ch:
// This can possibly change now so needs to be checked here.
mset.mu.RLock()
isClustered := mset.node != nil
mset.mu.RUnlock()
isClustered := mset.IsClustered()
ims := msgs.pop()
for _, imi := range ims {
im := imi.(*inMsg)
// If we are clustered we need to propose this message to the underlying raft group.
if isClustered {
mset.processClusteredInboundMsg(im.subj, im.rply, im.hdr, im.msg)