diff --git a/server/consumer.go b/server/consumer.go index 84474370..ba1a0456 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -492,8 +492,12 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri if c == nil { return nil, NewJSStreamInvalidError() } + var accName string c.mu.Lock() s, a := c.srv, c.acc + if a != nil { + accName = a.Name + } c.mu.Unlock() // Hold mset lock here. @@ -570,7 +574,6 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri maxdc: uint64(config.MaxDeliver), maxp: config.MaxAckPending, created: time.Now().UTC(), - ackMsgs: newIPQueue(), } // Bind internal client to the user account. @@ -595,6 +598,9 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri } } } + // Create ackMsgs queue now that we have a consumer name + o.ackMsgs = s.newIPQueue(fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' ackMsgs", accName, o.name, mset.cfg.Name)) + // Create our request waiting queue. if o.isPullMode() { o.waiting = newWaitQueue(config.MaxWaiting) @@ -3645,6 +3651,7 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error { } n := o.node qgroup := o.cfg.DeliverGroup + o.ackMsgs.unregister() o.mu.Unlock() if c != nil { diff --git a/server/events.go b/server/events.go index e519b1e7..b490a2cb 100644 --- a/server/events.go +++ b/server/events.go @@ -433,6 +433,7 @@ RESET: // there is a chance that the process will exit before the // writeLoop has a chance to send it. c.flushClients(time.Second) + sendq.recycle(&msgs) return } pm.returnToPool() diff --git a/server/ipqueue.go b/server/ipqueue.go index 2e514513..c002bd60 100644 --- a/server/ipqueue.go +++ b/server/ipqueue.go @@ -15,35 +15,26 @@ package server import ( "sync" + "sync/atomic" ) const ipQueueDefaultMaxRecycleSize = 4 * 1024 -const ipQueueDefaultWarnThreshold = 32 * 1024 - -type ipQueueLogger interface { - // The ipQueue will invoke this function with the queue's name and the number - // of pending elements. This call CANNOT block. It is ok to drop the logging - // if desired, but not block. - log(name string, pending int) -} // This is a generic intra-process queue. type ipQueue struct { + inprogress int64 sync.RWMutex - ch chan struct{} - elts []interface{} - pos int - pool *sync.Pool - mrs int - name string - logger ipQueueLogger - lt int + ch chan struct{} + elts []interface{} + pos int + pool *sync.Pool + mrs int + name string + m *sync.Map } type ipQueueOpts struct { maxRecycleSize int - name string - logger ipQueueLogger } type ipQueueOpt func(*ipQueueOpts) @@ -56,27 +47,19 @@ func ipQueue_MaxRecycleSize(max int) ipQueueOpt { } } -// This option provides the logger to be used by this queue to log -// when the number of pending elements reaches a certain threshold. -func ipQueue_Logger(name string, l ipQueueLogger) ipQueueOpt { - return func(o *ipQueueOpts) { - o.name, o.logger = name, l - } -} - -func newIPQueue(opts ...ipQueueOpt) *ipQueue { +func (s *Server) newIPQueue(name string, opts ...ipQueueOpt) *ipQueue { qo := ipQueueOpts{maxRecycleSize: ipQueueDefaultMaxRecycleSize} for _, o := range opts { o(&qo) } q := &ipQueue{ - ch: make(chan struct{}, 1), - mrs: qo.maxRecycleSize, - pool: &sync.Pool{}, - name: qo.name, - logger: qo.logger, - lt: ipQueueDefaultWarnThreshold, + ch: make(chan struct{}, 1), + mrs: qo.maxRecycleSize, + pool: &sync.Pool{}, + name: name, + m: &s.ipQueues, } + s.ipQueues.Store(name, q) return q } @@ -101,9 +84,6 @@ func (q *ipQueue) push(e interface{}) int { } q.elts = append(q.elts, e) l++ - if l >= q.lt && q.logger != nil && (l <= q.lt+10 || q.lt%10000 == 0) { - q.logger.log(q.name, l) - } q.Unlock() if signal { select { @@ -132,6 +112,7 @@ func (q *ipQueue) pop() []interface{} { elts = q.elts[q.pos:] } q.elts, q.pos = nil, 0 + atomic.AddInt64(&q.inprogress, int64(len(elts))) q.Unlock() return elts } @@ -174,13 +155,24 @@ func (q *ipQueue) popOne() interface{} { // After a pop(), the slice can be recycled for the next push() when // a first element is added to the queue. +// This will also decrement the "in progress" count with the length +// of the slice. // Reason we use pointer to slice instead of slice is explained // here: https://staticcheck.io/docs/checks#SA6002 func (q *ipQueue) recycle(elts *[]interface{}) { - // If invoked with an nil list, don't recyle. + // If invoked with a nil list, nothing to do. + if elts == nil || *elts == nil { + return + } + // Update the in progress count. + if len(*elts) > 0 { + if atomic.AddInt64(&q.inprogress, int64(-(len(*elts)))) < 0 { + atomic.StoreInt64(&q.inprogress, 0) + } + } // We also don't want to recycle huge slices, so check against the max. // q.mrs is normally immutable but can be changed, in a safe way, in some tests. - if elts == nil || *elts == nil || cap(*elts) > q.mrs { + if cap(*elts) > q.mrs { return } q.resetAndReturnToPool(elts) @@ -212,3 +204,21 @@ func (q *ipQueue) drain() { } q.Unlock() } + +// Since the length of the queue goes to 0 after a pop(), it is good to +// have an insight on how many elements are yet to be processed after a pop(). +// For that reason, the queue maintains a count of elements returned through +// the pop() API. When the caller will call q.recycle(), this count will +// be reduced by the size of the slice returned by pop(). +func (q *ipQueue) inProgress() int64 { + return atomic.LoadInt64(&q.inprogress) +} + +// Remove this queue from the server's map of ipQueues. +// All ipQueue operations (such as push/pop/etc..) are still possible. +func (q *ipQueue) unregister() { + if q == nil { + return + } + q.m.Delete(q.name) +} diff --git a/server/ipqueue_test.go b/server/ipqueue_test.go index d339536a..acf6f088 100644 --- a/server/ipqueue_test.go +++ b/server/ipqueue_test.go @@ -14,14 +14,14 @@ package server import ( - "fmt" "sync" "testing" "time" ) func TestIPQueueBasic(t *testing.T) { - q := newIPQueue() + s := &Server{} + q := s.newIPQueue("test") // Check that pool has been created if q.pool == nil { t.Fatal("Expected pool to have been created") @@ -42,14 +42,54 @@ func TestIPQueueBasic(t *testing.T) { } // Try to change the max recycle size - q = newIPQueue(ipQueue_MaxRecycleSize(10)) - if q.mrs != 10 { - t.Fatalf("Expected max recycle size to be 10, got %v", q.mrs) + q2 := s.newIPQueue("test2", ipQueue_MaxRecycleSize(10)) + if q2.mrs != 10 { + t.Fatalf("Expected max recycle size to be 10, got %v", q2.mrs) + } + + // Check that those 2 queues are registered + var gotFirst bool + var gotSecond bool + s.ipQueues.Range(func(k, v interface{}) bool { + switch k.(string) { + case "test": + gotFirst = true + case "test2": + gotSecond = true + default: + t.Fatalf("Unknown queue: %q", k.(string)) + } + return true + }) + if !gotFirst { + t.Fatalf("Did not find queue %q", "test") + } + if !gotSecond { + t.Fatalf("Did not find queue %q", "test2") + } + // Unregister them + q.unregister() + q2.unregister() + // They should have been removed from the map + s.ipQueues.Range(func(k, v interface{}) bool { + t.Fatalf("Got queue %q", k.(string)) + return false + }) + // But verify that we can still push/pop + q.push(1) + elts := q.pop() + if len(elts) != 1 { + t.Fatalf("Should have gotten 1 element, got %v", len(elts)) + } + q2.push(2) + if e := q2.popOne(); e.(int) != 2 { + t.Fatalf("popOne failed: %+v", e) } } func TestIPQueuePush(t *testing.T) { - q := newIPQueue() + s := &Server{} + q := s.newIPQueue("test") q.push(1) if l := q.len(); l != 1 { t.Fatalf("Expected len to be 1, got %v", l) @@ -74,7 +114,8 @@ func TestIPQueuePush(t *testing.T) { } func TestIPQueuePop(t *testing.T) { - q := newIPQueue() + s := &Server{} + q := s.newIPQueue("test") q.push(1) <-q.ch elts := q.pop() @@ -91,14 +132,29 @@ func TestIPQueuePop(t *testing.T) { default: // OK } + // Since pop() brings the number of pending to 0, we keep track of the + // number of "in progress" elements. Check that the value is 1 here. + if n := q.inProgress(); n != 1 { + t.Fatalf("Expected count to be 1, got %v", n) + } + // Recycling will bring it down to 0. + q.recycle(&elts) + if n := q.inProgress(); n != 0 { + t.Fatalf("Expected count to be 0, got %v", n) + } // If we call pop() now, we should get an empty list. if elts = q.pop(); elts != nil { t.Fatalf("Expected nil, got %v", elts) } + // The in progress count should still be 0 + if n := q.inProgress(); n != 0 { + t.Fatalf("Expected count to be 0, got %v", n) + } } func TestIPQueuePopOne(t *testing.T) { - q := newIPQueue() + s := &Server{} + q := s.newIPQueue("test") q.push(1) <-q.ch e := q.popOne() @@ -111,6 +167,10 @@ func TestIPQueuePopOne(t *testing.T) { if l := q.len(); l != 0 { t.Fatalf("Expected len to be 0, got %v", l) } + // That does not affect the number of notProcessed + if n := q.inProgress(); n != 0 { + t.Fatalf("Expected count to be 0, got %v", n) + } select { case <-q.ch: t.Fatalf("Should not have been notified of addition") @@ -157,7 +217,7 @@ func TestIPQueuePopOne(t *testing.T) { t.Fatalf("Expected nil, got %v", e) } - q = newIPQueue() + q = s.newIPQueue("test2") q.push(1) q.push(2) // Capture current capacity @@ -186,7 +246,8 @@ func TestIPQueuePopOne(t *testing.T) { } func TestIPQueueMultiProducers(t *testing.T) { - q := newIPQueue() + s := &Server{} + q := s.newIPQueue("test") wg := sync.WaitGroup{} wg.Add(3) @@ -211,6 +272,9 @@ func TestIPQueueMultiProducers(t *testing.T) { m[v.(int)] = struct{}{} } q.recycle(&values) + if n := q.inProgress(); n != 0 { + t.Fatalf("Expected count to be 0, got %v", n) + } done = len(m) == 300 case <-tm.C: t.Fatalf("Did not receive all elements: %v", m) @@ -220,7 +284,8 @@ func TestIPQueueMultiProducers(t *testing.T) { } func TestIPQueueRecycle(t *testing.T) { - q := newIPQueue() + s := &Server{} + q := s.newIPQueue("test") total := 1000 for iter := 0; iter < 5; iter++ { var sz int @@ -252,7 +317,7 @@ func TestIPQueueRecycle(t *testing.T) { } } - q = newIPQueue(ipQueue_MaxRecycleSize(10)) + q = s.newIPQueue("test2", ipQueue_MaxRecycleSize(10)) for i := 0; i < 100; i++ { q.push(i) } @@ -291,7 +356,8 @@ func TestIPQueueRecycle(t *testing.T) { } func TestIPQueueDrain(t *testing.T) { - q := newIPQueue() + s := &Server{} + q := s.newIPQueue("test") for iter, recycled := 0, false; iter < 5 && !recycled; iter++ { for i := 0; i < 100; i++ { q.push(i + 1) @@ -323,55 +389,3 @@ func TestIPQueueDrain(t *testing.T) { } } } - -type testIPQLog struct { - msgs []string -} - -func (l *testIPQLog) log(name string, pending int) { - l.msgs = append(l.msgs, fmt.Sprintf("%s: %d pending", name, pending)) -} - -func TestIPQueueLogger(t *testing.T) { - l := &testIPQLog{} - q := newIPQueue(ipQueue_Logger("test_logger", l)) - q.lt = 2 - q.push(1) - q.push(2) - if len(l.msgs) != 1 { - t.Fatalf("Unexpected logging: %v", l.msgs) - } - if l.msgs[0] != "test_logger: 2 pending" { - t.Fatalf("Unexpected content: %v", l.msgs[0]) - } - l.msgs = nil - q.push(3) - if len(l.msgs) != 1 { - t.Fatalf("Unexpected logging: %v", l.msgs) - } - if l.msgs[0] != "test_logger: 3 pending" { - t.Fatalf("Unexpected content: %v", l.msgs[0]) - } - l.msgs = nil - q.popOne() - q.push(4) - if len(l.msgs) != 1 { - t.Fatalf("Unexpected logging: %v", l.msgs) - } - if l.msgs[0] != "test_logger: 3 pending" { - t.Fatalf("Unexpected content: %v", l.msgs[0]) - } - l.msgs = nil - q.pop() - q.push(5) - if len(l.msgs) != 0 { - t.Fatalf("Unexpected logging: %v", l.msgs) - } - q.push(6) - if len(l.msgs) != 1 { - t.Fatalf("Unexpected logging: %v", l.msgs) - } - if l.msgs[0] != "test_logger: 2 pending" { - t.Fatalf("Unexpected content: %v", l.msgs[0]) - } -} diff --git a/server/jetstream_api.go b/server/jetstream_api.go index f83f1053..211cbf74 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -2787,7 +2787,7 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamC // For signaling to upper layers. resultCh := make(chan result, 1) - activeQ := newIPQueue() // of int + activeQ := s.newIPQueue(fmt.Sprintf("[ACC:%s] stream '%s' restore", acc.Name, streamName)) // of int var total int @@ -2866,6 +2866,7 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamC tfile.Close() os.Remove(tfile.Name()) sub.client.processUnsub(sub.sid) + activeQ.unregister() }() const activityInterval = 5 * time.Second diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index eaf46302..b0b74813 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -557,7 +557,7 @@ func (js *jetStream) setupMetaGroup() error { } // Start up our meta node. - n, err := s.startRaftNode(cfg) + n, err := s.startRaftNode(sysAcc.GetName(), cfg) if err != nil { s.Warnf("Could not start metadata controller: %v", err) return err @@ -1378,7 +1378,7 @@ func (rg *raftGroup) setPreferred() { } // createRaftGroup is called to spin up this raft group if needed. -func (js *jetStream) createRaftGroup(rg *raftGroup, storage StorageType) error { +func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage StorageType) error { js.mu.Lock() defer js.mu.Unlock() s, cc := js.srv, js.cluster @@ -1434,7 +1434,7 @@ func (js *jetStream) createRaftGroup(rg *raftGroup, storage StorageType) error { s.bootstrapRaftNode(cfg, rg.Peers, true) } - n, err := s.startRaftNode(cfg) + n, err := s.startRaftNode(accName, cfg) if err != nil || n == nil { s.Debugf("Error creating raft group: %v", err) return err @@ -2316,7 +2316,7 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss var needsSetLeader bool if !alreadyRunning && numReplicas > 1 { if needsNode { - js.createRaftGroup(rg, storage) + js.createRaftGroup(acc.GetName(), rg, storage) } s.startGoRoutine(func() { js.monitorStream(mset, sa) }) } else if numReplicas == 1 && alreadyRunning { @@ -2413,7 +2413,7 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme js.mu.RUnlock() // Process the raft group and make sure it's running if needed. - err := js.createRaftGroup(rg, storage) + err := js.createRaftGroup(acc.GetName(), rg, storage) // If we are restoring, create the stream if we are R>1 and not the preferred who handles the // receipt of the snapshot itself. @@ -2813,7 +2813,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state if !alreadyRunning { // Process the raft group and make sure its running if needed. - js.createRaftGroup(rg, mset.config().Storage) + js.createRaftGroup(acc.GetName(), rg, mset.config().Storage) } // Check if we already have this consumer running. @@ -5215,7 +5215,7 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) error { mset.store.FastState(&state) sreq := mset.calculateSyncRequest(&state, snap) s, js, subject, n := mset.srv, mset.js, mset.sa.Sync, mset.node - qname := fmt.Sprintf("Stream %q snapshot", mset.cfg.Name) + qname := fmt.Sprintf("[ACC:%s] stream '%s' snapshot", mset.acc.Name, mset.cfg.Name) mset.mu.Unlock() // Make sure our state's first sequence is <= the leader's snapshot. @@ -5290,7 +5290,8 @@ RETRY: reply string } - msgsQ := newIPQueue(ipQueue_Logger(qname, s.ipqLog)) // of *im + msgsQ := s.newIPQueue(qname) // of *im + defer msgsQ.unregister() // Send our catchup request here. reply := syncReplySubject() @@ -5324,10 +5325,12 @@ RETRY: // Check for eof signaling. if len(msg) == 0 { + msgsQ.recycle(&mrecs) return nil } if lseq, err := mset.processCatchupMsg(msg); err == nil { if lseq >= last { + msgsQ.recycle(&mrecs) return nil } } else if isOutOfSpaceErr(err) { @@ -5338,9 +5341,11 @@ RETRY: } else { s.Warnf("Catchup for stream '%s > %s' errored, account resources exceeded: %v", mset.account(), mset.name(), err) } + msgsQ.recycle(&mrecs) return err } else { s.Warnf("Catchup for stream '%s > %s' errored, will retry: %v", mset.account(), mset.name(), err) + msgsQ.recycle(&mrecs) goto RETRY } if mrec.reply != _EMPTY_ { diff --git a/server/monitor.go b/server/monitor.go index 02e88850..0cf1d6ad 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1077,6 +1077,38 @@ func (s *Server) HandleStacksz(w http.ResponseWriter, r *http.Request) { ResponseHandler(w, r, buf[:n]) } +type monitorIPQueue struct { + Pending int `json:"pending"` + InProgress int `json:"in_progress,omitempty"` +} + +func (s *Server) HandleIPQueuesz(w http.ResponseWriter, r *http.Request) { + all, err := decodeBool(w, r, "all") + if err != nil { + return + } + qfilter := r.URL.Query().Get("queues") + + queues := map[string]monitorIPQueue{} + + s.ipQueues.Range(func(k, v interface{}) bool { + name := k.(string) + queue := v.(*ipQueue) + pending := queue.len() + inProgress := int(queue.inProgress()) + if !all && (pending == 0 && inProgress == 0) { + return true + } else if qfilter != _EMPTY_ && !strings.Contains(name, qfilter) { + return true + } + queues[name] = monitorIPQueue{Pending: pending, InProgress: inProgress} + return true + }) + + b, _ := json.MarshalIndent(queues, "", " ") + ResponseHandler(w, r, b) +} + // Varz will output server information on the monitoring port at /varz. type Varz struct { ID string `json:"server_id"` diff --git a/server/mqtt.go b/server/mqtt.go index ce1cec9a..25d96cb0 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -921,7 +921,7 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc id := string(getHash(s.Name())) replicas := s.mqttDetermineReplicas() - qname := fmt.Sprintf("MQTT account %q send", accName) + qname := fmt.Sprintf("[ACC:%s] MQTT ", accName) as := &mqttAccountSessionManager{ sessions: make(map[string]*mqttSession), sessByHash: make(map[string]*mqttSession), @@ -932,11 +932,11 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc id: id, c: c, rplyr: mqttJSARepliesPrefix + id + ".", - sendq: newIPQueue(ipQueue_Logger(qname, s.ipqLog)), // of *mqttJSPubMsg + sendq: s.newIPQueue(qname + "send"), // of *mqttJSPubMsg nuid: nuid.New(), quitCh: quitCh, }, - sp: newIPQueue(), // of uint64 + sp: s.newIPQueue(qname + "sp"), // of uint64 } // TODO record domain name in as here diff --git a/server/raft.go b/server/raft.go index 19bf6533..104d21e3 100644 --- a/server/raft.go +++ b/server/raft.go @@ -197,6 +197,9 @@ type raft struct { leadc chan bool quit chan struct{} + // Account name of the asset this raft group is for + accName string + // Random generator, used to generate inboxes for instance prand *rand.Rand } @@ -330,7 +333,7 @@ func (s *Server) bootstrapRaftNode(cfg *RaftConfig, knownPeers []string, allPeer } // startRaftNode will start the raft node. -func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { +func (s *Server) startRaftNode(accName string, cfg *RaftConfig) (RaftNode, error) { if cfg == nil { return nil, errNilCfg } @@ -353,7 +356,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { return nil, errNoPeerState } - qpfx := fmt.Sprintf("RAFT [%s - %s] ", hash[:idLen], cfg.Name) + qpfx := fmt.Sprintf("[ACC:%s] RAFT '%s' ", accName, cfg.Name) rsrc := time.Now().UnixNano() if len(pub) >= 32 { if h, _ := highwayhash.New64([]byte(pub[:32])); h != nil { @@ -382,13 +385,14 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { quit: make(chan struct{}), wtvch: make(chan struct{}, 1), wpsch: make(chan struct{}, 1), - reqs: newIPQueue(), // of *voteRequest - votes: newIPQueue(), // of *voteResponse - prop: newIPQueue(ipQueue_Logger(qpfx+"Entry", s.ipqLog)), // of *Entry - entry: newIPQueue(ipQueue_Logger(qpfx+"AppendEntry", s.ipqLog)), // of *appendEntry - resp: newIPQueue(ipQueue_Logger(qpfx+"AppendEntryResponse", s.ipqLog)), // of *appendEntryResponse - apply: newIPQueue(ipQueue_Logger(qpfx+"CommittedEntry", s.ipqLog)), // of *CommittedEntry - stepdown: newIPQueue(), // of string + reqs: s.newIPQueue(qpfx + "vreq"), // of *voteRequest + votes: s.newIPQueue(qpfx + "vresp"), // of *voteResponse + prop: s.newIPQueue(qpfx + "entry"), // of *Entry + entry: s.newIPQueue(qpfx + "appendEntry"), // of *appendEntry + resp: s.newIPQueue(qpfx + "appendEntryResponse"), // of *appendEntryResponse + apply: s.newIPQueue(qpfx + "committedEntry"), // of *CommittedEntry + stepdown: s.newIPQueue(qpfx + "stepdown"), // of string + accName: accName, leadc: make(chan bool, 1), observer: cfg.Observer, extSt: ps.domainExt, @@ -1337,6 +1341,12 @@ func (n *raft) shutdown(shouldDelete bool) { os.Remove(filepath.Join(n.sd, termVoteFile)) os.RemoveAll(filepath.Join(n.sd, snapshotsDir)) } + // Unregistering ipQueues do not prevent them from push/pop + // just will remove them from the central monitoring map + queues := []*ipQueue{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply, n.stepdown} + for _, q := range queues { + q.unregister() + } n.Unlock() s.unregisterRaftNode(g) @@ -1973,6 +1983,7 @@ func (n *raft) runCatchup(ar *appendEntryResponse, indexUpdatesQ *ipQueue /* of n.debug("Catchup done for %q, will add into peers", peer) n.ProposeAddPeer(peer) } + indexUpdatesQ.unregister() }() n.debug("Running catchup for %q", peer) @@ -2114,8 +2125,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) { n.debug("Our first entry does not match request from follower") } // Create a queue for delivering updates from responses. - qname := fmt.Sprintf("RAFT [%s - %s] Index updates", n.id, n.group) - indexUpdates := newIPQueue(ipQueue_Logger(qname, n.s.ipqLog)) // of uint64 + indexUpdates := n.s.newIPQueue(fmt.Sprintf("[ACC:%s] RAFT '%s' indexUpdates", n.accName, n.group)) // of uint64 indexUpdates.push(ae.pindex) n.progress[ar.peer] = indexUpdates n.Unlock() diff --git a/server/sendq.go b/server/sendq.go index 35ff9456..49fcfb19 100644 --- a/server/sendq.go +++ b/server/sendq.go @@ -32,7 +32,7 @@ type sendq struct { } func (s *Server) newSendQ() *sendq { - sq := &sendq{s: s, q: newIPQueue(ipQueue_Logger("Send", s.ipqLog))} + sq := &sendq{s: s, q: s.newIPQueue("SendQ")} s.startGoRoutine(sq.internalLoop) return sq } diff --git a/server/server.go b/server/server.go index b9776532..e79c5b95 100644 --- a/server/server.go +++ b/server/server.go @@ -269,15 +269,8 @@ type Server struct { // Keep track of what that user name is for config reload purposes. sysAccOnlyNoAuthUser string - // This is a central logger for IPQueues when the number of pending - // messages reaches a certain thresold (per queue) - ipqLog *srvIPQueueLogger -} - -type srvIPQueueLogger struct { - ch chan string - done chan struct{} - s *Server + // IPQueues map + ipQueues sync.Map } // For tracking JS nodes. @@ -1265,7 +1258,7 @@ func (s *Server) setSystemAccount(acc *Account) error { sid: 1, servers: make(map[string]*serverUpdate), replies: make(map[string]msgHandler), - sendq: newIPQueue(ipQueue_Logger("System send", s.ipqLog)), // of *pubMsg + sendq: s.newIPQueue("System sendQ"), // of *pubMsg resetCh: make(chan struct{}), sq: s.newSendQ(), statsz: eventsHBInterval, @@ -1641,8 +1634,6 @@ func (s *Server) Start() { s.grRunning = true s.grMu.Unlock() - s.startIPQLogger() - // Pprof http endpoint for the profiler. if opts.ProfPort != 0 { s.StartProfiler() @@ -2011,11 +2002,6 @@ func (s *Server) Shutdown() { doneExpected-- } - // Stop the IPQueue logger (before the grWG.Wait() call) - if s.ipqLog != nil { - s.ipqLog.stop() - } - // Wait for go routines to be done. s.grWG.Wait() @@ -2267,6 +2253,7 @@ const ( AccountzPath = "/accountz" JszPath = "/jsz" HealthzPath = "/healthz" + IPQueuesPath = "/ipqueuesz" ) func (s *Server) basePath(p string) string { @@ -2371,6 +2358,8 @@ func (s *Server) startMonitoring(secure bool) error { mux.HandleFunc(s.basePath(JszPath), s.HandleJsz) // Healthz mux.HandleFunc(s.basePath(HealthzPath), s.HandleHealthz) + // IPQueuesz + mux.HandleFunc(s.basePath(IPQueuesPath), s.HandleIPQueuesz) // Do not set a WriteTimeout because it could cause cURL/browser // to return empty response or unable to display page if the @@ -3633,35 +3622,3 @@ func (s *Server) updateRemoteSubscription(acc *Account, sub *subscription, delta s.updateLeafNodes(acc, sub, delta) } - -func (s *Server) startIPQLogger() { - s.ipqLog = &srvIPQueueLogger{ - ch: make(chan string, 128), - done: make(chan struct{}), - s: s, - } - s.startGoRoutine(s.ipqLog.run) -} - -func (l *srvIPQueueLogger) stop() { - close(l.done) -} - -func (l *srvIPQueueLogger) log(name string, pending int) { - select { - case l.ch <- fmt.Sprintf("%s queue pending size: %v", name, pending): - default: - } -} - -func (l *srvIPQueueLogger) run() { - defer l.s.grWG.Done() - for { - select { - case w := <-l.ch: - l.s.Warnf("%s", w) - case <-l.done: - return - } - } -} diff --git a/server/server_test.go b/server/server_test.go index 9d6a19f0..501746bd 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -1279,9 +1279,6 @@ func TestServerShutdownDuringStart(t *testing.T) { ch := make(chan struct{}, 1) go func() { s.Start() - // Since the server has already been shutdown and we don't want to leave - // the ipqLog run() routine running, stop it now. - s.ipqLog.stop() close(ch) }() select { @@ -1964,36 +1961,3 @@ func TestServerLogsConfigurationFile(t *testing.T) { t.Fatalf("Config file location was not reported in log: %s", log) } } - -func TestServerIPQueueLogger(t *testing.T) { - o := DefaultOptions() - s := RunServer(o) - defer s.Shutdown() - - l := &captureWarnLogger{warn: make(chan string, 100)} - s.SetLogger(l, false, false) - - q := newIPQueue(ipQueue_Logger("test", s.ipqLog)) - // Normally, lt is immutable and set to ipQueueDefaultWarnThreshold, but - // for test, we set it to a low value. - q.lt = 2 - q.push(1) - // This one should case a warning - q.push(2) - - for { - select { - case w := <-l.warn: - // In case we get other warnings a runtime, just check that we - // get the one we expect and be done. - if strings.Contains(w, "test queue") { - if strings.Contains(w, "test queue pending size: 2") { - return - } - t.Fatalf("Invalid warning: %v", w) - } - case <-time.After(time.Second): - t.Fatal("Did not get warning") - } - } -} diff --git a/server/stream.go b/server/stream.go index 2f2164e8..697e0d5c 100644 --- a/server/stream.go +++ b/server/stream.go @@ -367,7 +367,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt c := s.createInternalJetStreamClient() ic := s.createInternalJetStreamClient() - qname := fmt.Sprintf("Stream %s > %s messages", a.Name, config.Name) + qpfx := fmt.Sprintf("[ACC:%s] stream '%s' ", a.Name, config.Name) mset := &stream{ acc: a, jsa: jsa, @@ -378,13 +378,13 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt sysc: ic, stype: cfg.Storage, consumers: make(map[string]*consumer), - msgs: newIPQueue(ipQueue_Logger(qname, s.ipqLog)), // of *inMsg + msgs: s.newIPQueue(qpfx + "messages"), // of *inMsg qch: make(chan struct{}), } // For no-ack consumers when we are interest retention. if cfg.Retention != LimitsPolicy { - mset.ackq = newIPQueue() // of uint64 + mset.ackq = s.newIPQueue(qpfx + "acks") // of uint64 } jsa.streams[cfg.Name] = mset @@ -1057,7 +1057,8 @@ func (mset *stream) update(config *StreamConfig) error { mset.sources = make(map[string]*sourceInfo) } mset.cfg.Sources = append(mset.cfg.Sources, s) - si := &sourceInfo{name: s.Name, iname: s.iname, msgs: newIPQueue() /* of *inMsg */} + qname := fmt.Sprintf("[ACC:%s] stream source '%s' from '%s' msgs", mset.acc.Name, mset.cfg.Name, s.Name) + si := &sourceInfo{name: s.Name, iname: s.iname, msgs: mset.srv.newIPQueue(qname) /* of *inMsg */} mset.sources[s.iname] = si mset.setStartingSequenceForSource(s.iname) mset.setSourceConsumer(s.iname, si.sseq+1) @@ -1655,7 +1656,8 @@ func (mset *stream) setupMirrorConsumer() error { } if !isReset { - mset.mirror = &sourceInfo{name: mset.cfg.Mirror.Name, msgs: newIPQueue() /* of *inMsg */} + qname := fmt.Sprintf("[ACC:%s] stream mirror '%s' of '%s' msgs", mset.acc.Name, mset.cfg.Name, mset.cfg.Mirror.Name) + mset.mirror = &sourceInfo{name: mset.cfg.Mirror.Name, msgs: mset.srv.newIPQueue(qname) /* of *inMsg */} } if !mset.mirror.grr { @@ -2273,7 +2275,8 @@ func (mset *stream) startingSequenceForSources() { if ssi.iname == _EMPTY_ { ssi.setIndexName() } - si := &sourceInfo{name: ssi.Name, iname: ssi.iname, msgs: newIPQueue() /* of *inMsg */} + qname := fmt.Sprintf("[ACC:%s] stream source '%s' from '%s' msgs", mset.acc.Name, mset.cfg.Name, ssi.Name) + si := &sourceInfo{name: ssi.Name, iname: ssi.iname, msgs: mset.srv.newIPQueue(qname) /* of *inMsg */} mset.sources[ssi.iname] = si } @@ -2387,6 +2390,7 @@ func (mset *stream) stopSourceConsumers() { close(si.qch) si.qch = nil } + si.msgs.unregister() } } @@ -2413,6 +2417,7 @@ func (mset *stream) unsubscribeToStream() error { if mset.mirror.qch != nil { close(mset.mirror.qch) } + mset.mirror.msgs.unregister() mset.mirror = nil } @@ -3186,8 +3191,8 @@ func (mset *stream) setupSendCapabilities() { if mset.outq != nil { return } - qname := fmt.Sprintf("Stream %q send", mset.cfg.Name) - mset.outq = &jsOutQ{newIPQueue(ipQueue_Logger(qname, mset.srv.ipqLog))} // of *jsPubMsg + qname := fmt.Sprintf("[ACC:%s] stream '%s' sendQ", mset.acc.Name, mset.cfg.Name) + mset.outq = &jsOutQ{mset.srv.newIPQueue(qname)} // of *jsPubMsg go mset.internalLoop() } @@ -3413,6 +3418,14 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { sysc := mset.sysc mset.sysc = nil + if deleteFlag { + // Unregistering ipQueues do not prevent them from push/pop + // just will remove them from the central monitoring map + mset.msgs.unregister() + mset.ackq.unregister() + mset.outq.unregister() + } + // Clustered cleanup. mset.mu.Unlock()