Merge pull request #3633 from nats-io/consumers-scale

Offload signaling to consumers when number is large.
This commit is contained in:
Derek Collison
2022-11-15 12:04:08 -08:00
committed by GitHub
2 changed files with 192 additions and 10 deletions

View File

@@ -5784,3 +5784,98 @@ func TestNoRaceJetStreamDeleteConsumerWithInterestStreamAndHighSeqs(t *testing.T
t.Fatalf("Consumer delete took too long: %v vs baseline %v", e, elapsed)
}
}
// Performance impact on stream ingress with large number of consumers.
func TestJetStreamLargeNumConsumersPerfImpact(t *testing.T) {
skip(t)
s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()
// Client for API requests.
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
require_NoError(t, err)
// Baseline with no consumers.
toSend := 1_000_000
start := time.Now()
for i := 0; i < toSend; i++ {
js.PublishAsync("foo", []byte("OK"))
}
<-js.PublishAsyncComplete()
tt := time.Since(start)
fmt.Printf("Base time is %v\n", tt)
fmt.Printf("%.0f msgs/sec\n", float64(toSend)/tt.Seconds())
err = js.PurgeStream("TEST")
require_NoError(t, err)
// Now add in 10 idle consumers.
for i := 1; i <= 10; i++ {
_, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: fmt.Sprintf("d-%d", i),
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)
}
start = time.Now()
for i := 0; i < toSend; i++ {
js.PublishAsync("foo", []byte("OK"))
}
<-js.PublishAsyncComplete()
tt = time.Since(start)
fmt.Printf("\n10 consumers time is %v\n", tt)
fmt.Printf("%.0f msgs/sec\n", float64(toSend)/tt.Seconds())
err = js.PurgeStream("TEST")
require_NoError(t, err)
// Now add in 90 more idle consumers.
for i := 11; i <= 100; i++ {
_, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: fmt.Sprintf("d-%d", i),
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)
}
start = time.Now()
for i := 0; i < toSend; i++ {
js.PublishAsync("foo", []byte("OK"))
}
<-js.PublishAsyncComplete()
tt = time.Since(start)
fmt.Printf("\n100 consumers time is %v\n", tt)
fmt.Printf("%.0f msgs/sec\n", float64(toSend)/tt.Seconds())
err = js.PurgeStream("TEST")
require_NoError(t, err)
// Now add in 900 more
for i := 101; i <= 1000; i++ {
_, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: fmt.Sprintf("d-%d", i),
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)
}
start = time.Now()
for i := 0; i < toSend; i++ {
js.PublishAsync("foo", []byte("OK"))
}
<-js.PublishAsyncComplete()
tt = time.Since(start)
fmt.Printf("\n1000 consumers time is %v\n", tt)
fmt.Printf("%.0f msgs/sec\n", float64(toSend)/tt.Seconds())
}

View File

@@ -225,6 +225,8 @@ type stream struct {
// For processing consumers as a list without main stream lock.
clsMu sync.RWMutex
cList []*consumer
sch chan struct{}
sigq *ipQueue // of *cMsg
// TODO(dlc) - Hide everything below behind two pointers.
// Clustered mode.
@@ -438,8 +440,13 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
msgs: s.newIPQueue(qpfx + "messages"), // of *inMsg
qch: make(chan struct{}),
uch: make(chan struct{}, 4),
sch: make(chan struct{}, 1),
}
// Start our signaling routine to process consumers.
mset.sigq = s.newIPQueue(qpfx + "obs") // of *cMsg
go mset.signalConsumersLoop()
// For no-ack consumers when we are interest retention.
if cfg.Retention != LimitsPolicy {
mset.ackq = s.newIPQueue(qpfx + "acks") // of uint64
@@ -4039,23 +4046,102 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Signal consumers for new messages.
if numConsumers > 0 {
mset.clsMu.RLock()
for _, o := range mset.cList {
o.mu.Lock()
if o.isLeader() && o.isFilteredMatch(subject) {
if seq > o.npcm {
o.npc++
}
o.signalNewMessages()
if numConsumers > consumerSignalThreshold {
mset.sigq.push(newCMsg(subject, seq))
select {
case mset.sch <- struct{}{}:
default:
}
o.mu.Unlock()
} else {
mset.signalConsumers(subject, seq)
}
mset.clsMu.RUnlock()
}
return nil
}
// Number of consumers to consider offloading signal processing.
const consumerSignalThreshold = 10
// Used to signal inbound message to registered consumers.
type cMsg struct {
seq uint64
subj string
}
// Pool to recycle consumer bound msgs.
var cMsgPool sync.Pool
// Used to queue up consumer bound msgs for signaling.
func newCMsg(subj string, seq uint64) *cMsg {
var m *cMsg
cm := cMsgPool.Get()
if cm != nil {
m = cm.(*cMsg)
} else {
m = new(cMsg)
}
m.subj, m.seq = subj, seq
return m
}
func (m *cMsg) returnToPool() {
if m == nil {
return
}
m.subj, m.seq = _EMPTY_, 0
cMsgPool.Put(m)
}
// Go routine to signal consumers.
// Offloaded from stream msg processing.
func (mset *stream) signalConsumersLoop() {
mset.mu.RLock()
s, qch, sch, msgs := mset.srv, mset.qch, mset.sch, mset.sigq
mset.mu.RUnlock()
for {
select {
case <-s.quitCh:
return
case <-qch:
return
case <-sch:
cms := msgs.pop()
for _, cm := range cms {
m := cm.(*cMsg)
seq, subj := m.seq, m.subj
m.returnToPool()
// Signal all appropriate consumers.
mset.signalConsumers(subj, seq)
}
msgs.recycle(&cms)
}
}
}
// This will update and signal all consumers that match.
func (mset *stream) signalConsumers(subj string, seq uint64) {
mset.clsMu.RLock()
defer mset.clsMu.RUnlock()
for _, o := range mset.cList {
o.mu.Lock()
if o.isLeader() && o.isFilteredMatch(subj) {
if seq > o.npcm {
o.npc++
}
if o.mset != nil {
if o.isPushMode() && o.active || o.isPullMode() && !o.waiting.isEmpty() {
o.signalNewMessages()
}
}
}
o.mu.Unlock()
}
}
// Internal message for use by jetstream subsystem.
type jsPubMsg struct {
dsubj string // Subject to send to, e.g. _INBOX.xxx
@@ -4399,6 +4485,7 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
mset.msgs.unregister()
mset.ackq.unregister()
mset.outq.unregister()
mset.sigq.unregister()
}
// Snapshot store.