From c3da39283299c1d6789c6e1c9b7850f029ab2cd3 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 17 Mar 2022 17:53:06 -0600 Subject: [PATCH] Changes to IPQueues Removed the warnings, instead have a sync.Map where they are registered/unregistered and can be inspected with an undocumented monitor page. Added the notion of "in progress" which is the number of messages that have beend pop()'ed. When recycle() is invoked this count goes down. Signed-off-by: Ivan Kozlovic --- server/consumer.go | 9 ++- server/events.go | 1 + server/ipqueue.go | 86 +++++++++++---------- server/ipqueue_test.go | 144 ++++++++++++++++++++---------------- server/jetstream_api.go | 3 +- server/jetstream_cluster.go | 21 ++++-- server/monitor.go | 32 ++++++++ server/mqtt.go | 6 +- server/raft.go | 32 +++++--- server/sendq.go | 2 +- server/server.go | 55 ++------------ server/server_test.go | 36 --------- server/stream.go | 29 ++++++-- 13 files changed, 235 insertions(+), 221 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 84474370..ba1a0456 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -492,8 +492,12 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri if c == nil { return nil, NewJSStreamInvalidError() } + var accName string c.mu.Lock() s, a := c.srv, c.acc + if a != nil { + accName = a.Name + } c.mu.Unlock() // Hold mset lock here. @@ -570,7 +574,6 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri maxdc: uint64(config.MaxDeliver), maxp: config.MaxAckPending, created: time.Now().UTC(), - ackMsgs: newIPQueue(), } // Bind internal client to the user account. @@ -595,6 +598,9 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri } } } + // Create ackMsgs queue now that we have a consumer name + o.ackMsgs = s.newIPQueue(fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' ackMsgs", accName, o.name, mset.cfg.Name)) + // Create our request waiting queue. if o.isPullMode() { o.waiting = newWaitQueue(config.MaxWaiting) @@ -3645,6 +3651,7 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error { } n := o.node qgroup := o.cfg.DeliverGroup + o.ackMsgs.unregister() o.mu.Unlock() if c != nil { diff --git a/server/events.go b/server/events.go index e519b1e7..b490a2cb 100644 --- a/server/events.go +++ b/server/events.go @@ -433,6 +433,7 @@ RESET: // there is a chance that the process will exit before the // writeLoop has a chance to send it. c.flushClients(time.Second) + sendq.recycle(&msgs) return } pm.returnToPool() diff --git a/server/ipqueue.go b/server/ipqueue.go index 2e514513..c002bd60 100644 --- a/server/ipqueue.go +++ b/server/ipqueue.go @@ -15,35 +15,26 @@ package server import ( "sync" + "sync/atomic" ) const ipQueueDefaultMaxRecycleSize = 4 * 1024 -const ipQueueDefaultWarnThreshold = 32 * 1024 - -type ipQueueLogger interface { - // The ipQueue will invoke this function with the queue's name and the number - // of pending elements. This call CANNOT block. It is ok to drop the logging - // if desired, but not block. - log(name string, pending int) -} // This is a generic intra-process queue. type ipQueue struct { + inprogress int64 sync.RWMutex - ch chan struct{} - elts []interface{} - pos int - pool *sync.Pool - mrs int - name string - logger ipQueueLogger - lt int + ch chan struct{} + elts []interface{} + pos int + pool *sync.Pool + mrs int + name string + m *sync.Map } type ipQueueOpts struct { maxRecycleSize int - name string - logger ipQueueLogger } type ipQueueOpt func(*ipQueueOpts) @@ -56,27 +47,19 @@ func ipQueue_MaxRecycleSize(max int) ipQueueOpt { } } -// This option provides the logger to be used by this queue to log -// when the number of pending elements reaches a certain threshold. -func ipQueue_Logger(name string, l ipQueueLogger) ipQueueOpt { - return func(o *ipQueueOpts) { - o.name, o.logger = name, l - } -} - -func newIPQueue(opts ...ipQueueOpt) *ipQueue { +func (s *Server) newIPQueue(name string, opts ...ipQueueOpt) *ipQueue { qo := ipQueueOpts{maxRecycleSize: ipQueueDefaultMaxRecycleSize} for _, o := range opts { o(&qo) } q := &ipQueue{ - ch: make(chan struct{}, 1), - mrs: qo.maxRecycleSize, - pool: &sync.Pool{}, - name: qo.name, - logger: qo.logger, - lt: ipQueueDefaultWarnThreshold, + ch: make(chan struct{}, 1), + mrs: qo.maxRecycleSize, + pool: &sync.Pool{}, + name: name, + m: &s.ipQueues, } + s.ipQueues.Store(name, q) return q } @@ -101,9 +84,6 @@ func (q *ipQueue) push(e interface{}) int { } q.elts = append(q.elts, e) l++ - if l >= q.lt && q.logger != nil && (l <= q.lt+10 || q.lt%10000 == 0) { - q.logger.log(q.name, l) - } q.Unlock() if signal { select { @@ -132,6 +112,7 @@ func (q *ipQueue) pop() []interface{} { elts = q.elts[q.pos:] } q.elts, q.pos = nil, 0 + atomic.AddInt64(&q.inprogress, int64(len(elts))) q.Unlock() return elts } @@ -174,13 +155,24 @@ func (q *ipQueue) popOne() interface{} { // After a pop(), the slice can be recycled for the next push() when // a first element is added to the queue. +// This will also decrement the "in progress" count with the length +// of the slice. // Reason we use pointer to slice instead of slice is explained // here: https://staticcheck.io/docs/checks#SA6002 func (q *ipQueue) recycle(elts *[]interface{}) { - // If invoked with an nil list, don't recyle. + // If invoked with a nil list, nothing to do. + if elts == nil || *elts == nil { + return + } + // Update the in progress count. + if len(*elts) > 0 { + if atomic.AddInt64(&q.inprogress, int64(-(len(*elts)))) < 0 { + atomic.StoreInt64(&q.inprogress, 0) + } + } // We also don't want to recycle huge slices, so check against the max. // q.mrs is normally immutable but can be changed, in a safe way, in some tests. - if elts == nil || *elts == nil || cap(*elts) > q.mrs { + if cap(*elts) > q.mrs { return } q.resetAndReturnToPool(elts) @@ -212,3 +204,21 @@ func (q *ipQueue) drain() { } q.Unlock() } + +// Since the length of the queue goes to 0 after a pop(), it is good to +// have an insight on how many elements are yet to be processed after a pop(). +// For that reason, the queue maintains a count of elements returned through +// the pop() API. When the caller will call q.recycle(), this count will +// be reduced by the size of the slice returned by pop(). +func (q *ipQueue) inProgress() int64 { + return atomic.LoadInt64(&q.inprogress) +} + +// Remove this queue from the server's map of ipQueues. +// All ipQueue operations (such as push/pop/etc..) are still possible. +func (q *ipQueue) unregister() { + if q == nil { + return + } + q.m.Delete(q.name) +} diff --git a/server/ipqueue_test.go b/server/ipqueue_test.go index d339536a..acf6f088 100644 --- a/server/ipqueue_test.go +++ b/server/ipqueue_test.go @@ -14,14 +14,14 @@ package server import ( - "fmt" "sync" "testing" "time" ) func TestIPQueueBasic(t *testing.T) { - q := newIPQueue() + s := &Server{} + q := s.newIPQueue("test") // Check that pool has been created if q.pool == nil { t.Fatal("Expected pool to have been created") @@ -42,14 +42,54 @@ func TestIPQueueBasic(t *testing.T) { } // Try to change the max recycle size - q = newIPQueue(ipQueue_MaxRecycleSize(10)) - if q.mrs != 10 { - t.Fatalf("Expected max recycle size to be 10, got %v", q.mrs) + q2 := s.newIPQueue("test2", ipQueue_MaxRecycleSize(10)) + if q2.mrs != 10 { + t.Fatalf("Expected max recycle size to be 10, got %v", q2.mrs) + } + + // Check that those 2 queues are registered + var gotFirst bool + var gotSecond bool + s.ipQueues.Range(func(k, v interface{}) bool { + switch k.(string) { + case "test": + gotFirst = true + case "test2": + gotSecond = true + default: + t.Fatalf("Unknown queue: %q", k.(string)) + } + return true + }) + if !gotFirst { + t.Fatalf("Did not find queue %q", "test") + } + if !gotSecond { + t.Fatalf("Did not find queue %q", "test2") + } + // Unregister them + q.unregister() + q2.unregister() + // They should have been removed from the map + s.ipQueues.Range(func(k, v interface{}) bool { + t.Fatalf("Got queue %q", k.(string)) + return false + }) + // But verify that we can still push/pop + q.push(1) + elts := q.pop() + if len(elts) != 1 { + t.Fatalf("Should have gotten 1 element, got %v", len(elts)) + } + q2.push(2) + if e := q2.popOne(); e.(int) != 2 { + t.Fatalf("popOne failed: %+v", e) } } func TestIPQueuePush(t *testing.T) { - q := newIPQueue() + s := &Server{} + q := s.newIPQueue("test") q.push(1) if l := q.len(); l != 1 { t.Fatalf("Expected len to be 1, got %v", l) @@ -74,7 +114,8 @@ func TestIPQueuePush(t *testing.T) { } func TestIPQueuePop(t *testing.T) { - q := newIPQueue() + s := &Server{} + q := s.newIPQueue("test") q.push(1) <-q.ch elts := q.pop() @@ -91,14 +132,29 @@ func TestIPQueuePop(t *testing.T) { default: // OK } + // Since pop() brings the number of pending to 0, we keep track of the + // number of "in progress" elements. Check that the value is 1 here. + if n := q.inProgress(); n != 1 { + t.Fatalf("Expected count to be 1, got %v", n) + } + // Recycling will bring it down to 0. + q.recycle(&elts) + if n := q.inProgress(); n != 0 { + t.Fatalf("Expected count to be 0, got %v", n) + } // If we call pop() now, we should get an empty list. if elts = q.pop(); elts != nil { t.Fatalf("Expected nil, got %v", elts) } + // The in progress count should still be 0 + if n := q.inProgress(); n != 0 { + t.Fatalf("Expected count to be 0, got %v", n) + } } func TestIPQueuePopOne(t *testing.T) { - q := newIPQueue() + s := &Server{} + q := s.newIPQueue("test") q.push(1) <-q.ch e := q.popOne() @@ -111,6 +167,10 @@ func TestIPQueuePopOne(t *testing.T) { if l := q.len(); l != 0 { t.Fatalf("Expected len to be 0, got %v", l) } + // That does not affect the number of notProcessed + if n := q.inProgress(); n != 0 { + t.Fatalf("Expected count to be 0, got %v", n) + } select { case <-q.ch: t.Fatalf("Should not have been notified of addition") @@ -157,7 +217,7 @@ func TestIPQueuePopOne(t *testing.T) { t.Fatalf("Expected nil, got %v", e) } - q = newIPQueue() + q = s.newIPQueue("test2") q.push(1) q.push(2) // Capture current capacity @@ -186,7 +246,8 @@ func TestIPQueuePopOne(t *testing.T) { } func TestIPQueueMultiProducers(t *testing.T) { - q := newIPQueue() + s := &Server{} + q := s.newIPQueue("test") wg := sync.WaitGroup{} wg.Add(3) @@ -211,6 +272,9 @@ func TestIPQueueMultiProducers(t *testing.T) { m[v.(int)] = struct{}{} } q.recycle(&values) + if n := q.inProgress(); n != 0 { + t.Fatalf("Expected count to be 0, got %v", n) + } done = len(m) == 300 case <-tm.C: t.Fatalf("Did not receive all elements: %v", m) @@ -220,7 +284,8 @@ func TestIPQueueMultiProducers(t *testing.T) { } func TestIPQueueRecycle(t *testing.T) { - q := newIPQueue() + s := &Server{} + q := s.newIPQueue("test") total := 1000 for iter := 0; iter < 5; iter++ { var sz int @@ -252,7 +317,7 @@ func TestIPQueueRecycle(t *testing.T) { } } - q = newIPQueue(ipQueue_MaxRecycleSize(10)) + q = s.newIPQueue("test2", ipQueue_MaxRecycleSize(10)) for i := 0; i < 100; i++ { q.push(i) } @@ -291,7 +356,8 @@ func TestIPQueueRecycle(t *testing.T) { } func TestIPQueueDrain(t *testing.T) { - q := newIPQueue() + s := &Server{} + q := s.newIPQueue("test") for iter, recycled := 0, false; iter < 5 && !recycled; iter++ { for i := 0; i < 100; i++ { q.push(i + 1) @@ -323,55 +389,3 @@ func TestIPQueueDrain(t *testing.T) { } } } - -type testIPQLog struct { - msgs []string -} - -func (l *testIPQLog) log(name string, pending int) { - l.msgs = append(l.msgs, fmt.Sprintf("%s: %d pending", name, pending)) -} - -func TestIPQueueLogger(t *testing.T) { - l := &testIPQLog{} - q := newIPQueue(ipQueue_Logger("test_logger", l)) - q.lt = 2 - q.push(1) - q.push(2) - if len(l.msgs) != 1 { - t.Fatalf("Unexpected logging: %v", l.msgs) - } - if l.msgs[0] != "test_logger: 2 pending" { - t.Fatalf("Unexpected content: %v", l.msgs[0]) - } - l.msgs = nil - q.push(3) - if len(l.msgs) != 1 { - t.Fatalf("Unexpected logging: %v", l.msgs) - } - if l.msgs[0] != "test_logger: 3 pending" { - t.Fatalf("Unexpected content: %v", l.msgs[0]) - } - l.msgs = nil - q.popOne() - q.push(4) - if len(l.msgs) != 1 { - t.Fatalf("Unexpected logging: %v", l.msgs) - } - if l.msgs[0] != "test_logger: 3 pending" { - t.Fatalf("Unexpected content: %v", l.msgs[0]) - } - l.msgs = nil - q.pop() - q.push(5) - if len(l.msgs) != 0 { - t.Fatalf("Unexpected logging: %v", l.msgs) - } - q.push(6) - if len(l.msgs) != 1 { - t.Fatalf("Unexpected logging: %v", l.msgs) - } - if l.msgs[0] != "test_logger: 2 pending" { - t.Fatalf("Unexpected content: %v", l.msgs[0]) - } -} diff --git a/server/jetstream_api.go b/server/jetstream_api.go index f83f1053..211cbf74 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -2787,7 +2787,7 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamC // For signaling to upper layers. resultCh := make(chan result, 1) - activeQ := newIPQueue() // of int + activeQ := s.newIPQueue(fmt.Sprintf("[ACC:%s] stream '%s' restore", acc.Name, streamName)) // of int var total int @@ -2866,6 +2866,7 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamC tfile.Close() os.Remove(tfile.Name()) sub.client.processUnsub(sub.sid) + activeQ.unregister() }() const activityInterval = 5 * time.Second diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index eaf46302..b0b74813 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -557,7 +557,7 @@ func (js *jetStream) setupMetaGroup() error { } // Start up our meta node. - n, err := s.startRaftNode(cfg) + n, err := s.startRaftNode(sysAcc.GetName(), cfg) if err != nil { s.Warnf("Could not start metadata controller: %v", err) return err @@ -1378,7 +1378,7 @@ func (rg *raftGroup) setPreferred() { } // createRaftGroup is called to spin up this raft group if needed. -func (js *jetStream) createRaftGroup(rg *raftGroup, storage StorageType) error { +func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage StorageType) error { js.mu.Lock() defer js.mu.Unlock() s, cc := js.srv, js.cluster @@ -1434,7 +1434,7 @@ func (js *jetStream) createRaftGroup(rg *raftGroup, storage StorageType) error { s.bootstrapRaftNode(cfg, rg.Peers, true) } - n, err := s.startRaftNode(cfg) + n, err := s.startRaftNode(accName, cfg) if err != nil || n == nil { s.Debugf("Error creating raft group: %v", err) return err @@ -2316,7 +2316,7 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss var needsSetLeader bool if !alreadyRunning && numReplicas > 1 { if needsNode { - js.createRaftGroup(rg, storage) + js.createRaftGroup(acc.GetName(), rg, storage) } s.startGoRoutine(func() { js.monitorStream(mset, sa) }) } else if numReplicas == 1 && alreadyRunning { @@ -2413,7 +2413,7 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme js.mu.RUnlock() // Process the raft group and make sure it's running if needed. - err := js.createRaftGroup(rg, storage) + err := js.createRaftGroup(acc.GetName(), rg, storage) // If we are restoring, create the stream if we are R>1 and not the preferred who handles the // receipt of the snapshot itself. @@ -2813,7 +2813,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state if !alreadyRunning { // Process the raft group and make sure its running if needed. - js.createRaftGroup(rg, mset.config().Storage) + js.createRaftGroup(acc.GetName(), rg, mset.config().Storage) } // Check if we already have this consumer running. @@ -5215,7 +5215,7 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) error { mset.store.FastState(&state) sreq := mset.calculateSyncRequest(&state, snap) s, js, subject, n := mset.srv, mset.js, mset.sa.Sync, mset.node - qname := fmt.Sprintf("Stream %q snapshot", mset.cfg.Name) + qname := fmt.Sprintf("[ACC:%s] stream '%s' snapshot", mset.acc.Name, mset.cfg.Name) mset.mu.Unlock() // Make sure our state's first sequence is <= the leader's snapshot. @@ -5290,7 +5290,8 @@ RETRY: reply string } - msgsQ := newIPQueue(ipQueue_Logger(qname, s.ipqLog)) // of *im + msgsQ := s.newIPQueue(qname) // of *im + defer msgsQ.unregister() // Send our catchup request here. reply := syncReplySubject() @@ -5324,10 +5325,12 @@ RETRY: // Check for eof signaling. if len(msg) == 0 { + msgsQ.recycle(&mrecs) return nil } if lseq, err := mset.processCatchupMsg(msg); err == nil { if lseq >= last { + msgsQ.recycle(&mrecs) return nil } } else if isOutOfSpaceErr(err) { @@ -5338,9 +5341,11 @@ RETRY: } else { s.Warnf("Catchup for stream '%s > %s' errored, account resources exceeded: %v", mset.account(), mset.name(), err) } + msgsQ.recycle(&mrecs) return err } else { s.Warnf("Catchup for stream '%s > %s' errored, will retry: %v", mset.account(), mset.name(), err) + msgsQ.recycle(&mrecs) goto RETRY } if mrec.reply != _EMPTY_ { diff --git a/server/monitor.go b/server/monitor.go index 85604cf0..35719a89 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1077,6 +1077,38 @@ func (s *Server) HandleStacksz(w http.ResponseWriter, r *http.Request) { ResponseHandler(w, r, buf[:n]) } +type monitorIPQueue struct { + Pending int `json:"pending"` + InProgress int `json:"in_progress,omitempty"` +} + +func (s *Server) HandleIPQueuesz(w http.ResponseWriter, r *http.Request) { + all, err := decodeBool(w, r, "all") + if err != nil { + return + } + qfilter := r.URL.Query().Get("queues") + + queues := map[string]monitorIPQueue{} + + s.ipQueues.Range(func(k, v interface{}) bool { + name := k.(string) + queue := v.(*ipQueue) + pending := queue.len() + inProgress := int(queue.inProgress()) + if !all && (pending == 0 && inProgress == 0) { + return true + } else if qfilter != _EMPTY_ && !strings.Contains(name, qfilter) { + return true + } + queues[name] = monitorIPQueue{Pending: pending, InProgress: inProgress} + return true + }) + + b, _ := json.MarshalIndent(queues, "", " ") + ResponseHandler(w, r, b) +} + // Varz will output server information on the monitoring port at /varz. type Varz struct { ID string `json:"server_id"` diff --git a/server/mqtt.go b/server/mqtt.go index ce1cec9a..25d96cb0 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -921,7 +921,7 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc id := string(getHash(s.Name())) replicas := s.mqttDetermineReplicas() - qname := fmt.Sprintf("MQTT account %q send", accName) + qname := fmt.Sprintf("[ACC:%s] MQTT ", accName) as := &mqttAccountSessionManager{ sessions: make(map[string]*mqttSession), sessByHash: make(map[string]*mqttSession), @@ -932,11 +932,11 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc id: id, c: c, rplyr: mqttJSARepliesPrefix + id + ".", - sendq: newIPQueue(ipQueue_Logger(qname, s.ipqLog)), // of *mqttJSPubMsg + sendq: s.newIPQueue(qname + "send"), // of *mqttJSPubMsg nuid: nuid.New(), quitCh: quitCh, }, - sp: newIPQueue(), // of uint64 + sp: s.newIPQueue(qname + "sp"), // of uint64 } // TODO record domain name in as here diff --git a/server/raft.go b/server/raft.go index 19bf6533..104d21e3 100644 --- a/server/raft.go +++ b/server/raft.go @@ -197,6 +197,9 @@ type raft struct { leadc chan bool quit chan struct{} + // Account name of the asset this raft group is for + accName string + // Random generator, used to generate inboxes for instance prand *rand.Rand } @@ -330,7 +333,7 @@ func (s *Server) bootstrapRaftNode(cfg *RaftConfig, knownPeers []string, allPeer } // startRaftNode will start the raft node. -func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { +func (s *Server) startRaftNode(accName string, cfg *RaftConfig) (RaftNode, error) { if cfg == nil { return nil, errNilCfg } @@ -353,7 +356,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { return nil, errNoPeerState } - qpfx := fmt.Sprintf("RAFT [%s - %s] ", hash[:idLen], cfg.Name) + qpfx := fmt.Sprintf("[ACC:%s] RAFT '%s' ", accName, cfg.Name) rsrc := time.Now().UnixNano() if len(pub) >= 32 { if h, _ := highwayhash.New64([]byte(pub[:32])); h != nil { @@ -382,13 +385,14 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { quit: make(chan struct{}), wtvch: make(chan struct{}, 1), wpsch: make(chan struct{}, 1), - reqs: newIPQueue(), // of *voteRequest - votes: newIPQueue(), // of *voteResponse - prop: newIPQueue(ipQueue_Logger(qpfx+"Entry", s.ipqLog)), // of *Entry - entry: newIPQueue(ipQueue_Logger(qpfx+"AppendEntry", s.ipqLog)), // of *appendEntry - resp: newIPQueue(ipQueue_Logger(qpfx+"AppendEntryResponse", s.ipqLog)), // of *appendEntryResponse - apply: newIPQueue(ipQueue_Logger(qpfx+"CommittedEntry", s.ipqLog)), // of *CommittedEntry - stepdown: newIPQueue(), // of string + reqs: s.newIPQueue(qpfx + "vreq"), // of *voteRequest + votes: s.newIPQueue(qpfx + "vresp"), // of *voteResponse + prop: s.newIPQueue(qpfx + "entry"), // of *Entry + entry: s.newIPQueue(qpfx + "appendEntry"), // of *appendEntry + resp: s.newIPQueue(qpfx + "appendEntryResponse"), // of *appendEntryResponse + apply: s.newIPQueue(qpfx + "committedEntry"), // of *CommittedEntry + stepdown: s.newIPQueue(qpfx + "stepdown"), // of string + accName: accName, leadc: make(chan bool, 1), observer: cfg.Observer, extSt: ps.domainExt, @@ -1337,6 +1341,12 @@ func (n *raft) shutdown(shouldDelete bool) { os.Remove(filepath.Join(n.sd, termVoteFile)) os.RemoveAll(filepath.Join(n.sd, snapshotsDir)) } + // Unregistering ipQueues do not prevent them from push/pop + // just will remove them from the central monitoring map + queues := []*ipQueue{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply, n.stepdown} + for _, q := range queues { + q.unregister() + } n.Unlock() s.unregisterRaftNode(g) @@ -1973,6 +1983,7 @@ func (n *raft) runCatchup(ar *appendEntryResponse, indexUpdatesQ *ipQueue /* of n.debug("Catchup done for %q, will add into peers", peer) n.ProposeAddPeer(peer) } + indexUpdatesQ.unregister() }() n.debug("Running catchup for %q", peer) @@ -2114,8 +2125,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) { n.debug("Our first entry does not match request from follower") } // Create a queue for delivering updates from responses. - qname := fmt.Sprintf("RAFT [%s - %s] Index updates", n.id, n.group) - indexUpdates := newIPQueue(ipQueue_Logger(qname, n.s.ipqLog)) // of uint64 + indexUpdates := n.s.newIPQueue(fmt.Sprintf("[ACC:%s] RAFT '%s' indexUpdates", n.accName, n.group)) // of uint64 indexUpdates.push(ae.pindex) n.progress[ar.peer] = indexUpdates n.Unlock() diff --git a/server/sendq.go b/server/sendq.go index 35ff9456..49fcfb19 100644 --- a/server/sendq.go +++ b/server/sendq.go @@ -32,7 +32,7 @@ type sendq struct { } func (s *Server) newSendQ() *sendq { - sq := &sendq{s: s, q: newIPQueue(ipQueue_Logger("Send", s.ipqLog))} + sq := &sendq{s: s, q: s.newIPQueue("SendQ")} s.startGoRoutine(sq.internalLoop) return sq } diff --git a/server/server.go b/server/server.go index 96474680..b54a929a 100644 --- a/server/server.go +++ b/server/server.go @@ -269,15 +269,8 @@ type Server struct { // Keep track of what that user name is for config reload purposes. sysAccOnlyNoAuthUser string - // This is a central logger for IPQueues when the number of pending - // messages reaches a certain thresold (per queue) - ipqLog *srvIPQueueLogger -} - -type srvIPQueueLogger struct { - ch chan string - done chan struct{} - s *Server + // IPQueues map + ipQueues sync.Map } // For tracking JS nodes. @@ -1265,7 +1258,7 @@ func (s *Server) setSystemAccount(acc *Account) error { sid: 1, servers: make(map[string]*serverUpdate), replies: make(map[string]msgHandler), - sendq: newIPQueue(ipQueue_Logger("System send", s.ipqLog)), // of *pubMsg + sendq: s.newIPQueue("System sendQ"), // of *pubMsg resetCh: make(chan struct{}), sq: s.newSendQ(), statsz: eventsHBInterval, @@ -1641,8 +1634,6 @@ func (s *Server) Start() { s.grRunning = true s.grMu.Unlock() - s.startIPQLogger() - // Pprof http endpoint for the profiler. if opts.ProfPort != 0 { s.StartProfiler() @@ -2011,11 +2002,6 @@ func (s *Server) Shutdown() { doneExpected-- } - // Stop the IPQueue logger (before the grWG.Wait() call) - if s.ipqLog != nil { - s.ipqLog.stop() - } - // Wait for go routines to be done. s.grWG.Wait() @@ -2267,6 +2253,7 @@ const ( AccountzPath = "/accountz" JszPath = "/jsz" HealthzPath = "/healthz" + IPQueuesPath = "/ipqueuesz" ) func (s *Server) basePath(p string) string { @@ -2371,6 +2358,8 @@ func (s *Server) startMonitoring(secure bool) error { mux.HandleFunc(s.basePath(JszPath), s.HandleJsz) // Healthz mux.HandleFunc(s.basePath(HealthzPath), s.HandleHealthz) + // IPQueuesz + mux.HandleFunc(s.basePath(IPQueuesPath), s.HandleIPQueuesz) // Do not set a WriteTimeout because it could cause cURL/browser // to return empty response or unable to display page if the @@ -3633,35 +3622,3 @@ func (s *Server) updateRemoteSubscription(acc *Account, sub *subscription, delta s.updateLeafNodes(acc, sub, delta) } - -func (s *Server) startIPQLogger() { - s.ipqLog = &srvIPQueueLogger{ - ch: make(chan string, 128), - done: make(chan struct{}), - s: s, - } - s.startGoRoutine(s.ipqLog.run) -} - -func (l *srvIPQueueLogger) stop() { - close(l.done) -} - -func (l *srvIPQueueLogger) log(name string, pending int) { - select { - case l.ch <- fmt.Sprintf("%s queue pending size: %v", name, pending): - default: - } -} - -func (l *srvIPQueueLogger) run() { - defer l.s.grWG.Done() - for { - select { - case w := <-l.ch: - l.s.Warnf("%s", w) - case <-l.done: - return - } - } -} diff --git a/server/server_test.go b/server/server_test.go index 9d6a19f0..501746bd 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -1279,9 +1279,6 @@ func TestServerShutdownDuringStart(t *testing.T) { ch := make(chan struct{}, 1) go func() { s.Start() - // Since the server has already been shutdown and we don't want to leave - // the ipqLog run() routine running, stop it now. - s.ipqLog.stop() close(ch) }() select { @@ -1964,36 +1961,3 @@ func TestServerLogsConfigurationFile(t *testing.T) { t.Fatalf("Config file location was not reported in log: %s", log) } } - -func TestServerIPQueueLogger(t *testing.T) { - o := DefaultOptions() - s := RunServer(o) - defer s.Shutdown() - - l := &captureWarnLogger{warn: make(chan string, 100)} - s.SetLogger(l, false, false) - - q := newIPQueue(ipQueue_Logger("test", s.ipqLog)) - // Normally, lt is immutable and set to ipQueueDefaultWarnThreshold, but - // for test, we set it to a low value. - q.lt = 2 - q.push(1) - // This one should case a warning - q.push(2) - - for { - select { - case w := <-l.warn: - // In case we get other warnings a runtime, just check that we - // get the one we expect and be done. - if strings.Contains(w, "test queue") { - if strings.Contains(w, "test queue pending size: 2") { - return - } - t.Fatalf("Invalid warning: %v", w) - } - case <-time.After(time.Second): - t.Fatal("Did not get warning") - } - } -} diff --git a/server/stream.go b/server/stream.go index 2f2164e8..697e0d5c 100644 --- a/server/stream.go +++ b/server/stream.go @@ -367,7 +367,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt c := s.createInternalJetStreamClient() ic := s.createInternalJetStreamClient() - qname := fmt.Sprintf("Stream %s > %s messages", a.Name, config.Name) + qpfx := fmt.Sprintf("[ACC:%s] stream '%s' ", a.Name, config.Name) mset := &stream{ acc: a, jsa: jsa, @@ -378,13 +378,13 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt sysc: ic, stype: cfg.Storage, consumers: make(map[string]*consumer), - msgs: newIPQueue(ipQueue_Logger(qname, s.ipqLog)), // of *inMsg + msgs: s.newIPQueue(qpfx + "messages"), // of *inMsg qch: make(chan struct{}), } // For no-ack consumers when we are interest retention. if cfg.Retention != LimitsPolicy { - mset.ackq = newIPQueue() // of uint64 + mset.ackq = s.newIPQueue(qpfx + "acks") // of uint64 } jsa.streams[cfg.Name] = mset @@ -1057,7 +1057,8 @@ func (mset *stream) update(config *StreamConfig) error { mset.sources = make(map[string]*sourceInfo) } mset.cfg.Sources = append(mset.cfg.Sources, s) - si := &sourceInfo{name: s.Name, iname: s.iname, msgs: newIPQueue() /* of *inMsg */} + qname := fmt.Sprintf("[ACC:%s] stream source '%s' from '%s' msgs", mset.acc.Name, mset.cfg.Name, s.Name) + si := &sourceInfo{name: s.Name, iname: s.iname, msgs: mset.srv.newIPQueue(qname) /* of *inMsg */} mset.sources[s.iname] = si mset.setStartingSequenceForSource(s.iname) mset.setSourceConsumer(s.iname, si.sseq+1) @@ -1655,7 +1656,8 @@ func (mset *stream) setupMirrorConsumer() error { } if !isReset { - mset.mirror = &sourceInfo{name: mset.cfg.Mirror.Name, msgs: newIPQueue() /* of *inMsg */} + qname := fmt.Sprintf("[ACC:%s] stream mirror '%s' of '%s' msgs", mset.acc.Name, mset.cfg.Name, mset.cfg.Mirror.Name) + mset.mirror = &sourceInfo{name: mset.cfg.Mirror.Name, msgs: mset.srv.newIPQueue(qname) /* of *inMsg */} } if !mset.mirror.grr { @@ -2273,7 +2275,8 @@ func (mset *stream) startingSequenceForSources() { if ssi.iname == _EMPTY_ { ssi.setIndexName() } - si := &sourceInfo{name: ssi.Name, iname: ssi.iname, msgs: newIPQueue() /* of *inMsg */} + qname := fmt.Sprintf("[ACC:%s] stream source '%s' from '%s' msgs", mset.acc.Name, mset.cfg.Name, ssi.Name) + si := &sourceInfo{name: ssi.Name, iname: ssi.iname, msgs: mset.srv.newIPQueue(qname) /* of *inMsg */} mset.sources[ssi.iname] = si } @@ -2387,6 +2390,7 @@ func (mset *stream) stopSourceConsumers() { close(si.qch) si.qch = nil } + si.msgs.unregister() } } @@ -2413,6 +2417,7 @@ func (mset *stream) unsubscribeToStream() error { if mset.mirror.qch != nil { close(mset.mirror.qch) } + mset.mirror.msgs.unregister() mset.mirror = nil } @@ -3186,8 +3191,8 @@ func (mset *stream) setupSendCapabilities() { if mset.outq != nil { return } - qname := fmt.Sprintf("Stream %q send", mset.cfg.Name) - mset.outq = &jsOutQ{newIPQueue(ipQueue_Logger(qname, mset.srv.ipqLog))} // of *jsPubMsg + qname := fmt.Sprintf("[ACC:%s] stream '%s' sendQ", mset.acc.Name, mset.cfg.Name) + mset.outq = &jsOutQ{mset.srv.newIPQueue(qname)} // of *jsPubMsg go mset.internalLoop() } @@ -3413,6 +3418,14 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { sysc := mset.sysc mset.sysc = nil + if deleteFlag { + // Unregistering ipQueues do not prevent them from push/pop + // just will remove them from the central monitoring map + mset.msgs.unregister() + mset.ackq.unregister() + mset.outq.unregister() + } + // Clustered cleanup. mset.mu.Unlock()