Merge pull request #2569 from nats-io/catchup

Improve handling when exceeding account resources.
This commit is contained in:
Derek Collison
2021-09-23 14:48:20 -07:00
committed by GitHub
3 changed files with 72 additions and 16 deletions

View File

@@ -1559,11 +1559,11 @@ func (jsa *jsAccount) limitsExceeded(storeType StorageType) bool {
defer jsa.mu.RUnlock()
if storeType == MemoryStorage {
if jsa.limits.MaxMemory > 0 && jsa.memTotal > jsa.limits.MaxMemory {
if jsa.limits.MaxMemory >= 0 && jsa.memTotal > jsa.limits.MaxMemory {
return true
}
} else {
if jsa.limits.MaxStore > 0 && jsa.storeTotal > jsa.limits.MaxStore {
if jsa.limits.MaxStore >= 0 && jsa.storeTotal > jsa.limits.MaxStore {
return true
}
}

View File

@@ -1695,6 +1695,10 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
if err := mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts); err != nil {
if !isRecovering {
if err == errLastSeqMismatch {
if mset.jsa.limitsExceeded(mset.cfg.Storage) {
s.Warnf("stream '%s > %s' errored, account resources exceeded: %v", mset.account(), mset.name(), err)
return nil
}
return err
}
s.Debugf("Got error processing JetStream msg: %v", err)
@@ -4388,9 +4392,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
canRespond := !mset.cfg.NoAck && len(reply) > 0
name, stype := mset.cfg.Name, mset.cfg.Storage
s, js, jsa, st, rf, outq := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.outq
maxMsgSize := int(mset.cfg.MaxMsgSize)
msetName := mset.cfg.Name
lseq := mset.lseq
maxMsgSize, lseq := int(mset.cfg.MaxMsgSize), mset.lseq
mset.mu.RUnlock()
// Check here pre-emptively if we have exceeded this server limits.
@@ -4493,7 +4495,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
}
if err != nil && isOutOfSpaceErr(err) {
s.handleOutOfSpace(msetName)
s.handleOutOfSpace(name)
}
return err
@@ -4614,12 +4616,11 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) {
mset.mu.Lock()
state := mset.store.State()
sreq := mset.calculateSyncRequest(&state, snap)
s, subject, n := mset.srv, mset.sa.Sync, mset.node
msetName := mset.cfg.Name
s, js, subject, n, name := mset.srv, mset.js, mset.sa.Sync, mset.node, mset.cfg.Name
mset.mu.Unlock()
// Just return if up to date..
if sreq == nil {
// Just return if up to date or already exceeded limits.
if sreq == nil || js.limitsExceeded(mset.cfg.Storage) {
return
}
@@ -4631,8 +4632,6 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) {
mset.setCatchingUp()
defer mset.clearCatchingUp()
js := s.getJetStream()
var sub *subscription
var err error
@@ -4681,7 +4680,8 @@ RETRY:
reply string
}
msgsC := make(chan *im, 32768)
sz := int(sreq.LastSeq-sreq.FirstSeq) + 1
msgsC := make(chan *im, sz)
// Send our catchup request here.
reply := syncReplySubject()
@@ -4723,10 +4723,14 @@ RETRY:
return
}
} else if isOutOfSpaceErr(err) {
s.handleOutOfSpace(msetName)
s.handleOutOfSpace(name)
return
} else if err == NewJSInsufficientResourcesError() {
s.resourcesExeededError()
if mset.js.limitsExceeded(mset.cfg.Storage) {
s.resourcesExeededError()
} else {
s.Warnf("Catchup for stream '%s > %s' errored, account resources exceeded: %v", mset.account(), mset.name(), err)
}
return
} else {
s.Warnf("Catchup for stream '%s > %s' errored, will retry: %v", mset.account(), mset.name(), err)
@@ -4745,6 +4749,7 @@ RETRY:
return
case isLeader := <-lch:
js.processStreamLeaderChange(mset, isLeader)
return
}
}
}
@@ -4761,7 +4766,8 @@ func (mset *stream) processCatchupMsg(msg []byte) (uint64, error) {
return 0, errors.New("bad catchup msg")
}
if mset.js.limitsExceeded(mset.cfg.Storage) {
st := mset.cfg.Storage
if mset.js.limitsExceeded(st) || mset.jsa.limitsExceeded(st) {
return 0, NewJSInsufficientResourcesError()
}

View File

@@ -8649,6 +8649,56 @@ func TestJetStreamClusterFlowControlRequiresHeartbeats(t *testing.T) {
}
}
var jsClusterAccountLimitsTempl = `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"}
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
no_auth_user: js
accounts {
$JS { users = [ { user: "js", pass: "p" } ]; jetstream: {max_store: 1MB, max_mem: 0} }
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}
`
func TestJetStreamAccountLimitsAndRestart(t *testing.T) {
c := createJetStreamClusterWithTemplate(t, jsClusterAccountLimitsTempl, "A3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Replicas: 3}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
for i := 0; i < 20_000; i++ {
if _, err := js.Publish("TEST", []byte("A")); err != nil {
break
}
if i == 5_000 {
snl := c.randomNonStreamLeader("$JS", "TEST")
snl.Shutdown()
}
}
c.stopAll()
c.restartAll()
c.waitOnLeader()
c.waitOnStreamLeader("$JS", "TEST")
for _, cs := range c.servers {
c.waitOnStreamCurrent(cs, "$JS", "TEST")
}
}
// Support functions
// Used to setup superclusters for tests.