diff --git a/server/client.go b/server/client.go index 6a910aa9..bb563c46 100644 --- a/server/client.go +++ b/server/client.go @@ -270,6 +270,7 @@ type client struct { trace bool echo bool + noIcb bool tags jwt.TagList nameTag string @@ -3026,7 +3027,7 @@ func (c *client) deliverMsg(sub *subscription, subject, reply, mh, msg []byte, g client.outBytes += msgSize // Check for internal subscriptions. - if sub.icb != nil { + if sub.icb != nil && !c.noIcb { if gwrply { // Note that we keep track of the GW routed reply in the destination // connection (`client`). The routed reply subject is in `c.pa.reply`, diff --git a/server/const.go b/server/const.go index a1a663ef..348ea0b7 100644 --- a/server/const.go +++ b/server/const.go @@ -40,7 +40,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.2.0-RC.4" + VERSION = "2.2.0-RC.5" // PROTO is the currently supported protocol. // 0 was the original diff --git a/server/consumer.go b/server/consumer.go index bf12981f..1cd044a8 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -187,7 +187,7 @@ type consumer struct { ackSubj string nextMsgSubj string maxp int - sendq chan *jsPubMsg + outq *jsOutQ pending map[uint64]*Pending ptmr *time.Timer rdq []uint64 @@ -428,6 +428,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri sysc: s.createInternalJetStreamClient(), cfg: *config, dsubj: config.DeliverSubject, + outq: mset.outq, active: true, qch: make(chan struct{}), mch: make(chan struct{}, 1), @@ -670,8 +671,6 @@ func (o *consumer) setLeader(isLeader bool) { o.replay = true } - // Recreate internal sendq - o.sendq = make(chan *jsPubMsg, msetSendQSize) // Recreate quit channel. o.qch = make(chan struct{}) qch := o.qch @@ -679,8 +678,6 @@ func (o *consumer) setLeader(isLeader bool) { // Now start up Go routine to deliver msgs. go o.loopAndGatherMsgs(qch) - // Startup our deliver loop. - go o.loopAndDeliverMsgs(qch) } else { // Shutdown the go routines and the subscriptions. @@ -693,7 +690,6 @@ func (o *consumer) setLeader(isLeader bool) { o.srv.sysUnsubscribe(o.infoSub) o.infoSub = nil } - o.sendq = nil if o.qch != nil { close(o.qch) o.qch = nil @@ -737,16 +733,10 @@ func (o *consumer) unsubscribe(sub *subscription) { o.client.processUnsub(sub.sid) } -// We need to make sure we protect access to the sendq. +// We need to make sure we protect access to the outq. // Do all advisory sends here. -// Lock should be held on entry but will be released. func (o *consumer) sendAdvisory(subj string, msg []byte) { - sendq := o.sendq - o.mu.Unlock() - if sendq != nil { - sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, msg, nil, 0} - } - o.mu.Lock() + o.outq.send(&jsPubMsg{subj, subj, _EMPTY_, nil, msg, nil, 0, nil}) } func (o *consumer) sendDeleteAdvisoryLocked() { @@ -1230,73 +1220,6 @@ func (o *consumer) writeStoreState() error { return o.store.Update(&state) } -// loopAndDeliverMsgs() will loop and deliver messages and watch for interest changes. -func (o *consumer) loopAndDeliverMsgs(qch chan struct{}) { - o.mu.Lock() - inch, sendq := o.inch, o.sendq - s, acc := o.acc.srv, o.acc - o.mu.Unlock() - - // Create our client used to send messages. - c := s.createInternalJetStreamClient() - // Bind to the account. - c.registerWithAccount(acc) - // Clean up on exit. - defer c.closeConnection(ClientClosed) - - // Warn when internal send queue is backed up past 75% - warnThresh := cap(sendq) * 3 / 4 - warnFreq := time.Second - last := time.Now().Add(-warnFreq) - - for { - if len(sendq) > warnThresh && time.Since(last) >= warnFreq { - s.Warnf("Jetstream internal consumer send queue > 75%% for account: %q consumer: %q", acc.Name, o) - last = time.Now() - } - - select { - case <-qch: - return - case interest := <-inch: - // inch can be nil on pull-based, but then this will - // just block and not fire. - o.updateDeliveryInterest(interest) - case pm := <-sendq: - if pm == nil { - return - } - c.pa.subject = []byte(pm.subj) - c.pa.deliver = []byte(pm.dsubj) - c.pa.size = len(pm.msg) + len(pm.hdr) - c.pa.szb = []byte(strconv.Itoa(c.pa.size)) - c.pa.reply = []byte(pm.reply) - - var msg []byte - if len(pm.hdr) > 0 { - c.pa.hdr = len(pm.hdr) - c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr)) - msg = append(pm.hdr, pm.msg...) - msg = append(msg, _CRLF_...) - } else { - c.pa.hdr = -1 - c.pa.hdb = nil - msg = append(pm.msg, _CRLF_...) - } - - didDeliver, _ := c.processInboundClientMsg(msg) - c.pa.szb = nil - c.flushClients(0) - - // Check to see if this is a delivery for an observable and - // we failed to deliver the message. If so alert the observable. - if !didDeliver && pm.o != nil && pm.seq > 0 { - pm.o.didNotDeliver(pm.seq) - } - } - } -} - // Info returns our current consumer state. func (o *consumer) info() *ConsumerInfo { o.mu.RLock() @@ -1645,18 +1568,13 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _, reply string defer o.mu.Unlock() s, mset, js := o.srv, o.mset, o.js - if mset == nil || o.sendq == nil { + if mset == nil { return } - sendq := o.sendq sendErr := func(status int, description string) { - // Needs to be unlocked to send err. - o.mu.Unlock() - defer o.mu.Lock() hdr := []byte(fmt.Sprintf("NATS/1.0 %d %s\r\n\r\n", status, description)) - pmsg := &jsPubMsg{reply, reply, _EMPTY_, hdr, nil, nil, 0} - sendq <- pmsg // Send error message. + o.outq.send(&jsPubMsg{reply, reply, _EMPTY_, hdr, nil, nil, 0, nil}) } if o.isPushMode() { @@ -1846,11 +1764,10 @@ func (o *consumer) forceExpireFirstWaiting() *waitingRequest { return wr } // If we are expiring this and we think there is still interest, alert. - if rr := o.acc.sl.Match(wr.reply); len(rr.psubs)+len(rr.qsubs) > 0 && o.mset != nil && o.sendq != nil { + if rr := o.acc.sl.Match(wr.reply); len(rr.psubs)+len(rr.qsubs) > 0 && o.mset != nil { // We still appear to have interest, so send alert as courtesy. hdr := []byte("NATS/1.0 408 Request Timeout\r\n\r\n") - pmsg := &jsPubMsg{wr.reply, wr.reply, _EMPTY_, hdr, nil, nil, 0} - o.sendq <- pmsg // Send message. + o.outq.send(&jsPubMsg{wr.reply, wr.reply, _EMPTY_, hdr, nil, nil, 0, nil}) } return wr } @@ -1903,6 +1820,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { } lseq = o.mset.state().LastSeq } + inch := o.inch o.mu.Unlock() // Deliver all the msgs we have now, once done or on a condition, we wait for new ones. @@ -2000,9 +1918,14 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { o.mu.Unlock() select { + case interest := <-inch: + // inch can be nil on pull-based, but then this will + // just block and not fire. + o.updateDeliveryInterest(interest) case <-qch: return case <-mch: + // Messages are waiting. } } } @@ -2014,7 +1937,7 @@ func (o *consumer) ackReply(sseq, dseq, dc uint64, ts int64, pending uint64) str // Deliver a msg to the consumer. // Lock should be held and o.mset validated to be non-nil. func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint64, ts int64) { - if o.mset == nil || o.sendq == nil { + if o.mset == nil { return } // Update pending on first attempt @@ -2032,21 +1955,17 @@ func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint6 dseq := o.dseq o.dseq++ - pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, dseq, dc, ts, o.sgap), hdr, msg, o, seq} + pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, dseq, dc, ts, o.sgap), hdr, msg, o, seq, nil} mset := o.mset ap := o.cfg.AckPolicy - sendq := o.sendq - // This needs to be unlocked since the other side may need this lock on a failed delivery. - o.mu.Unlock() // Send message. - sendq <- pmsg + o.outq.send(pmsg) + // If we are ack none and mset is interest only we should make sure stream removes interest. if ap == AckNone && mset.cfg.Retention == InterestPolicy && !mset.checkInterest(seq, o) { - mset.store.RemoveMsg(seq) + mset.rmch <- seq } - // Re-acquire lock. - o.mu.Lock() if ap == AckExplicit || ap == AckAll { o.trackPending(seq, dseq) @@ -2541,6 +2460,7 @@ func (o *consumer) stopWithFlags(dflag, doSignal, advisory bool) error { seqs = append(seqs, seq) } o.mu.Unlock() + // Sort just to keep pending sparse array state small. sort.Slice(seqs, func(i, j int) bool { return seqs[i] < seqs[j] }) for _, seq := range seqs { diff --git a/server/events.go b/server/events.go index 5a69bb3b..7a51add5 100644 --- a/server/events.go +++ b/server/events.go @@ -91,6 +91,7 @@ type internal struct { sendq chan *pubMsg resetCh chan struct{} wg sync.WaitGroup + sq *sendq orphMax time.Duration chkOrph time.Duration statsz time.Duration diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 2ef4f455..c66357e0 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1651,7 +1651,11 @@ func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, sub } // Call actual stepdown. - mset.raftNode().StepDown() + if mset != nil { + if node := mset.raftNode(); node != nil { + node.StepDown() + } + } resp.Success = true s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) @@ -2785,7 +2789,7 @@ func (s *Server) streamSnapshot(ci *ClientInfo, acc *Account, mset *stream, sr * chunk = chunk[:n] if err != nil { if n > 0 { - mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, chunk, nil, 0} + mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, chunk, nil, 0, nil}) } break } @@ -2800,15 +2804,14 @@ func (s *Server) streamSnapshot(ci *ClientInfo, acc *Account, mset *stream, sr * case <-time.After(10 * time.Millisecond): } } - // TODO(dlc) - Might want these moved off sendq if we have contention. ackReply := fmt.Sprintf("%s.%d.%d", ackSubj, len(chunk), index) - mset.sendq <- &jsPubMsg{reply, _EMPTY_, ackReply, nil, chunk, nil, 0} + mset.outq.send(&jsPubMsg{reply, _EMPTY_, ackReply, nil, chunk, nil, 0, nil}) atomic.AddInt32(&out, int32(len(chunk))) } done: // Send last EOF // TODO(dlc) - place hash in header - mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, nil, nil, 0} + mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil}) } // Request to create a durable consumer. diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 16e7b7ea..73652b81 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -756,7 +756,7 @@ func (js *jetStream) monitorCluster() { // FIXME(dlc) - Deal with errors. if _, didRemoval, err := js.applyMetaEntries(ce.Entries, isRecovering); err == nil { n.Applied(ce.Index) - if didRemoval && time.Since(lastSnapTime) > 2*time.Second { + if js.hasPeerEntries(ce.Entries) || (didRemoval && time.Since(lastSnapTime) > 2*time.Second) { // Since we received one make sure we have our own since we do not store // our meta state outside of raft. doSnapshot() @@ -1029,6 +1029,16 @@ func (js *jetStream) removePeerFromStream(sa *streamAssignment, peer string) { } } +// Check if we have peer related entries. +func (js *jetStream) hasPeerEntries(entries []*Entry) bool { + for _, e := range entries { + if e.Type == EntryRemovePeer || e.Type == EntryAddPeer { + return true + } + } + return false +} + func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool, bool, error) { var didSnap, didRemove bool for _, e := range entries { @@ -1038,7 +1048,6 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool } else if e.Type == EntryRemovePeer { if !isRecovering { js.processRemovePeer(string(e.Data)) - didRemove = true } } else { buf := e.Data @@ -3891,7 +3900,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ mset.mu.RLock() canRespond := !mset.cfg.NoAck && len(reply) > 0 - s, jsa, st, rf, sendq := mset.srv, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.sendq + s, jsa, st, rf, outq := mset.srv, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.outq maxMsgSize := int(mset.cfg.MaxMsgSize) msetName := mset.cfg.Name mset.mu.RUnlock() @@ -3920,7 +3929,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: mset.name()}} resp.Error = &ApiError{Code: 400, Description: "resource limits exceeded for account"} response, _ = json.Marshal(resp) - sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0} + outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil}) } return err } @@ -3933,7 +3942,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: mset.name()}} resp.Error = &ApiError{Code: 400, Description: "message size exceeds maximum allowed"} response, _ = json.Marshal(resp) - sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0} + outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil}) } return err } @@ -3962,7 +3971,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ // If we errored out respond here. if err != nil && canRespond { - sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0} + outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil}) } if err != nil && isOutOfSpaceErr(err) { diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 11229ae5..0940c57e 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -3118,7 +3118,7 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) { c.randomNonStreamLeader("$G", "NO-Q").Shutdown() // This should eventually have us stepdown as leader since we would have lost quorum with R=2. - checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + checkFor(t, 5*time.Second, 500*time.Millisecond, func() error { if sl := c.streamLeader("$G", "NO-Q"); sl == nil { return nil } diff --git a/server/raft.go b/server/raft.go index 2a80bd79..c0183854 100644 --- a/server/raft.go +++ b/server/raft.go @@ -153,6 +153,7 @@ type raft struct { asubj string areply string + sq *sendq aesub *subscription // For holding term and vote and peerstate to be written. @@ -176,7 +177,6 @@ type raft struct { entryc chan *appendEntry respc chan *appendEntryResponse applyc chan *CommittedEntry - sendq chan *pubMsg quit chan struct{} reqs chan *voteRequest votes chan *voteResponse @@ -206,7 +206,7 @@ const ( maxElectionTimeout = 5 * minElectionTimeout minCampaignTimeout = 100 * time.Millisecond maxCampaignTimeout = 4 * minCampaignTimeout - hbInterval = 250 * time.Millisecond + hbInterval = 500 * time.Millisecond lostQuorumInterval = hbInterval * 5 ) @@ -303,11 +303,11 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { return nil, errNilCfg } s.mu.Lock() - if s.sys == nil || s.sys.sendq == nil { + if s.sys == nil { s.mu.Unlock() return nil, ErrNoSysAccount } - sendq := s.sys.sendq + sq := s.sys.sq sacc := s.sys.account hash := s.sys.shash s.mu.Unlock() @@ -334,7 +334,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { acks: make(map[uint64]map[string]struct{}), s: s, c: s.createInternalSystemClient(), - sendq: sendq, + sq: sq, quit: make(chan struct{}), wtvch: make(chan struct{}, 1), wpsch: make(chan struct{}, 1), @@ -2884,11 +2884,11 @@ func (n *raft) requestVote() { } func (n *raft) sendRPC(subject, reply string, msg []byte) { - n.sendq <- &pubMsg{n.c, subject, reply, nil, msg, false} + n.sq.send(subject, reply, nil, msg) } func (n *raft) sendReply(subject string, msg []byte) { - n.sendq <- &pubMsg{n.c, subject, _EMPTY_, nil, msg, false} + n.sq.send(subject, _EMPTY_, nil, msg) } func (n *raft) wonElection(votes int) bool { diff --git a/server/sendq.go b/server/sendq.go new file mode 100644 index 00000000..66109062 --- /dev/null +++ b/server/sendq.go @@ -0,0 +1,129 @@ +// Copyright 2020-2021 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "strconv" + "sync" + "time" +) + +type outMsg struct { + subj string + rply string + hdr []byte + msg []byte + next *outMsg +} + +type sendq struct { + mu sync.Mutex + mch chan struct{} + head *outMsg + tail *outMsg + s *Server +} + +func (s *Server) newSendQ() *sendq { + sq := &sendq{s: s, mch: make(chan struct{}, 1)} + s.startGoRoutine(sq.internalLoop) + return sq +} + +func (sq *sendq) internalLoop() { + sq.mu.Lock() + s, mch := sq.s, sq.mch + sq.mu.Unlock() + + defer s.grWG.Done() + + c := s.createInternalSystemClient() + c.registerWithAccount(s.SystemAccount()) + c.noIcb = true + + defer c.closeConnection(ClientClosed) + + for s.isRunning() { + select { + case <-s.quitCh: + return + case <-mch: + for pm := sq.pending(); pm != nil; { + c.pa.subject = []byte(pm.subj) + c.pa.size = len(pm.msg) + len(pm.hdr) + c.pa.szb = []byte(strconv.Itoa(c.pa.size)) + c.pa.reply = []byte(pm.rply) + var msg []byte + if len(pm.hdr) > 0 { + c.pa.hdr = len(pm.hdr) + c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr)) + msg = append(pm.hdr, pm.msg...) + msg = append(msg, _CRLF_...) + } else { + c.pa.hdr = -1 + c.pa.hdb = nil + msg = append(pm.msg, _CRLF_...) + } + c.processInboundClientMsg(msg) + c.pa.szb = nil + // Do this here to nil out below vs up in for loop. + next := pm.next + pm.next, pm.hdr, pm.msg = nil, nil, nil + if pm = next; pm == nil { + pm = sq.pending() + } + } + c.flushClients(10 * time.Millisecond) + } + } +} + +func (sq *sendq) pending() *outMsg { + sq.mu.Lock() + head := sq.head + sq.head, sq.tail = nil, nil + sq.mu.Unlock() + return head +} + +func (sq *sendq) send(subj, rply string, hdr, msg []byte) { + out := &outMsg{subj, rply, nil, nil, nil} + // We will copy these for now. + if len(hdr) > 0 { + hdr = append(hdr[:0:0], hdr...) + out.hdr = hdr + } + if len(msg) > 0 { + msg = append(msg[:0:0], msg...) + out.msg = msg + } + + sq.mu.Lock() + var notify bool + if sq.head == nil { + sq.head = out + notify = true + } else { + sq.tail.next = out + } + sq.tail = out + sq.mu.Unlock() + + if notify { + select { + case sq.mch <- struct{}{}: + default: + } + } +} diff --git a/server/server.go b/server/server.go index 687ffabb..88c470fd 100644 --- a/server/server.go +++ b/server/server.go @@ -1120,6 +1120,7 @@ func (s *Server) setSystemAccount(acc *Account) error { replies: make(map[string]msgHandler), sendq: make(chan *pubMsg, internalSendQLen), resetCh: make(chan struct{}), + sq: s.newSendQ(), statsz: eventsHBInterval, orphMax: 5 * eventsHBInterval, chkOrph: 3 * eventsHBInterval, diff --git a/server/stream.go b/server/stream.go index ec94dc58..60f63f01 100644 --- a/server/stream.go +++ b/server/stream.go @@ -135,8 +135,11 @@ type stream struct { sysc *client sid int pubAck []byte - sendq chan *jsPubMsg + outq *jsOutQ + mch chan struct{} + msgs *inbound store StreamStore + rmch chan uint64 lseq uint64 lmsgId string consumers map[string]*consumer @@ -301,6 +304,9 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt sysc: ic, stype: cfg.Storage, consumers: make(map[string]*consumer), + mch: make(chan struct{}, 1), + msgs: &inbound{}, + rmch: make(chan uint64, 8192), qch: make(chan struct{}), } @@ -572,10 +578,10 @@ func (mset *stream) sendCreateAdvisory() { mset.mu.Lock() name := mset.cfg.Name template := mset.cfg.Template - sendq := mset.sendq + outq := mset.outq mset.mu.Unlock() - if sendq == nil { + if outq == nil { return } @@ -597,11 +603,11 @@ func (mset *stream) sendCreateAdvisory() { } subj := JSAdvisoryStreamCreatedPre + "." + name - sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0} + outq.send(&jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0, nil}) } func (mset *stream) sendDeleteAdvisoryLocked() { - if mset.sendq == nil { + if mset.outq == nil { return } @@ -619,12 +625,12 @@ func (mset *stream) sendDeleteAdvisoryLocked() { j, err := json.Marshal(m) if err == nil { subj := JSAdvisoryStreamDeletedPre + "." + mset.cfg.Name - mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0} + mset.outq.send(&jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0, nil}) } } func (mset *stream) sendUpdateAdvisoryLocked() { - if mset.sendq == nil { + if mset.outq == nil { return } @@ -641,7 +647,7 @@ func (mset *stream) sendUpdateAdvisoryLocked() { j, err := json.Marshal(m) if err == nil { subj := JSAdvisoryStreamUpdatedPre + "." + mset.cfg.Name - mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0} + mset.outq.send(&jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0, nil}) } } @@ -1113,8 +1119,8 @@ func (mset *stream) mirrorDurable() string { // Setup our mirror consumer. // Lock should be held. func (mset *stream) setupMirrorConsumer() error { - if mset.sendq == nil { - return errors.New("sendq required") + if mset.outq == nil { + return errors.New("outq required") } // Reset @@ -1172,7 +1178,7 @@ func (mset *stream) setupMirrorConsumer() error { subject = strings.Replace(subject, JSApiPrefix, ext.ApiPrefix, 1) subject = strings.ReplaceAll(subject, "..", ".") } - mset.sendq <- &jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0} + mset.outq.send(&jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil}) req := &CreateConsumerRequest{ Stream: mset.cfg.Mirror.Name, @@ -1223,7 +1229,7 @@ func (mset *stream) setupMirrorConsumer() error { subject = strings.ReplaceAll(subject, "..", ".") } - mset.sendq <- &jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0} + mset.outq.send(&jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0, nil}) go func() { var shouldRetry bool @@ -1314,7 +1320,7 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) { subject = strings.Replace(subject, JSApiPrefix, ext.ApiPrefix, 1) subject = strings.ReplaceAll(subject, "..", ".") } - mset.sendq <- &jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0} + mset.outq.send(&jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil}) if ext != nil { deliverSubject = strings.ReplaceAll(ext.DeliverPrefix+syncSubject(".S"), "..", ".") @@ -1392,7 +1398,7 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) { subject = strings.ReplaceAll(subject, "..", ".") } - mset.sendq <- &jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0} + mset.outq.send(&jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0, nil}) go func() { var shouldRetry bool @@ -1615,8 +1621,8 @@ func (mset *stream) startingSequenceForSources() { // Setup our source consumers. // Lock should be held. func (mset *stream) setupSourceConsumers() error { - if mset.sendq == nil { - return errors.New("sendq required") + if mset.outq == nil { + return errors.New("outq required") } // Reset if needed. for _, si := range mset.sources { @@ -1672,7 +1678,7 @@ func (mset *stream) stopSourceConsumers() { } // Need to delete the old one. subject := fmt.Sprintf(JSApiConsumerDeleteT, si.name, mset.sourceDurable(si.name)) - mset.sendq <- &jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0} + mset.outq.send(&jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil}) } } @@ -1688,7 +1694,7 @@ func (mset *stream) unsubscribeToStream() error { } durable := mset.mirrorDurable() subject := fmt.Sprintf(JSApiConsumerDeleteT, mset.cfg.Mirror.Name, durable) - mset.sendq <- &jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0} + mset.outq.send(&jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil}) mset.mirror = nil } @@ -1909,10 +1915,62 @@ func (mset *stream) isClustered() bool { return mset.node != nil } -// processInboundJetStreamMsg handles processing messages bound for a stream. -func (mset *stream) processInboundJetStreamMsg(_ *subscription, pc *client, subject, reply string, rmsg []byte) { - hdr, msg := pc.msgParts(rmsg) +// Used if we have to queue things internally to avoid the route/gw path. +type inMsg struct { + subj string + rply string + hdr []byte + msg []byte + next *inMsg +} +// Linked list for inbound messages. +type inbound struct { + head *inMsg + tail *inMsg +} + +func (mset *stream) pending() *inMsg { + mset.mu.Lock() + head := mset.msgs.head + mset.msgs.head, mset.msgs.tail = nil, nil + mset.mu.Unlock() + return head +} + +func (mset *stream) queueInboundMsg(subj, rply string, hdr, msg []byte) { + m := &inMsg{subj, rply, nil, nil, nil} + // Copy these. + if len(hdr) > 0 { + hdr = append(hdr[:0:0], hdr...) + m.hdr = hdr + } + if len(msg) > 0 { + msg = append(msg[:0:0], msg...) + m.msg = msg + } + + mset.mu.Lock() + var notify bool + if mset.msgs.head == nil { + mset.msgs.head = m + notify = true + } else { + mset.msgs.tail.next = m + } + mset.msgs.tail = m + mset.mu.Unlock() + + if notify { + select { + case mset.mch <- struct{}{}: + default: + } + } +} + +// processInboundJetStreamMsg handles processing messages bound for a stream. +func (mset *stream) processInboundJetStreamMsg(_ *subscription, c *client, subject, reply string, rmsg []byte) { mset.mu.RLock() isLeader, isClustered := mset.isLeader(), mset.node != nil mset.mu.RUnlock() @@ -1922,6 +1980,14 @@ func (mset *stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj return } + hdr, msg := c.msgParts(rmsg) + + // If we are not receiving directly from a client we should move this this Go routine. + if c.kind != CLIENT { + mset.queueInboundMsg(subject, reply, hdr, msg) + return + } + // If we are clustered we need to propose this message to the underlying raft group. if isClustered { mset.processClusteredInboundMsg(subject, reply, hdr, msg) @@ -1973,13 +2039,13 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } } if isMisMatch { - sendq := mset.sendq + outq := mset.outq mset.mu.Unlock() - if canRespond && sendq != nil { + if canRespond && outq != nil { resp.PubAck = &PubAck{Stream: name} resp.Error = &ApiError{Code: 503, Description: "expected stream sequence does not match"} b, _ := json.Marshal(resp) - sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0} + outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil}) } return errLastSeqMismatch } @@ -1995,14 +2061,14 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, var msgId string if len(hdr) > 0 { msgId = getMsgId(hdr) - sendq := mset.sendq + outq := mset.outq if dde := mset.checkMsgId(msgId); dde != nil { mset.clfs++ mset.mu.Unlock() if canRespond { response := append(pubAck, strconv.FormatUint(dde.seq, 10)...) response = append(response, ",\"duplicate\": true}"...) - sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0} + outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil}) } return errors.New("msgid is duplicate") } @@ -2015,7 +2081,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, resp.PubAck = &PubAck{Stream: name} resp.Error = &ApiError{Code: 400, Description: "expected stream does not match"} b, _ := json.Marshal(resp) - sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0} + outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil}) } return errors.New("expected stream does not match") } @@ -2028,7 +2094,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, resp.PubAck = &PubAck{Stream: name} resp.Error = &ApiError{Code: 400, Description: fmt.Sprintf("wrong last sequence: %d", mlseq)} b, _ := json.Marshal(resp) - sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0} + outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil}) } return fmt.Errorf("last sequence mismatch: %d vs %d", seq, mlseq) } @@ -2041,7 +2107,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, resp.PubAck = &PubAck{Stream: name} resp.Error = &ApiError{Code: 400, Description: fmt.Sprintf("wrong last msg ID: %s", last)} b, _ := json.Marshal(resp) - sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0} + outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil}) } return fmt.Errorf("last msgid mismatch: %q vs %q", lmsgId, last) } @@ -2062,7 +2128,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, resp.PubAck = &PubAck{Stream: name} resp.Error = &ApiError{Code: 400, Description: "message size exceeds maximum allowed"} b, _ := json.Marshal(resp) - mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0} + mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil}) } return ErrMaxPayload } @@ -2099,7 +2165,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, if canRespond { response = append(pubAck, strconv.FormatUint(mset.lseq, 10)...) response = append(response, '}') - mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0} + mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil}) } // If we have a msgId make sure to save. if msgId != _EMPTY_ { @@ -2169,7 +2235,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, // Send response here. if canRespond { - mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0} + mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil}) } if err == nil && seq > 0 && numConsumers > 0 { @@ -2199,6 +2265,43 @@ type jsPubMsg struct { msg []byte o *consumer seq uint64 + next *jsPubMsg +} + +// Forms a linked list for sending internal system messages. +type jsOutQ struct { + mu sync.Mutex + mch chan struct{} + head *jsPubMsg + tail *jsPubMsg +} + +func (q *jsOutQ) pending() *jsPubMsg { + q.mu.Lock() + head := q.head + q.head, q.tail = nil, nil + q.mu.Unlock() + return head +} + +func (q *jsOutQ) send(msg *jsPubMsg) { + q.mu.Lock() + var notify bool + if q.head == nil { + q.head = msg + notify = true + } else { + q.tail.next = msg + } + q.tail = msg + q.mu.Unlock() + + if notify { + select { + case q.mch <- struct{}{}: + default: + } + } } // StoredMsg is for raw access to messages in a stream. @@ -2210,19 +2313,16 @@ type StoredMsg struct { Time time.Time `json:"time"` } -// TODO(dlc) - Maybe look at onering instead of chan - https://github.com/pltr/onering -const msetSendQSize = 65536 - // This is similar to system semantics but did not want to overload the single system sendq, // or require system account when doing simple setup with jetstream. func (mset *stream) setupSendCapabilities() { mset.mu.Lock() defer mset.mu.Unlock() - if mset.sendq != nil { + if mset.outq != nil { return } - mset.sendq = make(chan *jsPubMsg, msetSendQSize) - go mset.internalSendLoop() + mset.outq = &jsOutQ{mch: make(chan struct{}, 1)} + go mset.internalLoop() } // Name returns the stream name. @@ -2235,58 +2335,70 @@ func (mset *stream) name() string { return mset.cfg.Name } -func (mset *stream) internalSendLoop() { +func (mset *stream) internalLoop() { mset.mu.RLock() s := mset.srv c := s.createInternalJetStreamClient() c.registerWithAccount(mset.acc) defer c.closeConnection(ClientClosed) - sendq := mset.sendq - name := mset.cfg.Name + outq, qch, mch, rmch := mset.outq, mset.qch, mset.mch, mset.rmch + isClustered := mset.node != nil mset.mu.RUnlock() - // Warn when internal send queue is backed up past 75% - warnThresh := 3 * msetSendQSize / 4 - warnFreq := time.Second - last := time.Now().Add(-warnFreq) - for { - if len(sendq) > warnThresh && time.Since(last) >= warnFreq { - s.Warnf("Jetstream internal send queue > 75%% for account: %q stream: %q", c.acc.Name, name) - last = time.Now() - } select { - case pm := <-sendq: - if pm == nil { - return - } - c.pa.subject = []byte(pm.subj) - c.pa.deliver = []byte(pm.dsubj) - c.pa.size = len(pm.msg) + len(pm.hdr) - c.pa.szb = []byte(strconv.Itoa(c.pa.size)) - c.pa.reply = []byte(pm.reply) + case <-outq.mch: + for pm := outq.pending(); pm != nil; { + c.pa.subject = []byte(pm.subj) + c.pa.deliver = []byte(pm.dsubj) + c.pa.size = len(pm.msg) + len(pm.hdr) + c.pa.szb = []byte(strconv.Itoa(c.pa.size)) + c.pa.reply = []byte(pm.reply) - var msg []byte - if len(pm.hdr) > 0 { - c.pa.hdr = len(pm.hdr) - c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr)) - msg = append(pm.hdr, pm.msg...) - msg = append(msg, _CRLF_...) - } else { - c.pa.hdr = -1 - c.pa.hdb = nil - msg = append(pm.msg, _CRLF_...) - } + var msg []byte + if len(pm.hdr) > 0 { + c.pa.hdr = len(pm.hdr) + c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr)) + msg = append(pm.hdr, pm.msg...) + msg = append(msg, _CRLF_...) + } else { + c.pa.hdr = -1 + c.pa.hdb = nil + msg = append(pm.msg, _CRLF_...) + } - didDeliver, _ := c.processInboundClientMsg(msg) - c.pa.szb = nil - c.flushClients(0) + didDeliver, _ := c.processInboundClientMsg(msg) + c.pa.szb = nil - // Check to see if this is a delivery for an observable and - // we failed to deliver the message. If so alert the observable. - if pm.o != nil && pm.seq > 0 && !didDeliver { - pm.o.didNotDeliver(pm.seq) + // Check to see if this is a delivery for an observable and + // we failed to deliver the message. If so alert the observable. + if pm.o != nil && pm.seq > 0 && !didDeliver { + pm.o.didNotDeliver(pm.seq) + } + + // Do this here to nil out below vs up in for loop. + next := pm.next + pm.next, pm.hdr, pm.msg = nil, nil, nil + pm = next } + c.flushClients(10 * time.Millisecond) + case <-mch: + for im := mset.pending(); im != nil; { + // If we are clustered we need to propose this message to the underlying raft group. + if isClustered { + mset.processClusteredInboundMsg(im.subj, im.rply, im.hdr, im.msg) + } else { + mset.processJetStreamMsg(im.subj, im.rply, im.hdr, im.msg, 0, 0) + } + // Do this here to nil out below vs up in for loop. + next := im.next + im.next, im.hdr, im.msg = nil, nil, nil + im = next + } + case seq := <-rmch: + mset.store.RemoveMsg(seq) + case <-qch: + return case <-s.quitCh: return } @@ -2354,7 +2466,7 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { if deleteFlag { for _, ssi := range mset.cfg.Sources { subject := fmt.Sprintf(JSApiConsumerDeleteT, ssi.Name, mset.sourceDurable(ssi.Name)) - mset.sendq <- &jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0} + mset.outq.send(&jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil}) } } @@ -2363,10 +2475,6 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { mset.sendDeleteAdvisoryLocked() } - if mset.sendq != nil { - mset.sendq <- nil - } - c := mset.client mset.client = nil if c == nil {