Merge pull request #1964 from nats-io/rc5

Raft changes and inbound stream msgs out of fastpath for non-clients.
This commit is contained in:
Derek Collison
2021-03-04 20:47:44 -07:00
committed by GitHub
11 changed files with 375 additions and 203 deletions

View File

@@ -270,6 +270,7 @@ type client struct {
trace bool
echo bool
noIcb bool
tags jwt.TagList
nameTag string
@@ -3026,7 +3027,7 @@ func (c *client) deliverMsg(sub *subscription, subject, reply, mh, msg []byte, g
client.outBytes += msgSize
// Check for internal subscriptions.
if sub.icb != nil {
if sub.icb != nil && !c.noIcb {
if gwrply {
// Note that we keep track of the GW routed reply in the destination
// connection (`client`). The routed reply subject is in `c.pa.reply`,

View File

@@ -40,7 +40,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.2.0-RC.4"
VERSION = "2.2.0-RC.5"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -187,7 +187,7 @@ type consumer struct {
ackSubj string
nextMsgSubj string
maxp int
sendq chan *jsPubMsg
outq *jsOutQ
pending map[uint64]*Pending
ptmr *time.Timer
rdq []uint64
@@ -428,6 +428,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
sysc: s.createInternalJetStreamClient(),
cfg: *config,
dsubj: config.DeliverSubject,
outq: mset.outq,
active: true,
qch: make(chan struct{}),
mch: make(chan struct{}, 1),
@@ -670,8 +671,6 @@ func (o *consumer) setLeader(isLeader bool) {
o.replay = true
}
// Recreate internal sendq
o.sendq = make(chan *jsPubMsg, msetSendQSize)
// Recreate quit channel.
o.qch = make(chan struct{})
qch := o.qch
@@ -679,8 +678,6 @@ func (o *consumer) setLeader(isLeader bool) {
// Now start up Go routine to deliver msgs.
go o.loopAndGatherMsgs(qch)
// Startup our deliver loop.
go o.loopAndDeliverMsgs(qch)
} else {
// Shutdown the go routines and the subscriptions.
@@ -693,7 +690,6 @@ func (o *consumer) setLeader(isLeader bool) {
o.srv.sysUnsubscribe(o.infoSub)
o.infoSub = nil
}
o.sendq = nil
if o.qch != nil {
close(o.qch)
o.qch = nil
@@ -737,16 +733,10 @@ func (o *consumer) unsubscribe(sub *subscription) {
o.client.processUnsub(sub.sid)
}
// We need to make sure we protect access to the sendq.
// We need to make sure we protect access to the outq.
// Do all advisory sends here.
// Lock should be held on entry but will be released.
func (o *consumer) sendAdvisory(subj string, msg []byte) {
sendq := o.sendq
o.mu.Unlock()
if sendq != nil {
sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, msg, nil, 0}
}
o.mu.Lock()
o.outq.send(&jsPubMsg{subj, subj, _EMPTY_, nil, msg, nil, 0, nil})
}
func (o *consumer) sendDeleteAdvisoryLocked() {
@@ -1230,73 +1220,6 @@ func (o *consumer) writeStoreState() error {
return o.store.Update(&state)
}
// loopAndDeliverMsgs() will loop and deliver messages and watch for interest changes.
func (o *consumer) loopAndDeliverMsgs(qch chan struct{}) {
o.mu.Lock()
inch, sendq := o.inch, o.sendq
s, acc := o.acc.srv, o.acc
o.mu.Unlock()
// Create our client used to send messages.
c := s.createInternalJetStreamClient()
// Bind to the account.
c.registerWithAccount(acc)
// Clean up on exit.
defer c.closeConnection(ClientClosed)
// Warn when internal send queue is backed up past 75%
warnThresh := cap(sendq) * 3 / 4
warnFreq := time.Second
last := time.Now().Add(-warnFreq)
for {
if len(sendq) > warnThresh && time.Since(last) >= warnFreq {
s.Warnf("Jetstream internal consumer send queue > 75%% for account: %q consumer: %q", acc.Name, o)
last = time.Now()
}
select {
case <-qch:
return
case interest := <-inch:
// inch can be nil on pull-based, but then this will
// just block and not fire.
o.updateDeliveryInterest(interest)
case pm := <-sendq:
if pm == nil {
return
}
c.pa.subject = []byte(pm.subj)
c.pa.deliver = []byte(pm.dsubj)
c.pa.size = len(pm.msg) + len(pm.hdr)
c.pa.szb = []byte(strconv.Itoa(c.pa.size))
c.pa.reply = []byte(pm.reply)
var msg []byte
if len(pm.hdr) > 0 {
c.pa.hdr = len(pm.hdr)
c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr))
msg = append(pm.hdr, pm.msg...)
msg = append(msg, _CRLF_...)
} else {
c.pa.hdr = -1
c.pa.hdb = nil
msg = append(pm.msg, _CRLF_...)
}
didDeliver, _ := c.processInboundClientMsg(msg)
c.pa.szb = nil
c.flushClients(0)
// Check to see if this is a delivery for an observable and
// we failed to deliver the message. If so alert the observable.
if !didDeliver && pm.o != nil && pm.seq > 0 {
pm.o.didNotDeliver(pm.seq)
}
}
}
}
// Info returns our current consumer state.
func (o *consumer) info() *ConsumerInfo {
o.mu.RLock()
@@ -1645,18 +1568,13 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _, reply string
defer o.mu.Unlock()
s, mset, js := o.srv, o.mset, o.js
if mset == nil || o.sendq == nil {
if mset == nil {
return
}
sendq := o.sendq
sendErr := func(status int, description string) {
// Needs to be unlocked to send err.
o.mu.Unlock()
defer o.mu.Lock()
hdr := []byte(fmt.Sprintf("NATS/1.0 %d %s\r\n\r\n", status, description))
pmsg := &jsPubMsg{reply, reply, _EMPTY_, hdr, nil, nil, 0}
sendq <- pmsg // Send error message.
o.outq.send(&jsPubMsg{reply, reply, _EMPTY_, hdr, nil, nil, 0, nil})
}
if o.isPushMode() {
@@ -1846,11 +1764,10 @@ func (o *consumer) forceExpireFirstWaiting() *waitingRequest {
return wr
}
// If we are expiring this and we think there is still interest, alert.
if rr := o.acc.sl.Match(wr.reply); len(rr.psubs)+len(rr.qsubs) > 0 && o.mset != nil && o.sendq != nil {
if rr := o.acc.sl.Match(wr.reply); len(rr.psubs)+len(rr.qsubs) > 0 && o.mset != nil {
// We still appear to have interest, so send alert as courtesy.
hdr := []byte("NATS/1.0 408 Request Timeout\r\n\r\n")
pmsg := &jsPubMsg{wr.reply, wr.reply, _EMPTY_, hdr, nil, nil, 0}
o.sendq <- pmsg // Send message.
o.outq.send(&jsPubMsg{wr.reply, wr.reply, _EMPTY_, hdr, nil, nil, 0, nil})
}
return wr
}
@@ -1903,6 +1820,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
}
lseq = o.mset.state().LastSeq
}
inch := o.inch
o.mu.Unlock()
// Deliver all the msgs we have now, once done or on a condition, we wait for new ones.
@@ -2000,9 +1918,14 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
o.mu.Unlock()
select {
case interest := <-inch:
// inch can be nil on pull-based, but then this will
// just block and not fire.
o.updateDeliveryInterest(interest)
case <-qch:
return
case <-mch:
// Messages are waiting.
}
}
}
@@ -2014,7 +1937,7 @@ func (o *consumer) ackReply(sseq, dseq, dc uint64, ts int64, pending uint64) str
// Deliver a msg to the consumer.
// Lock should be held and o.mset validated to be non-nil.
func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint64, ts int64) {
if o.mset == nil || o.sendq == nil {
if o.mset == nil {
return
}
// Update pending on first attempt
@@ -2032,21 +1955,17 @@ func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint6
dseq := o.dseq
o.dseq++
pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, dseq, dc, ts, o.sgap), hdr, msg, o, seq}
pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, dseq, dc, ts, o.sgap), hdr, msg, o, seq, nil}
mset := o.mset
ap := o.cfg.AckPolicy
sendq := o.sendq
// This needs to be unlocked since the other side may need this lock on a failed delivery.
o.mu.Unlock()
// Send message.
sendq <- pmsg
o.outq.send(pmsg)
// If we are ack none and mset is interest only we should make sure stream removes interest.
if ap == AckNone && mset.cfg.Retention == InterestPolicy && !mset.checkInterest(seq, o) {
mset.store.RemoveMsg(seq)
mset.rmch <- seq
}
// Re-acquire lock.
o.mu.Lock()
if ap == AckExplicit || ap == AckAll {
o.trackPending(seq, dseq)
@@ -2541,6 +2460,7 @@ func (o *consumer) stopWithFlags(dflag, doSignal, advisory bool) error {
seqs = append(seqs, seq)
}
o.mu.Unlock()
// Sort just to keep pending sparse array state small.
sort.Slice(seqs, func(i, j int) bool { return seqs[i] < seqs[j] })
for _, seq := range seqs {

View File

@@ -91,6 +91,7 @@ type internal struct {
sendq chan *pubMsg
resetCh chan struct{}
wg sync.WaitGroup
sq *sendq
orphMax time.Duration
chkOrph time.Duration
statsz time.Duration

View File

@@ -1651,7 +1651,11 @@ func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, sub
}
// Call actual stepdown.
mset.raftNode().StepDown()
if mset != nil {
if node := mset.raftNode(); node != nil {
node.StepDown()
}
}
resp.Success = true
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
@@ -2785,7 +2789,7 @@ func (s *Server) streamSnapshot(ci *ClientInfo, acc *Account, mset *stream, sr *
chunk = chunk[:n]
if err != nil {
if n > 0 {
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, chunk, nil, 0}
mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, chunk, nil, 0, nil})
}
break
}
@@ -2800,15 +2804,14 @@ func (s *Server) streamSnapshot(ci *ClientInfo, acc *Account, mset *stream, sr *
case <-time.After(10 * time.Millisecond):
}
}
// TODO(dlc) - Might want these moved off sendq if we have contention.
ackReply := fmt.Sprintf("%s.%d.%d", ackSubj, len(chunk), index)
mset.sendq <- &jsPubMsg{reply, _EMPTY_, ackReply, nil, chunk, nil, 0}
mset.outq.send(&jsPubMsg{reply, _EMPTY_, ackReply, nil, chunk, nil, 0, nil})
atomic.AddInt32(&out, int32(len(chunk)))
}
done:
// Send last EOF
// TODO(dlc) - place hash in header
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, nil, nil, 0}
mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil})
}
// Request to create a durable consumer.

View File

@@ -756,7 +756,7 @@ func (js *jetStream) monitorCluster() {
// FIXME(dlc) - Deal with errors.
if _, didRemoval, err := js.applyMetaEntries(ce.Entries, isRecovering); err == nil {
n.Applied(ce.Index)
if didRemoval && time.Since(lastSnapTime) > 2*time.Second {
if js.hasPeerEntries(ce.Entries) || (didRemoval && time.Since(lastSnapTime) > 2*time.Second) {
// Since we received one make sure we have our own since we do not store
// our meta state outside of raft.
doSnapshot()
@@ -1029,6 +1029,16 @@ func (js *jetStream) removePeerFromStream(sa *streamAssignment, peer string) {
}
}
// Check if we have peer related entries.
func (js *jetStream) hasPeerEntries(entries []*Entry) bool {
for _, e := range entries {
if e.Type == EntryRemovePeer || e.Type == EntryAddPeer {
return true
}
}
return false
}
func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool, bool, error) {
var didSnap, didRemove bool
for _, e := range entries {
@@ -1038,7 +1048,6 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool
} else if e.Type == EntryRemovePeer {
if !isRecovering {
js.processRemovePeer(string(e.Data))
didRemove = true
}
} else {
buf := e.Data
@@ -3891,7 +3900,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
mset.mu.RLock()
canRespond := !mset.cfg.NoAck && len(reply) > 0
s, jsa, st, rf, sendq := mset.srv, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.sendq
s, jsa, st, rf, outq := mset.srv, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.outq
maxMsgSize := int(mset.cfg.MaxMsgSize)
msetName := mset.cfg.Name
mset.mu.RUnlock()
@@ -3920,7 +3929,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: mset.name()}}
resp.Error = &ApiError{Code: 400, Description: "resource limits exceeded for account"}
response, _ = json.Marshal(resp)
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil})
}
return err
}
@@ -3933,7 +3942,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: mset.name()}}
resp.Error = &ApiError{Code: 400, Description: "message size exceeds maximum allowed"}
response, _ = json.Marshal(resp)
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil})
}
return err
}
@@ -3962,7 +3971,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
// If we errored out respond here.
if err != nil && canRespond {
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil})
}
if err != nil && isOutOfSpaceErr(err) {

View File

@@ -3118,7 +3118,7 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) {
c.randomNonStreamLeader("$G", "NO-Q").Shutdown()
// This should eventually have us stepdown as leader since we would have lost quorum with R=2.
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
checkFor(t, 5*time.Second, 500*time.Millisecond, func() error {
if sl := c.streamLeader("$G", "NO-Q"); sl == nil {
return nil
}

View File

@@ -153,6 +153,7 @@ type raft struct {
asubj string
areply string
sq *sendq
aesub *subscription
// For holding term and vote and peerstate to be written.
@@ -176,7 +177,6 @@ type raft struct {
entryc chan *appendEntry
respc chan *appendEntryResponse
applyc chan *CommittedEntry
sendq chan *pubMsg
quit chan struct{}
reqs chan *voteRequest
votes chan *voteResponse
@@ -206,7 +206,7 @@ const (
maxElectionTimeout = 5 * minElectionTimeout
minCampaignTimeout = 100 * time.Millisecond
maxCampaignTimeout = 4 * minCampaignTimeout
hbInterval = 250 * time.Millisecond
hbInterval = 500 * time.Millisecond
lostQuorumInterval = hbInterval * 5
)
@@ -303,11 +303,11 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
return nil, errNilCfg
}
s.mu.Lock()
if s.sys == nil || s.sys.sendq == nil {
if s.sys == nil {
s.mu.Unlock()
return nil, ErrNoSysAccount
}
sendq := s.sys.sendq
sq := s.sys.sq
sacc := s.sys.account
hash := s.sys.shash
s.mu.Unlock()
@@ -334,7 +334,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
acks: make(map[uint64]map[string]struct{}),
s: s,
c: s.createInternalSystemClient(),
sendq: sendq,
sq: sq,
quit: make(chan struct{}),
wtvch: make(chan struct{}, 1),
wpsch: make(chan struct{}, 1),
@@ -2884,11 +2884,11 @@ func (n *raft) requestVote() {
}
func (n *raft) sendRPC(subject, reply string, msg []byte) {
n.sendq <- &pubMsg{n.c, subject, reply, nil, msg, false}
n.sq.send(subject, reply, nil, msg)
}
func (n *raft) sendReply(subject string, msg []byte) {
n.sendq <- &pubMsg{n.c, subject, _EMPTY_, nil, msg, false}
n.sq.send(subject, _EMPTY_, nil, msg)
}
func (n *raft) wonElection(votes int) bool {

129
server/sendq.go Normal file
View File

@@ -0,0 +1,129 @@
// Copyright 2020-2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"strconv"
"sync"
"time"
)
type outMsg struct {
subj string
rply string
hdr []byte
msg []byte
next *outMsg
}
type sendq struct {
mu sync.Mutex
mch chan struct{}
head *outMsg
tail *outMsg
s *Server
}
func (s *Server) newSendQ() *sendq {
sq := &sendq{s: s, mch: make(chan struct{}, 1)}
s.startGoRoutine(sq.internalLoop)
return sq
}
func (sq *sendq) internalLoop() {
sq.mu.Lock()
s, mch := sq.s, sq.mch
sq.mu.Unlock()
defer s.grWG.Done()
c := s.createInternalSystemClient()
c.registerWithAccount(s.SystemAccount())
c.noIcb = true
defer c.closeConnection(ClientClosed)
for s.isRunning() {
select {
case <-s.quitCh:
return
case <-mch:
for pm := sq.pending(); pm != nil; {
c.pa.subject = []byte(pm.subj)
c.pa.size = len(pm.msg) + len(pm.hdr)
c.pa.szb = []byte(strconv.Itoa(c.pa.size))
c.pa.reply = []byte(pm.rply)
var msg []byte
if len(pm.hdr) > 0 {
c.pa.hdr = len(pm.hdr)
c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr))
msg = append(pm.hdr, pm.msg...)
msg = append(msg, _CRLF_...)
} else {
c.pa.hdr = -1
c.pa.hdb = nil
msg = append(pm.msg, _CRLF_...)
}
c.processInboundClientMsg(msg)
c.pa.szb = nil
// Do this here to nil out below vs up in for loop.
next := pm.next
pm.next, pm.hdr, pm.msg = nil, nil, nil
if pm = next; pm == nil {
pm = sq.pending()
}
}
c.flushClients(10 * time.Millisecond)
}
}
}
func (sq *sendq) pending() *outMsg {
sq.mu.Lock()
head := sq.head
sq.head, sq.tail = nil, nil
sq.mu.Unlock()
return head
}
func (sq *sendq) send(subj, rply string, hdr, msg []byte) {
out := &outMsg{subj, rply, nil, nil, nil}
// We will copy these for now.
if len(hdr) > 0 {
hdr = append(hdr[:0:0], hdr...)
out.hdr = hdr
}
if len(msg) > 0 {
msg = append(msg[:0:0], msg...)
out.msg = msg
}
sq.mu.Lock()
var notify bool
if sq.head == nil {
sq.head = out
notify = true
} else {
sq.tail.next = out
}
sq.tail = out
sq.mu.Unlock()
if notify {
select {
case sq.mch <- struct{}{}:
default:
}
}
}

View File

@@ -1120,6 +1120,7 @@ func (s *Server) setSystemAccount(acc *Account) error {
replies: make(map[string]msgHandler),
sendq: make(chan *pubMsg, internalSendQLen),
resetCh: make(chan struct{}),
sq: s.newSendQ(),
statsz: eventsHBInterval,
orphMax: 5 * eventsHBInterval,
chkOrph: 3 * eventsHBInterval,

View File

@@ -135,8 +135,11 @@ type stream struct {
sysc *client
sid int
pubAck []byte
sendq chan *jsPubMsg
outq *jsOutQ
mch chan struct{}
msgs *inbound
store StreamStore
rmch chan uint64
lseq uint64
lmsgId string
consumers map[string]*consumer
@@ -301,6 +304,9 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
sysc: ic,
stype: cfg.Storage,
consumers: make(map[string]*consumer),
mch: make(chan struct{}, 1),
msgs: &inbound{},
rmch: make(chan uint64, 8192),
qch: make(chan struct{}),
}
@@ -572,10 +578,10 @@ func (mset *stream) sendCreateAdvisory() {
mset.mu.Lock()
name := mset.cfg.Name
template := mset.cfg.Template
sendq := mset.sendq
outq := mset.outq
mset.mu.Unlock()
if sendq == nil {
if outq == nil {
return
}
@@ -597,11 +603,11 @@ func (mset *stream) sendCreateAdvisory() {
}
subj := JSAdvisoryStreamCreatedPre + "." + name
sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0}
outq.send(&jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0, nil})
}
func (mset *stream) sendDeleteAdvisoryLocked() {
if mset.sendq == nil {
if mset.outq == nil {
return
}
@@ -619,12 +625,12 @@ func (mset *stream) sendDeleteAdvisoryLocked() {
j, err := json.Marshal(m)
if err == nil {
subj := JSAdvisoryStreamDeletedPre + "." + mset.cfg.Name
mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0}
mset.outq.send(&jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0, nil})
}
}
func (mset *stream) sendUpdateAdvisoryLocked() {
if mset.sendq == nil {
if mset.outq == nil {
return
}
@@ -641,7 +647,7 @@ func (mset *stream) sendUpdateAdvisoryLocked() {
j, err := json.Marshal(m)
if err == nil {
subj := JSAdvisoryStreamUpdatedPre + "." + mset.cfg.Name
mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0}
mset.outq.send(&jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0, nil})
}
}
@@ -1113,8 +1119,8 @@ func (mset *stream) mirrorDurable() string {
// Setup our mirror consumer.
// Lock should be held.
func (mset *stream) setupMirrorConsumer() error {
if mset.sendq == nil {
return errors.New("sendq required")
if mset.outq == nil {
return errors.New("outq required")
}
// Reset
@@ -1172,7 +1178,7 @@ func (mset *stream) setupMirrorConsumer() error {
subject = strings.Replace(subject, JSApiPrefix, ext.ApiPrefix, 1)
subject = strings.ReplaceAll(subject, "..", ".")
}
mset.sendq <- &jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0}
mset.outq.send(&jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil})
req := &CreateConsumerRequest{
Stream: mset.cfg.Mirror.Name,
@@ -1223,7 +1229,7 @@ func (mset *stream) setupMirrorConsumer() error {
subject = strings.ReplaceAll(subject, "..", ".")
}
mset.sendq <- &jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0}
mset.outq.send(&jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0, nil})
go func() {
var shouldRetry bool
@@ -1314,7 +1320,7 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) {
subject = strings.Replace(subject, JSApiPrefix, ext.ApiPrefix, 1)
subject = strings.ReplaceAll(subject, "..", ".")
}
mset.sendq <- &jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0}
mset.outq.send(&jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil})
if ext != nil {
deliverSubject = strings.ReplaceAll(ext.DeliverPrefix+syncSubject(".S"), "..", ".")
@@ -1392,7 +1398,7 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) {
subject = strings.ReplaceAll(subject, "..", ".")
}
mset.sendq <- &jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0}
mset.outq.send(&jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0, nil})
go func() {
var shouldRetry bool
@@ -1615,8 +1621,8 @@ func (mset *stream) startingSequenceForSources() {
// Setup our source consumers.
// Lock should be held.
func (mset *stream) setupSourceConsumers() error {
if mset.sendq == nil {
return errors.New("sendq required")
if mset.outq == nil {
return errors.New("outq required")
}
// Reset if needed.
for _, si := range mset.sources {
@@ -1672,7 +1678,7 @@ func (mset *stream) stopSourceConsumers() {
}
// Need to delete the old one.
subject := fmt.Sprintf(JSApiConsumerDeleteT, si.name, mset.sourceDurable(si.name))
mset.sendq <- &jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0}
mset.outq.send(&jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil})
}
}
@@ -1688,7 +1694,7 @@ func (mset *stream) unsubscribeToStream() error {
}
durable := mset.mirrorDurable()
subject := fmt.Sprintf(JSApiConsumerDeleteT, mset.cfg.Mirror.Name, durable)
mset.sendq <- &jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0}
mset.outq.send(&jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil})
mset.mirror = nil
}
@@ -1909,10 +1915,62 @@ func (mset *stream) isClustered() bool {
return mset.node != nil
}
// processInboundJetStreamMsg handles processing messages bound for a stream.
func (mset *stream) processInboundJetStreamMsg(_ *subscription, pc *client, subject, reply string, rmsg []byte) {
hdr, msg := pc.msgParts(rmsg)
// Used if we have to queue things internally to avoid the route/gw path.
type inMsg struct {
subj string
rply string
hdr []byte
msg []byte
next *inMsg
}
// Linked list for inbound messages.
type inbound struct {
head *inMsg
tail *inMsg
}
func (mset *stream) pending() *inMsg {
mset.mu.Lock()
head := mset.msgs.head
mset.msgs.head, mset.msgs.tail = nil, nil
mset.mu.Unlock()
return head
}
func (mset *stream) queueInboundMsg(subj, rply string, hdr, msg []byte) {
m := &inMsg{subj, rply, nil, nil, nil}
// Copy these.
if len(hdr) > 0 {
hdr = append(hdr[:0:0], hdr...)
m.hdr = hdr
}
if len(msg) > 0 {
msg = append(msg[:0:0], msg...)
m.msg = msg
}
mset.mu.Lock()
var notify bool
if mset.msgs.head == nil {
mset.msgs.head = m
notify = true
} else {
mset.msgs.tail.next = m
}
mset.msgs.tail = m
mset.mu.Unlock()
if notify {
select {
case mset.mch <- struct{}{}:
default:
}
}
}
// processInboundJetStreamMsg handles processing messages bound for a stream.
func (mset *stream) processInboundJetStreamMsg(_ *subscription, c *client, subject, reply string, rmsg []byte) {
mset.mu.RLock()
isLeader, isClustered := mset.isLeader(), mset.node != nil
mset.mu.RUnlock()
@@ -1922,6 +1980,14 @@ func (mset *stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj
return
}
hdr, msg := c.msgParts(rmsg)
// If we are not receiving directly from a client we should move this this Go routine.
if c.kind != CLIENT {
mset.queueInboundMsg(subject, reply, hdr, msg)
return
}
// If we are clustered we need to propose this message to the underlying raft group.
if isClustered {
mset.processClusteredInboundMsg(subject, reply, hdr, msg)
@@ -1973,13 +2039,13 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}
}
if isMisMatch {
sendq := mset.sendq
outq := mset.outq
mset.mu.Unlock()
if canRespond && sendq != nil {
if canRespond && outq != nil {
resp.PubAck = &PubAck{Stream: name}
resp.Error = &ApiError{Code: 503, Description: "expected stream sequence does not match"}
b, _ := json.Marshal(resp)
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0}
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil})
}
return errLastSeqMismatch
}
@@ -1995,14 +2061,14 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
var msgId string
if len(hdr) > 0 {
msgId = getMsgId(hdr)
sendq := mset.sendq
outq := mset.outq
if dde := mset.checkMsgId(msgId); dde != nil {
mset.clfs++
mset.mu.Unlock()
if canRespond {
response := append(pubAck, strconv.FormatUint(dde.seq, 10)...)
response = append(response, ",\"duplicate\": true}"...)
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil})
}
return errors.New("msgid is duplicate")
}
@@ -2015,7 +2081,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
resp.PubAck = &PubAck{Stream: name}
resp.Error = &ApiError{Code: 400, Description: "expected stream does not match"}
b, _ := json.Marshal(resp)
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0}
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil})
}
return errors.New("expected stream does not match")
}
@@ -2028,7 +2094,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
resp.PubAck = &PubAck{Stream: name}
resp.Error = &ApiError{Code: 400, Description: fmt.Sprintf("wrong last sequence: %d", mlseq)}
b, _ := json.Marshal(resp)
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0}
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil})
}
return fmt.Errorf("last sequence mismatch: %d vs %d", seq, mlseq)
}
@@ -2041,7 +2107,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
resp.PubAck = &PubAck{Stream: name}
resp.Error = &ApiError{Code: 400, Description: fmt.Sprintf("wrong last msg ID: %s", last)}
b, _ := json.Marshal(resp)
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0}
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil})
}
return fmt.Errorf("last msgid mismatch: %q vs %q", lmsgId, last)
}
@@ -2062,7 +2128,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
resp.PubAck = &PubAck{Stream: name}
resp.Error = &ApiError{Code: 400, Description: "message size exceeds maximum allowed"}
b, _ := json.Marshal(resp)
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0}
mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil})
}
return ErrMaxPayload
}
@@ -2099,7 +2165,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
if canRespond {
response = append(pubAck, strconv.FormatUint(mset.lseq, 10)...)
response = append(response, '}')
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil})
}
// If we have a msgId make sure to save.
if msgId != _EMPTY_ {
@@ -2169,7 +2235,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Send response here.
if canRespond {
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil})
}
if err == nil && seq > 0 && numConsumers > 0 {
@@ -2199,6 +2265,43 @@ type jsPubMsg struct {
msg []byte
o *consumer
seq uint64
next *jsPubMsg
}
// Forms a linked list for sending internal system messages.
type jsOutQ struct {
mu sync.Mutex
mch chan struct{}
head *jsPubMsg
tail *jsPubMsg
}
func (q *jsOutQ) pending() *jsPubMsg {
q.mu.Lock()
head := q.head
q.head, q.tail = nil, nil
q.mu.Unlock()
return head
}
func (q *jsOutQ) send(msg *jsPubMsg) {
q.mu.Lock()
var notify bool
if q.head == nil {
q.head = msg
notify = true
} else {
q.tail.next = msg
}
q.tail = msg
q.mu.Unlock()
if notify {
select {
case q.mch <- struct{}{}:
default:
}
}
}
// StoredMsg is for raw access to messages in a stream.
@@ -2210,19 +2313,16 @@ type StoredMsg struct {
Time time.Time `json:"time"`
}
// TODO(dlc) - Maybe look at onering instead of chan - https://github.com/pltr/onering
const msetSendQSize = 65536
// This is similar to system semantics but did not want to overload the single system sendq,
// or require system account when doing simple setup with jetstream.
func (mset *stream) setupSendCapabilities() {
mset.mu.Lock()
defer mset.mu.Unlock()
if mset.sendq != nil {
if mset.outq != nil {
return
}
mset.sendq = make(chan *jsPubMsg, msetSendQSize)
go mset.internalSendLoop()
mset.outq = &jsOutQ{mch: make(chan struct{}, 1)}
go mset.internalLoop()
}
// Name returns the stream name.
@@ -2235,58 +2335,70 @@ func (mset *stream) name() string {
return mset.cfg.Name
}
func (mset *stream) internalSendLoop() {
func (mset *stream) internalLoop() {
mset.mu.RLock()
s := mset.srv
c := s.createInternalJetStreamClient()
c.registerWithAccount(mset.acc)
defer c.closeConnection(ClientClosed)
sendq := mset.sendq
name := mset.cfg.Name
outq, qch, mch, rmch := mset.outq, mset.qch, mset.mch, mset.rmch
isClustered := mset.node != nil
mset.mu.RUnlock()
// Warn when internal send queue is backed up past 75%
warnThresh := 3 * msetSendQSize / 4
warnFreq := time.Second
last := time.Now().Add(-warnFreq)
for {
if len(sendq) > warnThresh && time.Since(last) >= warnFreq {
s.Warnf("Jetstream internal send queue > 75%% for account: %q stream: %q", c.acc.Name, name)
last = time.Now()
}
select {
case pm := <-sendq:
if pm == nil {
return
}
c.pa.subject = []byte(pm.subj)
c.pa.deliver = []byte(pm.dsubj)
c.pa.size = len(pm.msg) + len(pm.hdr)
c.pa.szb = []byte(strconv.Itoa(c.pa.size))
c.pa.reply = []byte(pm.reply)
case <-outq.mch:
for pm := outq.pending(); pm != nil; {
c.pa.subject = []byte(pm.subj)
c.pa.deliver = []byte(pm.dsubj)
c.pa.size = len(pm.msg) + len(pm.hdr)
c.pa.szb = []byte(strconv.Itoa(c.pa.size))
c.pa.reply = []byte(pm.reply)
var msg []byte
if len(pm.hdr) > 0 {
c.pa.hdr = len(pm.hdr)
c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr))
msg = append(pm.hdr, pm.msg...)
msg = append(msg, _CRLF_...)
} else {
c.pa.hdr = -1
c.pa.hdb = nil
msg = append(pm.msg, _CRLF_...)
}
var msg []byte
if len(pm.hdr) > 0 {
c.pa.hdr = len(pm.hdr)
c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr))
msg = append(pm.hdr, pm.msg...)
msg = append(msg, _CRLF_...)
} else {
c.pa.hdr = -1
c.pa.hdb = nil
msg = append(pm.msg, _CRLF_...)
}
didDeliver, _ := c.processInboundClientMsg(msg)
c.pa.szb = nil
c.flushClients(0)
didDeliver, _ := c.processInboundClientMsg(msg)
c.pa.szb = nil
// Check to see if this is a delivery for an observable and
// we failed to deliver the message. If so alert the observable.
if pm.o != nil && pm.seq > 0 && !didDeliver {
pm.o.didNotDeliver(pm.seq)
// Check to see if this is a delivery for an observable and
// we failed to deliver the message. If so alert the observable.
if pm.o != nil && pm.seq > 0 && !didDeliver {
pm.o.didNotDeliver(pm.seq)
}
// Do this here to nil out below vs up in for loop.
next := pm.next
pm.next, pm.hdr, pm.msg = nil, nil, nil
pm = next
}
c.flushClients(10 * time.Millisecond)
case <-mch:
for im := mset.pending(); im != nil; {
// If we are clustered we need to propose this message to the underlying raft group.
if isClustered {
mset.processClusteredInboundMsg(im.subj, im.rply, im.hdr, im.msg)
} else {
mset.processJetStreamMsg(im.subj, im.rply, im.hdr, im.msg, 0, 0)
}
// Do this here to nil out below vs up in for loop.
next := im.next
im.next, im.hdr, im.msg = nil, nil, nil
im = next
}
case seq := <-rmch:
mset.store.RemoveMsg(seq)
case <-qch:
return
case <-s.quitCh:
return
}
@@ -2354,7 +2466,7 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
if deleteFlag {
for _, ssi := range mset.cfg.Sources {
subject := fmt.Sprintf(JSApiConsumerDeleteT, ssi.Name, mset.sourceDurable(ssi.Name))
mset.sendq <- &jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0}
mset.outq.send(&jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil})
}
}
@@ -2363,10 +2475,6 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
mset.sendDeleteAdvisoryLocked()
}
if mset.sendq != nil {
mset.sendq <- nil
}
c := mset.client
mset.client = nil
if c == nil {