mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Fixed #1144, qsub performance improvements
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -2997,24 +2997,47 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
|
||||
selectQSub:
|
||||
// We will hold onto remote or lead qsubs when we are coming from
|
||||
// a route or a leaf node just in case we can no longer do local delivery.
|
||||
var rsub *subscription
|
||||
var rsub, sub *subscription
|
||||
var _ql [32]*subscription
|
||||
|
||||
// Find a subscription that is able to deliver this message
|
||||
// starting at a random index.
|
||||
for startIndex, i := c.in.prand.Intn(len(qsubs)), 0; i < len(qsubs); i++ {
|
||||
index := (startIndex + i) % len(qsubs)
|
||||
sub := qsubs[index]
|
||||
src := c.kind
|
||||
// If we just came from a route we want to prefer local subs.
|
||||
// So only select from local subs but remember the first rsub
|
||||
// in case all else fails.
|
||||
if src == ROUTER {
|
||||
ql := _ql[:0]
|
||||
for i := 0; i < len(qsubs); i++ {
|
||||
sub = qsubs[i]
|
||||
if sub.client.kind == CLIENT {
|
||||
ql = append(ql, sub)
|
||||
} else if rsub == nil {
|
||||
rsub = sub
|
||||
}
|
||||
}
|
||||
qsubs = ql
|
||||
}
|
||||
|
||||
sindex := 0
|
||||
lqs := len(qsubs)
|
||||
if lqs > 1 {
|
||||
sindex = c.in.prand.Int() % lqs
|
||||
}
|
||||
|
||||
// Find a subscription that is able to deliver this message starting at a random index.
|
||||
for i := 0; i < lqs; i++ {
|
||||
if sindex+i < lqs {
|
||||
sub = qsubs[sindex+i]
|
||||
} else {
|
||||
sub = qsubs[(sindex+i)%lqs]
|
||||
}
|
||||
if sub == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Potentially sending to a remote sub across a route or leaf node.
|
||||
// We may want to skip this and prefer locals depending on where we
|
||||
// were sourced from.
|
||||
if src, dst := c.kind, sub.client.kind; dst == ROUTER || dst == LEAF {
|
||||
if src == ROUTER || ((src == LEAF || src == CLIENT) && dst == LEAF) {
|
||||
// We just came from a route, so skip and prefer local subs.
|
||||
// Keep our first rsub in case all else fails.
|
||||
// We have taken care of preferring local subs for a message from a route above.
|
||||
// Here we just care about a client or leaf and skipping a leaf and preferring locals.
|
||||
if dst := sub.client.kind; dst == ROUTER || dst == LEAF {
|
||||
if (src == LEAF || src == CLIENT) && dst == LEAF {
|
||||
if rsub == nil {
|
||||
rsub = sub
|
||||
}
|
||||
|
||||
@@ -321,45 +321,6 @@ func Benchmark___PubSubAccsImport(b *testing.B) {
|
||||
b.StopTimer()
|
||||
}
|
||||
|
||||
func Benchmark_PubSub512kTwoConns(b *testing.B) {
|
||||
b.StopTimer()
|
||||
s := runBenchServer()
|
||||
c := createClientConn(b, "127.0.0.1", PERF_PORT)
|
||||
doDefaultConnect(b, c)
|
||||
bw := bufio.NewWriterSize(c, defaultSendBufSize)
|
||||
|
||||
c2 := createClientConn(b, "127.0.0.1", PERF_PORT)
|
||||
doDefaultConnect(b, c2)
|
||||
sendProto(b, c2, "SUB foo 1\r\n")
|
||||
flushConnection(b, c2)
|
||||
|
||||
sz := 1024 * 512
|
||||
payload := sizedString(sz)
|
||||
|
||||
sendOp := []byte(fmt.Sprintf("PUB foo %d\r\n%s\r\n", sz, payload))
|
||||
ch := make(chan bool)
|
||||
|
||||
expected := len(fmt.Sprintf("MSG foo 1 %d\r\n%s\r\n", sz, payload)) * b.N
|
||||
go drainConnection(b, c2, ch, expected)
|
||||
|
||||
b.StartTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
bw.Write(sendOp)
|
||||
}
|
||||
err := bw.Flush()
|
||||
if err != nil {
|
||||
b.Errorf("Received error on FLUSH write: %v\n", err)
|
||||
}
|
||||
|
||||
// Wait for connection to be drained
|
||||
<-ch
|
||||
|
||||
b.StopTimer()
|
||||
c.Close()
|
||||
c2.Close()
|
||||
s.Shutdown()
|
||||
}
|
||||
|
||||
func Benchmark_____PubTwoQueueSub(b *testing.B) {
|
||||
b.StopTimer()
|
||||
s := runBenchServer()
|
||||
@@ -467,6 +428,45 @@ func Benchmark___PubEightQueueSub(b *testing.B) {
|
||||
s.Shutdown()
|
||||
}
|
||||
|
||||
func Benchmark_PubSub512kTwoConns(b *testing.B) {
|
||||
b.StopTimer()
|
||||
s := runBenchServer()
|
||||
c := createClientConn(b, "127.0.0.1", PERF_PORT)
|
||||
doDefaultConnect(b, c)
|
||||
bw := bufio.NewWriterSize(c, defaultSendBufSize)
|
||||
|
||||
c2 := createClientConn(b, "127.0.0.1", PERF_PORT)
|
||||
doDefaultConnect(b, c2)
|
||||
sendProto(b, c2, "SUB foo 1\r\n")
|
||||
flushConnection(b, c2)
|
||||
|
||||
sz := 1024 * 512
|
||||
payload := sizedString(sz)
|
||||
|
||||
sendOp := []byte(fmt.Sprintf("PUB foo %d\r\n%s\r\n", sz, payload))
|
||||
ch := make(chan bool)
|
||||
|
||||
expected := len(fmt.Sprintf("MSG foo 1 %d\r\n%s\r\n", sz, payload)) * b.N
|
||||
go drainConnection(b, c2, ch, expected)
|
||||
|
||||
b.StartTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
bw.Write(sendOp)
|
||||
}
|
||||
err := bw.Flush()
|
||||
if err != nil {
|
||||
b.Errorf("Received error on FLUSH write: %v\n", err)
|
||||
}
|
||||
|
||||
// Wait for connection to be drained
|
||||
<-ch
|
||||
|
||||
b.StopTimer()
|
||||
c.Close()
|
||||
c2.Close()
|
||||
s.Shutdown()
|
||||
}
|
||||
|
||||
func Benchmark__DenyMsgNoWCPubSub(b *testing.B) {
|
||||
s, opts := RunServerWithConfig("./configs/authorization.conf")
|
||||
opts.DisableShortFirstPing = true
|
||||
|
||||
@@ -384,3 +384,89 @@ func TestRequestsAcrossRoutesToQueues(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This is in response to Issue #1144
|
||||
// https://github.com/nats-io/nats-server/issues/1144
|
||||
func TestQueueDistributionAcrossRoutes(t *testing.T) {
|
||||
srvA, srvB, _, _ := runServers(t)
|
||||
defer srvA.Shutdown()
|
||||
defer srvB.Shutdown()
|
||||
|
||||
checkClusterFormed(t, srvA, srvB)
|
||||
|
||||
urlA := srvA.ClientURL()
|
||||
urlB := srvB.ClientURL()
|
||||
|
||||
nc1, err := nats.Connect(urlA)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create connection for nc1: %v\n", err)
|
||||
}
|
||||
defer nc1.Close()
|
||||
|
||||
nc2, err := nats.Connect(urlB)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create connection for nc2: %v\n", err)
|
||||
}
|
||||
defer nc2.Close()
|
||||
|
||||
var qsubs []*nats.Subscription
|
||||
|
||||
// Connect queue subscriptions as mentioned in the issue. 2(A) - 6(B) - 4(A)
|
||||
for i := 0; i < 2; i++ {
|
||||
sub, _ := nc1.QueueSubscribeSync("foo", "bar")
|
||||
qsubs = append(qsubs, sub)
|
||||
}
|
||||
nc1.Flush()
|
||||
for i := 0; i < 6; i++ {
|
||||
sub, _ := nc2.QueueSubscribeSync("foo", "bar")
|
||||
qsubs = append(qsubs, sub)
|
||||
}
|
||||
nc2.Flush()
|
||||
for i := 0; i < 4; i++ {
|
||||
sub, _ := nc1.QueueSubscribeSync("foo", "bar")
|
||||
qsubs = append(qsubs, sub)
|
||||
}
|
||||
nc1.Flush()
|
||||
|
||||
if err := checkExpectedSubs(7, srvA, srvB); err != nil {
|
||||
t.Fatalf("%v", err)
|
||||
}
|
||||
|
||||
send := 600
|
||||
for i := 0; i < send; i++ {
|
||||
nc2.Publish("foo", nil)
|
||||
}
|
||||
nc2.Flush()
|
||||
|
||||
tp := func() int {
|
||||
var total int
|
||||
for i := 0; i < len(qsubs); i++ {
|
||||
pending, _, _ := qsubs[i].Pending()
|
||||
total += pending
|
||||
}
|
||||
return total
|
||||
}
|
||||
|
||||
checkFor(t, time.Second, 10*time.Millisecond, func() error {
|
||||
if total := tp(); total != send {
|
||||
return fmt.Errorf("Number of total received %d", total)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// The bug is essentially that when we deliver across a route, we
|
||||
// prefer locals, but if we randomize to a block of bounce backs, then
|
||||
// we walk to the end and find the same local for all the remote options.
|
||||
// So what you will see in this case is a large value at #9 (2+6, next one local).
|
||||
|
||||
avg := send / len(qsubs)
|
||||
for i := 0; i < len(qsubs); i++ {
|
||||
total, _, _ := qsubs[i].Pending()
|
||||
if total > avg+(avg*3/10) {
|
||||
if i == 8 {
|
||||
t.Fatalf("Qsub in 8th position gets majority of the messages (prior 6 spots) in this test")
|
||||
}
|
||||
t.Fatalf("Received too high, %d vs %d", total, avg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user