Improve performance and latency with large number of sparse consumers.

When a stream had a large number of consumers on a server that were sparse, the signaling mechanism would do a linear scan to signal matching consumers. As usage patterns have continued to have more consumers that are filteres and sparse, meaning a message is destined for a single or small number of consumers.

This change moves selection to a sublist that tracks only active consumer leaders for selection, which optimizes selection of consumers to signal when the number of consumers is large.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-12-11 14:38:45 -05:00
parent ef32cba064
commit 855f790e3c
3 changed files with 273 additions and 57 deletions

View File

@@ -306,6 +306,9 @@ type consumer struct {
// Ack queue
ackMsgs *ipQueue
// For stream signaling.
sigSub *subscription
}
type proposal struct {
@@ -1062,6 +1065,9 @@ func (o *consumer) setLeader(isLeader bool) {
// Snapshot initial info.
o.infoWithSnap(true)
// Register as a leader with our parent stream.
mset.setConsumerAsLeader(o)
// Now start up Go routine to deliver msgs.
go o.loopAndGatherMsgs(qch)
@@ -1107,6 +1113,11 @@ func (o *consumer) setLeader(isLeader bool) {
stopAndClearTimer(&o.gwdtmr)
}
o.mu.Unlock()
// Unregister as a leader with our parent stream.
if mset != nil {
mset.removeConsumerAsLeader(o)
}
}
}
@@ -4311,3 +4322,42 @@ func (o *consumer) account() *Account {
o.mu.RUnlock()
return a
}
func (o *consumer) signalSub() *subscription {
o.mu.Lock()
defer o.mu.Unlock()
if o.sigSub != nil {
return o.sigSub
}
subject := o.cfg.FilterSubject
if subject == _EMPTY_ {
subject = fwcs
}
return &subscription{subject: []byte(subject), icb: o.processStreamSignal}
}
// This is what will be called when our parent stream wants to kick us regarding a new message.
// We know that we are the leader and that this subject matches us by how the parent handles registering
// us with the signaling sublist.
// We do need the sequence of the message however and we use the msg as the encoded seq.
func (o *consumer) processStreamSignal(_ *subscription, _ *client, _ *Account, subject, _ string, seqb []byte) {
var le = binary.LittleEndian
seq := le.Uint64(seqb)
o.mu.Lock()
defer o.mu.Unlock()
if o.mset == nil {
return
}
if seq > o.npcm {
o.npc++
}
if seq < o.sseq {
return
}
if o.isPushMode() && o.active || o.isPullMode() && !o.waiting.isEmpty() {
o.signalNewMessages()
}
}

View File

@@ -5785,6 +5785,26 @@ func TestNoRaceJetStreamDeleteConsumerWithInterestStreamAndHighSeqs(t *testing.T
}
}
// Bug when we encode a timestamp that upon decode causes an error which causes server to panic.
// This can happen on consumer redelivery since they adjusted timstamps can be in the future, and result
// in a negative encoding. If that encoding was exactly -1 seconds, would cause decodeConsumerState to fail
// and the server to panic.
func TestNoRaceEncodeConsumerStateBug(t *testing.T) {
for i := 0; i < 200_000; i++ {
// Pretend we redelivered and updated the timestamp to reflect the new start time for expiration.
// The bug will trip when time.Now() rounded to seconds in encode is 1 second below the truncated version
// of pending.
pending := Pending{Sequence: 1, Timestamp: time.Now().Add(time.Second).UnixNano()}
state := ConsumerState{
Delivered: SequencePair{Consumer: 1, Stream: 1},
Pending: map[uint64]*Pending{1: &pending},
}
buf := encodeConsumerState(&state)
_, err := decodeConsumerState(buf)
require_NoError(t, err)
}
}
// Performance impact on stream ingress with large number of consumers.
func TestJetStreamLargeNumConsumersPerfImpact(t *testing.T) {
skip(t)
@@ -5880,22 +5900,147 @@ func TestJetStreamLargeNumConsumersPerfImpact(t *testing.T) {
fmt.Printf("%.0f msgs/sec\n", float64(toSend)/tt.Seconds())
}
// Bug when we encode a timestamp that upon decode causes an error which causes server to panic.
// This can happen on consumer redelivery since they adjusted timstamps can be in the future, and result
// in a negative encoding. If that encoding was exactly -1 seconds, would cause decodeConsumerState to fail
// and the server to panic.
func TestNoRaceEncodeConsumerStateBug(t *testing.T) {
for i := 0; i < 200_000; i++ {
// Pretend we redelivered and updated the timestamp to reflect the new start time for expiration.
// The bug will trip when time.Now() rounded to seconds in encode is 1 second below the truncated version
// of pending.
pending := Pending{Sequence: 1, Timestamp: time.Now().Add(time.Second).UnixNano()}
state := ConsumerState{
Delivered: SequencePair{Consumer: 1, Stream: 1},
Pending: map[uint64]*Pending{1: &pending},
}
buf := encodeConsumerState(&state)
_, err := decodeConsumerState(buf)
// Performance impact on large number of consumers but sparse delivery.
func TestJetStreamLargeNumConsumersSparseDelivery(t *testing.T) {
skip(t)
s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()
// Client for API requests.
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"ID.*"},
})
require_NoError(t, err)
// Now add in ~10k consumers on different subjects.
for i := 3; i <= 10_000; i++ {
_, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: fmt.Sprintf("d-%d", i),
FilterSubject: fmt.Sprintf("ID.%d", i),
AckPolicy: nats.AckNonePolicy,
})
require_NoError(t, err)
}
toSend := 100_000
// Bind a consumer to ID.2.
var received int
done := make(chan bool)
nc, js = jsClientConnect(t, s)
defer nc.Close()
mh := func(m *nats.Msg) {
received++
if received >= toSend {
close(done)
}
}
_, err = js.Subscribe("ID.2", mh)
require_NoError(t, err)
last := make(chan bool)
_, err = js.Subscribe("ID.1", func(_ *nats.Msg) { close(last) })
require_NoError(t, err)
nc, _ = jsClientConnect(t, s)
defer nc.Close()
js, err = nc.JetStream(nats.PublishAsyncMaxPending(8 * 1024))
require_NoError(t, err)
start := time.Now()
for i := 0; i < toSend; i++ {
js.PublishAsync("ID.2", []byte("ok"))
}
// Check latency for this one message.
// This will show the issue better than throughput which can bypass signal processing.
js.PublishAsync("ID.1", []byte("ok"))
select {
case <-done:
break
case <-time.After(10 * time.Second):
t.Fatalf("Failed to receive all messages: %d of %d\n", received, toSend)
}
tt := time.Since(start)
fmt.Printf("Took %v to receive %d msgs\n", tt, toSend)
fmt.Printf("%.0f msgs/s\n", float64(toSend)/tt.Seconds())
select {
case <-last:
break
case <-time.After(30 * time.Second):
t.Fatalf("Failed to receive last message\n")
}
lt := time.Since(start)
fmt.Printf("Took %v to receive last msg\n", lt)
}
func TestNoRaceJetStreamEndToEndLatency(t *testing.T) {
// skip(t)
s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()
// Client for API requests.
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
require_NoError(t, err)
nc, js = jsClientConnect(t, s)
defer nc.Close()
var sent time.Time
next := make(chan struct{})
var (
total time.Duration
min time.Duration
max time.Duration
)
mh := func(m *nats.Msg) {
received := time.Now()
tt := received.Sub(sent)
if min == 0 || tt < min {
min = tt
}
if max == 0 || tt > max {
max = tt
}
total += tt
next <- struct{}{}
}
_, err = js.Subscribe("foo", mh)
require_NoError(t, err)
nc, js = jsClientConnect(t, s)
defer nc.Close()
toSend := 100_000
for i := 0; i < toSend; i++ {
sent = time.Now()
js.PublishAsync("foo", []byte("ok"))
<-next
}
fmt.Printf("AVG: %v\nMIN: %v\nMAX: %v\n", total/time.Duration(toSend), min, max)
}

View File

@@ -16,6 +16,7 @@ package server
import (
"archive/tar"
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
@@ -222,11 +223,12 @@ type stream struct {
// For republishing.
tr *transform
// For processing consumers as a list without main stream lock.
// For processing consumers without main stream lock.
clsMu sync.RWMutex
cList []*consumer
sch chan struct{}
sigq *ipQueue // of *cMsg
csl *Sublist
// TODO(dlc) - Hide everything below behind two pointers.
// Clustered mode.
@@ -4054,23 +4056,16 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Signal consumers for new messages.
if numConsumers > 0 {
if numConsumers > consumerSignalThreshold {
mset.sigq.push(newCMsg(subject, seq))
select {
case mset.sch <- struct{}{}:
default:
}
} else {
mset.signalConsumers(subject, seq)
mset.sigq.push(newCMsg(subject, seq))
select {
case mset.sch <- struct{}{}:
default:
}
}
return nil
}
// Number of consumers to consider offloading signal processing.
const consumerSignalThreshold = 10
// Used to signal inbound message to registered consumers.
type cMsg struct {
seq uint64
@@ -4134,19 +4129,21 @@ func (mset *stream) signalConsumers(subj string, seq uint64) {
mset.clsMu.RLock()
defer mset.clsMu.RUnlock()
for _, o := range mset.cList {
o.mu.Lock()
if o.isLeader() && o.isFilteredMatch(subj) {
if seq > o.npcm {
o.npc++
}
if o.mset != nil {
if o.isPushMode() && o.active || o.isPullMode() && !o.waiting.isEmpty() {
o.signalNewMessages()
}
}
}
o.mu.Unlock()
if mset.csl == nil {
return
}
r := mset.csl.Match(subj)
if len(r.psubs) == 0 {
return
}
// Encode the sequence here.
var eseq [8]byte
var le = binary.LittleEndian
le.PutUint64(eseq[:], seq)
msg := eseq[:]
for _, sub := range r.psubs {
sub.icb(sub, nil, nil, subj, _EMPTY_, msg)
}
}
@@ -4408,7 +4405,7 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
obs = append(obs, o)
}
mset.clsMu.Lock()
mset.consumers, mset.cList = nil, nil
mset.consumers, mset.cList, mset.csl = nil, nil, nil
mset.clsMu.Unlock()
// Check if we are a mirror.
@@ -4589,17 +4586,6 @@ func (mset *stream) numConsumers() int {
return len(mset.consumers)
}
// Lock should be held
// Don't expect this to be called at high rates.
func (mset *stream) updateConsumerList() {
mset.clsMu.Lock()
defer mset.clsMu.Unlock()
mset.cList = make([]*consumer, 0, len(mset.consumers))
for _, o := range mset.consumers {
mset.cList = append(mset.cList, o)
}
}
// Lock should be held.
func (mset *stream) setConsumer(o *consumer) {
mset.consumers[o.name] = o
@@ -4610,7 +4596,9 @@ func (mset *stream) setConsumer(o *consumer) {
mset.directs++
}
// Now update consumers list as well
mset.updateConsumerList()
mset.clsMu.Lock()
mset.cList = append(mset.cList, o)
mset.clsMu.Unlock()
}
// Lock should be held.
@@ -4621,9 +4609,42 @@ func (mset *stream) removeConsumer(o *consumer) {
if o.cfg.Direct && mset.directs > 0 {
mset.directs--
}
delete(mset.consumers, o.name)
// Now update consumers list as well
mset.updateConsumerList()
if mset.consumers != nil {
delete(mset.consumers, o.name)
// Now update consumers list as well
mset.clsMu.Lock()
for i, ol := range mset.cList {
if ol == o {
mset.cList = append(mset.cList[:i], mset.cList[i+1:]...)
break
}
}
// Always remove from the leader sublist.
if mset.csl != nil {
mset.csl.Remove(o.signalSub())
}
mset.clsMu.Unlock()
}
}
// Set the consumer as a leader. This will update signaling sublist.
func (mset *stream) setConsumerAsLeader(o *consumer) {
mset.clsMu.Lock()
defer mset.clsMu.Unlock()
if mset.csl == nil {
mset.csl = NewSublistWithCache()
}
mset.csl.Insert(o.signalSub())
}
// Remove the consumer as a leader. This will update signaling sublist.
func (mset *stream) removeConsumerAsLeader(o *consumer) {
mset.clsMu.Lock()
defer mset.clsMu.Unlock()
if mset.csl != nil {
mset.csl.Remove(o.signalSub())
}
}
// lookupConsumer will retrieve a consumer by name.