mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Tests consists of a single client trying to consume a fixed number of messages in a stream. While the cluster is being bounced by a chaos monkey.
1618 lines
42 KiB
Go
1618 lines
42 KiB
Go
// Copyright 2020-2022 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
// Do not exlude this file with the !skip_js_tests since those helpers
|
|
// are also used by MQTT.
|
|
|
|
package server
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"math/rand"
|
|
"net"
|
|
"net/url"
|
|
"os"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/nats-io/nats.go"
|
|
"golang.org/x/time/rate"
|
|
)
|
|
|
|
// Support functions
|
|
|
|
func init() {
|
|
// Speed up raft for tests.
|
|
hbInterval = 50 * time.Millisecond
|
|
minElectionTimeout = 1 * time.Second
|
|
maxElectionTimeout = 3 * time.Second
|
|
lostQuorumInterval = time.Second
|
|
lostQuorumCheck = 4 * hbInterval
|
|
}
|
|
|
|
// Used to setup superclusters for tests.
|
|
type supercluster struct {
|
|
t *testing.T
|
|
clusters []*cluster
|
|
nproxies []*netProxy
|
|
}
|
|
|
|
func (sc *supercluster) shutdown() {
|
|
if sc == nil {
|
|
return
|
|
}
|
|
for _, c := range sc.clusters {
|
|
shutdownCluster(c)
|
|
}
|
|
for _, np := range sc.nproxies {
|
|
np.stop()
|
|
}
|
|
}
|
|
|
|
func (sc *supercluster) randomServer() *Server {
|
|
return sc.randomCluster().randomServer()
|
|
}
|
|
|
|
func (sc *supercluster) serverByName(sname string) *Server {
|
|
for _, c := range sc.clusters {
|
|
if s := c.serverByName(sname); s != nil {
|
|
return s
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (sc *supercluster) waitOnStreamLeader(account, stream string) {
|
|
sc.t.Helper()
|
|
expires := time.Now().Add(30 * time.Second)
|
|
for time.Now().Before(expires) {
|
|
for _, c := range sc.clusters {
|
|
if leader := c.streamLeader(account, stream); leader != nil {
|
|
time.Sleep(200 * time.Millisecond)
|
|
return
|
|
}
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
sc.t.Fatalf("Expected a stream leader for %q %q, got none", account, stream)
|
|
}
|
|
|
|
var jsClusterAccountsTempl = `
|
|
listen: 127.0.0.1:-1
|
|
|
|
server_name: %s
|
|
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
|
|
|
|
leaf {
|
|
listen: 127.0.0.1:-1
|
|
}
|
|
|
|
cluster {
|
|
name: %s
|
|
listen: 127.0.0.1:%d
|
|
routes = [%s]
|
|
}
|
|
|
|
no_auth_user: one
|
|
|
|
accounts {
|
|
ONE { users = [ { user: "one", pass: "p" } ]; jetstream: enabled }
|
|
TWO { users = [ { user: "two", pass: "p" } ]; jetstream: enabled }
|
|
NOJS { users = [ { user: "nojs", pass: "p" } ] }
|
|
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
|
|
}
|
|
`
|
|
|
|
var jsClusterTempl = `
|
|
listen: 127.0.0.1:-1
|
|
server_name: %s
|
|
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
|
|
|
|
leaf {
|
|
listen: 127.0.0.1:-1
|
|
}
|
|
|
|
cluster {
|
|
name: %s
|
|
listen: 127.0.0.1:%d
|
|
routes = [%s]
|
|
}
|
|
|
|
# For access to system account.
|
|
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
|
|
`
|
|
|
|
var jsClusterEncryptedTempl = `
|
|
listen: 127.0.0.1:-1
|
|
server_name: %s
|
|
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s', key: "s3cr3t!"}
|
|
|
|
leaf {
|
|
listen: 127.0.0.1:-1
|
|
}
|
|
|
|
cluster {
|
|
name: %s
|
|
listen: 127.0.0.1:%d
|
|
routes = [%s]
|
|
}
|
|
|
|
# For access to system account.
|
|
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
|
|
`
|
|
|
|
var jsClusterMaxBytesTempl = `
|
|
listen: 127.0.0.1:-1
|
|
server_name: %s
|
|
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
|
|
|
|
leaf {
|
|
listen: 127.0.0.1:-1
|
|
}
|
|
|
|
cluster {
|
|
name: %s
|
|
listen: 127.0.0.1:%d
|
|
routes = [%s]
|
|
}
|
|
|
|
no_auth_user: u
|
|
|
|
accounts {
|
|
$U {
|
|
users = [ { user: "u", pass: "p" } ]
|
|
jetstream: {
|
|
max_mem: 128MB
|
|
max_file: 18GB
|
|
max_bytes: true // Forces streams to indicate max_bytes.
|
|
}
|
|
}
|
|
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
|
|
}
|
|
`
|
|
|
|
var jsClusterMaxBytesAccountLimitTempl = `
|
|
listen: 127.0.0.1:-1
|
|
server_name: %s
|
|
jetstream: {max_mem_store: 256MB, max_file_store: 4GB, store_dir: '%s'}
|
|
|
|
leaf {
|
|
listen: 127.0.0.1:-1
|
|
}
|
|
|
|
cluster {
|
|
name: %s
|
|
listen: 127.0.0.1:%d
|
|
routes = [%s]
|
|
}
|
|
|
|
no_auth_user: u
|
|
|
|
accounts {
|
|
$U {
|
|
users = [ { user: "u", pass: "p" } ]
|
|
jetstream: {
|
|
max_mem: 128MB
|
|
max_file: 3GB
|
|
max_bytes: true // Forces streams to indicate max_bytes.
|
|
}
|
|
}
|
|
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
|
|
}
|
|
`
|
|
|
|
var jsSuperClusterTempl = `
|
|
%s
|
|
gateway {
|
|
name: %s
|
|
listen: 127.0.0.1:%d
|
|
gateways = [%s
|
|
]
|
|
}
|
|
|
|
system_account: "$SYS"
|
|
`
|
|
|
|
var jsClusterLimitsTempl = `
|
|
listen: 127.0.0.1:-1
|
|
server_name: %s
|
|
jetstream: {max_mem_store: 2MB, max_file_store: 8MB, store_dir: '%s'}
|
|
|
|
cluster {
|
|
name: %s
|
|
listen: 127.0.0.1:%d
|
|
routes = [%s]
|
|
}
|
|
|
|
no_auth_user: u
|
|
|
|
accounts {
|
|
ONE {
|
|
users = [ { user: "u", pass: "s3cr3t!" } ]
|
|
jetstream: enabled
|
|
}
|
|
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
|
|
}
|
|
`
|
|
|
|
var jsMixedModeGlobalAccountTempl = `
|
|
listen: 127.0.0.1:-1
|
|
server_name: %s
|
|
jetstream: {max_mem_store: 2MB, max_file_store: 8MB, store_dir: '%s'}
|
|
|
|
cluster {
|
|
name: %s
|
|
listen: 127.0.0.1:%d
|
|
routes = [%s]
|
|
}
|
|
|
|
accounts {$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
|
|
`
|
|
|
|
var jsGWTempl = `%s{name: %s, urls: [%s]}`
|
|
|
|
func createJetStreamTaggedSuperCluster(t *testing.T) *supercluster {
|
|
return createJetStreamTaggedSuperClusterWithGWProxy(t, nil)
|
|
}
|
|
|
|
func createJetStreamTaggedSuperClusterWithGWProxy(t *testing.T, gwm gwProxyMap) *supercluster {
|
|
sc := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 3, 3, nil, gwm)
|
|
sc.waitOnPeerCount(9)
|
|
|
|
reset := func(s *Server) {
|
|
s.mu.Lock()
|
|
rch := s.sys.resetCh
|
|
s.mu.Unlock()
|
|
if rch != nil {
|
|
rch <- struct{}{}
|
|
}
|
|
s.sendStatszUpdate()
|
|
}
|
|
|
|
// Make first cluster AWS, US country code.
|
|
for _, s := range sc.clusterForName("C1").servers {
|
|
s.optsMu.Lock()
|
|
s.opts.Tags.Add("cloud:aws")
|
|
s.opts.Tags.Add("country:us")
|
|
s.optsMu.Unlock()
|
|
reset(s)
|
|
}
|
|
// Make second cluster GCP, UK country code.
|
|
for _, s := range sc.clusterForName("C2").servers {
|
|
s.optsMu.Lock()
|
|
s.opts.Tags.Add("cloud:gcp")
|
|
s.opts.Tags.Add("country:uk")
|
|
s.optsMu.Unlock()
|
|
reset(s)
|
|
}
|
|
// Make third cluster AZ, JP country code.
|
|
for _, s := range sc.clusterForName("C3").servers {
|
|
s.optsMu.Lock()
|
|
s.opts.Tags.Add("cloud:az")
|
|
s.opts.Tags.Add("country:jp")
|
|
s.optsMu.Unlock()
|
|
reset(s)
|
|
}
|
|
|
|
ml := sc.leader()
|
|
js := ml.getJetStream()
|
|
require_True(t, js != nil)
|
|
js.mu.RLock()
|
|
defer js.mu.RUnlock()
|
|
cc := js.cluster
|
|
require_True(t, cc != nil)
|
|
|
|
// Walk and make sure all tags are registered.
|
|
expires := time.Now().Add(10 * time.Second)
|
|
for time.Now().Before(expires) {
|
|
allOK := true
|
|
for _, p := range cc.meta.Peers() {
|
|
si, ok := ml.nodeToInfo.Load(p.ID)
|
|
require_True(t, ok)
|
|
ni := si.(nodeInfo)
|
|
if len(ni.tags) == 0 {
|
|
allOK = false
|
|
reset(sc.serverByName(ni.name))
|
|
}
|
|
}
|
|
if allOK {
|
|
break
|
|
}
|
|
}
|
|
|
|
return sc
|
|
}
|
|
|
|
func createJetStreamSuperCluster(t *testing.T, numServersPer, numClusters int) *supercluster {
|
|
return createJetStreamSuperClusterWithTemplate(t, jsClusterTempl, numServersPer, numClusters)
|
|
}
|
|
|
|
func createJetStreamSuperClusterWithTemplate(t *testing.T, tmpl string, numServersPer, numClusters int) *supercluster {
|
|
return createJetStreamSuperClusterWithTemplateAndModHook(t, tmpl, numServersPer, numClusters, nil, nil)
|
|
}
|
|
|
|
// For doing proxyies in GWs.
|
|
type gwProxy struct {
|
|
rtt time.Duration
|
|
up int
|
|
down int
|
|
}
|
|
|
|
// Maps cluster names to proxy settings.
|
|
type gwProxyMap map[string]*gwProxy
|
|
|
|
func createJetStreamSuperClusterWithTemplateAndModHook(t *testing.T, tmpl string, numServersPer, numClusters int, modify modifyCb, gwm gwProxyMap) *supercluster {
|
|
t.Helper()
|
|
if numServersPer < 1 {
|
|
t.Fatalf("Number of servers must be >= 1")
|
|
}
|
|
if numClusters <= 1 {
|
|
t.Fatalf("Number of clusters must be > 1")
|
|
}
|
|
|
|
startClusterPorts := []int{20_022, 22_022, 24_022}
|
|
startGatewayPorts := []int{20_122, 22_122, 24_122}
|
|
startClusterPort := startClusterPorts[rand.Intn(len(startClusterPorts))]
|
|
startGWPort := startGatewayPorts[rand.Intn(len(startGatewayPorts))]
|
|
|
|
// Make the GWs form faster for the tests.
|
|
SetGatewaysSolicitDelay(10 * time.Millisecond)
|
|
defer ResetGatewaysSolicitDelay()
|
|
|
|
cp, gp := startClusterPort, startGWPort
|
|
var clusters []*cluster
|
|
var nproxies []*netProxy
|
|
var gws []string
|
|
|
|
// Build GWs first, will be same for all servers.
|
|
for i, port := 1, gp; i <= numClusters; i++ {
|
|
cn := fmt.Sprintf("C%d", i)
|
|
var gwp *gwProxy
|
|
if len(gwm) > 0 {
|
|
gwp = gwm[cn]
|
|
}
|
|
var urls []string
|
|
for n := 0; n < numServersPer; n++ {
|
|
routeURL := fmt.Sprintf("nats-route://127.0.0.1:%d", port)
|
|
if gwp != nil {
|
|
np := createNetProxy(gwp.rtt, gwp.up, gwp.down, routeURL, false)
|
|
nproxies = append(nproxies, np)
|
|
routeURL = np.routeURL()
|
|
}
|
|
urls = append(urls, routeURL)
|
|
port++
|
|
}
|
|
gws = append(gws, fmt.Sprintf(jsGWTempl, "\n\t\t\t", cn, strings.Join(urls, ",")))
|
|
}
|
|
gwconf := strings.Join(gws, _EMPTY_)
|
|
|
|
for i := 1; i <= numClusters; i++ {
|
|
cn := fmt.Sprintf("C%d", i)
|
|
// Go ahead and build configurations.
|
|
c := &cluster{servers: make([]*Server, 0, numServersPer), opts: make([]*Options, 0, numServersPer), name: cn}
|
|
|
|
// Build out the routes that will be shared with all configs.
|
|
var routes []string
|
|
for port := cp; port < cp+numServersPer; port++ {
|
|
routes = append(routes, fmt.Sprintf("nats-route://127.0.0.1:%d", port))
|
|
}
|
|
routeConfig := strings.Join(routes, ",")
|
|
|
|
for si := 0; si < numServersPer; si++ {
|
|
storeDir := createDir(t, JetStreamStoreDir)
|
|
sn := fmt.Sprintf("%s-S%d", cn, si+1)
|
|
bconf := fmt.Sprintf(tmpl, sn, storeDir, cn, cp+si, routeConfig)
|
|
conf := fmt.Sprintf(jsSuperClusterTempl, bconf, cn, gp, gwconf)
|
|
gp++
|
|
if modify != nil {
|
|
conf = modify(sn, cn, storeDir, conf)
|
|
}
|
|
s, o := RunServerWithConfig(createConfFile(t, []byte(conf)))
|
|
c.servers = append(c.servers, s)
|
|
c.opts = append(c.opts, o)
|
|
}
|
|
checkClusterFormed(t, c.servers...)
|
|
clusters = append(clusters, c)
|
|
cp += numServersPer
|
|
c.t = t
|
|
}
|
|
|
|
// Start any proxies.
|
|
for _, np := range nproxies {
|
|
np.start()
|
|
}
|
|
|
|
// Wait for the supercluster to be formed.
|
|
egws := numClusters - 1
|
|
for _, c := range clusters {
|
|
for _, s := range c.servers {
|
|
waitForOutboundGateways(t, s, egws, 10*time.Second)
|
|
}
|
|
}
|
|
|
|
sc := &supercluster{t, clusters, nproxies}
|
|
sc.waitOnLeader()
|
|
sc.waitOnAllCurrent()
|
|
|
|
// Wait for all the peer nodes to be registered.
|
|
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
|
|
var peers []string
|
|
if ml := sc.leader(); ml != nil {
|
|
peers = ml.ActivePeers()
|
|
if len(peers) == numClusters*numServersPer {
|
|
return nil
|
|
}
|
|
}
|
|
return fmt.Errorf("Not correct number of peers, expected %d, got %d", numClusters*numServersPer, len(peers))
|
|
})
|
|
|
|
if sc.leader() == nil {
|
|
sc.t.Fatalf("Expected a cluster leader, got none")
|
|
}
|
|
|
|
return sc
|
|
}
|
|
|
|
func (sc *supercluster) createLeafNodes(clusterName string, numServers int) *cluster {
|
|
sc.t.Helper()
|
|
|
|
// Create our leafnode cluster template first.
|
|
return sc.createLeafNodesWithDomain(clusterName, numServers, "")
|
|
}
|
|
|
|
func (sc *supercluster) createLeafNodesWithDomain(clusterName string, numServers int, domain string) *cluster {
|
|
sc.t.Helper()
|
|
|
|
// Create our leafnode cluster template first.
|
|
return sc.randomCluster().createLeafNodes(clusterName, numServers, domain)
|
|
}
|
|
|
|
func (sc *supercluster) createSingleLeafNode(extend bool) *Server {
|
|
sc.t.Helper()
|
|
|
|
return sc.randomCluster().createLeafNode(extend)
|
|
}
|
|
|
|
func (sc *supercluster) leader() *Server {
|
|
for _, c := range sc.clusters {
|
|
if leader := c.leader(); leader != nil {
|
|
return leader
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (sc *supercluster) waitOnLeader() {
|
|
sc.t.Helper()
|
|
expires := time.Now().Add(10 * time.Second)
|
|
for time.Now().Before(expires) {
|
|
for _, c := range sc.clusters {
|
|
if leader := c.leader(); leader != nil {
|
|
time.Sleep(250 * time.Millisecond)
|
|
return
|
|
}
|
|
}
|
|
time.Sleep(25 * time.Millisecond)
|
|
}
|
|
sc.t.Fatalf("Expected a cluster leader, got none")
|
|
}
|
|
|
|
func (sc *supercluster) waitOnAllCurrent() {
|
|
for _, c := range sc.clusters {
|
|
c.waitOnAllCurrent()
|
|
}
|
|
}
|
|
|
|
func (sc *supercluster) clusterForName(name string) *cluster {
|
|
for _, c := range sc.clusters {
|
|
if c.name == name {
|
|
return c
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (sc *supercluster) randomCluster() *cluster {
|
|
clusters := append(sc.clusters[:0:0], sc.clusters...)
|
|
rand.Shuffle(len(clusters), func(i, j int) { clusters[i], clusters[j] = clusters[j], clusters[i] })
|
|
return clusters[0]
|
|
}
|
|
|
|
func (sc *supercluster) waitOnPeerCount(n int) {
|
|
sc.t.Helper()
|
|
sc.waitOnLeader()
|
|
leader := sc.leader()
|
|
expires := time.Now().Add(20 * time.Second)
|
|
for time.Now().Before(expires) {
|
|
peers := leader.JetStreamClusterPeers()
|
|
if len(peers) == n {
|
|
return
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
sc.t.Fatalf("Expected a super cluster peer count of %d, got %d", n, len(leader.JetStreamClusterPeers()))
|
|
}
|
|
|
|
var jsClusterMirrorSourceImportsTempl = `
|
|
listen: 127.0.0.1:-1
|
|
server_name: %s
|
|
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
|
|
|
|
cluster {
|
|
name: %s
|
|
listen: 127.0.0.1:%d
|
|
routes = [%s]
|
|
}
|
|
|
|
no_auth_user: dlc
|
|
|
|
accounts {
|
|
JS {
|
|
jetstream: enabled
|
|
users = [ { user: "rip", pass: "pass" } ]
|
|
exports [
|
|
{ service: "$JS.API.CONSUMER.>" } # To create internal consumers to mirror/source.
|
|
{ stream: "RI.DELIVER.SYNC.>" } # For the mirror/source consumers sending to IA via delivery subject.
|
|
{ service: "$JS.FC.>" }
|
|
]
|
|
}
|
|
IA {
|
|
jetstream: enabled
|
|
users = [ { user: "dlc", pass: "pass" } ]
|
|
imports [
|
|
{ service: { account: JS, subject: "$JS.API.CONSUMER.>"}, to: "RI.JS.API.CONSUMER.>" }
|
|
{ stream: { account: JS, subject: "RI.DELIVER.SYNC.>"} }
|
|
{ service: {account: JS, subject: "$JS.FC.>" }}
|
|
]
|
|
}
|
|
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
|
|
}
|
|
`
|
|
|
|
var jsClusterImportsTempl = `
|
|
listen: 127.0.0.1:-1
|
|
server_name: %s
|
|
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
|
|
|
|
cluster {
|
|
name: %s
|
|
listen: 127.0.0.1:%d
|
|
routes = [%s]
|
|
}
|
|
|
|
no_auth_user: dlc
|
|
|
|
accounts {
|
|
JS {
|
|
jetstream: enabled
|
|
users = [ { user: "rip", pass: "pass" } ]
|
|
exports [
|
|
{ service: "$JS.API.>", response: stream }
|
|
{ service: "TEST" } # For publishing to the stream.
|
|
{ service: "$JS.ACK.TEST.*.>" }
|
|
]
|
|
}
|
|
IA {
|
|
users = [ { user: "dlc", pass: "pass" } ]
|
|
imports [
|
|
{ service: { subject: "$JS.API.>", account: JS }}
|
|
{ service: { subject: "TEST", account: JS }}
|
|
{ service: { subject: "$JS.ACK.TEST.*.>", account: JS }}
|
|
]
|
|
}
|
|
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
|
|
}
|
|
`
|
|
|
|
func createMixedModeCluster(t *testing.T, tmpl string, clusterName, snPre string, numJsServers, numNonServers int, doJSConfig bool) *cluster {
|
|
t.Helper()
|
|
|
|
if clusterName == _EMPTY_ || numJsServers < 0 || numNonServers < 1 {
|
|
t.Fatalf("Bad params")
|
|
}
|
|
|
|
numServers := numJsServers + numNonServers
|
|
const startClusterPort = 23232
|
|
|
|
// Build out the routes that will be shared with all configs.
|
|
var routes []string
|
|
for cp := startClusterPort; cp < startClusterPort+numServers; cp++ {
|
|
routes = append(routes, fmt.Sprintf("nats-route://127.0.0.1:%d", cp))
|
|
}
|
|
routeConfig := strings.Join(routes, ",")
|
|
|
|
// Go ahead and build configurations and start servers.
|
|
c := &cluster{servers: make([]*Server, 0, numServers), opts: make([]*Options, 0, numServers), name: clusterName}
|
|
|
|
for cp := startClusterPort; cp < startClusterPort+numServers; cp++ {
|
|
storeDir := createDir(t, JetStreamStoreDir)
|
|
|
|
sn := fmt.Sprintf("%sS-%d", snPre, cp-startClusterPort+1)
|
|
conf := fmt.Sprintf(tmpl, sn, storeDir, clusterName, cp, routeConfig)
|
|
|
|
// Disable JS here.
|
|
if cp-startClusterPort >= numJsServers {
|
|
// We can disable by commmenting it out, meaning no JS config, or can set the config up and just set disabled.
|
|
// e.g. jetstream: {domain: "SPOKE", enabled: false}
|
|
if doJSConfig {
|
|
conf = strings.Replace(conf, "jetstream: {", "jetstream: { enabled: false, ", 1)
|
|
} else {
|
|
conf = strings.Replace(conf, "jetstream: ", "# jetstream: ", 1)
|
|
}
|
|
}
|
|
|
|
s, o := RunServerWithConfig(createConfFile(t, []byte(conf)))
|
|
c.servers = append(c.servers, s)
|
|
c.opts = append(c.opts, o)
|
|
}
|
|
c.t = t
|
|
|
|
// Wait til we are formed and have a leader.
|
|
c.checkClusterFormed()
|
|
if numJsServers > 0 {
|
|
c.waitOnPeerCount(numJsServers)
|
|
}
|
|
|
|
return c
|
|
}
|
|
|
|
// This will create a cluster that is explicitly configured for the routes, etc.
|
|
// and also has a defined clustername. All configs for routes and cluster name will be the same.
|
|
func createJetStreamClusterExplicit(t *testing.T, clusterName string, numServers int) *cluster {
|
|
return createJetStreamClusterWithTemplate(t, jsClusterTempl, clusterName, numServers)
|
|
}
|
|
|
|
func createJetStreamClusterWithTemplate(t *testing.T, tmpl string, clusterName string, numServers int) *cluster {
|
|
return createJetStreamClusterWithTemplateAndModHook(t, tmpl, clusterName, numServers, nil)
|
|
}
|
|
|
|
func createJetStreamClusterWithTemplateAndModHook(t *testing.T, tmpl string, clusterName string, numServers int, modify modifyCb) *cluster {
|
|
startPorts := []int{7_022, 9_022, 11_022, 15_022}
|
|
port := startPorts[rand.Intn(len(startPorts))]
|
|
return createJetStreamClusterAndModHook(t, tmpl, clusterName, _EMPTY_, numServers, port, true, modify)
|
|
}
|
|
|
|
func createJetStreamCluster(t *testing.T, tmpl string, clusterName, snPre string, numServers int, portStart int, waitOnReady bool) *cluster {
|
|
return createJetStreamClusterAndModHook(t, tmpl, clusterName, snPre, numServers, portStart, waitOnReady, nil)
|
|
}
|
|
|
|
type modifyCb func(serverName, clusterName, storeDir, conf string) string
|
|
|
|
func createJetStreamClusterAndModHook(t *testing.T, tmpl string, clusterName, snPre string, numServers int, portStart int, waitOnReady bool, modify modifyCb) *cluster {
|
|
t.Helper()
|
|
if clusterName == _EMPTY_ || numServers < 1 {
|
|
t.Fatalf("Bad params")
|
|
}
|
|
|
|
// Flaky test prevention:
|
|
// Binding a socket to IP stack port 0 will bind an ephemeral port from an OS-specific range.
|
|
// If someone passes in to us a port spec which would cover that range, the test would be flaky.
|
|
// Adjust these ports to be the most inclusive across the port runner OSes.
|
|
// Linux: /proc/sys/net/ipv4/ip_local_port_range : 32768:60999
|
|
// <https://dataplane.org/ephemeralports.html> is useful, and shows there's no safe available range without OS-specific tuning.
|
|
// Our tests are usually run on Linux. Folks who care about other OSes: if you can't tune your test-runner OS to match, please
|
|
// propose a viable alternative.
|
|
const prohibitedPortFirst = 32768
|
|
const prohibitedPortLast = 60999
|
|
if (portStart >= prohibitedPortFirst && portStart <= prohibitedPortLast) ||
|
|
(portStart+numServers-1 >= prohibitedPortFirst && portStart+numServers-1 <= prohibitedPortLast) {
|
|
t.Fatalf("test setup failure: may not specify a cluster port range which falls within %d:%d", prohibitedPortFirst, prohibitedPortLast)
|
|
}
|
|
|
|
// Build out the routes that will be shared with all configs.
|
|
var routes []string
|
|
for cp := portStart; cp < portStart+numServers; cp++ {
|
|
routes = append(routes, fmt.Sprintf("nats-route://127.0.0.1:%d", cp))
|
|
}
|
|
routeConfig := strings.Join(routes, ",")
|
|
|
|
// Go ahead and build configurations and start servers.
|
|
c := &cluster{servers: make([]*Server, 0, numServers), opts: make([]*Options, 0, numServers), name: clusterName}
|
|
|
|
for cp := portStart; cp < portStart+numServers; cp++ {
|
|
storeDir := createDir(t, JetStreamStoreDir)
|
|
sn := fmt.Sprintf("%sS-%d", snPre, cp-portStart+1)
|
|
conf := fmt.Sprintf(tmpl, sn, storeDir, clusterName, cp, routeConfig)
|
|
if modify != nil {
|
|
conf = modify(sn, clusterName, storeDir, conf)
|
|
}
|
|
s, o := RunServerWithConfig(createConfFile(t, []byte(conf)))
|
|
c.servers = append(c.servers, s)
|
|
c.opts = append(c.opts, o)
|
|
}
|
|
c.t = t
|
|
|
|
// Wait til we are formed and have a leader.
|
|
c.checkClusterFormed()
|
|
if waitOnReady {
|
|
c.waitOnClusterReady()
|
|
}
|
|
|
|
return c
|
|
}
|
|
|
|
func (c *cluster) addInNewServer() *Server {
|
|
c.t.Helper()
|
|
sn := fmt.Sprintf("S-%d", len(c.servers)+1)
|
|
storeDir, _ := os.MkdirTemp(tempRoot, JetStreamStoreDir)
|
|
seedRoute := fmt.Sprintf("nats-route://127.0.0.1:%d", c.opts[0].Cluster.Port)
|
|
conf := fmt.Sprintf(jsClusterTempl, sn, storeDir, c.name, -1, seedRoute)
|
|
s, o := RunServerWithConfig(createConfFile(c.t, []byte(conf)))
|
|
c.servers = append(c.servers, s)
|
|
c.opts = append(c.opts, o)
|
|
c.checkClusterFormed()
|
|
return s
|
|
}
|
|
|
|
// This is tied to jsClusterAccountsTempl, so changes there to users needs to be reflected here.
|
|
func (c *cluster) createSingleLeafNodeNoSystemAccount() *Server {
|
|
as := c.randomServer()
|
|
lno := as.getOpts().LeafNode
|
|
ln1 := fmt.Sprintf("nats://one:p@%s:%d", lno.Host, lno.Port)
|
|
ln2 := fmt.Sprintf("nats://two:p@%s:%d", lno.Host, lno.Port)
|
|
conf := fmt.Sprintf(jsClusterSingleLeafNodeTempl, createDir(c.t, JetStreamStoreDir), ln1, ln2)
|
|
s, o := RunServerWithConfig(createConfFile(c.t, []byte(conf)))
|
|
c.servers = append(c.servers, s)
|
|
c.opts = append(c.opts, o)
|
|
|
|
checkLeafNodeConnectedCount(c.t, as, 2)
|
|
|
|
return s
|
|
}
|
|
|
|
// This is tied to jsClusterAccountsTempl, so changes there to users needs to be reflected here.
|
|
func (c *cluster) createSingleLeafNodeNoSystemAccountAndEnablesJetStream() *Server {
|
|
return c.createSingleLeafNodeNoSystemAccountAndEnablesJetStreamWithDomain(_EMPTY_, "nojs")
|
|
}
|
|
|
|
func (c *cluster) createSingleLeafNodeNoSystemAccountAndEnablesJetStreamWithDomain(domain, user string) *Server {
|
|
tmpl := jsClusterSingleLeafNodeLikeNGSTempl
|
|
if domain != _EMPTY_ {
|
|
nsc := fmt.Sprintf("domain: %s, store_dir:", domain)
|
|
tmpl = strings.Replace(jsClusterSingleLeafNodeLikeNGSTempl, "store_dir:", nsc, 1)
|
|
}
|
|
as := c.randomServer()
|
|
lno := as.getOpts().LeafNode
|
|
ln := fmt.Sprintf("nats://%s:p@%s:%d", user, lno.Host, lno.Port)
|
|
conf := fmt.Sprintf(tmpl, createDir(c.t, JetStreamStoreDir), ln)
|
|
s, o := RunServerWithConfig(createConfFile(c.t, []byte(conf)))
|
|
c.servers = append(c.servers, s)
|
|
c.opts = append(c.opts, o)
|
|
|
|
checkLeafNodeConnectedCount(c.t, as, 1)
|
|
|
|
return s
|
|
}
|
|
|
|
var jsClusterSingleLeafNodeLikeNGSTempl = `
|
|
listen: 127.0.0.1:-1
|
|
server_name: LNJS
|
|
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
|
|
|
|
leaf { remotes [ { urls: [ %s ] } ] }
|
|
`
|
|
|
|
var jsClusterSingleLeafNodeTempl = `
|
|
listen: 127.0.0.1:-1
|
|
server_name: LNJS
|
|
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
|
|
|
|
leaf { remotes [
|
|
{ urls: [ %s ], account: "JSY" }
|
|
{ urls: [ %s ], account: "JSN" } ]
|
|
}
|
|
|
|
accounts {
|
|
JSY { users = [ { user: "y", pass: "p" } ]; jetstream: true }
|
|
JSN { users = [ { user: "n", pass: "p" } ] }
|
|
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
|
|
}
|
|
`
|
|
|
|
var jsClusterTemplWithLeafNode = `
|
|
listen: 127.0.0.1:-1
|
|
server_name: %s
|
|
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
|
|
|
|
{{leaf}}
|
|
|
|
cluster {
|
|
name: %s
|
|
listen: 127.0.0.1:%d
|
|
routes = [%s]
|
|
}
|
|
|
|
# For access to system account.
|
|
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
|
|
`
|
|
|
|
var jsClusterTemplWithLeafNodeNoJS = `
|
|
listen: 127.0.0.1:-1
|
|
server_name: %s
|
|
|
|
# Need to keep below since it fills in the store dir by default so just comment out.
|
|
# jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
|
|
|
|
{{leaf}}
|
|
|
|
cluster {
|
|
name: %s
|
|
listen: 127.0.0.1:%d
|
|
routes = [%s]
|
|
}
|
|
|
|
# For access to system account.
|
|
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
|
|
`
|
|
|
|
var jsClusterTemplWithSingleLeafNode = `
|
|
listen: 127.0.0.1:-1
|
|
server_name: %s
|
|
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
|
|
|
|
{{leaf}}
|
|
|
|
# For access to system account.
|
|
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
|
|
`
|
|
|
|
var jsClusterTemplWithSingleLeafNodeNoJS = `
|
|
listen: 127.0.0.1:-1
|
|
server_name: %s
|
|
|
|
# jetstream: {store_dir: '%s'}
|
|
|
|
{{leaf}}
|
|
|
|
# For access to system account.
|
|
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
|
|
`
|
|
|
|
var jsLeafFrag = `
|
|
leaf {
|
|
remotes [
|
|
{ urls: [ %s ] }
|
|
{ urls: [ %s ], account: "$SYS" }
|
|
]
|
|
}
|
|
`
|
|
|
|
func (c *cluster) createLeafNodes(clusterName string, numServers int, domain string) *cluster {
|
|
c.t.Helper()
|
|
return c.createLeafNodesWithStartPortAndDomain(clusterName, numServers, 22111, domain)
|
|
}
|
|
|
|
func (c *cluster) createLeafNodesNoJS(clusterName string, numServers int) *cluster {
|
|
c.t.Helper()
|
|
return c.createLeafNodesWithTemplateAndStartPort(jsClusterTemplWithLeafNodeNoJS, clusterName, numServers, 21333)
|
|
}
|
|
|
|
func (c *cluster) createLeafNodesWithStartPortAndDomain(clusterName string, numServers int, portStart int, domain string) *cluster {
|
|
c.t.Helper()
|
|
if domain == _EMPTY_ {
|
|
return c.createLeafNodesWithTemplateAndStartPort(jsClusterTemplWithLeafNode, clusterName, numServers, portStart)
|
|
}
|
|
tmpl := strings.Replace(jsClusterTemplWithLeafNode, "store_dir:", fmt.Sprintf(`domain: "%s", store_dir:`, domain), 1)
|
|
return c.createLeafNodesWithTemplateAndStartPort(tmpl, clusterName, numServers, portStart)
|
|
}
|
|
|
|
func (c *cluster) createLeafNode(extend bool) *Server {
|
|
c.t.Helper()
|
|
if extend {
|
|
return c.createLeafNodeWithTemplate("LNS",
|
|
strings.ReplaceAll(jsClusterTemplWithSingleLeafNode, "store_dir:", " extension_hint: will_extend, store_dir:"))
|
|
} else {
|
|
return c.createLeafNodeWithTemplate("LNS", jsClusterTemplWithSingleLeafNode)
|
|
}
|
|
}
|
|
|
|
func (c *cluster) createLeafNodeWithTemplate(name, template string) *Server {
|
|
c.t.Helper()
|
|
tmpl := c.createLeafSolicit(template)
|
|
conf := fmt.Sprintf(tmpl, name, createDir(c.t, JetStreamStoreDir))
|
|
s, o := RunServerWithConfig(createConfFile(c.t, []byte(conf)))
|
|
c.servers = append(c.servers, s)
|
|
c.opts = append(c.opts, o)
|
|
return s
|
|
}
|
|
|
|
// Helper to generate the leaf solicit configs.
|
|
func (c *cluster) createLeafSolicit(tmpl string) string {
|
|
c.t.Helper()
|
|
|
|
// Create our leafnode cluster template first.
|
|
var lns, lnss []string
|
|
for _, s := range c.servers {
|
|
if s.ClusterName() != c.name {
|
|
continue
|
|
}
|
|
ln := s.getOpts().LeafNode
|
|
lns = append(lns, fmt.Sprintf("nats://%s:%d", ln.Host, ln.Port))
|
|
lnss = append(lnss, fmt.Sprintf("nats://admin:s3cr3t!@%s:%d", ln.Host, ln.Port))
|
|
}
|
|
lnc := strings.Join(lns, ", ")
|
|
lnsc := strings.Join(lnss, ", ")
|
|
lconf := fmt.Sprintf(jsLeafFrag, lnc, lnsc)
|
|
return strings.Replace(tmpl, "{{leaf}}", lconf, 1)
|
|
}
|
|
|
|
func (c *cluster) createLeafNodesWithTemplateMixedMode(template, clusterName string, numJsServers, numNonServers int, doJSConfig bool) *cluster {
|
|
c.t.Helper()
|
|
|
|
// Create our leafnode cluster template first.
|
|
tmpl := c.createLeafSolicit(template)
|
|
pre := clusterName + "-"
|
|
|
|
lc := createMixedModeCluster(c.t, tmpl, clusterName, pre, numJsServers, numNonServers, doJSConfig)
|
|
for _, s := range lc.servers {
|
|
checkLeafNodeConnectedCount(c.t, s, 2)
|
|
}
|
|
lc.waitOnClusterReadyWithNumPeers(numJsServers)
|
|
|
|
return lc
|
|
}
|
|
|
|
func (c *cluster) createLeafNodesWithTemplateAndStartPort(template, clusterName string, numServers int, portStart int) *cluster {
|
|
c.t.Helper()
|
|
|
|
// Create our leafnode cluster template first.
|
|
tmpl := c.createLeafSolicit(template)
|
|
pre := clusterName + "-"
|
|
lc := createJetStreamCluster(c.t, tmpl, clusterName, pre, numServers, portStart, false)
|
|
for _, s := range lc.servers {
|
|
checkLeafNodeConnectedCount(c.t, s, 2)
|
|
}
|
|
return lc
|
|
}
|
|
|
|
// Helper function to close and disable leafnodes.
|
|
func (s *Server) closeAndDisableLeafnodes() {
|
|
var leafs []*client
|
|
s.mu.Lock()
|
|
for _, ln := range s.leafs {
|
|
leafs = append(leafs, ln)
|
|
}
|
|
// Disable leafnodes for now.
|
|
s.leafDisableConnect = true
|
|
s.mu.Unlock()
|
|
|
|
for _, ln := range leafs {
|
|
ln.closeConnection(Revocation)
|
|
}
|
|
}
|
|
|
|
// Helper to set the remote migrate feature.
|
|
func (s *Server) setJetStreamMigrateOnRemoteLeaf() {
|
|
s.mu.Lock()
|
|
for _, cfg := range s.leafRemoteCfgs {
|
|
cfg.JetStreamClusterMigrate = true
|
|
}
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// Will add in the mapping for the account to each server.
|
|
func (c *cluster) addSubjectMapping(account, src, dest string) {
|
|
c.t.Helper()
|
|
|
|
for _, s := range c.servers {
|
|
if s.ClusterName() != c.name {
|
|
continue
|
|
}
|
|
acc, err := s.LookupAccount(account)
|
|
if err != nil {
|
|
c.t.Fatalf("Unexpected error on %v: %v", s, err)
|
|
}
|
|
if err := acc.AddMapping(src, dest); err != nil {
|
|
c.t.Fatalf("Error adding mapping: %v", err)
|
|
}
|
|
}
|
|
// Make sure interest propagates.
|
|
time.Sleep(200 * time.Millisecond)
|
|
}
|
|
|
|
// Adjust limits for the given account.
|
|
func (c *cluster) updateLimits(account string, newLimits map[string]JetStreamAccountLimits) {
|
|
c.t.Helper()
|
|
for _, s := range c.servers {
|
|
acc, err := s.LookupAccount(account)
|
|
if err != nil {
|
|
c.t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
if err := acc.UpdateJetStreamLimits(newLimits); err != nil {
|
|
c.t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Hack for staticcheck
|
|
var skip = func(t *testing.T) {
|
|
t.SkipNow()
|
|
}
|
|
|
|
func jsClientConnect(t *testing.T, s *Server, opts ...nats.Option) (*nats.Conn, nats.JetStreamContext) {
|
|
t.Helper()
|
|
nc, err := nats.Connect(s.ClientURL(), opts...)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create client: %v", err)
|
|
}
|
|
js, err := nc.JetStream(nats.MaxWait(10 * time.Second))
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error getting JetStream context: %v", err)
|
|
}
|
|
return nc, js
|
|
}
|
|
|
|
func jsClientConnectEx(t *testing.T, s *Server, domain string, opts ...nats.Option) (*nats.Conn, nats.JetStreamContext) {
|
|
t.Helper()
|
|
nc, err := nats.Connect(s.ClientURL(), opts...)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create client: %v", err)
|
|
}
|
|
js, err := nc.JetStream(nats.MaxWait(10*time.Second), nats.Domain(domain))
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error getting JetStream context: %v", err)
|
|
}
|
|
return nc, js
|
|
}
|
|
|
|
func jsClientConnectCluster(t *testing.T, c *cluster, opts ...nats.Option) (*nats.Conn, nats.JetStreamContext) {
|
|
t.Helper()
|
|
|
|
var sb strings.Builder
|
|
|
|
for _, s := range c.servers {
|
|
sb.WriteString(s.ClientURL())
|
|
sb.WriteString(",")
|
|
}
|
|
|
|
return jsClientConnectURL(t, sb.String(), opts...)
|
|
}
|
|
|
|
func jsClientConnectURL(t *testing.T, url string, opts ...nats.Option) (*nats.Conn, nats.JetStreamContext) {
|
|
t.Helper()
|
|
|
|
nc, err := nats.Connect(url, opts...)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create client: %v", err)
|
|
}
|
|
js, err := nc.JetStream(nats.MaxWait(10 * time.Second))
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error getting JetStream context: %v", err)
|
|
}
|
|
return nc, js
|
|
}
|
|
|
|
func checkSubsPending(t *testing.T, sub *nats.Subscription, numExpected int) {
|
|
t.Helper()
|
|
checkFor(t, 10*time.Second, 20*time.Millisecond, func() error {
|
|
if nmsgs, _, err := sub.Pending(); err != nil || nmsgs != numExpected {
|
|
return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, numExpected)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func fetchMsgs(t *testing.T, sub *nats.Subscription, numExpected int, totalWait time.Duration) []*nats.Msg {
|
|
t.Helper()
|
|
result := make([]*nats.Msg, 0, numExpected)
|
|
for start, count, wait := time.Now(), numExpected, totalWait; len(result) != numExpected; {
|
|
msgs, err := sub.Fetch(count, nats.MaxWait(wait))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
result = append(result, msgs...)
|
|
count -= len(msgs)
|
|
if wait = totalWait - time.Since(start); wait < 0 {
|
|
break
|
|
}
|
|
}
|
|
if len(result) != numExpected {
|
|
t.Fatalf("Unexpected msg count, got %d, want %d", len(result), numExpected)
|
|
}
|
|
return result
|
|
}
|
|
|
|
func (c *cluster) restartServer(rs *Server) *Server {
|
|
c.t.Helper()
|
|
index := -1
|
|
var opts *Options
|
|
for i, s := range c.servers {
|
|
if s == rs {
|
|
index = i
|
|
break
|
|
}
|
|
}
|
|
if index < 0 {
|
|
c.t.Fatalf("Could not find server %v to restart", rs)
|
|
}
|
|
opts = c.opts[index]
|
|
s, o := RunServerWithConfig(opts.ConfigFile)
|
|
c.servers[index] = s
|
|
c.opts[index] = o
|
|
return s
|
|
}
|
|
|
|
func (c *cluster) checkClusterFormed() {
|
|
c.t.Helper()
|
|
checkClusterFormed(c.t, c.servers...)
|
|
}
|
|
|
|
func (c *cluster) waitOnPeerCount(n int) {
|
|
c.t.Helper()
|
|
c.waitOnLeader()
|
|
leader := c.leader()
|
|
for leader == nil {
|
|
c.waitOnLeader()
|
|
leader = c.leader()
|
|
}
|
|
expires := time.Now().Add(30 * time.Second)
|
|
for time.Now().Before(expires) {
|
|
if peers := leader.JetStreamClusterPeers(); len(peers) == n {
|
|
return
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
leader = c.leader()
|
|
for leader == nil {
|
|
c.waitOnLeader()
|
|
leader = c.leader()
|
|
}
|
|
}
|
|
c.t.Fatalf("Expected a cluster peer count of %d, got %d", n, len(leader.JetStreamClusterPeers()))
|
|
}
|
|
|
|
func (c *cluster) waitOnConsumerLeader(account, stream, consumer string) {
|
|
c.t.Helper()
|
|
expires := time.Now().Add(20 * time.Second)
|
|
for time.Now().Before(expires) {
|
|
if leader := c.consumerLeader(account, stream, consumer); leader != nil {
|
|
time.Sleep(200 * time.Millisecond)
|
|
return
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
c.t.Fatalf("Expected a consumer leader for %q %q %q, got none", account, stream, consumer)
|
|
}
|
|
|
|
func (c *cluster) consumerLeader(account, stream, consumer string) *Server {
|
|
c.t.Helper()
|
|
for _, s := range c.servers {
|
|
if s.JetStreamIsConsumerLeader(account, stream, consumer) {
|
|
return s
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *cluster) randomNonConsumerLeader(account, stream, consumer string) *Server {
|
|
c.t.Helper()
|
|
for _, s := range c.servers {
|
|
if !s.JetStreamIsConsumerLeader(account, stream, consumer) {
|
|
return s
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *cluster) waitOnStreamLeader(account, stream string) {
|
|
c.t.Helper()
|
|
expires := time.Now().Add(30 * time.Second)
|
|
for time.Now().Before(expires) {
|
|
if leader := c.streamLeader(account, stream); leader != nil {
|
|
time.Sleep(200 * time.Millisecond)
|
|
return
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
c.t.Fatalf("Expected a stream leader for %q %q, got none", account, stream)
|
|
}
|
|
|
|
func (c *cluster) randomNonStreamLeader(account, stream string) *Server {
|
|
c.t.Helper()
|
|
for _, s := range c.servers {
|
|
if s.JetStreamIsStreamAssigned(account, stream) && !s.JetStreamIsStreamLeader(account, stream) {
|
|
return s
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *cluster) streamLeader(account, stream string) *Server {
|
|
c.t.Helper()
|
|
for _, s := range c.servers {
|
|
if s.JetStreamIsStreamLeader(account, stream) {
|
|
return s
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *cluster) waitOnStreamCurrent(s *Server, account, stream string) {
|
|
c.t.Helper()
|
|
expires := time.Now().Add(30 * time.Second)
|
|
for time.Now().Before(expires) {
|
|
if s.JetStreamIsStreamCurrent(account, stream) {
|
|
time.Sleep(100 * time.Millisecond)
|
|
return
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
c.t.Fatalf("Expected server %q to eventually be current for stream %q", s, stream)
|
|
}
|
|
|
|
func (c *cluster) waitOnServerHealthz(s *Server) {
|
|
c.t.Helper()
|
|
expires := time.Now().Add(30 * time.Second)
|
|
for time.Now().Before(expires) {
|
|
hs := s.healthz(nil)
|
|
if hs.Status == "ok" && hs.Error == _EMPTY_ {
|
|
return
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
c.t.Fatalf("Expected server %q to eventually return healthz 'ok', but got %q", s, s.healthz(nil).Error)
|
|
}
|
|
|
|
func (c *cluster) waitOnServerCurrent(s *Server) {
|
|
c.t.Helper()
|
|
expires := time.Now().Add(20 * time.Second)
|
|
for time.Now().Before(expires) {
|
|
time.Sleep(100 * time.Millisecond)
|
|
if !s.JetStreamEnabled() || s.JetStreamIsCurrent() {
|
|
return
|
|
}
|
|
}
|
|
c.t.Fatalf("Expected server %q to eventually be current", s)
|
|
}
|
|
|
|
func (c *cluster) waitOnAllCurrent() {
|
|
for _, cs := range c.servers {
|
|
c.waitOnServerCurrent(cs)
|
|
}
|
|
}
|
|
|
|
func (c *cluster) serverByName(sname string) *Server {
|
|
for _, s := range c.servers {
|
|
if s.Name() == sname {
|
|
return s
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *cluster) randomNonLeader() *Server {
|
|
// range should randomize.. but..
|
|
for _, s := range c.servers {
|
|
if s.Running() && !s.JetStreamIsLeader() {
|
|
return s
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *cluster) leader() *Server {
|
|
for _, s := range c.servers {
|
|
if s.JetStreamIsLeader() {
|
|
return s
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *cluster) expectNoLeader() {
|
|
c.t.Helper()
|
|
expires := time.Now().Add(maxElectionTimeout)
|
|
for time.Now().Before(expires) {
|
|
if c.leader() == nil {
|
|
return
|
|
}
|
|
time.Sleep(20 * time.Millisecond)
|
|
}
|
|
c.t.Fatalf("Expected no leader but have one")
|
|
}
|
|
|
|
func (c *cluster) waitOnLeader() {
|
|
c.t.Helper()
|
|
expires := time.Now().Add(40 * time.Second)
|
|
for time.Now().Before(expires) {
|
|
if leader := c.leader(); leader != nil {
|
|
time.Sleep(100 * time.Millisecond)
|
|
return
|
|
}
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
|
|
c.t.Fatalf("Expected a cluster leader, got none")
|
|
}
|
|
|
|
// Helper function to check that a cluster is formed
|
|
func (c *cluster) waitOnClusterReady() {
|
|
c.t.Helper()
|
|
c.waitOnClusterReadyWithNumPeers(len(c.servers))
|
|
}
|
|
|
|
func (c *cluster) waitOnClusterReadyWithNumPeers(numPeersExpected int) {
|
|
c.t.Helper()
|
|
var leader *Server
|
|
expires := time.Now().Add(40 * time.Second)
|
|
for time.Now().Before(expires) {
|
|
if leader = c.leader(); leader != nil {
|
|
break
|
|
}
|
|
time.Sleep(50 * time.Millisecond)
|
|
}
|
|
// Now make sure we have all peers.
|
|
for leader != nil && time.Now().Before(expires) {
|
|
if len(leader.JetStreamClusterPeers()) == numPeersExpected {
|
|
time.Sleep(100 * time.Millisecond)
|
|
return
|
|
}
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
|
|
if leader == nil {
|
|
c.shutdown()
|
|
c.t.Fatalf("Failed to elect a meta-leader")
|
|
}
|
|
|
|
peersSeen := len(leader.JetStreamClusterPeers())
|
|
c.shutdown()
|
|
|
|
if leader == nil {
|
|
c.t.Fatalf("Expected a cluster leader and fully formed cluster, no leader")
|
|
} else {
|
|
c.t.Fatalf("Expected a fully formed cluster, only %d of %d peers seen", peersSeen, numPeersExpected)
|
|
}
|
|
}
|
|
|
|
// Helper function to remove JetStream from a server.
|
|
func (c *cluster) removeJetStream(s *Server) {
|
|
c.t.Helper()
|
|
index := -1
|
|
for i, cs := range c.servers {
|
|
if cs == s {
|
|
index = i
|
|
break
|
|
}
|
|
}
|
|
cf := c.opts[index].ConfigFile
|
|
cb, _ := os.ReadFile(cf)
|
|
var sb strings.Builder
|
|
for _, l := range strings.Split(string(cb), "\n") {
|
|
if !strings.HasPrefix(strings.TrimSpace(l), "jetstream") {
|
|
sb.WriteString(l + "\n")
|
|
}
|
|
}
|
|
if err := os.WriteFile(cf, []byte(sb.String()), 0644); err != nil {
|
|
c.t.Fatalf("Error writing updated config file: %v", err)
|
|
}
|
|
if err := s.Reload(); err != nil {
|
|
c.t.Fatalf("Error on server reload: %v", err)
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
|
|
func (c *cluster) stopAll() {
|
|
c.t.Helper()
|
|
for _, s := range c.servers {
|
|
s.Shutdown()
|
|
}
|
|
}
|
|
|
|
func (c *cluster) restartAll() {
|
|
c.t.Helper()
|
|
for i, s := range c.servers {
|
|
if !s.Running() {
|
|
opts := c.opts[i]
|
|
s, o := RunServerWithConfig(opts.ConfigFile)
|
|
c.servers[i] = s
|
|
c.opts[i] = o
|
|
}
|
|
}
|
|
c.waitOnClusterReady()
|
|
}
|
|
|
|
func (c *cluster) restartAllSamePorts() {
|
|
c.t.Helper()
|
|
for i, s := range c.servers {
|
|
if !s.Running() {
|
|
opts := c.opts[i]
|
|
s := RunServer(opts)
|
|
c.servers[i] = s
|
|
}
|
|
}
|
|
c.waitOnClusterReady()
|
|
}
|
|
|
|
func (c *cluster) totalSubs() (total int) {
|
|
c.t.Helper()
|
|
for _, s := range c.servers {
|
|
total += int(s.NumSubscriptions())
|
|
}
|
|
return total
|
|
}
|
|
|
|
func (c *cluster) stableTotalSubs() (total int) {
|
|
nsubs := -1
|
|
checkFor(c.t, 2*time.Second, 250*time.Millisecond, func() error {
|
|
subs := c.totalSubs()
|
|
if subs == nsubs {
|
|
return nil
|
|
}
|
|
nsubs = subs
|
|
return fmt.Errorf("Still stabilizing")
|
|
})
|
|
return nsubs
|
|
|
|
}
|
|
|
|
func addStream(t *testing.T, nc *nats.Conn, cfg *StreamConfig) *StreamInfo {
|
|
t.Helper()
|
|
req, err := json.Marshal(cfg)
|
|
require_NoError(t, err)
|
|
rmsg, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
|
|
require_NoError(t, err)
|
|
var resp JSApiStreamCreateResponse
|
|
err = json.Unmarshal(rmsg.Data, &resp)
|
|
require_NoError(t, err)
|
|
if resp.Type != JSApiStreamCreateResponseType {
|
|
t.Fatalf("Invalid response type %s expected %s", resp.Type, JSApiStreamCreateResponseType)
|
|
}
|
|
if resp.Error != nil {
|
|
t.Fatalf("Unexpected error: %+v", resp.Error)
|
|
}
|
|
return resp.StreamInfo
|
|
}
|
|
|
|
func updateStream(t *testing.T, nc *nats.Conn, cfg *StreamConfig) *StreamInfo {
|
|
t.Helper()
|
|
req, err := json.Marshal(cfg)
|
|
require_NoError(t, err)
|
|
rmsg, err := nc.Request(fmt.Sprintf(JSApiStreamUpdateT, cfg.Name), req, time.Second)
|
|
require_NoError(t, err)
|
|
var resp JSApiStreamCreateResponse
|
|
err = json.Unmarshal(rmsg.Data, &resp)
|
|
require_NoError(t, err)
|
|
if resp.Type != JSApiStreamUpdateResponseType {
|
|
t.Fatalf("Invalid response type %s expected %s", resp.Type, JSApiStreamUpdateResponseType)
|
|
}
|
|
if resp.Error != nil {
|
|
t.Fatalf("Unexpected error: %+v", resp.Error)
|
|
}
|
|
return resp.StreamInfo
|
|
}
|
|
|
|
// setInActiveDeleteThreshold sets the delete threshold for how long to wait
|
|
// before deleting an inactive consumer.
|
|
func (o *consumer) setInActiveDeleteThreshold(dthresh time.Duration) error {
|
|
o.mu.Lock()
|
|
defer o.mu.Unlock()
|
|
|
|
deleteWasRunning := o.dtmr != nil
|
|
stopAndClearTimer(&o.dtmr)
|
|
// Do not add jitter if set via here.
|
|
o.dthresh = dthresh
|
|
if deleteWasRunning {
|
|
o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() })
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Net Proxy - For introducing RTT and BW constraints.
|
|
type netProxy struct {
|
|
listener net.Listener
|
|
conns []net.Conn
|
|
rtt time.Duration
|
|
up int
|
|
down int
|
|
url string
|
|
surl string
|
|
}
|
|
|
|
func newNetProxy(rtt time.Duration, upRate, downRate int, serverURL string) *netProxy {
|
|
return createNetProxy(rtt, upRate, downRate, serverURL, true)
|
|
}
|
|
|
|
func createNetProxy(rtt time.Duration, upRate, downRate int, serverURL string, start bool) *netProxy {
|
|
hp := net.JoinHostPort("127.0.0.1", "0")
|
|
l, e := net.Listen("tcp", hp)
|
|
if e != nil {
|
|
panic(fmt.Sprintf("Error listening on port: %s, %q", hp, e))
|
|
}
|
|
port := l.Addr().(*net.TCPAddr).Port
|
|
proxy := &netProxy{
|
|
listener: l,
|
|
rtt: rtt,
|
|
up: upRate,
|
|
down: downRate,
|
|
url: fmt.Sprintf("nats://127.0.0.1:%d", port),
|
|
surl: serverURL,
|
|
}
|
|
if start {
|
|
proxy.start()
|
|
}
|
|
return proxy
|
|
}
|
|
|
|
func (np *netProxy) start() {
|
|
go func() {
|
|
for {
|
|
client, err := np.listener.Accept()
|
|
if err != nil {
|
|
return
|
|
}
|
|
u, err := url.Parse(np.surl)
|
|
if err != nil {
|
|
panic(fmt.Sprintf("Could not parse server URL: %v", err))
|
|
}
|
|
server, err := net.DialTimeout("tcp", u.Host, time.Second)
|
|
if err != nil {
|
|
panic("Can't connect proxy to NATS server")
|
|
}
|
|
np.conns = append(np.conns, client, server)
|
|
go np.loop(np.rtt, np.up, client, server)
|
|
go np.loop(np.rtt, np.down, server, client)
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (np *netProxy) clientURL() string {
|
|
return np.url
|
|
}
|
|
|
|
func (np *netProxy) routeURL() string {
|
|
return strings.Replace(np.url, "nats", "nats-route", 1)
|
|
}
|
|
|
|
func (np *netProxy) loop(rtt time.Duration, tbw int, r, w net.Conn) {
|
|
delay := rtt / 2
|
|
const rbl = 8192
|
|
var buf [rbl]byte
|
|
ctx := context.Background()
|
|
|
|
rl := rate.NewLimiter(rate.Limit(tbw), rbl)
|
|
|
|
for fr := true; ; {
|
|
sr := time.Now()
|
|
n, err := r.Read(buf[:])
|
|
if err != nil {
|
|
return
|
|
}
|
|
// RTT delays
|
|
if fr || time.Since(sr) > 250*time.Millisecond {
|
|
fr = false
|
|
if delay > 0 {
|
|
time.Sleep(delay)
|
|
}
|
|
}
|
|
if err := rl.WaitN(ctx, n); err != nil {
|
|
return
|
|
}
|
|
if _, err = w.Write(buf[:n]); err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (np *netProxy) stop() {
|
|
if np.listener != nil {
|
|
np.listener.Close()
|
|
np.listener = nil
|
|
for _, c := range np.conns {
|
|
c.Close()
|
|
}
|
|
}
|
|
}
|