From 7de3568f3990760fdbc23d92f317d0a22a9cb640 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Fri, 9 Jun 2023 17:07:53 +0100 Subject: [PATCH 1/4] Reduce messages in chaos tests It doesn't really appear as though, for what these tests are trying to prove, that an excessively large number of messages is required. Instead let's drop the count a little in the hope that they run a bit faster. Signed-off-by: Neil Twigg --- server/jetstream_chaos_test.go | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/server/jetstream_chaos_test.go b/server/jetstream_chaos_test.go index 2ef91f6a..b0873382 100644 --- a/server/jetstream_chaos_test.go +++ b/server/jetstream_chaos_test.go @@ -329,7 +329,8 @@ func createStreamForConsumerChaosTest(t *testing.T, c *cluster, replicas, numMes // Verify ordered delivery despite cluster-wide outages func TestJetStreamChaosConsumerOrdered(t *testing.T) { - const numMessages = 30_000 + const numMessages = 5_000 + const numBatch = 500 const maxRetries = 100 const retryDelay = 500 * time.Millisecond const fetchTimeout = 250 * time.Millisecond @@ -407,7 +408,7 @@ func TestJetStreamChaosConsumerOrdered(t *testing.T) { // Simulate application processing (and gives the monkey some time to brew chaos) time.Sleep(10 * time.Millisecond) - if i%1000 == 0 { + if i%numBatch == 0 { t.Logf("Consumed %d/%d", i, numMessages) } } @@ -416,7 +417,8 @@ func TestJetStreamChaosConsumerOrdered(t *testing.T) { // Verify ordered delivery despite cluster-wide outages func TestJetStreamChaosConsumerAsync(t *testing.T) { - const numMessages = 30_000 + const numMessages = 5_000 + const numBatch = 500 const timeout = 30 * time.Second // No (new) messages for 30s => terminate const maxRetries = 25 const retryDelay = 500 * time.Millisecond @@ -480,7 +482,7 @@ func TestJetStreamChaosConsumerAsync(t *testing.T) { timeoutTimer.Reset(1 * time.Second) } - if received.count()%1000 == 0 { + if received.count()%numBatch == 0 { t.Logf("Consumed %d/%d", received.count(), numMessages) } @@ -535,7 +537,8 @@ func TestJetStreamChaosConsumerAsync(t *testing.T) { // The consumer connection is also periodically closed, and the consumer 'resumes' on a different one func TestJetStreamChaosConsumerDurable(t *testing.T) { - const numMessages = 30_000 + const numMessages = 5_000 + const numBatch = 500 const timeout = 30 * time.Second // No (new) messages for 60s => terminate const clusterSize = 3 const replicas = 3 @@ -703,7 +706,7 @@ func TestJetStreamChaosConsumerDurable(t *testing.T) { } } - if received.count()%1000 == 0 { + if received.count()%numBatch == 0 { t.Logf("Consumed %d/%d, duplicate deliveries: %d", received.count(), numMessages, deliveryCount-received.count()) // Close connection and resume consuming on a different one resetDurableConsumer() @@ -740,7 +743,8 @@ func TestJetStreamChaosConsumerDurable(t *testing.T) { func TestJetStreamChaosConsumerPull(t *testing.T) { - const numMessages = 10_000 + const numMessages = 5_000 + const numBatch = 500 const maxRetries = 100 const retryDelay = 500 * time.Millisecond const fetchTimeout = 250 * time.Millisecond @@ -845,7 +849,7 @@ func TestJetStreamChaosConsumerPull(t *testing.T) { } } - if !isDupe && received.count()%1000 == 0 { + if !isDupe && received.count()%numBatch == 0 { t.Logf("Consumed %d/%d (duplicates: %d)", received.count(), numMessages, deliveredCount-received.count()) } } From 81154c40f5fa63041a160990c393c56da63a1081 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 9 Jun 2023 09:29:48 -0700 Subject: [PATCH 2/4] Bump to 2.9.18-beta.2 Signed-off-by: Derek Collison --- server/const.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/const.go b/server/const.go index cfcb78ac..8d56df08 100644 --- a/server/const.go +++ b/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.9.18-beta.1" + VERSION = "2.9.18-beta.2" // PROTO is the currently supported protocol. // 0 was the original From ce2dcd339429911c43fe75be2e108524e9771bbb Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 9 Jun 2023 14:43:59 -0700 Subject: [PATCH 3/4] Fix for properly distributed queue requests over multiple leafnode connections. When a leafnode server joins two accounts in a supercluster, we want to make sure that each connection properly takes into account the weighted number of subscribers in each account. Signed-off-by: Derek Collison --- server/leafnode_test.go | 220 ++++++++++++++++++++++++++++++++++++++++ server/sublist.go | 2 +- 2 files changed, 221 insertions(+), 1 deletion(-) diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 575bfc5f..f49fbc62 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -4846,3 +4846,223 @@ func TestLeafNodeDuplicateMsg(t *testing.T) { t.Run("sub_b2_pub_a1", func(t *testing.T) { check(t, b2, a1) }) t.Run("sub_b2_pub_a2", func(t *testing.T) { check(t, b2, a2) }) } + +func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithSeparateAccounts(t *testing.T) { + sc := createJetStreamSuperClusterWithTemplate(t, jsClusterAccountsTempl, 3, 2) + defer sc.shutdown() + + // Now create a leafnode cluster that has 2 LNs, one to each cluster but on separate accounts, ONE and TWO. + var lnTmpl = ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + + {{leaf}} + + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + + accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }} + ` + + var leafFrag = ` + leaf { + listen: 127.0.0.1:-1 + remotes [ + { urls: [ %s ] } + { urls: [ %s ] } + ] + }` + + // We want to have two leaf node connections that join to the same local account on the leafnode servers, + // but connect to different accounts in different clusters. + c1 := sc.clusters[0] // Will connect to account ONE + c2 := sc.clusters[1] // Will connect to account TWO + + genLeafTmpl := func(tmpl string) string { + t.Helper() + + var ln1, ln2 []string + for _, s := range c1.servers { + if s.ClusterName() != c1.name { + continue + } + ln := s.getOpts().LeafNode + ln1 = append(ln1, fmt.Sprintf("nats://one:p@%s:%d", ln.Host, ln.Port)) + } + + for _, s := range c2.servers { + if s.ClusterName() != c2.name { + continue + } + ln := s.getOpts().LeafNode + ln2 = append(ln2, fmt.Sprintf("nats://two:p@%s:%d", ln.Host, ln.Port)) + } + return strings.Replace(tmpl, "{{leaf}}", fmt.Sprintf(leafFrag, strings.Join(ln1, ", "), strings.Join(ln2, ", ")), 1) + } + + tmpl := strings.Replace(lnTmpl, "store_dir:", fmt.Sprintf(`domain: "%s", store_dir:`, "SA"), 1) + tmpl = genLeafTmpl(tmpl) + + ln := createJetStreamCluster(t, tmpl, "SA", "SA-", 3, 22280, false) + ln.waitOnClusterReady() + defer ln.shutdown() + + for _, s := range ln.servers { + checkLeafNodeConnectedCount(t, s, 2) + } + + // Now connect DQ subscribers to each cluster and they separate accounts, and make sure we get the right behavior, balanced between + // them when requests originate from the leaf cluster. + + // Create 5 clients for each cluster / account + var c1c, c2c []*nats.Conn + for i := 0; i < 5; i++ { + nc1, _ := jsClientConnect(t, c1.randomServer(), nats.UserInfo("one", "p")) + defer nc1.Close() + c1c = append(c1c, nc1) + nc2, _ := jsClientConnect(t, c2.randomServer(), nats.UserInfo("two", "p")) + defer nc2.Close() + c2c = append(c2c, nc2) + } + + createSubs := func(num int, conns []*nats.Conn) (subs []*nats.Subscription) { + for i := 0; i < num; i++ { + nc := conns[rand.Intn(len(conns))] + sub, err := nc.QueueSubscribeSync("REQUEST", "MC") + require_NoError(t, err) + subs = append(subs, sub) + nc.Flush() + } + // Let subs propagate. + time.Sleep(100 * time.Millisecond) + return subs + } + closeSubs := func(subs []*nats.Subscription) { + for _, sub := range subs { + sub.Unsubscribe() + } + } + + // Simple test first. + subs1 := createSubs(1, c1c) + defer closeSubs(subs1) + subs2 := createSubs(1, c2c) + defer closeSubs(subs2) + + sendRequests := func(num int) { + // Now connect to the leaf cluster and send some requests. + nc, _ := jsClientConnect(t, ln.randomServer()) + defer nc.Close() + + for i := 0; i < num; i++ { + require_NoError(t, nc.Publish("REQUEST", []byte("HELP"))) + } + nc.Flush() + } + + pending := func(subs []*nats.Subscription) (total int) { + for _, sub := range subs { + n, _, err := sub.Pending() + require_NoError(t, err) + total += n + } + return total + } + + num := 1000 + checkAllReceived := func() error { + total := pending(subs1) + pending(subs2) + if total == num { + return nil + } + return fmt.Errorf("Not all received: %d vs %d", total, num) + } + + checkBalanced := func(total, pc1, pc2 int) { + tf := float64(total) + e1 := tf * (float64(pc1) / 100.00) + e2 := tf * (float64(pc2) / 100.00) + delta := tf / 10 + p1 := float64(pending(subs1)) + if p1 < e1-delta || p1 > e1+delta { + t.Fatalf("Value out of range for subs1, expected %v got %v", e1, p1) + } + p2 := float64(pending(subs2)) + if p2 < e2-delta || p2 > e2+delta { + t.Fatalf("Value out of range for subs2, expected %v got %v", e2, p2) + } + } + + // Now connect to the leaf cluster and send some requests. + + // Simple 50/50 + sendRequests(num) + checkFor(t, time.Second, 200*time.Millisecond, checkAllReceived) + checkBalanced(num, 50, 50) + + closeSubs(subs1) + closeSubs(subs2) + + // Now test unbalanced. 10/90 + subs1 = createSubs(1, c1c) + defer closeSubs(subs1) + subs2 = createSubs(9, c2c) + defer closeSubs(subs2) + + sendRequests(num) + checkFor(t, time.Second, 200*time.Millisecond, checkAllReceived) + checkBalanced(num, 10, 90) + + // Now test draining the subs as we are sending from an initial balanced situation simulating a draining of a cluster. + + closeSubs(subs1) + closeSubs(subs2) + subs1, subs2 = nil, nil + + // These subs slightly different. + var r1, r2 atomic.Uint64 + for i := 0; i < 20; i++ { + nc := c1c[rand.Intn(len(c1c))] + sub, err := nc.QueueSubscribe("REQUEST", "MC", func(m *nats.Msg) { r1.Add(1) }) + require_NoError(t, err) + subs1 = append(subs1, sub) + nc.Flush() + + nc = c2c[rand.Intn(len(c2c))] + sub, err = nc.QueueSubscribe("REQUEST", "MC", func(m *nats.Msg) { r2.Add(1) }) + require_NoError(t, err) + subs2 = append(subs2, sub) + nc.Flush() + } + defer closeSubs(subs1) + defer closeSubs(subs2) + + nc, _ := jsClientConnect(t, ln.randomServer()) + defer nc.Close() + + for i, dindex := 0, 1; i < num; i++ { + require_NoError(t, nc.Publish("REQUEST", []byte("HELP"))) + // Check if we have more to simulate draining. + // Will drain within first ~100 requests using 20% rand test below. + // Will leave 1 behind. + if dindex < len(subs1)-1 && rand.Intn(6) > 4 { + sub := subs1[dindex] + dindex++ + sub.Drain() + } + } + nc.Flush() + + checkFor(t, time.Second, 200*time.Millisecond, func() error { + total := int(r1.Load() + r2.Load()) + if total == num { + return nil + } + return fmt.Errorf("Not all received: %d vs %d", total, num) + }) + require_True(t, r2.Load() > r1.Load()) +} diff --git a/server/sublist.go b/server/sublist.go index 47d45999..48375b6b 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -615,7 +615,7 @@ func (s *Sublist) reduceCacheCount() { // Helper function for auto-expanding remote qsubs. func isRemoteQSub(sub *subscription) bool { - return sub != nil && sub.queue != nil && sub.client != nil && sub.client.kind == ROUTER + return sub != nil && sub.queue != nil && sub.client != nil && (sub.client.kind == ROUTER || sub.client.kind == LEAF) } // UpdateRemoteQSub should be called when we update the weight of an existing From 2765e534ebc434ee03c38097a001968c04871dc7 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 9 Jun 2023 15:09:15 -0700 Subject: [PATCH 4/4] Fix test and update copyright Signed-off-by: Derek Collison --- server/leafnode_test.go | 2 +- test/system_services_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/leafnode_test.go b/server/leafnode_test.go index f49fbc62..5ba223b9 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -1,4 +1,4 @@ -// Copyright 2019-2021 The NATS Authors +// Copyright 2019-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/test/system_services_test.go b/test/system_services_test.go index a1bad16b..a925e7ba 100644 --- a/test/system_services_test.go +++ b/test/system_services_test.go @@ -262,7 +262,7 @@ func TestSystemServiceSubscribersLeafNodesWithoutSystem(t *testing.T) { // For now we do not see all the details behind a leafnode if the leafnode is not enabled. checkDbgNumSubs(t, nc, "foo.bar.3", 2) - checkDbgNumSubs(t, nc, "foo.bar.baz QG.22", 11) + checkDbgNumSubs(t, nc, "foo.bar.baz QG.22", 12) } func runSolicitLeafServerWithSystemToURL(surl string) (*server.Server, *server.Options) {