Adding logger for IPQueue

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2022-01-05 19:32:54 -07:00
parent 48fd559bfc
commit 29c40c874c
9 changed files with 193 additions and 22 deletions

View File

@@ -18,19 +18,32 @@ import (
)
const ipQueueDefaultMaxRecycleSize = 4 * 1024
const ipQueueDefaultWarnThreshold = 32 * 1024
type ipQueueLogger interface {
// The ipQueue will invoke this function with the queue's name and the number
// of pending elements. This call CANNOT block. It is ok to drop the logging
// if desired, but not block.
log(name string, pending int)
}
// This is a generic intra-process queue.
type ipQueue struct {
sync.RWMutex
ch chan struct{}
elts []interface{}
pos int
pool *sync.Pool
mrs int
ch chan struct{}
elts []interface{}
pos int
pool *sync.Pool
mrs int
name string
logger ipQueueLogger
lt int
}
type ipQueueOpts struct {
maxRecycleSize int
name string
logger ipQueueLogger
}
type ipQueueOpt func(*ipQueueOpts)
@@ -43,15 +56,26 @@ func ipQueue_MaxRecycleSize(max int) ipQueueOpt {
}
}
// This option provides the logger to be used by this queue to log
// when the number of pending elements reaches a certain threshold.
func ipQueue_Logger(name string, l ipQueueLogger) ipQueueOpt {
return func(o *ipQueueOpts) {
o.name, o.logger = name, l
}
}
func newIPQueue(opts ...ipQueueOpt) *ipQueue {
qo := ipQueueOpts{maxRecycleSize: ipQueueDefaultMaxRecycleSize}
for _, o := range opts {
o(&qo)
}
q := &ipQueue{
ch: make(chan struct{}, 1),
mrs: qo.maxRecycleSize,
pool: &sync.Pool{},
ch: make(chan struct{}, 1),
mrs: qo.maxRecycleSize,
pool: &sync.Pool{},
name: qo.name,
logger: qo.logger,
lt: ipQueueDefaultWarnThreshold,
}
return q
}
@@ -77,6 +101,9 @@ func (q *ipQueue) push(e interface{}) int {
}
q.elts = append(q.elts, e)
l++
if l >= q.lt && q.logger != nil {
q.logger.log(q.name, l)
}
q.Unlock()
if signal {
select {

View File

@@ -14,6 +14,7 @@
package server
import (
"fmt"
"sync"
"testing"
"time"
@@ -322,3 +323,55 @@ func TestIPQueueDrain(t *testing.T) {
}
}
}
type testIPQLog struct {
msgs []string
}
func (l *testIPQLog) log(name string, pending int) {
l.msgs = append(l.msgs, fmt.Sprintf("%s: %d pending", name, pending))
}
func TestIPQueueLogger(t *testing.T) {
l := &testIPQLog{}
q := newIPQueue(ipQueue_Logger("test_logger", l))
q.lt = 2
q.push(1)
q.push(2)
if len(l.msgs) != 1 {
t.Fatalf("Unexpected logging: %v", l.msgs)
}
if l.msgs[0] != "test_logger: 2 pending" {
t.Fatalf("Unexpected content: %v", l.msgs[0])
}
l.msgs = nil
q.push(3)
if len(l.msgs) != 1 {
t.Fatalf("Unexpected logging: %v", l.msgs)
}
if l.msgs[0] != "test_logger: 3 pending" {
t.Fatalf("Unexpected content: %v", l.msgs[0])
}
l.msgs = nil
q.popOne()
q.push(4)
if len(l.msgs) != 1 {
t.Fatalf("Unexpected logging: %v", l.msgs)
}
if l.msgs[0] != "test_logger: 3 pending" {
t.Fatalf("Unexpected content: %v", l.msgs[0])
}
l.msgs = nil
q.pop()
q.push(5)
if len(l.msgs) != 0 {
t.Fatalf("Unexpected logging: %v", l.msgs)
}
q.push(6)
if len(l.msgs) != 1 {
t.Fatalf("Unexpected logging: %v", l.msgs)
}
if l.msgs[0] != "test_logger: 2 pending" {
t.Fatalf("Unexpected content: %v", l.msgs[0])
}
}

View File

@@ -5000,6 +5000,7 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) error {
mset.store.FastState(&state)
sreq := mset.calculateSyncRequest(&state, snap)
s, js, subject, n := mset.srv, mset.js, mset.sa.Sync, mset.node
qname := fmt.Sprintf("Stream %q snapshot", mset.cfg.Name)
mset.mu.Unlock()
// Make sure our state's first sequence is <= the leader's snapshot.
@@ -5074,7 +5075,7 @@ RETRY:
reply string
}
msgsQ := newIPQueue() // of *im
msgsQ := newIPQueue(ipQueue_Logger(qname, s.ipqLog)) // of *im
// Send our catchup request here.
reply := syncReplySubject()

View File

@@ -921,6 +921,7 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
id := string(getHash(s.Name()))
replicas := s.mqttDetermineReplicas()
qname := fmt.Sprintf("MQTT account %q send", accName)
as := &mqttAccountSessionManager{
sessions: make(map[string]*mqttSession),
sessByHash: make(map[string]*mqttSession),
@@ -931,7 +932,7 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
id: id,
c: c,
rplyr: mqttJSARepliesPrefix + id + ".",
sendq: newIPQueue(), // of *mqttJSPubMsg
sendq: newIPQueue(ipQueue_Logger(qname, s.ipqLog)), // of *mqttJSPubMsg
nuid: nuid.New(),
quitCh: quitCh,
},

View File

@@ -350,6 +350,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
return nil, errNoPeerState
}
qpfx := fmt.Sprintf("RAFT [%s - %s] ", hash[:idLen], cfg.Name)
n := &raft{
created: time.Now(),
id: hash[:idLen],
@@ -372,13 +373,13 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
quit: make(chan struct{}),
wtvch: make(chan struct{}, 1),
wpsch: make(chan struct{}, 1),
reqs: newIPQueue(), // of *voteRequest
votes: newIPQueue(), // of *voteResponse
prop: newIPQueue(), // of *Entry
entry: newIPQueue(), // of *appendEntry
resp: newIPQueue(), // of *appendEntryResponse
apply: newIPQueue(), // of *CommittedEntry
stepdown: newIPQueue(), // of string
reqs: newIPQueue(), // of *voteRequest
votes: newIPQueue(), // of *voteResponse
prop: newIPQueue(ipQueue_Logger(qpfx+"Entry", s.ipqLog)), // of *Entry
entry: newIPQueue(ipQueue_Logger(qpfx+"AppendEntry", s.ipqLog)), // of *appendEntry
resp: newIPQueue(ipQueue_Logger(qpfx+"AppendEntryResponse", s.ipqLog)), // of *appendEntryResponse
apply: newIPQueue(ipQueue_Logger(qpfx+"CommittedEntry", s.ipqLog)), // of *CommittedEntry
stepdown: newIPQueue(), // of string
leadc: make(chan bool, 1),
observer: cfg.Observer,
extSt: ps.domainExt,
@@ -2094,7 +2095,8 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) {
n.debug("Our first entry does not match request from follower")
}
// Create a queue for delivering updates from responses.
indexUpdates := newIPQueue() // of uint64
qname := fmt.Sprintf("RAFT [%s - %s] Index updates", n.id, n.group)
indexUpdates := newIPQueue(ipQueue_Logger(qname, n.s.ipqLog)) // of uint64
indexUpdates.push(ae.pindex)
n.progress[ar.peer] = indexUpdates
n.Unlock()

View File

@@ -32,7 +32,7 @@ type sendq struct {
}
func (s *Server) newSendQ() *sendq {
sq := &sendq{s: s, q: newIPQueue()}
sq := &sendq{s: s, q: newIPQueue(ipQueue_Logger("Send", s.ipqLog))}
s.startGoRoutine(sq.internalLoop)
return sq
}

View File

@@ -268,6 +268,16 @@ type Server struct {
// How often user logon fails due to the issuer account not being pinned.
pinnedAccFail uint64
// This is a central logger for IPQueues when the number of pending
// messages reaches a certain thresold (per queue)
ipqLog *srvIPQueueLogger
}
type srvIPQueueLogger struct {
ch chan string
done chan struct{}
s *Server
}
// For tracking JS nodes.
@@ -1225,7 +1235,7 @@ func (s *Server) setSystemAccount(acc *Account) error {
sid: 1,
servers: make(map[string]*serverUpdate),
replies: make(map[string]msgHandler),
sendq: newIPQueue(), // of *pubMsg
sendq: newIPQueue(ipQueue_Logger("System send", s.ipqLog)), // of *pubMsg
resetCh: make(chan struct{}),
sq: s.newSendQ(),
statsz: eventsHBInterval,
@@ -1595,6 +1605,8 @@ func (s *Server) Start() {
s.grRunning = true
s.grMu.Unlock()
s.startIPQLogger()
// Pprof http endpoint for the profiler.
if opts.ProfPort != 0 {
s.StartProfiler()
@@ -1963,6 +1975,11 @@ func (s *Server) Shutdown() {
doneExpected--
}
// Stop the IPQueue logger (before the grWG.Wait() call)
if s.ipqLog != nil {
s.ipqLog.stop()
}
// Wait for go routines to be done.
s.grWG.Wait()
@@ -3601,3 +3618,35 @@ func (s *Server) updateRemoteSubscription(acc *Account, sub *subscription, delta
s.updateLeafNodes(acc, sub, delta)
}
func (s *Server) startIPQLogger() {
s.ipqLog = &srvIPQueueLogger{
ch: make(chan string, 128),
done: make(chan struct{}),
s: s,
}
s.startGoRoutine(s.ipqLog.run)
}
func (l *srvIPQueueLogger) stop() {
close(l.done)
}
func (l *srvIPQueueLogger) log(name string, pending int) {
select {
case l.ch <- fmt.Sprintf("%s queue pending size: %v", name, pending):
default:
}
}
func (l *srvIPQueueLogger) run() {
defer l.s.grWG.Done()
for {
select {
case w := <-l.ch:
l.s.Warnf("%s", w)
case <-l.done:
return
}
}
}

View File

@@ -1279,6 +1279,9 @@ func TestServerShutdownDuringStart(t *testing.T) {
ch := make(chan struct{}, 1)
go func() {
s.Start()
// Since the server has already been shutdown and we don't want to leave
// the ipqLog run() routine running, stop it now.
s.ipqLog.stop()
close(ch)
}()
select {
@@ -1961,3 +1964,36 @@ func TestServerLogsConfigurationFile(t *testing.T) {
t.Fatalf("Config file location was not reported in log: %s", log)
}
}
func TestServerIPQueueLogger(t *testing.T) {
o := DefaultOptions()
s := RunServer(o)
defer s.Shutdown()
l := &captureWarnLogger{warn: make(chan string, 100)}
s.SetLogger(l, false, false)
q := newIPQueue(ipQueue_Logger("test", s.ipqLog))
// Normally, lt is immutable and set to ipQueueDefaultWarnThreshold, but
// for test, we set it to a low value.
q.lt = 2
q.push(1)
// This one should case a warning
q.push(2)
for {
select {
case w := <-l.warn:
// In case we get other warnings a runtime, just check that we
// get the one we expect and be done.
if strings.Contains(w, "test queue") {
if strings.Contains(w, "test queue pending size: 2") {
return
}
t.Fatalf("Invalid warning: %v", w)
}
case <-time.After(time.Second):
t.Fatal("Did not get warning")
}
}
}

View File

@@ -368,6 +368,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
c := s.createInternalJetStreamClient()
ic := s.createInternalJetStreamClient()
qname := fmt.Sprintf("Stream %s > %s messages", a.Name, config.Name)
mset := &stream{
acc: a,
jsa: jsa,
@@ -378,7 +379,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
sysc: ic,
stype: cfg.Storage,
consumers: make(map[string]*consumer),
msgs: newIPQueue(), // of *inMsg
msgs: newIPQueue(ipQueue_Logger(qname, s.ipqLog)), // of *inMsg
qch: make(chan struct{}),
}
@@ -3109,7 +3110,8 @@ func (mset *stream) setupSendCapabilities() {
if mset.outq != nil {
return
}
mset.outq = &jsOutQ{newIPQueue()} // of *jsPubMsg
qname := fmt.Sprintf("Stream %q send", mset.cfg.Name)
mset.outq = &jsOutQ{newIPQueue(ipQueue_Logger(qname, mset.srv.ipqLog))} // of *jsPubMsg
go mset.internalLoop()
}