From 29c40c874ce89ff92e785cd0f72ef62901dfebe6 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 5 Jan 2022 19:32:54 -0700 Subject: [PATCH] Adding logger for IPQueue Signed-off-by: Ivan Kozlovic --- server/ipqueue.go | 43 ++++++++++++++++++++++++------ server/ipqueue_test.go | 53 +++++++++++++++++++++++++++++++++++++ server/jetstream_cluster.go | 3 ++- server/mqtt.go | 3 ++- server/raft.go | 18 +++++++------ server/sendq.go | 2 +- server/server.go | 51 ++++++++++++++++++++++++++++++++++- server/server_test.go | 36 +++++++++++++++++++++++++ server/stream.go | 6 +++-- 9 files changed, 193 insertions(+), 22 deletions(-) diff --git a/server/ipqueue.go b/server/ipqueue.go index d8193f37..f496e24d 100644 --- a/server/ipqueue.go +++ b/server/ipqueue.go @@ -18,19 +18,32 @@ import ( ) const ipQueueDefaultMaxRecycleSize = 4 * 1024 +const ipQueueDefaultWarnThreshold = 32 * 1024 + +type ipQueueLogger interface { + // The ipQueue will invoke this function with the queue's name and the number + // of pending elements. This call CANNOT block. It is ok to drop the logging + // if desired, but not block. + log(name string, pending int) +} // This is a generic intra-process queue. type ipQueue struct { sync.RWMutex - ch chan struct{} - elts []interface{} - pos int - pool *sync.Pool - mrs int + ch chan struct{} + elts []interface{} + pos int + pool *sync.Pool + mrs int + name string + logger ipQueueLogger + lt int } type ipQueueOpts struct { maxRecycleSize int + name string + logger ipQueueLogger } type ipQueueOpt func(*ipQueueOpts) @@ -43,15 +56,26 @@ func ipQueue_MaxRecycleSize(max int) ipQueueOpt { } } +// This option provides the logger to be used by this queue to log +// when the number of pending elements reaches a certain threshold. +func ipQueue_Logger(name string, l ipQueueLogger) ipQueueOpt { + return func(o *ipQueueOpts) { + o.name, o.logger = name, l + } +} + func newIPQueue(opts ...ipQueueOpt) *ipQueue { qo := ipQueueOpts{maxRecycleSize: ipQueueDefaultMaxRecycleSize} for _, o := range opts { o(&qo) } q := &ipQueue{ - ch: make(chan struct{}, 1), - mrs: qo.maxRecycleSize, - pool: &sync.Pool{}, + ch: make(chan struct{}, 1), + mrs: qo.maxRecycleSize, + pool: &sync.Pool{}, + name: qo.name, + logger: qo.logger, + lt: ipQueueDefaultWarnThreshold, } return q } @@ -77,6 +101,9 @@ func (q *ipQueue) push(e interface{}) int { } q.elts = append(q.elts, e) l++ + if l >= q.lt && q.logger != nil { + q.logger.log(q.name, l) + } q.Unlock() if signal { select { diff --git a/server/ipqueue_test.go b/server/ipqueue_test.go index 6a2e074a..d339536a 100644 --- a/server/ipqueue_test.go +++ b/server/ipqueue_test.go @@ -14,6 +14,7 @@ package server import ( + "fmt" "sync" "testing" "time" @@ -322,3 +323,55 @@ func TestIPQueueDrain(t *testing.T) { } } } + +type testIPQLog struct { + msgs []string +} + +func (l *testIPQLog) log(name string, pending int) { + l.msgs = append(l.msgs, fmt.Sprintf("%s: %d pending", name, pending)) +} + +func TestIPQueueLogger(t *testing.T) { + l := &testIPQLog{} + q := newIPQueue(ipQueue_Logger("test_logger", l)) + q.lt = 2 + q.push(1) + q.push(2) + if len(l.msgs) != 1 { + t.Fatalf("Unexpected logging: %v", l.msgs) + } + if l.msgs[0] != "test_logger: 2 pending" { + t.Fatalf("Unexpected content: %v", l.msgs[0]) + } + l.msgs = nil + q.push(3) + if len(l.msgs) != 1 { + t.Fatalf("Unexpected logging: %v", l.msgs) + } + if l.msgs[0] != "test_logger: 3 pending" { + t.Fatalf("Unexpected content: %v", l.msgs[0]) + } + l.msgs = nil + q.popOne() + q.push(4) + if len(l.msgs) != 1 { + t.Fatalf("Unexpected logging: %v", l.msgs) + } + if l.msgs[0] != "test_logger: 3 pending" { + t.Fatalf("Unexpected content: %v", l.msgs[0]) + } + l.msgs = nil + q.pop() + q.push(5) + if len(l.msgs) != 0 { + t.Fatalf("Unexpected logging: %v", l.msgs) + } + q.push(6) + if len(l.msgs) != 1 { + t.Fatalf("Unexpected logging: %v", l.msgs) + } + if l.msgs[0] != "test_logger: 2 pending" { + t.Fatalf("Unexpected content: %v", l.msgs[0]) + } +} diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 5d84853d..d36508f5 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -5000,6 +5000,7 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) error { mset.store.FastState(&state) sreq := mset.calculateSyncRequest(&state, snap) s, js, subject, n := mset.srv, mset.js, mset.sa.Sync, mset.node + qname := fmt.Sprintf("Stream %q snapshot", mset.cfg.Name) mset.mu.Unlock() // Make sure our state's first sequence is <= the leader's snapshot. @@ -5074,7 +5075,7 @@ RETRY: reply string } - msgsQ := newIPQueue() // of *im + msgsQ := newIPQueue(ipQueue_Logger(qname, s.ipqLog)) // of *im // Send our catchup request here. reply := syncReplySubject() diff --git a/server/mqtt.go b/server/mqtt.go index 6bbf76b5..ce1cec9a 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -921,6 +921,7 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc id := string(getHash(s.Name())) replicas := s.mqttDetermineReplicas() + qname := fmt.Sprintf("MQTT account %q send", accName) as := &mqttAccountSessionManager{ sessions: make(map[string]*mqttSession), sessByHash: make(map[string]*mqttSession), @@ -931,7 +932,7 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc id: id, c: c, rplyr: mqttJSARepliesPrefix + id + ".", - sendq: newIPQueue(), // of *mqttJSPubMsg + sendq: newIPQueue(ipQueue_Logger(qname, s.ipqLog)), // of *mqttJSPubMsg nuid: nuid.New(), quitCh: quitCh, }, diff --git a/server/raft.go b/server/raft.go index 226aef37..2953c32c 100644 --- a/server/raft.go +++ b/server/raft.go @@ -350,6 +350,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { return nil, errNoPeerState } + qpfx := fmt.Sprintf("RAFT [%s - %s] ", hash[:idLen], cfg.Name) n := &raft{ created: time.Now(), id: hash[:idLen], @@ -372,13 +373,13 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { quit: make(chan struct{}), wtvch: make(chan struct{}, 1), wpsch: make(chan struct{}, 1), - reqs: newIPQueue(), // of *voteRequest - votes: newIPQueue(), // of *voteResponse - prop: newIPQueue(), // of *Entry - entry: newIPQueue(), // of *appendEntry - resp: newIPQueue(), // of *appendEntryResponse - apply: newIPQueue(), // of *CommittedEntry - stepdown: newIPQueue(), // of string + reqs: newIPQueue(), // of *voteRequest + votes: newIPQueue(), // of *voteResponse + prop: newIPQueue(ipQueue_Logger(qpfx+"Entry", s.ipqLog)), // of *Entry + entry: newIPQueue(ipQueue_Logger(qpfx+"AppendEntry", s.ipqLog)), // of *appendEntry + resp: newIPQueue(ipQueue_Logger(qpfx+"AppendEntryResponse", s.ipqLog)), // of *appendEntryResponse + apply: newIPQueue(ipQueue_Logger(qpfx+"CommittedEntry", s.ipqLog)), // of *CommittedEntry + stepdown: newIPQueue(), // of string leadc: make(chan bool, 1), observer: cfg.Observer, extSt: ps.domainExt, @@ -2094,7 +2095,8 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) { n.debug("Our first entry does not match request from follower") } // Create a queue for delivering updates from responses. - indexUpdates := newIPQueue() // of uint64 + qname := fmt.Sprintf("RAFT [%s - %s] Index updates", n.id, n.group) + indexUpdates := newIPQueue(ipQueue_Logger(qname, n.s.ipqLog)) // of uint64 indexUpdates.push(ae.pindex) n.progress[ar.peer] = indexUpdates n.Unlock() diff --git a/server/sendq.go b/server/sendq.go index 0b358547..35ff9456 100644 --- a/server/sendq.go +++ b/server/sendq.go @@ -32,7 +32,7 @@ type sendq struct { } func (s *Server) newSendQ() *sendq { - sq := &sendq{s: s, q: newIPQueue()} + sq := &sendq{s: s, q: newIPQueue(ipQueue_Logger("Send", s.ipqLog))} s.startGoRoutine(sq.internalLoop) return sq } diff --git a/server/server.go b/server/server.go index ff685474..1a44935f 100644 --- a/server/server.go +++ b/server/server.go @@ -268,6 +268,16 @@ type Server struct { // How often user logon fails due to the issuer account not being pinned. pinnedAccFail uint64 + + // This is a central logger for IPQueues when the number of pending + // messages reaches a certain thresold (per queue) + ipqLog *srvIPQueueLogger +} + +type srvIPQueueLogger struct { + ch chan string + done chan struct{} + s *Server } // For tracking JS nodes. @@ -1225,7 +1235,7 @@ func (s *Server) setSystemAccount(acc *Account) error { sid: 1, servers: make(map[string]*serverUpdate), replies: make(map[string]msgHandler), - sendq: newIPQueue(), // of *pubMsg + sendq: newIPQueue(ipQueue_Logger("System send", s.ipqLog)), // of *pubMsg resetCh: make(chan struct{}), sq: s.newSendQ(), statsz: eventsHBInterval, @@ -1595,6 +1605,8 @@ func (s *Server) Start() { s.grRunning = true s.grMu.Unlock() + s.startIPQLogger() + // Pprof http endpoint for the profiler. if opts.ProfPort != 0 { s.StartProfiler() @@ -1963,6 +1975,11 @@ func (s *Server) Shutdown() { doneExpected-- } + // Stop the IPQueue logger (before the grWG.Wait() call) + if s.ipqLog != nil { + s.ipqLog.stop() + } + // Wait for go routines to be done. s.grWG.Wait() @@ -3601,3 +3618,35 @@ func (s *Server) updateRemoteSubscription(acc *Account, sub *subscription, delta s.updateLeafNodes(acc, sub, delta) } + +func (s *Server) startIPQLogger() { + s.ipqLog = &srvIPQueueLogger{ + ch: make(chan string, 128), + done: make(chan struct{}), + s: s, + } + s.startGoRoutine(s.ipqLog.run) +} + +func (l *srvIPQueueLogger) stop() { + close(l.done) +} + +func (l *srvIPQueueLogger) log(name string, pending int) { + select { + case l.ch <- fmt.Sprintf("%s queue pending size: %v", name, pending): + default: + } +} + +func (l *srvIPQueueLogger) run() { + defer l.s.grWG.Done() + for { + select { + case w := <-l.ch: + l.s.Warnf("%s", w) + case <-l.done: + return + } + } +} diff --git a/server/server_test.go b/server/server_test.go index 630a305c..f1dfcd69 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -1279,6 +1279,9 @@ func TestServerShutdownDuringStart(t *testing.T) { ch := make(chan struct{}, 1) go func() { s.Start() + // Since the server has already been shutdown and we don't want to leave + // the ipqLog run() routine running, stop it now. + s.ipqLog.stop() close(ch) }() select { @@ -1961,3 +1964,36 @@ func TestServerLogsConfigurationFile(t *testing.T) { t.Fatalf("Config file location was not reported in log: %s", log) } } + +func TestServerIPQueueLogger(t *testing.T) { + o := DefaultOptions() + s := RunServer(o) + defer s.Shutdown() + + l := &captureWarnLogger{warn: make(chan string, 100)} + s.SetLogger(l, false, false) + + q := newIPQueue(ipQueue_Logger("test", s.ipqLog)) + // Normally, lt is immutable and set to ipQueueDefaultWarnThreshold, but + // for test, we set it to a low value. + q.lt = 2 + q.push(1) + // This one should case a warning + q.push(2) + + for { + select { + case w := <-l.warn: + // In case we get other warnings a runtime, just check that we + // get the one we expect and be done. + if strings.Contains(w, "test queue") { + if strings.Contains(w, "test queue pending size: 2") { + return + } + t.Fatalf("Invalid warning: %v", w) + } + case <-time.After(time.Second): + t.Fatal("Did not get warning") + } + } +} diff --git a/server/stream.go b/server/stream.go index f477ebc0..595b0bd9 100644 --- a/server/stream.go +++ b/server/stream.go @@ -368,6 +368,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt c := s.createInternalJetStreamClient() ic := s.createInternalJetStreamClient() + qname := fmt.Sprintf("Stream %s > %s messages", a.Name, config.Name) mset := &stream{ acc: a, jsa: jsa, @@ -378,7 +379,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt sysc: ic, stype: cfg.Storage, consumers: make(map[string]*consumer), - msgs: newIPQueue(), // of *inMsg + msgs: newIPQueue(ipQueue_Logger(qname, s.ipqLog)), // of *inMsg qch: make(chan struct{}), } @@ -3109,7 +3110,8 @@ func (mset *stream) setupSendCapabilities() { if mset.outq != nil { return } - mset.outq = &jsOutQ{newIPQueue()} // of *jsPubMsg + qname := fmt.Sprintf("Stream %q send", mset.cfg.Name) + mset.outq = &jsOutQ{newIPQueue(ipQueue_Logger(qname, mset.srv.ipqLog))} // of *jsPubMsg go mset.internalLoop() }