diff --git a/server/jetstream_chaos_test.go b/server/jetstream_chaos_test.go index 2ef91f6a..b0873382 100644 --- a/server/jetstream_chaos_test.go +++ b/server/jetstream_chaos_test.go @@ -329,7 +329,8 @@ func createStreamForConsumerChaosTest(t *testing.T, c *cluster, replicas, numMes // Verify ordered delivery despite cluster-wide outages func TestJetStreamChaosConsumerOrdered(t *testing.T) { - const numMessages = 30_000 + const numMessages = 5_000 + const numBatch = 500 const maxRetries = 100 const retryDelay = 500 * time.Millisecond const fetchTimeout = 250 * time.Millisecond @@ -407,7 +408,7 @@ func TestJetStreamChaosConsumerOrdered(t *testing.T) { // Simulate application processing (and gives the monkey some time to brew chaos) time.Sleep(10 * time.Millisecond) - if i%1000 == 0 { + if i%numBatch == 0 { t.Logf("Consumed %d/%d", i, numMessages) } } @@ -416,7 +417,8 @@ func TestJetStreamChaosConsumerOrdered(t *testing.T) { // Verify ordered delivery despite cluster-wide outages func TestJetStreamChaosConsumerAsync(t *testing.T) { - const numMessages = 30_000 + const numMessages = 5_000 + const numBatch = 500 const timeout = 30 * time.Second // No (new) messages for 30s => terminate const maxRetries = 25 const retryDelay = 500 * time.Millisecond @@ -480,7 +482,7 @@ func TestJetStreamChaosConsumerAsync(t *testing.T) { timeoutTimer.Reset(1 * time.Second) } - if received.count()%1000 == 0 { + if received.count()%numBatch == 0 { t.Logf("Consumed %d/%d", received.count(), numMessages) } @@ -535,7 +537,8 @@ func TestJetStreamChaosConsumerAsync(t *testing.T) { // The consumer connection is also periodically closed, and the consumer 'resumes' on a different one func TestJetStreamChaosConsumerDurable(t *testing.T) { - const numMessages = 30_000 + const numMessages = 5_000 + const numBatch = 500 const timeout = 30 * time.Second // No (new) messages for 60s => terminate const clusterSize = 3 const replicas = 3 @@ -703,7 +706,7 @@ func TestJetStreamChaosConsumerDurable(t *testing.T) { } } - if received.count()%1000 == 0 { + if received.count()%numBatch == 0 { t.Logf("Consumed %d/%d, duplicate deliveries: %d", received.count(), numMessages, deliveryCount-received.count()) // Close connection and resume consuming on a different one resetDurableConsumer() @@ -740,7 +743,8 @@ func TestJetStreamChaosConsumerDurable(t *testing.T) { func TestJetStreamChaosConsumerPull(t *testing.T) { - const numMessages = 10_000 + const numMessages = 5_000 + const numBatch = 500 const maxRetries = 100 const retryDelay = 500 * time.Millisecond const fetchTimeout = 250 * time.Millisecond @@ -845,7 +849,7 @@ func TestJetStreamChaosConsumerPull(t *testing.T) { } } - if !isDupe && received.count()%1000 == 0 { + if !isDupe && received.count()%numBatch == 0 { t.Logf("Consumed %d/%d (duplicates: %d)", received.count(), numMessages, deliveredCount-received.count()) } } diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 24977604..76f7ebd7 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -1,4 +1,4 @@ -// Copyright 2019-2021 The NATS Authors +// Copyright 2019-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -5949,3 +5949,223 @@ func TestLeafNodeCompressionAuthTimeout(t *testing.T) { t.Fatalf("Leaf connection first connection had CID %v, now %v", leaf2, l2) } } + +func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithSeparateAccounts(t *testing.T) { + sc := createJetStreamSuperClusterWithTemplate(t, jsClusterAccountsTempl, 3, 2) + defer sc.shutdown() + + // Now create a leafnode cluster that has 2 LNs, one to each cluster but on separate accounts, ONE and TWO. + var lnTmpl = ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + + {{leaf}} + + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + + accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }} + ` + + var leafFrag = ` + leaf { + listen: 127.0.0.1:-1 + remotes [ + { urls: [ %s ] } + { urls: [ %s ] } + ] + }` + + // We want to have two leaf node connections that join to the same local account on the leafnode servers, + // but connect to different accounts in different clusters. + c1 := sc.clusters[0] // Will connect to account ONE + c2 := sc.clusters[1] // Will connect to account TWO + + genLeafTmpl := func(tmpl string) string { + t.Helper() + + var ln1, ln2 []string + for _, s := range c1.servers { + if s.ClusterName() != c1.name { + continue + } + ln := s.getOpts().LeafNode + ln1 = append(ln1, fmt.Sprintf("nats://one:p@%s:%d", ln.Host, ln.Port)) + } + + for _, s := range c2.servers { + if s.ClusterName() != c2.name { + continue + } + ln := s.getOpts().LeafNode + ln2 = append(ln2, fmt.Sprintf("nats://two:p@%s:%d", ln.Host, ln.Port)) + } + return strings.Replace(tmpl, "{{leaf}}", fmt.Sprintf(leafFrag, strings.Join(ln1, ", "), strings.Join(ln2, ", ")), 1) + } + + tmpl := strings.Replace(lnTmpl, "store_dir:", fmt.Sprintf(`domain: "%s", store_dir:`, "SA"), 1) + tmpl = genLeafTmpl(tmpl) + + ln := createJetStreamCluster(t, tmpl, "SA", "SA-", 3, 22280, false) + ln.waitOnClusterReady() + defer ln.shutdown() + + for _, s := range ln.servers { + checkLeafNodeConnectedCount(t, s, 2) + } + + // Now connect DQ subscribers to each cluster and they separate accounts, and make sure we get the right behavior, balanced between + // them when requests originate from the leaf cluster. + + // Create 5 clients for each cluster / account + var c1c, c2c []*nats.Conn + for i := 0; i < 5; i++ { + nc1, _ := jsClientConnect(t, c1.randomServer(), nats.UserInfo("one", "p")) + defer nc1.Close() + c1c = append(c1c, nc1) + nc2, _ := jsClientConnect(t, c2.randomServer(), nats.UserInfo("two", "p")) + defer nc2.Close() + c2c = append(c2c, nc2) + } + + createSubs := func(num int, conns []*nats.Conn) (subs []*nats.Subscription) { + for i := 0; i < num; i++ { + nc := conns[rand.Intn(len(conns))] + sub, err := nc.QueueSubscribeSync("REQUEST", "MC") + require_NoError(t, err) + subs = append(subs, sub) + nc.Flush() + } + // Let subs propagate. + time.Sleep(100 * time.Millisecond) + return subs + } + closeSubs := func(subs []*nats.Subscription) { + for _, sub := range subs { + sub.Unsubscribe() + } + } + + // Simple test first. + subs1 := createSubs(1, c1c) + defer closeSubs(subs1) + subs2 := createSubs(1, c2c) + defer closeSubs(subs2) + + sendRequests := func(num int) { + // Now connect to the leaf cluster and send some requests. + nc, _ := jsClientConnect(t, ln.randomServer()) + defer nc.Close() + + for i := 0; i < num; i++ { + require_NoError(t, nc.Publish("REQUEST", []byte("HELP"))) + } + nc.Flush() + } + + pending := func(subs []*nats.Subscription) (total int) { + for _, sub := range subs { + n, _, err := sub.Pending() + require_NoError(t, err) + total += n + } + return total + } + + num := 1000 + checkAllReceived := func() error { + total := pending(subs1) + pending(subs2) + if total == num { + return nil + } + return fmt.Errorf("Not all received: %d vs %d", total, num) + } + + checkBalanced := func(total, pc1, pc2 int) { + tf := float64(total) + e1 := tf * (float64(pc1) / 100.00) + e2 := tf * (float64(pc2) / 100.00) + delta := tf / 10 + p1 := float64(pending(subs1)) + if p1 < e1-delta || p1 > e1+delta { + t.Fatalf("Value out of range for subs1, expected %v got %v", e1, p1) + } + p2 := float64(pending(subs2)) + if p2 < e2-delta || p2 > e2+delta { + t.Fatalf("Value out of range for subs2, expected %v got %v", e2, p2) + } + } + + // Now connect to the leaf cluster and send some requests. + + // Simple 50/50 + sendRequests(num) + checkFor(t, time.Second, 200*time.Millisecond, checkAllReceived) + checkBalanced(num, 50, 50) + + closeSubs(subs1) + closeSubs(subs2) + + // Now test unbalanced. 10/90 + subs1 = createSubs(1, c1c) + defer closeSubs(subs1) + subs2 = createSubs(9, c2c) + defer closeSubs(subs2) + + sendRequests(num) + checkFor(t, time.Second, 200*time.Millisecond, checkAllReceived) + checkBalanced(num, 10, 90) + + // Now test draining the subs as we are sending from an initial balanced situation simulating a draining of a cluster. + + closeSubs(subs1) + closeSubs(subs2) + subs1, subs2 = nil, nil + + // These subs slightly different. + var r1, r2 atomic.Uint64 + for i := 0; i < 20; i++ { + nc := c1c[rand.Intn(len(c1c))] + sub, err := nc.QueueSubscribe("REQUEST", "MC", func(m *nats.Msg) { r1.Add(1) }) + require_NoError(t, err) + subs1 = append(subs1, sub) + nc.Flush() + + nc = c2c[rand.Intn(len(c2c))] + sub, err = nc.QueueSubscribe("REQUEST", "MC", func(m *nats.Msg) { r2.Add(1) }) + require_NoError(t, err) + subs2 = append(subs2, sub) + nc.Flush() + } + defer closeSubs(subs1) + defer closeSubs(subs2) + + nc, _ := jsClientConnect(t, ln.randomServer()) + defer nc.Close() + + for i, dindex := 0, 1; i < num; i++ { + require_NoError(t, nc.Publish("REQUEST", []byte("HELP"))) + // Check if we have more to simulate draining. + // Will drain within first ~100 requests using 20% rand test below. + // Will leave 1 behind. + if dindex < len(subs1)-1 && rand.Intn(6) > 4 { + sub := subs1[dindex] + dindex++ + sub.Drain() + } + } + nc.Flush() + + checkFor(t, time.Second, 200*time.Millisecond, func() error { + total := int(r1.Load() + r2.Load()) + if total == num { + return nil + } + return fmt.Errorf("Not all received: %d vs %d", total, num) + }) + require_True(t, r2.Load() > r1.Load()) +} diff --git a/server/sublist.go b/server/sublist.go index 47d45999..48375b6b 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -615,7 +615,7 @@ func (s *Sublist) reduceCacheCount() { // Helper function for auto-expanding remote qsubs. func isRemoteQSub(sub *subscription) bool { - return sub != nil && sub.queue != nil && sub.client != nil && sub.client.kind == ROUTER + return sub != nil && sub.queue != nil && sub.client != nil && (sub.client.kind == ROUTER || sub.client.kind == LEAF) } // UpdateRemoteQSub should be called when we update the weight of an existing diff --git a/test/system_services_test.go b/test/system_services_test.go index a1bad16b..a925e7ba 100644 --- a/test/system_services_test.go +++ b/test/system_services_test.go @@ -262,7 +262,7 @@ func TestSystemServiceSubscribersLeafNodesWithoutSystem(t *testing.T) { // For now we do not see all the details behind a leafnode if the leafnode is not enabled. checkDbgNumSubs(t, nc, "foo.bar.3", 2) - checkDbgNumSubs(t, nc, "foo.bar.baz QG.22", 11) + checkDbgNumSubs(t, nc, "foo.bar.baz QG.22", 12) } func runSolicitLeafServerWithSystemToURL(surl string) (*server.Server, *server.Options) {