mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
Simplified flow control, avoid stalls due to msg loss
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -2126,6 +2126,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
|
||||
if !o.active {
|
||||
goto waitForMsgs
|
||||
}
|
||||
// Flowcontrol.
|
||||
if o.maxpb > 0 && o.pbytes > o.maxpb {
|
||||
goto waitForMsgs
|
||||
}
|
||||
@@ -2221,15 +2222,15 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
|
||||
if o.isActive() {
|
||||
const t = "NATS/1.0 100 Idle Heartbeat\r\n%s: %d\r\n%s: %d\r\n\r\n"
|
||||
hdr := []byte(fmt.Sprintf(t, JSLastConsumerSeq, dseq, JSLastStreamSeq, sseq))
|
||||
if fcp := o.fcID(); fcp != _EMPTY_ {
|
||||
// Add in that we are stalled on flow control here.
|
||||
addOn := []byte(fmt.Sprintf("%s: %s\r\n\r\n", JSConsumerStalled, fcp))
|
||||
hdr = append(hdr[:len(hdr)-LEN_CR_LF], []byte(addOn)...)
|
||||
}
|
||||
outq.send(&jsPubMsg{odsubj, _EMPTY_, _EMPTY_, hdr, nil, nil, 0, nil})
|
||||
}
|
||||
// Reset our idle heartbeat timer.
|
||||
hb.Reset(hbd)
|
||||
|
||||
// Now check on flowcontrol if enabled. Make sure if we have any outstanding to resend.
|
||||
if o.fcOut() {
|
||||
o.sendFlowControl()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2336,12 +2337,10 @@ func (o *consumer) processFlowControl(_ *subscription, c *client, _ *Account, su
|
||||
|
||||
// Update accounting.
|
||||
o.pbytes -= o.fcsz
|
||||
o.fcid, o.fcsz = _EMPTY_, 0
|
||||
|
||||
// In case they are sent out of order or we get duplicates etc.
|
||||
if o.pbytes < 0 {
|
||||
o.pbytes = 0
|
||||
}
|
||||
o.fcid, o.fcsz = _EMPTY_, 0
|
||||
|
||||
o.signalNewMessages()
|
||||
}
|
||||
@@ -2364,10 +2363,10 @@ func (o *consumer) fcReply() string {
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
func (o *consumer) fcOut() bool {
|
||||
func (o *consumer) fcID() string {
|
||||
o.mu.RLock()
|
||||
defer o.mu.RUnlock()
|
||||
return o.fcid != _EMPTY_
|
||||
return o.fcid
|
||||
}
|
||||
|
||||
// sendFlowControl will send a flow control packet to the consumer.
|
||||
|
||||
@@ -1535,24 +1535,6 @@ func (mset *stream) resetClusteredState() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (mset *stream) checkForFlowControl(seq uint64) {
|
||||
mset.mu.Lock()
|
||||
if mset.fcr != nil {
|
||||
if rply := mset.fcr[seq]; rply != _EMPTY_ {
|
||||
delete(mset.fcr, seq)
|
||||
mset.outq.send(&jsPubMsg{rply, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil})
|
||||
} else if len(mset.fcr) > 0 {
|
||||
for fseq, rply := range mset.fcr {
|
||||
if fseq < seq {
|
||||
delete(mset.fcr, fseq)
|
||||
mset.outq.send(&jsPubMsg{rply, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
mset.mu.Unlock()
|
||||
}
|
||||
|
||||
func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isRecovering bool) error {
|
||||
for _, e := range ce.Entries {
|
||||
if e.Type == EntryNormal {
|
||||
@@ -1569,8 +1551,16 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
|
||||
panic(err.Error())
|
||||
}
|
||||
|
||||
// We can skip if we know this is less than what we already have.
|
||||
// Check for flowcontrol here.
|
||||
if len(msg) == 0 && len(hdr) > 0 && reply != _EMPTY_ && bytes.HasPrefix(hdr, []byte("NATS/1.0 100 ")) {
|
||||
mset.sendFlowControlReply(reply)
|
||||
continue
|
||||
}
|
||||
|
||||
// Grab last sequence.
|
||||
last := mset.lastSeq()
|
||||
|
||||
// We can skip if we know this is less than what we already have.
|
||||
if lseq < last {
|
||||
s.Debugf("Apply stream entries skipping message with sequence %d with last of %d", lseq, last)
|
||||
continue
|
||||
@@ -1583,11 +1573,8 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
|
||||
continue
|
||||
}
|
||||
|
||||
// Check for flowcontrol here.
|
||||
mset.checkForFlowControl(lseq + 1)
|
||||
|
||||
// Messages to be skipped have no subject or timestamp.
|
||||
if subject == _EMPTY_ && ts == 0 {
|
||||
// Messages to be skipped have no subject or timestamp or msg or hdr.
|
||||
if subject == _EMPTY_ && ts == 0 && len(msg) == 0 && len(hdr) == 0 {
|
||||
// Skip and update our lseq.
|
||||
mset.setLastSeq(mset.store.SkipMsg())
|
||||
continue
|
||||
@@ -4156,7 +4143,9 @@ func decodeStreamMsg(buf []byte) (subject, reply string, hdr, msg []byte, lseq u
|
||||
if len(buf) < hl {
|
||||
return _EMPTY_, _EMPTY_, nil, nil, 0, 0, errBadStreamMsg
|
||||
}
|
||||
hdr = buf[:hl]
|
||||
if hdr = buf[:hl]; len(hdr) == 0 {
|
||||
hdr = nil
|
||||
}
|
||||
buf = buf[hl:]
|
||||
if len(buf) < 4 {
|
||||
return _EMPTY_, _EMPTY_, nil, nil, 0, 0, errBadStreamMsg
|
||||
@@ -4166,7 +4155,9 @@ func decodeStreamMsg(buf []byte) (subject, reply string, hdr, msg []byte, lseq u
|
||||
if len(buf) < ml {
|
||||
return _EMPTY_, _EMPTY_, nil, nil, 0, 0, errBadStreamMsg
|
||||
}
|
||||
msg = buf[:ml]
|
||||
if msg = buf[:ml]; len(msg) == 0 {
|
||||
msg = nil
|
||||
}
|
||||
return subject, reply, hdr, msg, lseq, ts, nil
|
||||
}
|
||||
|
||||
@@ -4232,7 +4223,7 @@ func (mset *stream) stateSnapshot() []byte {
|
||||
}
|
||||
|
||||
// processClusteredMsg will propose the inbound message to the underlying raft group.
|
||||
func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg []byte) (uint64, error) {
|
||||
func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg []byte) error {
|
||||
// For possible error response.
|
||||
var response []byte
|
||||
|
||||
@@ -4256,7 +4247,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
|
||||
if node := mset.raftNode(); node != nil {
|
||||
node.StepDown()
|
||||
}
|
||||
return 0, ApiErrors[JSInsufficientResourcesErr]
|
||||
return ApiErrors[JSInsufficientResourcesErr]
|
||||
}
|
||||
|
||||
// Check here pre-emptively if we have exceeded our account limits.
|
||||
@@ -4285,7 +4276,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
|
||||
response, _ = json.Marshal(resp)
|
||||
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil})
|
||||
}
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
|
||||
// Check msgSize if we have a limit set there. Again this works if it goes through but better to be pre-emptive.
|
||||
@@ -4298,7 +4289,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
|
||||
response, _ = json.Marshal(resp)
|
||||
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil})
|
||||
}
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
|
||||
// Proceed with proposing this message.
|
||||
@@ -4307,14 +4298,11 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
|
||||
// Check if we need to set initial value here
|
||||
mset.clMu.Lock()
|
||||
if mset.clseq == 0 || mset.clseq < lseq {
|
||||
mset.mu.RLock()
|
||||
mset.clseq = mset.lseq
|
||||
mset.mu.RUnlock()
|
||||
mset.clseq = mset.lastSeq()
|
||||
}
|
||||
|
||||
esm := encodeStreamMsg(subject, reply, hdr, msg, mset.clseq, time.Now().UnixNano())
|
||||
mset.clseq++
|
||||
seq := mset.clseq
|
||||
|
||||
// Do proposal.
|
||||
err := mset.node.Propose(esm)
|
||||
@@ -4324,7 +4312,6 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
|
||||
mset.clMu.Unlock()
|
||||
|
||||
if err != nil {
|
||||
seq = 0
|
||||
if canRespond {
|
||||
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: mset.cfg.Name}}
|
||||
resp.Error = &ApiError{Code: 503, Description: err.Error()}
|
||||
@@ -4338,7 +4325,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
|
||||
s.handleOutOfSpace(msetName)
|
||||
}
|
||||
|
||||
return seq, err
|
||||
return err
|
||||
}
|
||||
|
||||
// For requesting messages post raft snapshot to catch up streams post server restart.
|
||||
|
||||
@@ -2486,3 +2486,65 @@ func TestNoRaceJetStreamSlowRestartWithManyExpiredMsgs(t *testing.T) {
|
||||
t.Fatalf("Expected no msgs after restart, got %d", si.State.Msgs)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNoRaceJetStreamStalledMirrorsAfterExpire(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "JSC", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
cfg := &nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"foo.*"},
|
||||
Replicas: 1,
|
||||
MaxAge: 250 * time.Microsecond,
|
||||
}
|
||||
|
||||
if _, err := js.AddStream(cfg); err != nil {
|
||||
t.Fatalf("Error creating stream: %v", err)
|
||||
}
|
||||
|
||||
if _, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "M",
|
||||
Replicas: 2,
|
||||
Mirror: &nats.StreamSource{Name: "TEST"},
|
||||
}); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
sendBatch := func(batch int) {
|
||||
t.Helper()
|
||||
for i := 0; i < batch; i++ {
|
||||
js.PublishAsync("foo.bar", []byte("Hello"))
|
||||
}
|
||||
select {
|
||||
case <-js.PublishAsyncComplete():
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatalf("Did not receive completion signal")
|
||||
}
|
||||
}
|
||||
|
||||
numMsgs := 25_000
|
||||
sendBatch(numMsgs)
|
||||
|
||||
// Turn off expiration so we can test we did not stall.
|
||||
cfg.MaxAge = 0
|
||||
if _, err := js.UpdateStream(cfg); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
sendBatch(numMsgs)
|
||||
|
||||
// Wait for mirror to be caught up.
|
||||
checkFor(t, 5*time.Second, 500*time.Millisecond, func() error {
|
||||
si, err := js.StreamInfo("M")
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if si.State.LastSeq != uint64(2*numMsgs) {
|
||||
return fmt.Errorf("Expected %d as last sequence, got state: %+v", 2*numMsgs, si.State)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
@@ -176,9 +176,6 @@ type stream struct {
|
||||
// Indicates we have direct consumers.
|
||||
directs int
|
||||
|
||||
// For flowcontrol processing for source and mirror internal consumers.
|
||||
fcr map[uint64]string
|
||||
|
||||
// TODO(dlc) - Hide everything below behind two pointers.
|
||||
// Clustered mode.
|
||||
sa *streamAssignment
|
||||
@@ -201,7 +198,6 @@ type sourceInfo struct {
|
||||
msgs *inbound
|
||||
sseq uint64
|
||||
dseq uint64
|
||||
clseq uint64
|
||||
lag uint64
|
||||
err *ApiError
|
||||
last time.Time
|
||||
@@ -220,6 +216,7 @@ const (
|
||||
JSStreamSource = "Nats-Stream-Source"
|
||||
JSLastConsumerSeq = "Nats-Last-Consumer"
|
||||
JSLastStreamSeq = "Nats-Last-Stream"
|
||||
JSConsumerStalled = "Nats-Consumer-Stalled"
|
||||
)
|
||||
|
||||
// Dedupe entry
|
||||
@@ -468,7 +465,7 @@ func (mset *stream) setStreamAssignment(sa *streamAssignment) {
|
||||
|
||||
// Lock should be held.
|
||||
func (mset *stream) isLeader() bool {
|
||||
if mset.node != nil {
|
||||
if mset.isClustered() {
|
||||
return mset.node.Leader()
|
||||
}
|
||||
return true
|
||||
@@ -1295,9 +1292,12 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool {
|
||||
if m.rply != _EMPTY_ {
|
||||
mset.handleFlowControl(mset.mirror, m)
|
||||
} else {
|
||||
// For idle heartbeats make sure we did not miss anything.
|
||||
// For idle heartbeats make sure we did not miss anything and check if we are considered stalled.
|
||||
if ldseq := parseInt64(getHeader(JSLastConsumerSeq, m.hdr)); ldseq > 0 && uint64(ldseq) != mset.mirror.dseq {
|
||||
needsRetry = true
|
||||
} else if fcReply := getHeader(JSConsumerStalled, m.hdr); len(fcReply) > 0 {
|
||||
// Other side thinks we are stalled, so send flow control reply.
|
||||
mset.outq.sendMsg(string(fcReply), nil)
|
||||
}
|
||||
}
|
||||
mset.mu.Unlock()
|
||||
@@ -1315,7 +1315,7 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool {
|
||||
}
|
||||
|
||||
// Mirror info tracking.
|
||||
olag, osseq, odseq, oclseq := mset.mirror.lag, mset.mirror.sseq, mset.mirror.dseq, mset.mirror.clseq
|
||||
olag, osseq, odseq := mset.mirror.lag, mset.mirror.sseq, mset.mirror.dseq
|
||||
if sseq == mset.mirror.sseq+1 {
|
||||
mset.mirror.dseq = dseq
|
||||
mset.mirror.sseq++
|
||||
@@ -1345,7 +1345,6 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool {
|
||||
mset.mirror.lag = pending - 1
|
||||
}
|
||||
|
||||
mset.mirror.clseq = sseq - 1
|
||||
js, stype := mset.js, mset.cfg.Storage
|
||||
mset.mu.Unlock()
|
||||
|
||||
@@ -1369,7 +1368,6 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool {
|
||||
mset.mirror.lag = olag
|
||||
mset.mirror.sseq = osseq
|
||||
mset.mirror.dseq = odseq
|
||||
mset.mirror.clseq = oclseq
|
||||
mset.mu.Unlock()
|
||||
return false
|
||||
} else {
|
||||
@@ -1865,15 +1863,21 @@ func (m *inMsg) isControlMsg() bool {
|
||||
return len(m.msg) == 0 && len(m.hdr) > 0 && bytes.HasPrefix(m.hdr, []byte("NATS/1.0 100 "))
|
||||
}
|
||||
|
||||
// handleFlowControl will properly handle flow control messages for both R1 and R>1.
|
||||
// Sends a reply to a flow control request.
|
||||
func (mset *stream) sendFlowControlReply(reply string) {
|
||||
mset.mu.Lock()
|
||||
if mset.isLeader() && mset.outq != nil {
|
||||
mset.outq.sendMsg(reply, nil)
|
||||
}
|
||||
mset.mu.Unlock()
|
||||
}
|
||||
|
||||
// handleFlowControl will properly handle flow control messages for both R==1 and R>1.
|
||||
// Lock should be held.
|
||||
func (mset *stream) handleFlowControl(si *sourceInfo, m *inMsg) {
|
||||
// If we are clustered we want to delay signaling back the the upstream consumer.
|
||||
if node := mset.node; node != nil && si.clseq > 0 {
|
||||
if mset.fcr == nil {
|
||||
mset.fcr = make(map[uint64]string)
|
||||
}
|
||||
mset.fcr[si.clseq] = m.rply
|
||||
// If we are clustered we will send the flow control message through the replication stack.
|
||||
if mset.isClustered() {
|
||||
mset.node.Propose(encodeStreamMsg(_EMPTY_, m.rply, m.hdr, nil, 0, 0))
|
||||
} else {
|
||||
mset.outq.sendMsg(m.rply, nil)
|
||||
}
|
||||
@@ -1912,6 +1916,9 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool {
|
||||
if ldseq := parseInt64(getHeader(JSLastConsumerSeq, m.hdr)); ldseq > 0 && uint64(ldseq) != si.dseq {
|
||||
needsRetry = true
|
||||
mset.retrySourceConsumerAtSeq(si.iname, si.sseq+1)
|
||||
} else if fcReply := getHeader(JSConsumerStalled, m.hdr); len(fcReply) > 0 {
|
||||
// Other side thinks we are stalled, so send flow control reply.
|
||||
mset.outq.sendMsg(string(fcReply), nil)
|
||||
}
|
||||
}
|
||||
mset.mu.Unlock()
|
||||
@@ -1960,15 +1967,9 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool {
|
||||
hdr = genHeader(hdr, JSStreamSource, si.genSourceHeader(m.rply))
|
||||
|
||||
var err error
|
||||
var clseq uint64
|
||||
// If we are clustered we need to propose this message to the underlying raft group.
|
||||
if node != nil {
|
||||
clseq, err = mset.processClusteredInboundMsg(m.subj, _EMPTY_, hdr, msg)
|
||||
if err == nil {
|
||||
mset.mu.Lock()
|
||||
si.clseq = clseq
|
||||
mset.mu.Unlock()
|
||||
}
|
||||
err = mset.processClusteredInboundMsg(m.subj, _EMPTY_, hdr, msg)
|
||||
} else {
|
||||
err = mset.processJetStreamMsg(m.subj, _EMPTY_, hdr, msg, 0, 0)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user