diff --git a/server/client.go b/server/client.go index abac4b34..75353dbe 100644 --- a/server/client.go +++ b/server/client.go @@ -2997,24 +2997,47 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, selectQSub: // We will hold onto remote or lead qsubs when we are coming from // a route or a leaf node just in case we can no longer do local delivery. - var rsub *subscription + var rsub, sub *subscription + var _ql [32]*subscription - // Find a subscription that is able to deliver this message - // starting at a random index. - for startIndex, i := c.in.prand.Intn(len(qsubs)), 0; i < len(qsubs); i++ { - index := (startIndex + i) % len(qsubs) - sub := qsubs[index] + src := c.kind + // If we just came from a route we want to prefer local subs. + // So only select from local subs but remember the first rsub + // in case all else fails. + if src == ROUTER { + ql := _ql[:0] + for i := 0; i < len(qsubs); i++ { + sub = qsubs[i] + if sub.client.kind == CLIENT { + ql = append(ql, sub) + } else if rsub == nil { + rsub = sub + } + } + qsubs = ql + } + + sindex := 0 + lqs := len(qsubs) + if lqs > 1 { + sindex = c.in.prand.Int() % lqs + } + + // Find a subscription that is able to deliver this message starting at a random index. + for i := 0; i < lqs; i++ { + if sindex+i < lqs { + sub = qsubs[sindex+i] + } else { + sub = qsubs[(sindex+i)%lqs] + } if sub == nil { continue } - // Potentially sending to a remote sub across a route or leaf node. - // We may want to skip this and prefer locals depending on where we - // were sourced from. - if src, dst := c.kind, sub.client.kind; dst == ROUTER || dst == LEAF { - if src == ROUTER || ((src == LEAF || src == CLIENT) && dst == LEAF) { - // We just came from a route, so skip and prefer local subs. - // Keep our first rsub in case all else fails. + // We have taken care of preferring local subs for a message from a route above. + // Here we just care about a client or leaf and skipping a leaf and preferring locals. + if dst := sub.client.kind; dst == ROUTER || dst == LEAF { + if (src == LEAF || src == CLIENT) && dst == LEAF { if rsub == nil { rsub = sub } diff --git a/test/bench_test.go b/test/bench_test.go index 02bbea9f..efbe0773 100644 --- a/test/bench_test.go +++ b/test/bench_test.go @@ -321,45 +321,6 @@ func Benchmark___PubSubAccsImport(b *testing.B) { b.StopTimer() } -func Benchmark_PubSub512kTwoConns(b *testing.B) { - b.StopTimer() - s := runBenchServer() - c := createClientConn(b, "127.0.0.1", PERF_PORT) - doDefaultConnect(b, c) - bw := bufio.NewWriterSize(c, defaultSendBufSize) - - c2 := createClientConn(b, "127.0.0.1", PERF_PORT) - doDefaultConnect(b, c2) - sendProto(b, c2, "SUB foo 1\r\n") - flushConnection(b, c2) - - sz := 1024 * 512 - payload := sizedString(sz) - - sendOp := []byte(fmt.Sprintf("PUB foo %d\r\n%s\r\n", sz, payload)) - ch := make(chan bool) - - expected := len(fmt.Sprintf("MSG foo 1 %d\r\n%s\r\n", sz, payload)) * b.N - go drainConnection(b, c2, ch, expected) - - b.StartTimer() - for i := 0; i < b.N; i++ { - bw.Write(sendOp) - } - err := bw.Flush() - if err != nil { - b.Errorf("Received error on FLUSH write: %v\n", err) - } - - // Wait for connection to be drained - <-ch - - b.StopTimer() - c.Close() - c2.Close() - s.Shutdown() -} - func Benchmark_____PubTwoQueueSub(b *testing.B) { b.StopTimer() s := runBenchServer() @@ -467,6 +428,45 @@ func Benchmark___PubEightQueueSub(b *testing.B) { s.Shutdown() } +func Benchmark_PubSub512kTwoConns(b *testing.B) { + b.StopTimer() + s := runBenchServer() + c := createClientConn(b, "127.0.0.1", PERF_PORT) + doDefaultConnect(b, c) + bw := bufio.NewWriterSize(c, defaultSendBufSize) + + c2 := createClientConn(b, "127.0.0.1", PERF_PORT) + doDefaultConnect(b, c2) + sendProto(b, c2, "SUB foo 1\r\n") + flushConnection(b, c2) + + sz := 1024 * 512 + payload := sizedString(sz) + + sendOp := []byte(fmt.Sprintf("PUB foo %d\r\n%s\r\n", sz, payload)) + ch := make(chan bool) + + expected := len(fmt.Sprintf("MSG foo 1 %d\r\n%s\r\n", sz, payload)) * b.N + go drainConnection(b, c2, ch, expected) + + b.StartTimer() + for i := 0; i < b.N; i++ { + bw.Write(sendOp) + } + err := bw.Flush() + if err != nil { + b.Errorf("Received error on FLUSH write: %v\n", err) + } + + // Wait for connection to be drained + <-ch + + b.StopTimer() + c.Close() + c2.Close() + s.Shutdown() +} + func Benchmark__DenyMsgNoWCPubSub(b *testing.B) { s, opts := RunServerWithConfig("./configs/authorization.conf") opts.DisableShortFirstPing = true diff --git a/test/client_cluster_test.go b/test/client_cluster_test.go index 5dc06636..1a5a1048 100644 --- a/test/client_cluster_test.go +++ b/test/client_cluster_test.go @@ -384,3 +384,89 @@ func TestRequestsAcrossRoutesToQueues(t *testing.T) { } } } + +// This is in response to Issue #1144 +// https://github.com/nats-io/nats-server/issues/1144 +func TestQueueDistributionAcrossRoutes(t *testing.T) { + srvA, srvB, _, _ := runServers(t) + defer srvA.Shutdown() + defer srvB.Shutdown() + + checkClusterFormed(t, srvA, srvB) + + urlA := srvA.ClientURL() + urlB := srvB.ClientURL() + + nc1, err := nats.Connect(urlA) + if err != nil { + t.Fatalf("Failed to create connection for nc1: %v\n", err) + } + defer nc1.Close() + + nc2, err := nats.Connect(urlB) + if err != nil { + t.Fatalf("Failed to create connection for nc2: %v\n", err) + } + defer nc2.Close() + + var qsubs []*nats.Subscription + + // Connect queue subscriptions as mentioned in the issue. 2(A) - 6(B) - 4(A) + for i := 0; i < 2; i++ { + sub, _ := nc1.QueueSubscribeSync("foo", "bar") + qsubs = append(qsubs, sub) + } + nc1.Flush() + for i := 0; i < 6; i++ { + sub, _ := nc2.QueueSubscribeSync("foo", "bar") + qsubs = append(qsubs, sub) + } + nc2.Flush() + for i := 0; i < 4; i++ { + sub, _ := nc1.QueueSubscribeSync("foo", "bar") + qsubs = append(qsubs, sub) + } + nc1.Flush() + + if err := checkExpectedSubs(7, srvA, srvB); err != nil { + t.Fatalf("%v", err) + } + + send := 600 + for i := 0; i < send; i++ { + nc2.Publish("foo", nil) + } + nc2.Flush() + + tp := func() int { + var total int + for i := 0; i < len(qsubs); i++ { + pending, _, _ := qsubs[i].Pending() + total += pending + } + return total + } + + checkFor(t, time.Second, 10*time.Millisecond, func() error { + if total := tp(); total != send { + return fmt.Errorf("Number of total received %d", total) + } + return nil + }) + + // The bug is essentially that when we deliver across a route, we + // prefer locals, but if we randomize to a block of bounce backs, then + // we walk to the end and find the same local for all the remote options. + // So what you will see in this case is a large value at #9 (2+6, next one local). + + avg := send / len(qsubs) + for i := 0; i < len(qsubs); i++ { + total, _, _ := qsubs[i].Pending() + if total > avg+(avg*3/10) { + if i == 8 { + t.Fatalf("Qsub in 8th position gets majority of the messages (prior 6 spots) in this test") + } + t.Fatalf("Received too high, %d vs %d", total, avg) + } + } +}