From bfb8e3432e0b9007885a211d103e6f51a1d0beac Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 4 Mar 2021 14:45:34 -0800 Subject: [PATCH 1/8] Move RAFT comms off internal sendq. Move route and gateway msgs our of fast path for inbound stream msgs. Signed-off-by: Derek Collison --- server/client.go | 5 +- server/events.go | 1 + server/jetstream_api.go | 6 +- server/raft.go | 14 ++--- server/sendq.go | 134 ++++++++++++++++++++++++++++++++++++++++ server/server.go | 1 + server/stream.go | 93 ++++++++++++++++++++++++++-- 7 files changed, 239 insertions(+), 15 deletions(-) create mode 100644 server/sendq.go diff --git a/server/client.go b/server/client.go index 6a910aa9..11c7d2a7 100644 --- a/server/client.go +++ b/server/client.go @@ -271,6 +271,8 @@ type client struct { trace bool echo bool + internal bool + tags jwt.TagList nameTag string } @@ -562,6 +564,7 @@ func (c *client) initClient() { c.subs = make(map[string]*subscription) c.echo = true + c.internal = true c.setTraceLevel() @@ -3026,7 +3029,7 @@ func (c *client) deliverMsg(sub *subscription, subject, reply, mh, msg []byte, g client.outBytes += msgSize // Check for internal subscriptions. - if sub.icb != nil { + if sub.icb != nil && c.internal { if gwrply { // Note that we keep track of the GW routed reply in the destination // connection (`client`). The routed reply subject is in `c.pa.reply`, diff --git a/server/events.go b/server/events.go index 5a69bb3b..7a51add5 100644 --- a/server/events.go +++ b/server/events.go @@ -91,6 +91,7 @@ type internal struct { sendq chan *pubMsg resetCh chan struct{} wg sync.WaitGroup + sq *sendq orphMax time.Duration chkOrph time.Duration statsz time.Duration diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 2ef4f455..f8c11bcb 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1651,7 +1651,11 @@ func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, sub } // Call actual stepdown. - mset.raftNode().StepDown() + if mset != nil { + if node := mset.raftNode(); node != nil { + node.StepDown() + } + } resp.Success = true s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) diff --git a/server/raft.go b/server/raft.go index 2a80bd79..c0183854 100644 --- a/server/raft.go +++ b/server/raft.go @@ -153,6 +153,7 @@ type raft struct { asubj string areply string + sq *sendq aesub *subscription // For holding term and vote and peerstate to be written. @@ -176,7 +177,6 @@ type raft struct { entryc chan *appendEntry respc chan *appendEntryResponse applyc chan *CommittedEntry - sendq chan *pubMsg quit chan struct{} reqs chan *voteRequest votes chan *voteResponse @@ -206,7 +206,7 @@ const ( maxElectionTimeout = 5 * minElectionTimeout minCampaignTimeout = 100 * time.Millisecond maxCampaignTimeout = 4 * minCampaignTimeout - hbInterval = 250 * time.Millisecond + hbInterval = 500 * time.Millisecond lostQuorumInterval = hbInterval * 5 ) @@ -303,11 +303,11 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { return nil, errNilCfg } s.mu.Lock() - if s.sys == nil || s.sys.sendq == nil { + if s.sys == nil { s.mu.Unlock() return nil, ErrNoSysAccount } - sendq := s.sys.sendq + sq := s.sys.sq sacc := s.sys.account hash := s.sys.shash s.mu.Unlock() @@ -334,7 +334,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { acks: make(map[uint64]map[string]struct{}), s: s, c: s.createInternalSystemClient(), - sendq: sendq, + sq: sq, quit: make(chan struct{}), wtvch: make(chan struct{}, 1), wpsch: make(chan struct{}, 1), @@ -2884,11 +2884,11 @@ func (n *raft) requestVote() { } func (n *raft) sendRPC(subject, reply string, msg []byte) { - n.sendq <- &pubMsg{n.c, subject, reply, nil, msg, false} + n.sq.send(subject, reply, nil, msg) } func (n *raft) sendReply(subject string, msg []byte) { - n.sendq <- &pubMsg{n.c, subject, _EMPTY_, nil, msg, false} + n.sq.send(subject, _EMPTY_, nil, msg) } func (n *raft) wonElection(votes int) bool { diff --git a/server/sendq.go b/server/sendq.go new file mode 100644 index 00000000..d9a1541d --- /dev/null +++ b/server/sendq.go @@ -0,0 +1,134 @@ +// Copyright 2020-2021 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "strconv" + "sync" + "time" +) + +type outMsg struct { + subj string + rply string + hdr []byte + msg []byte + next *outMsg +} + +type sendq struct { + mu sync.Mutex + mch chan struct{} + head *outMsg + tail *outMsg + s *Server + msgs int + bytes int +} + +func (s *Server) newSendQ() *sendq { + sq := &sendq{s: s, mch: make(chan struct{}, 1)} + s.startGoRoutine(sq.internalLoop) + return sq +} + +func (sq *sendq) internalLoop() { + sq.mu.Lock() + s, mch := sq.s, sq.mch + sq.mu.Unlock() + + defer s.grWG.Done() + + c := s.createInternalSystemClient() + c.registerWithAccount(s.SystemAccount()) + c.internal = false + + defer c.closeConnection(ClientClosed) + + for s.isRunning() { + select { + case <-s.quitCh: + return + case <-mch: + for pm := sq.pending(); pm != nil; { + c.pa.subject = []byte(pm.subj) + c.pa.size = len(pm.msg) + len(pm.hdr) + c.pa.szb = []byte(strconv.Itoa(c.pa.size)) + c.pa.reply = []byte(pm.rply) + var msg []byte + if len(pm.hdr) > 0 { + c.pa.hdr = len(pm.hdr) + c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr)) + msg = append(pm.hdr, pm.msg...) + msg = append(msg, _CRLF_...) + } else { + c.pa.hdr = -1 + c.pa.hdb = nil + msg = append(pm.msg, _CRLF_...) + } + c.processInboundClientMsg(msg) + c.pa.szb = nil + // Do this here to nil out below vs up in for loop. + next := pm.next + pm.next, pm.hdr, pm.msg = nil, nil, nil + if pm = next; pm == nil { + pm = sq.pending() + } + } + c.flushClients(10 * time.Millisecond) + } + } +} + +func (sq *sendq) pending() *outMsg { + sq.mu.Lock() + head := sq.head + sq.head, sq.tail = nil, nil + sq.msgs, sq.bytes = 0, 0 + sq.mu.Unlock() + return head +} + +func (sq *sendq) send(subj, rply string, hdr, msg []byte) (int, int) { + out := &outMsg{subj, rply, nil, nil, nil} + // We will copy these for now. + if len(hdr) > 0 { + hdr = append(hdr[:0:0], hdr...) + out.hdr = hdr + } + if len(msg) > 0 { + msg = append(msg[:0:0], msg...) + out.msg = msg + } + sq.mu.Lock() + + sq.msgs++ + sq.bytes += len(subj) + len(rply) + len(hdr) + len(msg) + + if sq.head == nil { + sq.head = out + } else { + sq.tail.next = out + } + sq.tail = out + msgs, bytes := sq.msgs, sq.bytes + sq.mu.Unlock() + + select { + case sq.mch <- struct{}{}: + default: + } + + return msgs, bytes +} diff --git a/server/server.go b/server/server.go index 687ffabb..88c470fd 100644 --- a/server/server.go +++ b/server/server.go @@ -1120,6 +1120,7 @@ func (s *Server) setSystemAccount(acc *Account) error { replies: make(map[string]msgHandler), sendq: make(chan *pubMsg, internalSendQLen), resetCh: make(chan struct{}), + sq: s.newSendQ(), statsz: eventsHBInterval, orphMax: 5 * eventsHBInterval, chkOrph: 3 * eventsHBInterval, diff --git a/server/stream.go b/server/stream.go index ec94dc58..2c6f105e 100644 --- a/server/stream.go +++ b/server/stream.go @@ -136,6 +136,8 @@ type stream struct { sid int pubAck []byte sendq chan *jsPubMsg + mch chan struct{} + msgs *inbound store StreamStore lseq uint64 lmsgId string @@ -301,6 +303,8 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt sysc: ic, stype: cfg.Storage, consumers: make(map[string]*consumer), + mch: make(chan struct{}, 1), + msgs: &inbound{}, qch: make(chan struct{}), } @@ -1909,10 +1913,63 @@ func (mset *stream) isClustered() bool { return mset.node != nil } -// processInboundJetStreamMsg handles processing messages bound for a stream. -func (mset *stream) processInboundJetStreamMsg(_ *subscription, pc *client, subject, reply string, rmsg []byte) { - hdr, msg := pc.msgParts(rmsg) +// Used if we have to queue things internally to avoid the route/gw path. +type inMsg struct { + subj string + rply string + hdr []byte + msg []byte + next *inMsg +} +// Linked list for inbound messages. +type inbound struct { + head *inMsg + tail *inMsg + msgs int + bytes int +} + +func (mset *stream) pending() *inMsg { + mset.mu.Lock() + head := mset.msgs.head + mset.msgs.head, mset.msgs.tail = nil, nil + mset.msgs.msgs, mset.msgs.bytes = 0, 0 + mset.mu.Unlock() + return head +} + +func (mset *stream) queueInboundMsg(subj, rply string, hdr, msg []byte) { + m := &inMsg{subj, rply, nil, nil, nil} + // Copy these. + if len(hdr) > 0 { + hdr = append(hdr[:0:0], hdr...) + m.hdr = hdr + } + if len(msg) > 0 { + msg = append(msg[:0:0], msg...) + m.msg = msg + } + + mset.mu.Lock() + if mset.msgs.head == nil { + mset.msgs.head = m + } else { + mset.msgs.tail.next = m + } + mset.msgs.tail = m + mset.msgs.msgs++ + mset.msgs.bytes += len(subj) + len(rply) + len(hdr) + len(msg) + mset.mu.Unlock() + + select { + case mset.mch <- struct{}{}: + default: + } +} + +// processInboundJetStreamMsg handles processing messages bound for a stream. +func (mset *stream) processInboundJetStreamMsg(_ *subscription, c *client, subject, reply string, rmsg []byte) { mset.mu.RLock() isLeader, isClustered := mset.isLeader(), mset.node != nil mset.mu.RUnlock() @@ -1922,6 +1979,14 @@ func (mset *stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj return } + hdr, msg := c.msgParts(rmsg) + + // If we are not receiving directly from a client we should move this this Go routine. + if c.kind != CLIENT { + mset.queueInboundMsg(subject, reply, hdr, msg) + return + } + // If we are clustered we need to propose this message to the underlying raft group. if isClustered { mset.processClusteredInboundMsg(subject, reply, hdr, msg) @@ -2222,7 +2287,7 @@ func (mset *stream) setupSendCapabilities() { return } mset.sendq = make(chan *jsPubMsg, msetSendQSize) - go mset.internalSendLoop() + go mset.internalLoop() } // Name returns the stream name. @@ -2235,14 +2300,15 @@ func (mset *stream) name() string { return mset.cfg.Name } -func (mset *stream) internalSendLoop() { +func (mset *stream) internalLoop() { mset.mu.RLock() s := mset.srv c := s.createInternalJetStreamClient() c.registerWithAccount(mset.acc) defer c.closeConnection(ClientClosed) - sendq := mset.sendq + sendq, mch := mset.sendq, mset.mch name := mset.cfg.Name + isClustered := mset.node != nil mset.mu.RUnlock() // Warn when internal send queue is backed up past 75% @@ -2287,6 +2353,21 @@ func (mset *stream) internalSendLoop() { if pm.o != nil && pm.seq > 0 && !didDeliver { pm.o.didNotDeliver(pm.seq) } + case <-mch: + for im := mset.pending(); im != nil; { + // If we are clustered we need to propose this message to the underlying raft group. + if isClustered { + mset.processClusteredInboundMsg(im.subj, im.rply, im.hdr, im.msg) + } else { + mset.processJetStreamMsg(im.subj, im.rply, im.hdr, im.msg, 0, 0) + } + // Do this here to nil out below vs up in for loop. + next := im.next + im.next, im.hdr, im.msg = nil, nil, nil + if im = next; im == nil { + im = mset.pending() + } + } case <-s.quitCh: return } From 051d0159efe31734d5983893748560dcf024066c Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 4 Mar 2021 14:46:59 -0800 Subject: [PATCH 2/8] rc5 Signed-off-by: Derek Collison --- server/const.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/const.go b/server/const.go index a1a663ef..348ea0b7 100644 --- a/server/const.go +++ b/server/const.go @@ -40,7 +40,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.2.0-RC.4" + VERSION = "2.2.0-RC.5" // PROTO is the currently supported protocol. // 0 was the original From e70e46ea4ab904d3a56e50560ea421c6314d7d78 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 4 Mar 2021 16:16:28 -0800 Subject: [PATCH 3/8] Updates based on PR feedback Signed-off-by: Derek Collison --- server/client.go | 6 ++---- server/sendq.go | 33 ++++++++++++++------------------- server/stream.go | 23 ++++++++++------------- 3 files changed, 26 insertions(+), 36 deletions(-) diff --git a/server/client.go b/server/client.go index 11c7d2a7..bb563c46 100644 --- a/server/client.go +++ b/server/client.go @@ -270,8 +270,7 @@ type client struct { trace bool echo bool - - internal bool + noIcb bool tags jwt.TagList nameTag string @@ -564,7 +563,6 @@ func (c *client) initClient() { c.subs = make(map[string]*subscription) c.echo = true - c.internal = true c.setTraceLevel() @@ -3029,7 +3027,7 @@ func (c *client) deliverMsg(sub *subscription, subject, reply, mh, msg []byte, g client.outBytes += msgSize // Check for internal subscriptions. - if sub.icb != nil && c.internal { + if sub.icb != nil && !c.noIcb { if gwrply { // Note that we keep track of the GW routed reply in the destination // connection (`client`). The routed reply subject is in `c.pa.reply`, diff --git a/server/sendq.go b/server/sendq.go index d9a1541d..1e16066b 100644 --- a/server/sendq.go +++ b/server/sendq.go @@ -28,13 +28,11 @@ type outMsg struct { } type sendq struct { - mu sync.Mutex - mch chan struct{} - head *outMsg - tail *outMsg - s *Server - msgs int - bytes int + mu sync.Mutex + mch chan struct{} + head *outMsg + tail *outMsg + s *Server } func (s *Server) newSendQ() *sendq { @@ -52,7 +50,7 @@ func (sq *sendq) internalLoop() { c := s.createInternalSystemClient() c.registerWithAccount(s.SystemAccount()) - c.internal = false + c.noIcb = true defer c.closeConnection(ClientClosed) @@ -95,12 +93,11 @@ func (sq *sendq) pending() *outMsg { sq.mu.Lock() head := sq.head sq.head, sq.tail = nil, nil - sq.msgs, sq.bytes = 0, 0 sq.mu.Unlock() return head } -func (sq *sendq) send(subj, rply string, hdr, msg []byte) (int, int) { +func (sq *sendq) send(subj, rply string, hdr, msg []byte) { out := &outMsg{subj, rply, nil, nil, nil} // We will copy these for now. if len(hdr) > 0 { @@ -113,22 +110,20 @@ func (sq *sendq) send(subj, rply string, hdr, msg []byte) (int, int) { } sq.mu.Lock() - sq.msgs++ - sq.bytes += len(subj) + len(rply) + len(hdr) + len(msg) - + var doKick bool if sq.head == nil { sq.head = out + doKick = true } else { sq.tail.next = out } sq.tail = out - msgs, bytes := sq.msgs, sq.bytes sq.mu.Unlock() - select { - case sq.mch <- struct{}{}: - default: + if doKick { + select { + case sq.mch <- struct{}{}: + default: + } } - - return msgs, bytes } diff --git a/server/stream.go b/server/stream.go index 2c6f105e..838fa593 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1924,17 +1924,14 @@ type inMsg struct { // Linked list for inbound messages. type inbound struct { - head *inMsg - tail *inMsg - msgs int - bytes int + head *inMsg + tail *inMsg } func (mset *stream) pending() *inMsg { mset.mu.Lock() head := mset.msgs.head mset.msgs.head, mset.msgs.tail = nil, nil - mset.msgs.msgs, mset.msgs.bytes = 0, 0 mset.mu.Unlock() return head } @@ -1952,19 +1949,21 @@ func (mset *stream) queueInboundMsg(subj, rply string, hdr, msg []byte) { } mset.mu.Lock() + var doKick bool if mset.msgs.head == nil { mset.msgs.head = m + doKick = true } else { mset.msgs.tail.next = m } mset.msgs.tail = m - mset.msgs.msgs++ - mset.msgs.bytes += len(subj) + len(rply) + len(hdr) + len(msg) mset.mu.Unlock() - select { - case mset.mch <- struct{}{}: - default: + if doKick { + select { + case mset.mch <- struct{}{}: + default: + } } } @@ -2364,9 +2363,7 @@ func (mset *stream) internalLoop() { // Do this here to nil out below vs up in for loop. next := im.next im.next, im.hdr, im.msg = nil, nil, nil - if im = next; im == nil { - im = mset.pending() - } + im = next } case <-s.quitCh: return From 207ebd3b3dbee17e762b55601dba14262cd3e2c2 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 4 Mar 2021 17:19:50 -0800 Subject: [PATCH 4/8] Changed stream sendq to linked list outq. Made consumer share streams outq. Signed-off-by: Derek Collison --- server/consumer.go | 115 +++++----------------- server/jetstream_api.go | 7 +- server/jetstream_cluster.go | 8 +- server/sendq.go | 8 +- server/stream.go | 187 +++++++++++++++++++++--------------- 5 files changed, 144 insertions(+), 181 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index bf12981f..1baca1ac 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -187,7 +187,7 @@ type consumer struct { ackSubj string nextMsgSubj string maxp int - sendq chan *jsPubMsg + outq *jsOutQ pending map[uint64]*Pending ptmr *time.Timer rdq []uint64 @@ -623,7 +623,7 @@ func (o *consumer) setLeader(isLeader bool) { } mset.mu.RLock() - s, jsa, stream := mset.srv, mset.jsa, mset.cfg.Name + s, jsa, stream, outq := mset.srv, mset.jsa, mset.cfg.Name, mset.outq mset.mu.RUnlock() o.mu.Lock() @@ -670,8 +670,9 @@ func (o *consumer) setLeader(isLeader bool) { o.replay = true } - // Recreate internal sendq - o.sendq = make(chan *jsPubMsg, msetSendQSize) + // We borrow this from the mset. + o.outq = outq + // Recreate quit channel. o.qch = make(chan struct{}) qch := o.qch @@ -679,8 +680,6 @@ func (o *consumer) setLeader(isLeader bool) { // Now start up Go routine to deliver msgs. go o.loopAndGatherMsgs(qch) - // Startup our deliver loop. - go o.loopAndDeliverMsgs(qch) } else { // Shutdown the go routines and the subscriptions. @@ -693,7 +692,7 @@ func (o *consumer) setLeader(isLeader bool) { o.srv.sysUnsubscribe(o.infoSub) o.infoSub = nil } - o.sendq = nil + o.outq = nil if o.qch != nil { close(o.qch) o.qch = nil @@ -737,14 +736,14 @@ func (o *consumer) unsubscribe(sub *subscription) { o.client.processUnsub(sub.sid) } -// We need to make sure we protect access to the sendq. +// We need to make sure we protect access to the outq. // Do all advisory sends here. // Lock should be held on entry but will be released. func (o *consumer) sendAdvisory(subj string, msg []byte) { - sendq := o.sendq + outq := o.outq o.mu.Unlock() - if sendq != nil { - sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, msg, nil, 0} + if outq != nil { + outq.send(&jsPubMsg{subj, subj, _EMPTY_, nil, msg, nil, 0, nil}) } o.mu.Lock() } @@ -1230,73 +1229,6 @@ func (o *consumer) writeStoreState() error { return o.store.Update(&state) } -// loopAndDeliverMsgs() will loop and deliver messages and watch for interest changes. -func (o *consumer) loopAndDeliverMsgs(qch chan struct{}) { - o.mu.Lock() - inch, sendq := o.inch, o.sendq - s, acc := o.acc.srv, o.acc - o.mu.Unlock() - - // Create our client used to send messages. - c := s.createInternalJetStreamClient() - // Bind to the account. - c.registerWithAccount(acc) - // Clean up on exit. - defer c.closeConnection(ClientClosed) - - // Warn when internal send queue is backed up past 75% - warnThresh := cap(sendq) * 3 / 4 - warnFreq := time.Second - last := time.Now().Add(-warnFreq) - - for { - if len(sendq) > warnThresh && time.Since(last) >= warnFreq { - s.Warnf("Jetstream internal consumer send queue > 75%% for account: %q consumer: %q", acc.Name, o) - last = time.Now() - } - - select { - case <-qch: - return - case interest := <-inch: - // inch can be nil on pull-based, but then this will - // just block and not fire. - o.updateDeliveryInterest(interest) - case pm := <-sendq: - if pm == nil { - return - } - c.pa.subject = []byte(pm.subj) - c.pa.deliver = []byte(pm.dsubj) - c.pa.size = len(pm.msg) + len(pm.hdr) - c.pa.szb = []byte(strconv.Itoa(c.pa.size)) - c.pa.reply = []byte(pm.reply) - - var msg []byte - if len(pm.hdr) > 0 { - c.pa.hdr = len(pm.hdr) - c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr)) - msg = append(pm.hdr, pm.msg...) - msg = append(msg, _CRLF_...) - } else { - c.pa.hdr = -1 - c.pa.hdb = nil - msg = append(pm.msg, _CRLF_...) - } - - didDeliver, _ := c.processInboundClientMsg(msg) - c.pa.szb = nil - c.flushClients(0) - - // Check to see if this is a delivery for an observable and - // we failed to deliver the message. If so alert the observable. - if !didDeliver && pm.o != nil && pm.seq > 0 { - pm.o.didNotDeliver(pm.seq) - } - } - } -} - // Info returns our current consumer state. func (o *consumer) info() *ConsumerInfo { o.mu.RLock() @@ -1645,18 +1577,17 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _, reply string defer o.mu.Unlock() s, mset, js := o.srv, o.mset, o.js - if mset == nil || o.sendq == nil { + if mset == nil { return } - sendq := o.sendq + outq := o.outq sendErr := func(status int, description string) { // Needs to be unlocked to send err. o.mu.Unlock() defer o.mu.Lock() hdr := []byte(fmt.Sprintf("NATS/1.0 %d %s\r\n\r\n", status, description)) - pmsg := &jsPubMsg{reply, reply, _EMPTY_, hdr, nil, nil, 0} - sendq <- pmsg // Send error message. + outq.send(&jsPubMsg{reply, reply, _EMPTY_, hdr, nil, nil, 0, nil}) } if o.isPushMode() { @@ -1846,11 +1777,10 @@ func (o *consumer) forceExpireFirstWaiting() *waitingRequest { return wr } // If we are expiring this and we think there is still interest, alert. - if rr := o.acc.sl.Match(wr.reply); len(rr.psubs)+len(rr.qsubs) > 0 && o.mset != nil && o.sendq != nil { + if rr := o.acc.sl.Match(wr.reply); len(rr.psubs)+len(rr.qsubs) > 0 && o.mset != nil && o.outq != nil { // We still appear to have interest, so send alert as courtesy. hdr := []byte("NATS/1.0 408 Request Timeout\r\n\r\n") - pmsg := &jsPubMsg{wr.reply, wr.reply, _EMPTY_, hdr, nil, nil, 0} - o.sendq <- pmsg // Send message. + o.outq.send(&jsPubMsg{wr.reply, wr.reply, _EMPTY_, hdr, nil, nil, 0, nil}) } return wr } @@ -1903,6 +1833,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { } lseq = o.mset.state().LastSeq } + inch := o.inch o.mu.Unlock() // Deliver all the msgs we have now, once done or on a condition, we wait for new ones. @@ -2000,9 +1931,14 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { o.mu.Unlock() select { + case interest := <-inch: + // inch can be nil on pull-based, but then this will + // just block and not fire. + o.updateDeliveryInterest(interest) case <-qch: return case <-mch: + // Messages are waiting. } } } @@ -2014,7 +1950,7 @@ func (o *consumer) ackReply(sseq, dseq, dc uint64, ts int64, pending uint64) str // Deliver a msg to the consumer. // Lock should be held and o.mset validated to be non-nil. func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint64, ts int64) { - if o.mset == nil || o.sendq == nil { + if o.mset == nil || o.outq == nil { return } // Update pending on first attempt @@ -2032,15 +1968,16 @@ func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint6 dseq := o.dseq o.dseq++ - pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, dseq, dc, ts, o.sgap), hdr, msg, o, seq} + pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, dseq, dc, ts, o.sgap), hdr, msg, o, seq, nil} mset := o.mset ap := o.cfg.AckPolicy - sendq := o.sendq + outq := o.outq // This needs to be unlocked since the other side may need this lock on a failed delivery. o.mu.Unlock() // Send message. - sendq <- pmsg + outq.send(pmsg) + // If we are ack none and mset is interest only we should make sure stream removes interest. if ap == AckNone && mset.cfg.Retention == InterestPolicy && !mset.checkInterest(seq, o) { mset.store.RemoveMsg(seq) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index f8c11bcb..c66357e0 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -2789,7 +2789,7 @@ func (s *Server) streamSnapshot(ci *ClientInfo, acc *Account, mset *stream, sr * chunk = chunk[:n] if err != nil { if n > 0 { - mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, chunk, nil, 0} + mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, chunk, nil, 0, nil}) } break } @@ -2804,15 +2804,14 @@ func (s *Server) streamSnapshot(ci *ClientInfo, acc *Account, mset *stream, sr * case <-time.After(10 * time.Millisecond): } } - // TODO(dlc) - Might want these moved off sendq if we have contention. ackReply := fmt.Sprintf("%s.%d.%d", ackSubj, len(chunk), index) - mset.sendq <- &jsPubMsg{reply, _EMPTY_, ackReply, nil, chunk, nil, 0} + mset.outq.send(&jsPubMsg{reply, _EMPTY_, ackReply, nil, chunk, nil, 0, nil}) atomic.AddInt32(&out, int32(len(chunk))) } done: // Send last EOF // TODO(dlc) - place hash in header - mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, nil, nil, 0} + mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil}) } // Request to create a durable consumer. diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 16e7b7ea..43a951d8 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3891,7 +3891,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ mset.mu.RLock() canRespond := !mset.cfg.NoAck && len(reply) > 0 - s, jsa, st, rf, sendq := mset.srv, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.sendq + s, jsa, st, rf, outq := mset.srv, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.outq maxMsgSize := int(mset.cfg.MaxMsgSize) msetName := mset.cfg.Name mset.mu.RUnlock() @@ -3920,7 +3920,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: mset.name()}} resp.Error = &ApiError{Code: 400, Description: "resource limits exceeded for account"} response, _ = json.Marshal(resp) - sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0} + outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil}) } return err } @@ -3933,7 +3933,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: mset.name()}} resp.Error = &ApiError{Code: 400, Description: "message size exceeds maximum allowed"} response, _ = json.Marshal(resp) - sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0} + outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil}) } return err } @@ -3962,7 +3962,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ // If we errored out respond here. if err != nil && canRespond { - sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0} + outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil}) } if err != nil && isOutOfSpaceErr(err) { diff --git a/server/sendq.go b/server/sendq.go index 1e16066b..66109062 100644 --- a/server/sendq.go +++ b/server/sendq.go @@ -108,19 +108,19 @@ func (sq *sendq) send(subj, rply string, hdr, msg []byte) { msg = append(msg[:0:0], msg...) out.msg = msg } - sq.mu.Lock() - var doKick bool + sq.mu.Lock() + var notify bool if sq.head == nil { sq.head = out - doKick = true + notify = true } else { sq.tail.next = out } sq.tail = out sq.mu.Unlock() - if doKick { + if notify { select { case sq.mch <- struct{}{}: default: diff --git a/server/stream.go b/server/stream.go index 838fa593..afe58809 100644 --- a/server/stream.go +++ b/server/stream.go @@ -135,7 +135,7 @@ type stream struct { sysc *client sid int pubAck []byte - sendq chan *jsPubMsg + outq *jsOutQ mch chan struct{} msgs *inbound store StreamStore @@ -576,10 +576,10 @@ func (mset *stream) sendCreateAdvisory() { mset.mu.Lock() name := mset.cfg.Name template := mset.cfg.Template - sendq := mset.sendq + outq := mset.outq mset.mu.Unlock() - if sendq == nil { + if outq == nil { return } @@ -601,11 +601,11 @@ func (mset *stream) sendCreateAdvisory() { } subj := JSAdvisoryStreamCreatedPre + "." + name - sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0} + outq.send(&jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0, nil}) } func (mset *stream) sendDeleteAdvisoryLocked() { - if mset.sendq == nil { + if mset.outq == nil { return } @@ -623,12 +623,12 @@ func (mset *stream) sendDeleteAdvisoryLocked() { j, err := json.Marshal(m) if err == nil { subj := JSAdvisoryStreamDeletedPre + "." + mset.cfg.Name - mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0} + mset.outq.send(&jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0, nil}) } } func (mset *stream) sendUpdateAdvisoryLocked() { - if mset.sendq == nil { + if mset.outq == nil { return } @@ -645,7 +645,7 @@ func (mset *stream) sendUpdateAdvisoryLocked() { j, err := json.Marshal(m) if err == nil { subj := JSAdvisoryStreamUpdatedPre + "." + mset.cfg.Name - mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0} + mset.outq.send(&jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0, nil}) } } @@ -1117,8 +1117,8 @@ func (mset *stream) mirrorDurable() string { // Setup our mirror consumer. // Lock should be held. func (mset *stream) setupMirrorConsumer() error { - if mset.sendq == nil { - return errors.New("sendq required") + if mset.outq == nil { + return errors.New("outq required") } // Reset @@ -1176,7 +1176,7 @@ func (mset *stream) setupMirrorConsumer() error { subject = strings.Replace(subject, JSApiPrefix, ext.ApiPrefix, 1) subject = strings.ReplaceAll(subject, "..", ".") } - mset.sendq <- &jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0} + mset.outq.send(&jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil}) req := &CreateConsumerRequest{ Stream: mset.cfg.Mirror.Name, @@ -1227,7 +1227,7 @@ func (mset *stream) setupMirrorConsumer() error { subject = strings.ReplaceAll(subject, "..", ".") } - mset.sendq <- &jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0} + mset.outq.send(&jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0, nil}) go func() { var shouldRetry bool @@ -1318,7 +1318,7 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) { subject = strings.Replace(subject, JSApiPrefix, ext.ApiPrefix, 1) subject = strings.ReplaceAll(subject, "..", ".") } - mset.sendq <- &jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0} + mset.outq.send(&jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil}) if ext != nil { deliverSubject = strings.ReplaceAll(ext.DeliverPrefix+syncSubject(".S"), "..", ".") @@ -1396,7 +1396,7 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) { subject = strings.ReplaceAll(subject, "..", ".") } - mset.sendq <- &jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0} + mset.outq.send(&jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0, nil}) go func() { var shouldRetry bool @@ -1619,8 +1619,8 @@ func (mset *stream) startingSequenceForSources() { // Setup our source consumers. // Lock should be held. func (mset *stream) setupSourceConsumers() error { - if mset.sendq == nil { - return errors.New("sendq required") + if mset.outq == nil { + return errors.New("outq required") } // Reset if needed. for _, si := range mset.sources { @@ -1676,7 +1676,7 @@ func (mset *stream) stopSourceConsumers() { } // Need to delete the old one. subject := fmt.Sprintf(JSApiConsumerDeleteT, si.name, mset.sourceDurable(si.name)) - mset.sendq <- &jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0} + mset.outq.send(&jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil}) } } @@ -1692,7 +1692,7 @@ func (mset *stream) unsubscribeToStream() error { } durable := mset.mirrorDurable() subject := fmt.Sprintf(JSApiConsumerDeleteT, mset.cfg.Mirror.Name, durable) - mset.sendq <- &jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0} + mset.outq.send(&jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil}) mset.mirror = nil } @@ -1949,17 +1949,17 @@ func (mset *stream) queueInboundMsg(subj, rply string, hdr, msg []byte) { } mset.mu.Lock() - var doKick bool + var notify bool if mset.msgs.head == nil { mset.msgs.head = m - doKick = true + notify = true } else { mset.msgs.tail.next = m } mset.msgs.tail = m mset.mu.Unlock() - if doKick { + if notify { select { case mset.mch <- struct{}{}: default: @@ -2037,13 +2037,13 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } } if isMisMatch { - sendq := mset.sendq + outq := mset.outq mset.mu.Unlock() - if canRespond && sendq != nil { + if canRespond && outq != nil { resp.PubAck = &PubAck{Stream: name} resp.Error = &ApiError{Code: 503, Description: "expected stream sequence does not match"} b, _ := json.Marshal(resp) - sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0} + outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil}) } return errLastSeqMismatch } @@ -2059,14 +2059,14 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, var msgId string if len(hdr) > 0 { msgId = getMsgId(hdr) - sendq := mset.sendq + outq := mset.outq if dde := mset.checkMsgId(msgId); dde != nil { mset.clfs++ mset.mu.Unlock() if canRespond { response := append(pubAck, strconv.FormatUint(dde.seq, 10)...) response = append(response, ",\"duplicate\": true}"...) - sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0} + outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil}) } return errors.New("msgid is duplicate") } @@ -2079,7 +2079,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, resp.PubAck = &PubAck{Stream: name} resp.Error = &ApiError{Code: 400, Description: "expected stream does not match"} b, _ := json.Marshal(resp) - sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0} + outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil}) } return errors.New("expected stream does not match") } @@ -2092,7 +2092,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, resp.PubAck = &PubAck{Stream: name} resp.Error = &ApiError{Code: 400, Description: fmt.Sprintf("wrong last sequence: %d", mlseq)} b, _ := json.Marshal(resp) - sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0} + outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil}) } return fmt.Errorf("last sequence mismatch: %d vs %d", seq, mlseq) } @@ -2105,7 +2105,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, resp.PubAck = &PubAck{Stream: name} resp.Error = &ApiError{Code: 400, Description: fmt.Sprintf("wrong last msg ID: %s", last)} b, _ := json.Marshal(resp) - sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0} + outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil}) } return fmt.Errorf("last msgid mismatch: %q vs %q", lmsgId, last) } @@ -2126,7 +2126,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, resp.PubAck = &PubAck{Stream: name} resp.Error = &ApiError{Code: 400, Description: "message size exceeds maximum allowed"} b, _ := json.Marshal(resp) - mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0} + mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil}) } return ErrMaxPayload } @@ -2163,7 +2163,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, if canRespond { response = append(pubAck, strconv.FormatUint(mset.lseq, 10)...) response = append(response, '}') - mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0} + mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil}) } // If we have a msgId make sure to save. if msgId != _EMPTY_ { @@ -2233,7 +2233,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, // Send response here. if canRespond { - mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0} + mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil}) } if err == nil && seq > 0 && numConsumers > 0 { @@ -2263,6 +2263,43 @@ type jsPubMsg struct { msg []byte o *consumer seq uint64 + next *jsPubMsg +} + +// Forms a linked list for sending internal system messages. +type jsOutQ struct { + mu sync.Mutex + mch chan struct{} + head *jsPubMsg + tail *jsPubMsg +} + +func (q *jsOutQ) pending() *jsPubMsg { + q.mu.Lock() + head := q.head + q.head, q.tail = nil, nil + q.mu.Unlock() + return head +} + +func (q *jsOutQ) send(msg *jsPubMsg) { + q.mu.Lock() + var notify bool + if q.head == nil { + q.head = msg + notify = true + } else { + q.tail.next = msg + } + q.tail = msg + q.mu.Unlock() + + if notify { + select { + case q.mch <- struct{}{}: + default: + } + } } // StoredMsg is for raw access to messages in a stream. @@ -2274,18 +2311,15 @@ type StoredMsg struct { Time time.Time `json:"time"` } -// TODO(dlc) - Maybe look at onering instead of chan - https://github.com/pltr/onering -const msetSendQSize = 65536 - // This is similar to system semantics but did not want to overload the single system sendq, // or require system account when doing simple setup with jetstream. func (mset *stream) setupSendCapabilities() { mset.mu.Lock() defer mset.mu.Unlock() - if mset.sendq != nil { + if mset.outq != nil { return } - mset.sendq = make(chan *jsPubMsg, msetSendQSize) + mset.outq = &jsOutQ{mch: make(chan struct{}, 1)} go mset.internalLoop() } @@ -2305,53 +2339,48 @@ func (mset *stream) internalLoop() { c := s.createInternalJetStreamClient() c.registerWithAccount(mset.acc) defer c.closeConnection(ClientClosed) - sendq, mch := mset.sendq, mset.mch - name := mset.cfg.Name + outq, qch, mch := mset.outq, mset.qch, mset.mch isClustered := mset.node != nil mset.mu.RUnlock() - // Warn when internal send queue is backed up past 75% - warnThresh := 3 * msetSendQSize / 4 - warnFreq := time.Second - last := time.Now().Add(-warnFreq) - for { - if len(sendq) > warnThresh && time.Since(last) >= warnFreq { - s.Warnf("Jetstream internal send queue > 75%% for account: %q stream: %q", c.acc.Name, name) - last = time.Now() - } select { - case pm := <-sendq: - if pm == nil { - return - } - c.pa.subject = []byte(pm.subj) - c.pa.deliver = []byte(pm.dsubj) - c.pa.size = len(pm.msg) + len(pm.hdr) - c.pa.szb = []byte(strconv.Itoa(c.pa.size)) - c.pa.reply = []byte(pm.reply) + case <-outq.mch: + for pm := outq.pending(); pm != nil; { + c.pa.subject = []byte(pm.subj) + c.pa.deliver = []byte(pm.dsubj) + c.pa.size = len(pm.msg) + len(pm.hdr) + c.pa.szb = []byte(strconv.Itoa(c.pa.size)) + c.pa.reply = []byte(pm.reply) - var msg []byte - if len(pm.hdr) > 0 { - c.pa.hdr = len(pm.hdr) - c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr)) - msg = append(pm.hdr, pm.msg...) - msg = append(msg, _CRLF_...) - } else { - c.pa.hdr = -1 - c.pa.hdb = nil - msg = append(pm.msg, _CRLF_...) - } + var msg []byte + if len(pm.hdr) > 0 { + c.pa.hdr = len(pm.hdr) + c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr)) + msg = append(pm.hdr, pm.msg...) + msg = append(msg, _CRLF_...) + } else { + c.pa.hdr = -1 + c.pa.hdb = nil + msg = append(pm.msg, _CRLF_...) + } - didDeliver, _ := c.processInboundClientMsg(msg) - c.pa.szb = nil - c.flushClients(0) + didDeliver, _ := c.processInboundClientMsg(msg) + c.pa.szb = nil - // Check to see if this is a delivery for an observable and - // we failed to deliver the message. If so alert the observable. - if pm.o != nil && pm.seq > 0 && !didDeliver { - pm.o.didNotDeliver(pm.seq) + // Check to see if this is a delivery for an observable and + // we failed to deliver the message. If so alert the observable. + if pm.o != nil && pm.seq > 0 && !didDeliver { + pm.o.didNotDeliver(pm.seq) + } + + // Do this here to nil out below vs up in for loop. + next := pm.next + pm.next, pm.hdr, pm.msg = nil, nil, nil + pm = next } + c.flushClients(10 * time.Millisecond) + case <-mch: for im := mset.pending(); im != nil; { // If we are clustered we need to propose this message to the underlying raft group. @@ -2365,6 +2394,8 @@ func (mset *stream) internalLoop() { im.next, im.hdr, im.msg = nil, nil, nil im = next } + case <-qch: + return case <-s.quitCh: return } @@ -2432,7 +2463,7 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { if deleteFlag { for _, ssi := range mset.cfg.Sources { subject := fmt.Sprintf(JSApiConsumerDeleteT, ssi.Name, mset.sourceDurable(ssi.Name)) - mset.sendq <- &jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0} + mset.outq.send(&jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil}) } } @@ -2441,10 +2472,6 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { mset.sendDeleteAdvisoryLocked() } - if mset.sendq != nil { - mset.sendq <- nil - } - c := mset.client mset.client = nil if c == nil { From 4cdfb0ab6ee7773c9f28fcd13d7a4b9eb94dbd7c Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 4 Mar 2021 17:58:01 -0800 Subject: [PATCH 5/8] Don't need to release locks now with outq. Also borrow once Signed-off-by: Derek Collison --- server/consumer.go | 32 +++++++------------------------- 1 file changed, 7 insertions(+), 25 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 1baca1ac..fa7c3528 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -428,6 +428,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri sysc: s.createInternalJetStreamClient(), cfg: *config, dsubj: config.DeliverSubject, + outq: mset.outq, active: true, qch: make(chan struct{}), mch: make(chan struct{}, 1), @@ -623,7 +624,7 @@ func (o *consumer) setLeader(isLeader bool) { } mset.mu.RLock() - s, jsa, stream, outq := mset.srv, mset.jsa, mset.cfg.Name, mset.outq + s, jsa, stream := mset.srv, mset.jsa, mset.cfg.Name mset.mu.RUnlock() o.mu.Lock() @@ -670,9 +671,6 @@ func (o *consumer) setLeader(isLeader bool) { o.replay = true } - // We borrow this from the mset. - o.outq = outq - // Recreate quit channel. o.qch = make(chan struct{}) qch := o.qch @@ -692,7 +690,6 @@ func (o *consumer) setLeader(isLeader bool) { o.srv.sysUnsubscribe(o.infoSub) o.infoSub = nil } - o.outq = nil if o.qch != nil { close(o.qch) o.qch = nil @@ -738,14 +735,8 @@ func (o *consumer) unsubscribe(sub *subscription) { // We need to make sure we protect access to the outq. // Do all advisory sends here. -// Lock should be held on entry but will be released. func (o *consumer) sendAdvisory(subj string, msg []byte) { - outq := o.outq - o.mu.Unlock() - if outq != nil { - outq.send(&jsPubMsg{subj, subj, _EMPTY_, nil, msg, nil, 0, nil}) - } - o.mu.Lock() + o.outq.send(&jsPubMsg{subj, subj, _EMPTY_, nil, msg, nil, 0, nil}) } func (o *consumer) sendDeleteAdvisoryLocked() { @@ -1580,14 +1571,10 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _, reply string if mset == nil { return } - outq := o.outq sendErr := func(status int, description string) { - // Needs to be unlocked to send err. - o.mu.Unlock() - defer o.mu.Lock() hdr := []byte(fmt.Sprintf("NATS/1.0 %d %s\r\n\r\n", status, description)) - outq.send(&jsPubMsg{reply, reply, _EMPTY_, hdr, nil, nil, 0, nil}) + o.outq.send(&jsPubMsg{reply, reply, _EMPTY_, hdr, nil, nil, 0, nil}) } if o.isPushMode() { @@ -1777,7 +1764,7 @@ func (o *consumer) forceExpireFirstWaiting() *waitingRequest { return wr } // If we are expiring this and we think there is still interest, alert. - if rr := o.acc.sl.Match(wr.reply); len(rr.psubs)+len(rr.qsubs) > 0 && o.mset != nil && o.outq != nil { + if rr := o.acc.sl.Match(wr.reply); len(rr.psubs)+len(rr.qsubs) > 0 && o.mset != nil { // We still appear to have interest, so send alert as courtesy. hdr := []byte("NATS/1.0 408 Request Timeout\r\n\r\n") o.outq.send(&jsPubMsg{wr.reply, wr.reply, _EMPTY_, hdr, nil, nil, 0, nil}) @@ -1950,7 +1937,7 @@ func (o *consumer) ackReply(sseq, dseq, dc uint64, ts int64, pending uint64) str // Deliver a msg to the consumer. // Lock should be held and o.mset validated to be non-nil. func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint64, ts int64) { - if o.mset == nil || o.outq == nil { + if o.mset == nil { return } // Update pending on first attempt @@ -1971,19 +1958,14 @@ func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint6 pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, dseq, dc, ts, o.sgap), hdr, msg, o, seq, nil} mset := o.mset ap := o.cfg.AckPolicy - outq := o.outq - // This needs to be unlocked since the other side may need this lock on a failed delivery. - o.mu.Unlock() // Send message. - outq.send(pmsg) + o.outq.send(pmsg) // If we are ack none and mset is interest only we should make sure stream removes interest. if ap == AckNone && mset.cfg.Retention == InterestPolicy && !mset.checkInterest(seq, o) { mset.store.RemoveMsg(seq) } - // Re-acquire lock. - o.mu.Lock() if ap == AckExplicit || ap == AckAll { o.trackPending(seq, dseq) From 038a5cadc47503127abdc01331bb53834705c16b Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 4 Mar 2021 18:39:32 -0800 Subject: [PATCH 6/8] LQ is longer now Signed-off-by: Derek Collison --- server/jetstream_cluster_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 11229ae5..0940c57e 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -3118,7 +3118,7 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) { c.randomNonStreamLeader("$G", "NO-Q").Shutdown() // This should eventually have us stepdown as leader since we would have lost quorum with R=2. - checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + checkFor(t, 5*time.Second, 500*time.Millisecond, func() error { if sl := c.streamLeader("$G", "NO-Q"); sl == nil { return nil } From 7b1b9a7946f5a22d86f6752690b5d230d501dec9 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 4 Mar 2021 18:52:57 -0800 Subject: [PATCH 7/8] Snapshot on peer state change, e.g. removal Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 43a951d8..73652b81 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -756,7 +756,7 @@ func (js *jetStream) monitorCluster() { // FIXME(dlc) - Deal with errors. if _, didRemoval, err := js.applyMetaEntries(ce.Entries, isRecovering); err == nil { n.Applied(ce.Index) - if didRemoval && time.Since(lastSnapTime) > 2*time.Second { + if js.hasPeerEntries(ce.Entries) || (didRemoval && time.Since(lastSnapTime) > 2*time.Second) { // Since we received one make sure we have our own since we do not store // our meta state outside of raft. doSnapshot() @@ -1029,6 +1029,16 @@ func (js *jetStream) removePeerFromStream(sa *streamAssignment, peer string) { } } +// Check if we have peer related entries. +func (js *jetStream) hasPeerEntries(entries []*Entry) bool { + for _, e := range entries { + if e.Type == EntryRemovePeer || e.Type == EntryAddPeer { + return true + } + } + return false +} + func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool, bool, error) { var didSnap, didRemove bool for _, e := range entries { @@ -1038,7 +1048,6 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool } else if e.Type == EntryRemovePeer { if !isRecovering { js.processRemovePeer(string(e.Data)) - didRemove = true } } else { buf := e.Data From 4bfd6485f6de7e15b19b02bb89fe27ff8364b6c6 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 4 Mar 2021 19:11:24 -0800 Subject: [PATCH 8/8] Can't remove based on interest directly Signed-off-by: Derek Collison --- server/consumer.go | 3 ++- server/stream.go | 7 +++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index fa7c3528..1cd044a8 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1964,7 +1964,7 @@ func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint6 // If we are ack none and mset is interest only we should make sure stream removes interest. if ap == AckNone && mset.cfg.Retention == InterestPolicy && !mset.checkInterest(seq, o) { - mset.store.RemoveMsg(seq) + mset.rmch <- seq } if ap == AckExplicit || ap == AckAll { @@ -2460,6 +2460,7 @@ func (o *consumer) stopWithFlags(dflag, doSignal, advisory bool) error { seqs = append(seqs, seq) } o.mu.Unlock() + // Sort just to keep pending sparse array state small. sort.Slice(seqs, func(i, j int) bool { return seqs[i] < seqs[j] }) for _, seq := range seqs { diff --git a/server/stream.go b/server/stream.go index afe58809..60f63f01 100644 --- a/server/stream.go +++ b/server/stream.go @@ -139,6 +139,7 @@ type stream struct { mch chan struct{} msgs *inbound store StreamStore + rmch chan uint64 lseq uint64 lmsgId string consumers map[string]*consumer @@ -305,6 +306,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt consumers: make(map[string]*consumer), mch: make(chan struct{}, 1), msgs: &inbound{}, + rmch: make(chan uint64, 8192), qch: make(chan struct{}), } @@ -2339,7 +2341,7 @@ func (mset *stream) internalLoop() { c := s.createInternalJetStreamClient() c.registerWithAccount(mset.acc) defer c.closeConnection(ClientClosed) - outq, qch, mch := mset.outq, mset.qch, mset.mch + outq, qch, mch, rmch := mset.outq, mset.qch, mset.mch, mset.rmch isClustered := mset.node != nil mset.mu.RUnlock() @@ -2380,7 +2382,6 @@ func (mset *stream) internalLoop() { pm = next } c.flushClients(10 * time.Millisecond) - case <-mch: for im := mset.pending(); im != nil; { // If we are clustered we need to propose this message to the underlying raft group. @@ -2394,6 +2395,8 @@ func (mset *stream) internalLoop() { im.next, im.hdr, im.msg = nil, nil, nil im = next } + case seq := <-rmch: + mset.store.RemoveMsg(seq) case <-qch: return case <-s.quitCh: