From 9f241f3322ce069c8e7a994b45392548d6de6677 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 15 Nov 2022 11:25:07 -0800 Subject: [PATCH] Offload signaling to consumers when number is large. Signed-off-by: Derek Collison --- server/norace_test.go | 95 +++++++++++++++++++++++++++++++++++++ server/stream.go | 107 ++++++++++++++++++++++++++++++++++++++---- 2 files changed, 192 insertions(+), 10 deletions(-) diff --git a/server/norace_test.go b/server/norace_test.go index 9f26f311..7cca0fb2 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -5784,3 +5784,98 @@ func TestNoRaceJetStreamDeleteConsumerWithInterestStreamAndHighSeqs(t *testing.T t.Fatalf("Consumer delete took too long: %v vs baseline %v", e, elapsed) } } + +// Performance impact on stream ingress with large number of consumers. +func TestJetStreamLargeNumConsumersPerfImpact(t *testing.T) { + skip(t) + + s := RunBasicJetStreamServer() + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + // Client for API requests. + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + require_NoError(t, err) + + // Baseline with no consumers. + toSend := 1_000_000 + start := time.Now() + for i := 0; i < toSend; i++ { + js.PublishAsync("foo", []byte("OK")) + } + <-js.PublishAsyncComplete() + tt := time.Since(start) + fmt.Printf("Base time is %v\n", tt) + fmt.Printf("%.0f msgs/sec\n", float64(toSend)/tt.Seconds()) + + err = js.PurgeStream("TEST") + require_NoError(t, err) + + // Now add in 10 idle consumers. + for i := 1; i <= 10; i++ { + _, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: fmt.Sprintf("d-%d", i), + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + } + + start = time.Now() + for i := 0; i < toSend; i++ { + js.PublishAsync("foo", []byte("OK")) + } + <-js.PublishAsyncComplete() + tt = time.Since(start) + fmt.Printf("\n10 consumers time is %v\n", tt) + fmt.Printf("%.0f msgs/sec\n", float64(toSend)/tt.Seconds()) + + err = js.PurgeStream("TEST") + require_NoError(t, err) + + // Now add in 90 more idle consumers. + for i := 11; i <= 100; i++ { + _, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: fmt.Sprintf("d-%d", i), + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + } + + start = time.Now() + for i := 0; i < toSend; i++ { + js.PublishAsync("foo", []byte("OK")) + } + <-js.PublishAsyncComplete() + tt = time.Since(start) + fmt.Printf("\n100 consumers time is %v\n", tt) + fmt.Printf("%.0f msgs/sec\n", float64(toSend)/tt.Seconds()) + + err = js.PurgeStream("TEST") + require_NoError(t, err) + + // Now add in 900 more + for i := 101; i <= 1000; i++ { + _, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: fmt.Sprintf("d-%d", i), + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + } + + start = time.Now() + for i := 0; i < toSend; i++ { + js.PublishAsync("foo", []byte("OK")) + } + <-js.PublishAsyncComplete() + tt = time.Since(start) + fmt.Printf("\n1000 consumers time is %v\n", tt) + fmt.Printf("%.0f msgs/sec\n", float64(toSend)/tt.Seconds()) +} diff --git a/server/stream.go b/server/stream.go index 09e10b9a..9bbcd341 100644 --- a/server/stream.go +++ b/server/stream.go @@ -225,6 +225,8 @@ type stream struct { // For processing consumers as a list without main stream lock. clsMu sync.RWMutex cList []*consumer + sch chan struct{} + sigq *ipQueue // of *cMsg // TODO(dlc) - Hide everything below behind two pointers. // Clustered mode. @@ -438,8 +440,13 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt msgs: s.newIPQueue(qpfx + "messages"), // of *inMsg qch: make(chan struct{}), uch: make(chan struct{}, 4), + sch: make(chan struct{}, 1), } + // Start our signaling routine to process consumers. + mset.sigq = s.newIPQueue(qpfx + "obs") // of *cMsg + go mset.signalConsumersLoop() + // For no-ack consumers when we are interest retention. if cfg.Retention != LimitsPolicy { mset.ackq = s.newIPQueue(qpfx + "acks") // of uint64 @@ -4039,23 +4046,102 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, // Signal consumers for new messages. if numConsumers > 0 { - mset.clsMu.RLock() - for _, o := range mset.cList { - o.mu.Lock() - if o.isLeader() && o.isFilteredMatch(subject) { - if seq > o.npcm { - o.npc++ - } - o.signalNewMessages() + if numConsumers > consumerSignalThreshold { + mset.sigq.push(newCMsg(subject, seq)) + select { + case mset.sch <- struct{}{}: + default: } - o.mu.Unlock() + } else { + mset.signalConsumers(subject, seq) } - mset.clsMu.RUnlock() } return nil } +// Number of consumers to consider offloading signal processing. +const consumerSignalThreshold = 10 + +// Used to signal inbound message to registered consumers. +type cMsg struct { + seq uint64 + subj string +} + +// Pool to recycle consumer bound msgs. +var cMsgPool sync.Pool + +// Used to queue up consumer bound msgs for signaling. +func newCMsg(subj string, seq uint64) *cMsg { + var m *cMsg + cm := cMsgPool.Get() + if cm != nil { + m = cm.(*cMsg) + } else { + m = new(cMsg) + } + m.subj, m.seq = subj, seq + + return m +} + +func (m *cMsg) returnToPool() { + if m == nil { + return + } + m.subj, m.seq = _EMPTY_, 0 + cMsgPool.Put(m) +} + +// Go routine to signal consumers. +// Offloaded from stream msg processing. +func (mset *stream) signalConsumersLoop() { + mset.mu.RLock() + s, qch, sch, msgs := mset.srv, mset.qch, mset.sch, mset.sigq + mset.mu.RUnlock() + + for { + select { + case <-s.quitCh: + return + case <-qch: + return + case <-sch: + cms := msgs.pop() + for _, cm := range cms { + m := cm.(*cMsg) + seq, subj := m.seq, m.subj + m.returnToPool() + // Signal all appropriate consumers. + mset.signalConsumers(subj, seq) + } + msgs.recycle(&cms) + } + } +} + +// This will update and signal all consumers that match. +func (mset *stream) signalConsumers(subj string, seq uint64) { + mset.clsMu.RLock() + defer mset.clsMu.RUnlock() + + for _, o := range mset.cList { + o.mu.Lock() + if o.isLeader() && o.isFilteredMatch(subj) { + if seq > o.npcm { + o.npc++ + } + if o.mset != nil { + if o.isPushMode() && o.active || o.isPullMode() && !o.waiting.isEmpty() { + o.signalNewMessages() + } + } + } + o.mu.Unlock() + } +} + // Internal message for use by jetstream subsystem. type jsPubMsg struct { dsubj string // Subject to send to, e.g. _INBOX.xxx @@ -4399,6 +4485,7 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { mset.msgs.unregister() mset.ackq.unregister() mset.outq.unregister() + mset.sigq.unregister() } // Snapshot store.