mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Merge branch 'main' into dev
This commit is contained in:
@@ -329,7 +329,8 @@ func createStreamForConsumerChaosTest(t *testing.T, c *cluster, replicas, numMes
|
||||
// Verify ordered delivery despite cluster-wide outages
|
||||
func TestJetStreamChaosConsumerOrdered(t *testing.T) {
|
||||
|
||||
const numMessages = 30_000
|
||||
const numMessages = 5_000
|
||||
const numBatch = 500
|
||||
const maxRetries = 100
|
||||
const retryDelay = 500 * time.Millisecond
|
||||
const fetchTimeout = 250 * time.Millisecond
|
||||
@@ -407,7 +408,7 @@ func TestJetStreamChaosConsumerOrdered(t *testing.T) {
|
||||
// Simulate application processing (and gives the monkey some time to brew chaos)
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
if i%1000 == 0 {
|
||||
if i%numBatch == 0 {
|
||||
t.Logf("Consumed %d/%d", i, numMessages)
|
||||
}
|
||||
}
|
||||
@@ -416,7 +417,8 @@ func TestJetStreamChaosConsumerOrdered(t *testing.T) {
|
||||
// Verify ordered delivery despite cluster-wide outages
|
||||
func TestJetStreamChaosConsumerAsync(t *testing.T) {
|
||||
|
||||
const numMessages = 30_000
|
||||
const numMessages = 5_000
|
||||
const numBatch = 500
|
||||
const timeout = 30 * time.Second // No (new) messages for 30s => terminate
|
||||
const maxRetries = 25
|
||||
const retryDelay = 500 * time.Millisecond
|
||||
@@ -480,7 +482,7 @@ func TestJetStreamChaosConsumerAsync(t *testing.T) {
|
||||
timeoutTimer.Reset(1 * time.Second)
|
||||
}
|
||||
|
||||
if received.count()%1000 == 0 {
|
||||
if received.count()%numBatch == 0 {
|
||||
t.Logf("Consumed %d/%d", received.count(), numMessages)
|
||||
}
|
||||
|
||||
@@ -535,7 +537,8 @@ func TestJetStreamChaosConsumerAsync(t *testing.T) {
|
||||
// The consumer connection is also periodically closed, and the consumer 'resumes' on a different one
|
||||
func TestJetStreamChaosConsumerDurable(t *testing.T) {
|
||||
|
||||
const numMessages = 30_000
|
||||
const numMessages = 5_000
|
||||
const numBatch = 500
|
||||
const timeout = 30 * time.Second // No (new) messages for 60s => terminate
|
||||
const clusterSize = 3
|
||||
const replicas = 3
|
||||
@@ -703,7 +706,7 @@ func TestJetStreamChaosConsumerDurable(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
if received.count()%1000 == 0 {
|
||||
if received.count()%numBatch == 0 {
|
||||
t.Logf("Consumed %d/%d, duplicate deliveries: %d", received.count(), numMessages, deliveryCount-received.count())
|
||||
// Close connection and resume consuming on a different one
|
||||
resetDurableConsumer()
|
||||
@@ -740,7 +743,8 @@ func TestJetStreamChaosConsumerDurable(t *testing.T) {
|
||||
|
||||
func TestJetStreamChaosConsumerPull(t *testing.T) {
|
||||
|
||||
const numMessages = 10_000
|
||||
const numMessages = 5_000
|
||||
const numBatch = 500
|
||||
const maxRetries = 100
|
||||
const retryDelay = 500 * time.Millisecond
|
||||
const fetchTimeout = 250 * time.Millisecond
|
||||
@@ -845,7 +849,7 @@ func TestJetStreamChaosConsumerPull(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
if !isDupe && received.count()%1000 == 0 {
|
||||
if !isDupe && received.count()%numBatch == 0 {
|
||||
t.Logf("Consumed %d/%d (duplicates: %d)", received.count(), numMessages, deliveredCount-received.count())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2019-2021 The NATS Authors
|
||||
// Copyright 2019-2023 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -5949,3 +5949,223 @@ func TestLeafNodeCompressionAuthTimeout(t *testing.T) {
|
||||
t.Fatalf("Leaf connection first connection had CID %v, now %v", leaf2, l2)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithSeparateAccounts(t *testing.T) {
|
||||
sc := createJetStreamSuperClusterWithTemplate(t, jsClusterAccountsTempl, 3, 2)
|
||||
defer sc.shutdown()
|
||||
|
||||
// Now create a leafnode cluster that has 2 LNs, one to each cluster but on separate accounts, ONE and TWO.
|
||||
var lnTmpl = `
|
||||
listen: 127.0.0.1:-1
|
||||
server_name: %s
|
||||
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
|
||||
|
||||
{{leaf}}
|
||||
|
||||
cluster {
|
||||
name: %s
|
||||
listen: 127.0.0.1:%d
|
||||
routes = [%s]
|
||||
}
|
||||
|
||||
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }}
|
||||
`
|
||||
|
||||
var leafFrag = `
|
||||
leaf {
|
||||
listen: 127.0.0.1:-1
|
||||
remotes [
|
||||
{ urls: [ %s ] }
|
||||
{ urls: [ %s ] }
|
||||
]
|
||||
}`
|
||||
|
||||
// We want to have two leaf node connections that join to the same local account on the leafnode servers,
|
||||
// but connect to different accounts in different clusters.
|
||||
c1 := sc.clusters[0] // Will connect to account ONE
|
||||
c2 := sc.clusters[1] // Will connect to account TWO
|
||||
|
||||
genLeafTmpl := func(tmpl string) string {
|
||||
t.Helper()
|
||||
|
||||
var ln1, ln2 []string
|
||||
for _, s := range c1.servers {
|
||||
if s.ClusterName() != c1.name {
|
||||
continue
|
||||
}
|
||||
ln := s.getOpts().LeafNode
|
||||
ln1 = append(ln1, fmt.Sprintf("nats://one:p@%s:%d", ln.Host, ln.Port))
|
||||
}
|
||||
|
||||
for _, s := range c2.servers {
|
||||
if s.ClusterName() != c2.name {
|
||||
continue
|
||||
}
|
||||
ln := s.getOpts().LeafNode
|
||||
ln2 = append(ln2, fmt.Sprintf("nats://two:p@%s:%d", ln.Host, ln.Port))
|
||||
}
|
||||
return strings.Replace(tmpl, "{{leaf}}", fmt.Sprintf(leafFrag, strings.Join(ln1, ", "), strings.Join(ln2, ", ")), 1)
|
||||
}
|
||||
|
||||
tmpl := strings.Replace(lnTmpl, "store_dir:", fmt.Sprintf(`domain: "%s", store_dir:`, "SA"), 1)
|
||||
tmpl = genLeafTmpl(tmpl)
|
||||
|
||||
ln := createJetStreamCluster(t, tmpl, "SA", "SA-", 3, 22280, false)
|
||||
ln.waitOnClusterReady()
|
||||
defer ln.shutdown()
|
||||
|
||||
for _, s := range ln.servers {
|
||||
checkLeafNodeConnectedCount(t, s, 2)
|
||||
}
|
||||
|
||||
// Now connect DQ subscribers to each cluster and they separate accounts, and make sure we get the right behavior, balanced between
|
||||
// them when requests originate from the leaf cluster.
|
||||
|
||||
// Create 5 clients for each cluster / account
|
||||
var c1c, c2c []*nats.Conn
|
||||
for i := 0; i < 5; i++ {
|
||||
nc1, _ := jsClientConnect(t, c1.randomServer(), nats.UserInfo("one", "p"))
|
||||
defer nc1.Close()
|
||||
c1c = append(c1c, nc1)
|
||||
nc2, _ := jsClientConnect(t, c2.randomServer(), nats.UserInfo("two", "p"))
|
||||
defer nc2.Close()
|
||||
c2c = append(c2c, nc2)
|
||||
}
|
||||
|
||||
createSubs := func(num int, conns []*nats.Conn) (subs []*nats.Subscription) {
|
||||
for i := 0; i < num; i++ {
|
||||
nc := conns[rand.Intn(len(conns))]
|
||||
sub, err := nc.QueueSubscribeSync("REQUEST", "MC")
|
||||
require_NoError(t, err)
|
||||
subs = append(subs, sub)
|
||||
nc.Flush()
|
||||
}
|
||||
// Let subs propagate.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
return subs
|
||||
}
|
||||
closeSubs := func(subs []*nats.Subscription) {
|
||||
for _, sub := range subs {
|
||||
sub.Unsubscribe()
|
||||
}
|
||||
}
|
||||
|
||||
// Simple test first.
|
||||
subs1 := createSubs(1, c1c)
|
||||
defer closeSubs(subs1)
|
||||
subs2 := createSubs(1, c2c)
|
||||
defer closeSubs(subs2)
|
||||
|
||||
sendRequests := func(num int) {
|
||||
// Now connect to the leaf cluster and send some requests.
|
||||
nc, _ := jsClientConnect(t, ln.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
for i := 0; i < num; i++ {
|
||||
require_NoError(t, nc.Publish("REQUEST", []byte("HELP")))
|
||||
}
|
||||
nc.Flush()
|
||||
}
|
||||
|
||||
pending := func(subs []*nats.Subscription) (total int) {
|
||||
for _, sub := range subs {
|
||||
n, _, err := sub.Pending()
|
||||
require_NoError(t, err)
|
||||
total += n
|
||||
}
|
||||
return total
|
||||
}
|
||||
|
||||
num := 1000
|
||||
checkAllReceived := func() error {
|
||||
total := pending(subs1) + pending(subs2)
|
||||
if total == num {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("Not all received: %d vs %d", total, num)
|
||||
}
|
||||
|
||||
checkBalanced := func(total, pc1, pc2 int) {
|
||||
tf := float64(total)
|
||||
e1 := tf * (float64(pc1) / 100.00)
|
||||
e2 := tf * (float64(pc2) / 100.00)
|
||||
delta := tf / 10
|
||||
p1 := float64(pending(subs1))
|
||||
if p1 < e1-delta || p1 > e1+delta {
|
||||
t.Fatalf("Value out of range for subs1, expected %v got %v", e1, p1)
|
||||
}
|
||||
p2 := float64(pending(subs2))
|
||||
if p2 < e2-delta || p2 > e2+delta {
|
||||
t.Fatalf("Value out of range for subs2, expected %v got %v", e2, p2)
|
||||
}
|
||||
}
|
||||
|
||||
// Now connect to the leaf cluster and send some requests.
|
||||
|
||||
// Simple 50/50
|
||||
sendRequests(num)
|
||||
checkFor(t, time.Second, 200*time.Millisecond, checkAllReceived)
|
||||
checkBalanced(num, 50, 50)
|
||||
|
||||
closeSubs(subs1)
|
||||
closeSubs(subs2)
|
||||
|
||||
// Now test unbalanced. 10/90
|
||||
subs1 = createSubs(1, c1c)
|
||||
defer closeSubs(subs1)
|
||||
subs2 = createSubs(9, c2c)
|
||||
defer closeSubs(subs2)
|
||||
|
||||
sendRequests(num)
|
||||
checkFor(t, time.Second, 200*time.Millisecond, checkAllReceived)
|
||||
checkBalanced(num, 10, 90)
|
||||
|
||||
// Now test draining the subs as we are sending from an initial balanced situation simulating a draining of a cluster.
|
||||
|
||||
closeSubs(subs1)
|
||||
closeSubs(subs2)
|
||||
subs1, subs2 = nil, nil
|
||||
|
||||
// These subs slightly different.
|
||||
var r1, r2 atomic.Uint64
|
||||
for i := 0; i < 20; i++ {
|
||||
nc := c1c[rand.Intn(len(c1c))]
|
||||
sub, err := nc.QueueSubscribe("REQUEST", "MC", func(m *nats.Msg) { r1.Add(1) })
|
||||
require_NoError(t, err)
|
||||
subs1 = append(subs1, sub)
|
||||
nc.Flush()
|
||||
|
||||
nc = c2c[rand.Intn(len(c2c))]
|
||||
sub, err = nc.QueueSubscribe("REQUEST", "MC", func(m *nats.Msg) { r2.Add(1) })
|
||||
require_NoError(t, err)
|
||||
subs2 = append(subs2, sub)
|
||||
nc.Flush()
|
||||
}
|
||||
defer closeSubs(subs1)
|
||||
defer closeSubs(subs2)
|
||||
|
||||
nc, _ := jsClientConnect(t, ln.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
for i, dindex := 0, 1; i < num; i++ {
|
||||
require_NoError(t, nc.Publish("REQUEST", []byte("HELP")))
|
||||
// Check if we have more to simulate draining.
|
||||
// Will drain within first ~100 requests using 20% rand test below.
|
||||
// Will leave 1 behind.
|
||||
if dindex < len(subs1)-1 && rand.Intn(6) > 4 {
|
||||
sub := subs1[dindex]
|
||||
dindex++
|
||||
sub.Drain()
|
||||
}
|
||||
}
|
||||
nc.Flush()
|
||||
|
||||
checkFor(t, time.Second, 200*time.Millisecond, func() error {
|
||||
total := int(r1.Load() + r2.Load())
|
||||
if total == num {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("Not all received: %d vs %d", total, num)
|
||||
})
|
||||
require_True(t, r2.Load() > r1.Load())
|
||||
}
|
||||
|
||||
@@ -615,7 +615,7 @@ func (s *Sublist) reduceCacheCount() {
|
||||
|
||||
// Helper function for auto-expanding remote qsubs.
|
||||
func isRemoteQSub(sub *subscription) bool {
|
||||
return sub != nil && sub.queue != nil && sub.client != nil && sub.client.kind == ROUTER
|
||||
return sub != nil && sub.queue != nil && sub.client != nil && (sub.client.kind == ROUTER || sub.client.kind == LEAF)
|
||||
}
|
||||
|
||||
// UpdateRemoteQSub should be called when we update the weight of an existing
|
||||
|
||||
@@ -262,7 +262,7 @@ func TestSystemServiceSubscribersLeafNodesWithoutSystem(t *testing.T) {
|
||||
// For now we do not see all the details behind a leafnode if the leafnode is not enabled.
|
||||
checkDbgNumSubs(t, nc, "foo.bar.3", 2)
|
||||
|
||||
checkDbgNumSubs(t, nc, "foo.bar.baz QG.22", 11)
|
||||
checkDbgNumSubs(t, nc, "foo.bar.baz QG.22", 12)
|
||||
}
|
||||
|
||||
func runSolicitLeafServerWithSystemToURL(surl string) (*server.Server, *server.Options) {
|
||||
|
||||
Reference in New Issue
Block a user