mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Currently, we use ReadyForConnections in server tests to wait for the server to be ready. However, when this fails we don't get a clue about why it failed. This change adds a new unexported method called readyForConnections that returns an error describing which check failed. The exported ReadyForConnections version works exactly as before. The unexported version gets used in internal tests only.
1942 lines
53 KiB
Go
1942 lines
53 KiB
Go
// Copyright 2012-2020 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package server
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"crypto/tls"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net"
|
|
"net/url"
|
|
"os"
|
|
"reflect"
|
|
"runtime"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/nats-io/nats.go"
|
|
)
|
|
|
|
func checkFor(t *testing.T, totalWait, sleepDur time.Duration, f func() error) {
|
|
t.Helper()
|
|
timeout := time.Now().Add(totalWait)
|
|
var err error
|
|
for time.Now().Before(timeout) {
|
|
err = f()
|
|
if err == nil {
|
|
return
|
|
}
|
|
time.Sleep(sleepDur)
|
|
}
|
|
if err != nil {
|
|
t.Fatal(err.Error())
|
|
}
|
|
}
|
|
|
|
func DefaultOptions() *Options {
|
|
return &Options{
|
|
Host: "127.0.0.1",
|
|
Port: -1,
|
|
HTTPPort: -1,
|
|
Cluster: ClusterOpts{Port: -1, Name: "abc"},
|
|
NoLog: true,
|
|
NoSigs: true,
|
|
Debug: true,
|
|
Trace: true,
|
|
}
|
|
}
|
|
|
|
// New Go Routine based server
|
|
func RunServer(opts *Options) *Server {
|
|
if opts == nil {
|
|
opts = DefaultOptions()
|
|
}
|
|
s, err := NewServer(opts)
|
|
if err != nil || s == nil {
|
|
panic(fmt.Sprintf("No NATS Server object returned: %v", err))
|
|
}
|
|
|
|
if !opts.NoLog {
|
|
s.ConfigureLogger()
|
|
}
|
|
|
|
// Run server in Go routine.
|
|
go s.Start()
|
|
|
|
// Wait for accept loop(s) to be started
|
|
if err := s.readyForConnections(10 * time.Second); err != nil {
|
|
panic(err)
|
|
}
|
|
return s
|
|
}
|
|
|
|
// LoadConfig loads a configuration from a filename
|
|
func LoadConfig(configFile string) (opts *Options) {
|
|
opts, err := ProcessConfigFile(configFile)
|
|
if err != nil {
|
|
panic(fmt.Sprintf("Error processing configuration file: %v", err))
|
|
}
|
|
opts.NoSigs, opts.NoLog = true, true
|
|
return
|
|
}
|
|
|
|
// RunServerWithConfig starts a new Go routine based server with a configuration file.
|
|
func RunServerWithConfig(configFile string) (srv *Server, opts *Options) {
|
|
opts = LoadConfig(configFile)
|
|
srv = RunServer(opts)
|
|
return
|
|
}
|
|
|
|
func TestVersionMatchesTag(t *testing.T) {
|
|
tag := os.Getenv("TRAVIS_TAG")
|
|
// Travis started to return '' when no tag is set. Support both now.
|
|
if tag == "" || tag == "''" {
|
|
t.SkipNow()
|
|
}
|
|
// We expect a tag of the form vX.Y.Z. If that's not the case,
|
|
// we need someone to have a look. So fail if first letter is not
|
|
// a `v`
|
|
if tag[0] != 'v' {
|
|
t.Fatalf("Expect tag to start with `v`, tag is: %s", tag)
|
|
}
|
|
// Strip the `v` from the tag for the version comparison.
|
|
if VERSION != tag[1:] {
|
|
t.Fatalf("Version (%s) does not match tag (%s)", VERSION, tag[1:])
|
|
}
|
|
}
|
|
|
|
func TestStartProfiler(t *testing.T) {
|
|
s := New(DefaultOptions())
|
|
s.StartProfiler()
|
|
s.mu.Lock()
|
|
s.profiler.Close()
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
func TestStartupAndShutdown(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
opts.NoSystemAccount = true
|
|
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
if !s.isRunning() {
|
|
t.Fatal("Could not run server")
|
|
}
|
|
|
|
// Debug stuff.
|
|
numRoutes := s.NumRoutes()
|
|
if numRoutes != 0 {
|
|
t.Fatalf("Expected numRoutes to be 0 vs %d\n", numRoutes)
|
|
}
|
|
|
|
numRemotes := s.NumRemotes()
|
|
if numRemotes != 0 {
|
|
t.Fatalf("Expected numRemotes to be 0 vs %d\n", numRemotes)
|
|
}
|
|
|
|
numClients := s.NumClients()
|
|
if numClients != 0 && numClients != 1 {
|
|
t.Fatalf("Expected numClients to be 1 or 0 vs %d\n", numClients)
|
|
}
|
|
|
|
numSubscriptions := s.NumSubscriptions()
|
|
if numSubscriptions != 0 {
|
|
t.Fatalf("Expected numSubscriptions to be 0 vs %d\n", numSubscriptions)
|
|
}
|
|
}
|
|
|
|
func TestTLSVersions(t *testing.T) {
|
|
for _, test := range []struct {
|
|
name string
|
|
value uint16
|
|
expected string
|
|
}{
|
|
{"1.0", tls.VersionTLS10, "1.0"},
|
|
{"1.1", tls.VersionTLS11, "1.1"},
|
|
{"1.2", tls.VersionTLS12, "1.2"},
|
|
{"1.3", tls.VersionTLS13, "1.3"},
|
|
{"unknown", 0x999, "Unknown [0x999]"},
|
|
} {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
if v := tlsVersion(test.value); v != test.expected {
|
|
t.Fatalf("Expected value 0x%x to be %q, got %q", test.value, test.expected, v)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestTlsCipher(t *testing.T) {
|
|
if strings.Compare(tlsCipher(0x0005), "TLS_RSA_WITH_RC4_128_SHA") != 0 {
|
|
t.Fatalf("Invalid tls cipher")
|
|
}
|
|
if strings.Compare(tlsCipher(0x000a), "TLS_RSA_WITH_3DES_EDE_CBC_SHA") != 0 {
|
|
t.Fatalf("Invalid tls cipher")
|
|
}
|
|
if strings.Compare(tlsCipher(0x002f), "TLS_RSA_WITH_AES_128_CBC_SHA") != 0 {
|
|
t.Fatalf("Invalid tls cipher")
|
|
}
|
|
if strings.Compare(tlsCipher(0x0035), "TLS_RSA_WITH_AES_256_CBC_SHA") != 0 {
|
|
t.Fatalf("Invalid tls cipher")
|
|
}
|
|
if strings.Compare(tlsCipher(0xc007), "TLS_ECDHE_ECDSA_WITH_RC4_128_SHA") != 0 {
|
|
t.Fatalf("Invalid tls cipher")
|
|
}
|
|
if strings.Compare(tlsCipher(0xc009), "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA") != 0 {
|
|
t.Fatalf("Invalid tls cipher")
|
|
}
|
|
if strings.Compare(tlsCipher(0xc00a), "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA") != 0 {
|
|
t.Fatalf("Invalid tls cipher")
|
|
}
|
|
if strings.Compare(tlsCipher(0xc011), "TLS_ECDHE_RSA_WITH_RC4_128_SHA") != 0 {
|
|
t.Fatalf("Invalid tls cipher")
|
|
}
|
|
if strings.Compare(tlsCipher(0xc012), "TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA") != 0 {
|
|
t.Fatalf("Invalid tls cipher")
|
|
}
|
|
if strings.Compare(tlsCipher(0xc013), "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA") != 0 {
|
|
t.Fatalf("Invalid tls cipher")
|
|
}
|
|
if strings.Compare(tlsCipher(0xc014), "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA") != 0 {
|
|
t.Fatalf("IUnknownnvalid tls cipher")
|
|
}
|
|
if strings.Compare(tlsCipher(0xc02f), "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256") != 0 {
|
|
t.Fatalf("Invalid tls cipher")
|
|
}
|
|
if strings.Compare(tlsCipher(0xc02b), "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256") != 0 {
|
|
t.Fatalf("Invalid tls cipher")
|
|
}
|
|
if strings.Compare(tlsCipher(0xc030), "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384") != 0 {
|
|
t.Fatalf("Invalid tls cipher")
|
|
}
|
|
if strings.Compare(tlsCipher(0xc02c), "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384") != 0 {
|
|
t.Fatalf("Invalid tls cipher")
|
|
}
|
|
if strings.Compare(tlsCipher(0x1301), "TLS_AES_128_GCM_SHA256") != 0 {
|
|
t.Fatalf("Invalid tls cipher")
|
|
}
|
|
if strings.Compare(tlsCipher(0x1302), "TLS_AES_256_GCM_SHA384") != 0 {
|
|
t.Fatalf("Invalid tls cipher")
|
|
}
|
|
if strings.Compare(tlsCipher(0x1303), "TLS_CHACHA20_POLY1305_SHA256") != 0 {
|
|
t.Fatalf("Invalid tls cipher")
|
|
}
|
|
if strings.Compare(tlsCipher(0x9999), "Unknown [0x9999]") != 0 {
|
|
t.Fatalf("Expected an unknown cipher")
|
|
}
|
|
}
|
|
|
|
func TestGetConnectURLs(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
opts.Port = 4222
|
|
|
|
var globalIP net.IP
|
|
|
|
checkGlobalConnectURLs := func() {
|
|
s := New(opts)
|
|
defer s.Shutdown()
|
|
|
|
s.mu.Lock()
|
|
urls := s.getClientConnectURLs()
|
|
s.mu.Unlock()
|
|
if len(urls) == 0 {
|
|
t.Fatalf("Expected to get a list of urls, got none for listen addr: %v", opts.Host)
|
|
}
|
|
for _, u := range urls {
|
|
tcpaddr, err := net.ResolveTCPAddr("tcp", u)
|
|
if err != nil {
|
|
t.Fatalf("Error resolving: %v", err)
|
|
}
|
|
ip := tcpaddr.IP
|
|
if !ip.IsGlobalUnicast() {
|
|
t.Fatalf("IP %v is not global", ip.String())
|
|
}
|
|
if ip.IsUnspecified() {
|
|
t.Fatalf("IP %v is unspecified", ip.String())
|
|
}
|
|
addr := strings.TrimSuffix(u, ":4222")
|
|
if addr == opts.Host {
|
|
t.Fatalf("Returned url is not right: %v", u)
|
|
}
|
|
if globalIP == nil {
|
|
globalIP = ip
|
|
}
|
|
}
|
|
}
|
|
|
|
listenAddrs := []string{"0.0.0.0", "::"}
|
|
for _, listenAddr := range listenAddrs {
|
|
opts.Host = listenAddr
|
|
checkGlobalConnectURLs()
|
|
}
|
|
|
|
checkConnectURLsHasOnlyOne := func() {
|
|
s := New(opts)
|
|
defer s.Shutdown()
|
|
|
|
s.mu.Lock()
|
|
urls := s.getClientConnectURLs()
|
|
s.mu.Unlock()
|
|
if len(urls) != 1 {
|
|
t.Fatalf("Expected one URL, got %v", urls)
|
|
}
|
|
tcpaddr, err := net.ResolveTCPAddr("tcp", urls[0])
|
|
if err != nil {
|
|
t.Fatalf("Error resolving: %v", err)
|
|
}
|
|
ip := tcpaddr.IP
|
|
if ip.String() != opts.Host {
|
|
t.Fatalf("Expected connect URL to be %v, got %v", opts.Host, ip.String())
|
|
}
|
|
}
|
|
|
|
singleConnectReturned := []string{"127.0.0.1", "::1"}
|
|
if globalIP != nil {
|
|
singleConnectReturned = append(singleConnectReturned, globalIP.String())
|
|
}
|
|
for _, listenAddr := range singleConnectReturned {
|
|
opts.Host = listenAddr
|
|
checkConnectURLsHasOnlyOne()
|
|
}
|
|
}
|
|
|
|
func TestInfoServerNameDefaultsToPK(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
opts.Port = 4222
|
|
opts.ClientAdvertise = "nats.example.com"
|
|
s := New(opts)
|
|
defer s.Shutdown()
|
|
|
|
if s.info.Name != s.info.ID {
|
|
t.Fatalf("server info hostname is incorrect, got: '%v' expected: '%v'", s.info.Name, s.info.ID)
|
|
}
|
|
}
|
|
|
|
func TestInfoServerNameIsSettable(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
opts.Port = 4222
|
|
opts.ClientAdvertise = "nats.example.com"
|
|
opts.ServerName = "test_server_name"
|
|
s := New(opts)
|
|
defer s.Shutdown()
|
|
|
|
if s.info.Name != "test_server_name" {
|
|
t.Fatalf("server info hostname is incorrect, got: '%v' expected: 'test_server_name'", s.info.Name)
|
|
}
|
|
}
|
|
|
|
func TestClientAdvertiseConnectURL(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
opts.Port = 4222
|
|
opts.ClientAdvertise = "nats.example.com"
|
|
s := New(opts)
|
|
defer s.Shutdown()
|
|
|
|
s.mu.Lock()
|
|
urls := s.getClientConnectURLs()
|
|
s.mu.Unlock()
|
|
if len(urls) != 1 {
|
|
t.Fatalf("Expected to get one url, got none: %v with ClientAdvertise %v",
|
|
opts.Host, opts.ClientAdvertise)
|
|
}
|
|
if urls[0] != "nats.example.com:4222" {
|
|
t.Fatalf("Expected to get '%s', got: '%v'", "nats.example.com:4222", urls[0])
|
|
}
|
|
s.Shutdown()
|
|
|
|
opts.ClientAdvertise = "nats.example.com:7777"
|
|
s = New(opts)
|
|
s.mu.Lock()
|
|
urls = s.getClientConnectURLs()
|
|
s.mu.Unlock()
|
|
if len(urls) != 1 {
|
|
t.Fatalf("Expected to get one url, got none: %v with ClientAdvertise %v",
|
|
opts.Host, opts.ClientAdvertise)
|
|
}
|
|
if urls[0] != "nats.example.com:7777" {
|
|
t.Fatalf("Expected 'nats.example.com:7777', got: '%v'", urls[0])
|
|
}
|
|
if s.info.Host != "nats.example.com" {
|
|
t.Fatalf("Expected host to be set to nats.example.com")
|
|
}
|
|
if s.info.Port != 7777 {
|
|
t.Fatalf("Expected port to be set to 7777")
|
|
}
|
|
s.Shutdown()
|
|
|
|
opts = DefaultOptions()
|
|
opts.Port = 0
|
|
opts.ClientAdvertise = "nats.example.com:7777"
|
|
s = New(opts)
|
|
if s.info.Host != "nats.example.com" && s.info.Port != 7777 {
|
|
t.Fatalf("Expected Client Advertise Host:Port to be nats.example.com:7777, got: %s:%d",
|
|
s.info.Host, s.info.Port)
|
|
}
|
|
s.Shutdown()
|
|
}
|
|
|
|
func TestClientAdvertiseInCluster(t *testing.T) {
|
|
optsA := DefaultOptions()
|
|
optsA.ClientAdvertise = "srvA:4222"
|
|
srvA := RunServer(optsA)
|
|
defer srvA.Shutdown()
|
|
|
|
nc := natsConnect(t, srvA.ClientURL())
|
|
defer nc.Close()
|
|
|
|
optsB := DefaultOptions()
|
|
optsB.ClientAdvertise = "srvBC:4222"
|
|
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", optsA.Cluster.Port))
|
|
srvB := RunServer(optsB)
|
|
defer srvB.Shutdown()
|
|
|
|
checkClusterFormed(t, srvA, srvB)
|
|
|
|
checkURLs := func(expected string) {
|
|
t.Helper()
|
|
checkFor(t, time.Second, 15*time.Millisecond, func() error {
|
|
srvs := nc.DiscoveredServers()
|
|
for _, u := range srvs {
|
|
if u == expected {
|
|
return nil
|
|
}
|
|
}
|
|
return fmt.Errorf("Url %q not found in %q", expected, srvs)
|
|
})
|
|
}
|
|
checkURLs("nats://srvBC:4222")
|
|
|
|
optsC := DefaultOptions()
|
|
optsC.ClientAdvertise = "srvBC:4222"
|
|
optsC.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", optsA.Cluster.Port))
|
|
srvC := RunServer(optsC)
|
|
defer srvC.Shutdown()
|
|
|
|
checkClusterFormed(t, srvA, srvB, srvC)
|
|
checkURLs("nats://srvBC:4222")
|
|
|
|
srvB.Shutdown()
|
|
checkNumRoutes(t, srvA, 1)
|
|
checkURLs("nats://srvBC:4222")
|
|
}
|
|
|
|
func TestClientAdvertiseErrorOnStartup(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
// Set invalid address
|
|
opts.ClientAdvertise = "addr:::123"
|
|
testFatalErrorOnStart(t, opts, "ClientAdvertise")
|
|
}
|
|
|
|
func TestNoDeadlockOnStartFailure(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
opts.Host = "x.x.x.x" // bad host
|
|
opts.Port = 4222
|
|
opts.HTTPHost = opts.Host
|
|
opts.Cluster.Host = "127.0.0.1"
|
|
opts.Cluster.Port = -1
|
|
opts.ProfPort = -1
|
|
s := New(opts)
|
|
|
|
// This should return since it should fail to start a listener
|
|
// on x.x.x.x:4222
|
|
ch := make(chan struct{})
|
|
go func() {
|
|
s.Start()
|
|
close(ch)
|
|
}()
|
|
select {
|
|
case <-ch:
|
|
case <-time.After(time.Second):
|
|
t.Fatalf("Start() should have returned due to failure to start listener")
|
|
}
|
|
|
|
// We should be able to shutdown
|
|
s.Shutdown()
|
|
}
|
|
|
|
func TestMaxConnections(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
opts.MaxConn = 1
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
addr := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
|
nc, err := nats.Connect(addr)
|
|
if err != nil {
|
|
t.Fatalf("Error creating client: %v\n", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
nc2, err := nats.Connect(addr)
|
|
if err == nil {
|
|
nc2.Close()
|
|
t.Fatal("Expected connection to fail")
|
|
}
|
|
}
|
|
|
|
func TestMaxSubscriptions(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
opts.MaxSubs = 10
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
addr := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
|
nc, err := nats.Connect(addr)
|
|
if err != nil {
|
|
t.Fatalf("Error creating client: %v\n", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
for i := 0; i < 10; i++ {
|
|
_, err := nc.Subscribe(fmt.Sprintf("foo.%d", i), func(*nats.Msg) {})
|
|
if err != nil {
|
|
t.Fatalf("Error subscribing: %v\n", err)
|
|
}
|
|
}
|
|
// This should cause the error.
|
|
nc.Subscribe("foo.22", func(*nats.Msg) {})
|
|
nc.Flush()
|
|
if err := nc.LastError(); err == nil {
|
|
t.Fatal("Expected an error but got none\n")
|
|
}
|
|
}
|
|
|
|
func TestProcessCommandLineArgs(t *testing.T) {
|
|
var host string
|
|
var port int
|
|
cmd := flag.NewFlagSet("nats-server", flag.ExitOnError)
|
|
cmd.StringVar(&host, "a", "0.0.0.0", "Host.")
|
|
cmd.IntVar(&port, "p", 4222, "Port.")
|
|
|
|
cmd.Parse([]string{"-a", "127.0.0.1", "-p", "9090"})
|
|
showVersion, showHelp, err := ProcessCommandLineArgs(cmd)
|
|
if err != nil {
|
|
t.Errorf("Expected no errors, got: %s", err)
|
|
}
|
|
if showVersion || showHelp {
|
|
t.Errorf("Expected not having to handle subcommands")
|
|
}
|
|
|
|
cmd.Parse([]string{"version"})
|
|
showVersion, showHelp, err = ProcessCommandLineArgs(cmd)
|
|
if err != nil {
|
|
t.Errorf("Expected no errors, got: %s", err)
|
|
}
|
|
if !showVersion {
|
|
t.Errorf("Expected having to handle version command")
|
|
}
|
|
if showHelp {
|
|
t.Errorf("Expected not having to handle help command")
|
|
}
|
|
|
|
cmd.Parse([]string{"help"})
|
|
showVersion, showHelp, err = ProcessCommandLineArgs(cmd)
|
|
if err != nil {
|
|
t.Errorf("Expected no errors, got: %s", err)
|
|
}
|
|
if showVersion {
|
|
t.Errorf("Expected not having to handle version command")
|
|
}
|
|
if !showHelp {
|
|
t.Errorf("Expected having to handle help command")
|
|
}
|
|
|
|
cmd.Parse([]string{"foo", "-p", "9090"})
|
|
_, _, err = ProcessCommandLineArgs(cmd)
|
|
if err == nil {
|
|
t.Errorf("Expected an error handling the command arguments")
|
|
}
|
|
}
|
|
|
|
func TestRandomPorts(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
opts.HTTPPort = -1
|
|
opts.Port = -1
|
|
s := RunServer(opts)
|
|
|
|
defer s.Shutdown()
|
|
|
|
if s.Addr() == nil || s.Addr().(*net.TCPAddr).Port <= 0 {
|
|
t.Fatal("Should have dynamically assigned server port.")
|
|
}
|
|
|
|
if s.Addr() == nil || s.Addr().(*net.TCPAddr).Port == 4222 {
|
|
t.Fatal("Should not have dynamically assigned default port: 4222.")
|
|
}
|
|
|
|
if s.MonitorAddr() == nil || s.MonitorAddr().Port <= 0 {
|
|
t.Fatal("Should have dynamically assigned monitoring port.")
|
|
}
|
|
|
|
}
|
|
|
|
func TestNilMonitoringPort(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
opts.HTTPPort = 0
|
|
opts.HTTPSPort = 0
|
|
s := RunServer(opts)
|
|
|
|
defer s.Shutdown()
|
|
|
|
if s.MonitorAddr() != nil {
|
|
t.Fatal("HttpAddr should be nil.")
|
|
}
|
|
}
|
|
|
|
type DummyAuth struct{}
|
|
|
|
func (d *DummyAuth) Check(c ClientAuthentication) bool {
|
|
return c.GetOpts().Username == "valid"
|
|
}
|
|
|
|
func TestCustomClientAuthentication(t *testing.T) {
|
|
var clientAuth DummyAuth
|
|
|
|
opts := DefaultOptions()
|
|
opts.CustomClientAuthentication = &clientAuth
|
|
|
|
s := RunServer(opts)
|
|
|
|
defer s.Shutdown()
|
|
|
|
addr := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
|
|
|
nc, err := nats.Connect(addr, nats.UserInfo("valid", ""))
|
|
if err != nil {
|
|
t.Fatalf("Expected client to connect, got: %s", err)
|
|
}
|
|
nc.Close()
|
|
if _, err := nats.Connect(addr, nats.UserInfo("invalid", "")); err == nil {
|
|
t.Fatal("Expected client to fail to connect")
|
|
}
|
|
}
|
|
|
|
func TestCustomRouterAuthentication(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
opts.CustomRouterAuthentication = &DummyAuth{}
|
|
opts.Cluster.Host = "127.0.0.1"
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
clusterPort := s.ClusterAddr().Port
|
|
|
|
opts2 := DefaultOptions()
|
|
opts2.Cluster.Host = "127.0.0.1"
|
|
opts2.Routes = RoutesFromStr(fmt.Sprintf("nats://invalid@127.0.0.1:%d", clusterPort))
|
|
s2 := RunServer(opts2)
|
|
defer s2.Shutdown()
|
|
|
|
// s2 will attempt to connect to s, which should reject.
|
|
// Keep in mind that s2 will try again...
|
|
time.Sleep(50 * time.Millisecond)
|
|
checkNumRoutes(t, s2, 0)
|
|
|
|
opts3 := DefaultOptions()
|
|
opts3.Cluster.Host = "127.0.0.1"
|
|
opts3.Routes = RoutesFromStr(fmt.Sprintf("nats://valid@127.0.0.1:%d", clusterPort))
|
|
s3 := RunServer(opts3)
|
|
defer s3.Shutdown()
|
|
checkClusterFormed(t, s, s3)
|
|
checkNumRoutes(t, s3, 1)
|
|
}
|
|
|
|
func TestMonitoringNoTimeout(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
s.mu.Lock()
|
|
srv := s.monitoringServer
|
|
s.mu.Unlock()
|
|
|
|
if srv == nil {
|
|
t.Fatalf("Monitoring server not set")
|
|
}
|
|
if srv.ReadTimeout != 0 {
|
|
t.Fatalf("ReadTimeout should not be set, was set to %v", srv.ReadTimeout)
|
|
}
|
|
if srv.WriteTimeout != 0 {
|
|
t.Fatalf("WriteTimeout should not be set, was set to %v", srv.WriteTimeout)
|
|
}
|
|
}
|
|
|
|
func TestProfilingNoTimeout(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
opts.ProfPort = -1
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
paddr := s.ProfilerAddr()
|
|
if paddr == nil {
|
|
t.Fatalf("Profiler not started")
|
|
}
|
|
pport := paddr.Port
|
|
if pport <= 0 {
|
|
t.Fatalf("Expected profiler port to be set, got %v", pport)
|
|
}
|
|
s.mu.Lock()
|
|
srv := s.profilingServer
|
|
s.mu.Unlock()
|
|
|
|
if srv == nil {
|
|
t.Fatalf("Profiling server not set")
|
|
}
|
|
if srv.ReadTimeout != 0 {
|
|
t.Fatalf("ReadTimeout should not be set, was set to %v", srv.ReadTimeout)
|
|
}
|
|
if srv.WriteTimeout != 0 {
|
|
t.Fatalf("WriteTimeout should not be set, was set to %v", srv.WriteTimeout)
|
|
}
|
|
}
|
|
|
|
func TestLameDuckOptionsValidation(t *testing.T) {
|
|
o := DefaultOptions()
|
|
o.LameDuckDuration = 5 * time.Second
|
|
o.LameDuckGracePeriod = 10 * time.Second
|
|
s, err := NewServer(o)
|
|
if s != nil {
|
|
s.Shutdown()
|
|
}
|
|
if err == nil || !strings.Contains(err.Error(), "should be strictly lower") {
|
|
t.Fatalf("Expected error saying that ldm grace period should be lower than ldm duration, got %v", err)
|
|
}
|
|
}
|
|
|
|
func testSetLDMGracePeriod(o *Options, val time.Duration) {
|
|
// For tests, we set the grace period as a negative value
|
|
// so we can have a grace period bigger than the total duration.
|
|
// When validating options, we would not be able to run the
|
|
// server without this trick.
|
|
o.LameDuckGracePeriod = val * -1
|
|
}
|
|
|
|
func TestLameDuckMode(t *testing.T) {
|
|
optsA := DefaultOptions()
|
|
testSetLDMGracePeriod(optsA, time.Nanosecond)
|
|
optsA.Cluster.Host = "127.0.0.1"
|
|
srvA := RunServer(optsA)
|
|
defer srvA.Shutdown()
|
|
|
|
// Check that if there is no client, server is shutdown
|
|
srvA.lameDuckMode()
|
|
srvA.mu.Lock()
|
|
shutdown := srvA.shutdown
|
|
srvA.mu.Unlock()
|
|
if !shutdown {
|
|
t.Fatalf("Server should have shutdown")
|
|
}
|
|
|
|
optsA.LameDuckDuration = 10 * time.Nanosecond
|
|
srvA = RunServer(optsA)
|
|
defer srvA.Shutdown()
|
|
|
|
optsB := DefaultOptions()
|
|
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port))
|
|
srvB := RunServer(optsB)
|
|
defer srvB.Shutdown()
|
|
|
|
checkClusterFormed(t, srvA, srvB)
|
|
|
|
total := 50
|
|
connectClients := func() []*nats.Conn {
|
|
ncs := make([]*nats.Conn, 0, total)
|
|
for i := 0; i < total; i++ {
|
|
nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port),
|
|
nats.ReconnectWait(50*time.Millisecond))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
ncs = append(ncs, nc)
|
|
}
|
|
return ncs
|
|
}
|
|
stopClientsAndSrvB := func(ncs []*nats.Conn) {
|
|
for _, nc := range ncs {
|
|
nc.Close()
|
|
}
|
|
srvB.Shutdown()
|
|
}
|
|
|
|
ncs := connectClients()
|
|
|
|
checkClientsCount(t, srvA, total)
|
|
checkClientsCount(t, srvB, 0)
|
|
|
|
start := time.Now()
|
|
srvA.lameDuckMode()
|
|
// Make sure that nothing bad happens if called twice
|
|
srvA.lameDuckMode()
|
|
// Wait that shutdown completes
|
|
elapsed := time.Since(start)
|
|
// It should have taken more than the allotted time of 10ms since we had 50 clients.
|
|
if elapsed <= optsA.LameDuckDuration {
|
|
t.Fatalf("Expected to take more than %v, got %v", optsA.LameDuckDuration, elapsed)
|
|
}
|
|
|
|
checkClientsCount(t, srvA, 0)
|
|
checkClientsCount(t, srvB, total)
|
|
|
|
// Check closed status on server A
|
|
// Connections are saved in go routines, so although we have evaluated the number
|
|
// of connections in the server A to be 0, the polling of connection closed may
|
|
// need a bit more time.
|
|
checkFor(t, time.Second, 15*time.Millisecond, func() error {
|
|
cz := pollConz(t, srvA, 1, "", &ConnzOptions{State: ConnClosed})
|
|
if n := len(cz.Conns); n != total {
|
|
return fmt.Errorf("expected %v closed connections, got %v", total, n)
|
|
}
|
|
return nil
|
|
})
|
|
cz := pollConz(t, srvA, 1, "", &ConnzOptions{State: ConnClosed})
|
|
if n := len(cz.Conns); n != total {
|
|
t.Fatalf("Expected %v closed connections, got %v", total, n)
|
|
}
|
|
for _, c := range cz.Conns {
|
|
checkReason(t, c.Reason, ServerShutdown)
|
|
}
|
|
|
|
stopClientsAndSrvB(ncs)
|
|
|
|
optsA.LameDuckDuration = time.Second
|
|
srvA = RunServer(optsA)
|
|
defer srvA.Shutdown()
|
|
|
|
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port))
|
|
srvB = RunServer(optsB)
|
|
defer srvB.Shutdown()
|
|
|
|
checkClusterFormed(t, srvA, srvB)
|
|
|
|
ncs = connectClients()
|
|
|
|
checkClientsCount(t, srvA, total)
|
|
checkClientsCount(t, srvB, 0)
|
|
|
|
start = time.Now()
|
|
go srvA.lameDuckMode()
|
|
// Check that while in lameDuckMode, it is not possible to connect
|
|
// to the server. Wait to be in LD mode first
|
|
checkFor(t, 500*time.Millisecond, 15*time.Millisecond, func() error {
|
|
srvA.mu.Lock()
|
|
ldm := srvA.ldm
|
|
srvA.mu.Unlock()
|
|
if !ldm {
|
|
return fmt.Errorf("Did not reach lame duck mode")
|
|
}
|
|
return nil
|
|
})
|
|
if _, err := nats.Connect(fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port)); err != nats.ErrNoServers {
|
|
t.Fatalf("Expected %v, got %v", nats.ErrNoServers, err)
|
|
}
|
|
srvA.grWG.Wait()
|
|
elapsed = time.Since(start)
|
|
|
|
checkClientsCount(t, srvA, 0)
|
|
checkClientsCount(t, srvB, total)
|
|
|
|
if elapsed > time.Duration(float64(optsA.LameDuckDuration)*1.1) {
|
|
t.Fatalf("Expected to not take more than %v, got %v", optsA.LameDuckDuration, elapsed)
|
|
}
|
|
|
|
stopClientsAndSrvB(ncs)
|
|
|
|
// Now check that we can shutdown server while in LD mode.
|
|
optsA.LameDuckDuration = 60 * time.Second
|
|
srvA = RunServer(optsA)
|
|
defer srvA.Shutdown()
|
|
|
|
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port))
|
|
srvB = RunServer(optsB)
|
|
defer srvB.Shutdown()
|
|
|
|
checkClusterFormed(t, srvA, srvB)
|
|
|
|
ncs = connectClients()
|
|
|
|
checkClientsCount(t, srvA, total)
|
|
checkClientsCount(t, srvB, 0)
|
|
|
|
start = time.Now()
|
|
go srvA.lameDuckMode()
|
|
time.Sleep(100 * time.Millisecond)
|
|
srvA.Shutdown()
|
|
elapsed = time.Since(start)
|
|
// Make sure that it did not take that long
|
|
if elapsed > time.Second {
|
|
t.Fatalf("Took too long: %v", elapsed)
|
|
}
|
|
checkClientsCount(t, srvA, 0)
|
|
checkClientsCount(t, srvB, total)
|
|
|
|
stopClientsAndSrvB(ncs)
|
|
|
|
// Now test that we introduce delay before starting closing client connections.
|
|
// This allow to "signal" multiple servers and avoid their clients to reconnect
|
|
// to a server that is going to be going in LD mode.
|
|
testSetLDMGracePeriod(optsA, 100*time.Millisecond)
|
|
optsA.LameDuckDuration = 10 * time.Millisecond
|
|
srvA = RunServer(optsA)
|
|
defer srvA.Shutdown()
|
|
|
|
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port))
|
|
testSetLDMGracePeriod(optsB, 100*time.Millisecond)
|
|
optsB.LameDuckDuration = 10 * time.Millisecond
|
|
srvB = RunServer(optsB)
|
|
defer srvB.Shutdown()
|
|
|
|
optsC := DefaultOptions()
|
|
optsC.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port))
|
|
testSetLDMGracePeriod(optsC, 100*time.Millisecond)
|
|
optsC.LameDuckGracePeriod = -100 * time.Millisecond
|
|
optsC.LameDuckDuration = 10 * time.Millisecond
|
|
srvC := RunServer(optsC)
|
|
defer srvC.Shutdown()
|
|
|
|
checkClusterFormed(t, srvA, srvB, srvC)
|
|
|
|
rt := int32(0)
|
|
nc, err := nats.Connect(fmt.Sprintf("nats://127.0.0.1:%d", optsA.Port),
|
|
nats.ReconnectWait(15*time.Millisecond),
|
|
nats.ReconnectHandler(func(*nats.Conn) {
|
|
atomic.AddInt32(&rt, 1)
|
|
}))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
go srvA.lameDuckMode()
|
|
// Wait a bit, but less than lameDuckModeInitialDelay that we set in this
|
|
// test to 100ms.
|
|
time.Sleep(30 * time.Millisecond)
|
|
go srvB.lameDuckMode()
|
|
|
|
srvA.grWG.Wait()
|
|
srvB.grWG.Wait()
|
|
checkClientsCount(t, srvC, 1)
|
|
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
|
|
if n := atomic.LoadInt32(&rt); n != 1 {
|
|
return fmt.Errorf("Expected client to reconnect only once, got %v", n)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func TestLameDuckModeInfo(t *testing.T) {
|
|
optsA := testWSOptions()
|
|
optsA.Cluster.Name = "abc"
|
|
optsA.Cluster.Host = "127.0.0.1"
|
|
optsA.Cluster.Port = -1
|
|
// Ensure that initial delay is set very high so that we can
|
|
// check that some events occur as expected before the client
|
|
// is disconnected.
|
|
testSetLDMGracePeriod(optsA, 5*time.Second)
|
|
optsA.LameDuckDuration = 50 * time.Millisecond
|
|
optsA.DisableShortFirstPing = true
|
|
srvA := RunServer(optsA)
|
|
defer srvA.Shutdown()
|
|
|
|
curla := fmt.Sprintf("127.0.0.1:%d", optsA.Port)
|
|
wscurla := fmt.Sprintf("127.0.0.1:%d", optsA.Websocket.Port)
|
|
c, err := net.Dial("tcp", curla)
|
|
if err != nil {
|
|
t.Fatalf("Error connecting: %v", err)
|
|
}
|
|
defer c.Close()
|
|
client := bufio.NewReaderSize(c, maxBufSize)
|
|
|
|
wsconn, wsclient := testWSCreateClient(t, false, false, optsA.Websocket.Host, optsA.Websocket.Port)
|
|
defer wsconn.Close()
|
|
|
|
getInfo := func(ws bool) *serverInfo {
|
|
t.Helper()
|
|
var l string
|
|
var err error
|
|
if ws {
|
|
l = string(testWSReadFrame(t, wsclient))
|
|
} else {
|
|
l, err = client.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error receiving info from server: %v\n", err)
|
|
}
|
|
}
|
|
var info serverInfo
|
|
if err = json.Unmarshal([]byte(l[5:]), &info); err != nil {
|
|
t.Fatalf("Could not parse INFO json: %v\n", err)
|
|
}
|
|
return &info
|
|
}
|
|
|
|
getInfo(false)
|
|
c.Write([]byte("CONNECT {\"protocol\":1,\"verbose\":false}\r\nPING\r\n"))
|
|
client.ReadString('\n')
|
|
|
|
optsB := testWSOptions()
|
|
optsB.Cluster.Name = "abc"
|
|
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port))
|
|
srvB := RunServer(optsB)
|
|
defer srvB.Shutdown()
|
|
|
|
checkClusterFormed(t, srvA, srvB)
|
|
|
|
checkConnectURLs := func(expected [][]string) *serverInfo {
|
|
t.Helper()
|
|
var si *serverInfo
|
|
for i, ws := range []bool{false, true} {
|
|
sort.Strings(expected[i])
|
|
si = getInfo(ws)
|
|
sort.Strings(si.ConnectURLs)
|
|
if !reflect.DeepEqual(expected[i], si.ConnectURLs) {
|
|
t.Fatalf("Expected %q, got %q", expected, si.ConnectURLs)
|
|
}
|
|
}
|
|
return si
|
|
}
|
|
|
|
curlb := fmt.Sprintf("127.0.0.1:%d", optsB.Port)
|
|
wscurlb := fmt.Sprintf("127.0.0.1:%d", optsB.Websocket.Port)
|
|
expected := [][]string{{curla, curlb}, {wscurla, wscurlb}}
|
|
checkConnectURLs(expected)
|
|
|
|
optsC := testWSOptions()
|
|
testSetLDMGracePeriod(optsA, 5*time.Second)
|
|
optsC.Cluster.Name = "abc"
|
|
optsC.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port))
|
|
srvC := RunServer(optsC)
|
|
defer srvC.Shutdown()
|
|
|
|
checkClusterFormed(t, srvA, srvB, srvC)
|
|
|
|
curlc := fmt.Sprintf("127.0.0.1:%d", optsC.Port)
|
|
wscurlc := fmt.Sprintf("127.0.0.1:%d", optsC.Websocket.Port)
|
|
expected = [][]string{{curla, curlb, curlc}, {wscurla, wscurlb, wscurlc}}
|
|
checkConnectURLs(expected)
|
|
|
|
optsD := testWSOptions()
|
|
optsD.Cluster.Name = "abc"
|
|
optsD.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port))
|
|
srvD := RunServer(optsD)
|
|
defer srvD.Shutdown()
|
|
|
|
checkClusterFormed(t, srvA, srvB, srvC, srvD)
|
|
|
|
curld := fmt.Sprintf("127.0.0.1:%d", optsD.Port)
|
|
wscurld := fmt.Sprintf("127.0.0.1:%d", optsD.Websocket.Port)
|
|
expected = [][]string{{curla, curlb, curlc, curld}, {wscurla, wscurlb, wscurlc, wscurld}}
|
|
checkConnectURLs(expected)
|
|
|
|
// Now lame duck server A and C. We should have client connected to A
|
|
// receive info that A is in LDM without A's URL, but also receive
|
|
// an update with C's URL gone.
|
|
// But first we need to create a client to C because otherwise the
|
|
// LDM signal will just shut it down because it would have no client.
|
|
nc, err := nats.Connect(srvC.ClientURL())
|
|
if err != nil {
|
|
t.Fatalf("Failed to connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
nc.Flush()
|
|
|
|
start := time.Now()
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(2)
|
|
go func() {
|
|
defer wg.Done()
|
|
srvA.lameDuckMode()
|
|
}()
|
|
|
|
expected = [][]string{{curlb, curlc, curld}, {wscurlb, wscurlc, wscurld}}
|
|
si := checkConnectURLs(expected)
|
|
if !si.LameDuckMode {
|
|
t.Fatal("Expected LameDuckMode to be true, it was not")
|
|
}
|
|
|
|
// Start LDM for server C. This should send an update to A
|
|
// which in turn should remove C from the list of URLs and
|
|
// update its client.
|
|
go func() {
|
|
defer wg.Done()
|
|
srvC.lameDuckMode()
|
|
}()
|
|
|
|
expected = [][]string{{curlb, curld}, {wscurlb, wscurld}}
|
|
si = checkConnectURLs(expected)
|
|
// This update should not say that it is LDM.
|
|
if si.LameDuckMode {
|
|
t.Fatal("Expected LameDuckMode to be false, it was true")
|
|
}
|
|
|
|
// Now shutdown D, and we also should get an update.
|
|
srvD.Shutdown()
|
|
|
|
expected = [][]string{{curlb}, {wscurlb}}
|
|
si = checkConnectURLs(expected)
|
|
// This update should not say that it is LDM.
|
|
if si.LameDuckMode {
|
|
t.Fatal("Expected LameDuckMode to be false, it was true")
|
|
}
|
|
if time.Since(start) > 2*time.Second {
|
|
t.Fatalf("Did not get the expected events prior of server A and C shutting down")
|
|
}
|
|
|
|
// Now explicitly shutdown srvA. When a server shutdown, it closes all its
|
|
// connections. For routes, it means that it is going to remove the remote's
|
|
// URL from its map. We want to make sure that in that case, server does not
|
|
// actually send an updated INFO to its clients.
|
|
srvA.Shutdown()
|
|
|
|
// Expect nothing to be received on the client connection.
|
|
if l, err := client.ReadString('\n'); err == nil {
|
|
t.Fatalf("Expected connection to fail, instead got %q", l)
|
|
}
|
|
|
|
c.Close()
|
|
nc.Close()
|
|
// Don't need to wait for actual disconnect of clients.
|
|
srvC.Shutdown()
|
|
wg.Wait()
|
|
}
|
|
|
|
func TestServerValidateGatewaysOptions(t *testing.T) {
|
|
baseOpt := testDefaultOptionsForGateway("A")
|
|
u, _ := url.Parse("host:5222")
|
|
g := &RemoteGatewayOpts{
|
|
URLs: []*url.URL{u},
|
|
}
|
|
baseOpt.Gateway.Gateways = append(baseOpt.Gateway.Gateways, g)
|
|
|
|
for _, test := range []struct {
|
|
name string
|
|
opts func() *Options
|
|
expectedErr string
|
|
}{
|
|
{
|
|
name: "gateway_has_no_name",
|
|
opts: func() *Options {
|
|
o := baseOpt.Clone()
|
|
o.Gateway.Name = ""
|
|
return o
|
|
},
|
|
expectedErr: "has no name",
|
|
},
|
|
{
|
|
name: "gateway_has_no_port",
|
|
opts: func() *Options {
|
|
o := baseOpt.Clone()
|
|
o.Gateway.Port = 0
|
|
return o
|
|
},
|
|
expectedErr: "no port specified",
|
|
},
|
|
{
|
|
name: "gateway_dst_has_no_name",
|
|
opts: func() *Options {
|
|
o := baseOpt.Clone()
|
|
return o
|
|
},
|
|
expectedErr: "has no name",
|
|
},
|
|
{
|
|
name: "gateway_dst_urls_is_nil",
|
|
opts: func() *Options {
|
|
o := baseOpt.Clone()
|
|
o.Gateway.Gateways[0].Name = "B"
|
|
o.Gateway.Gateways[0].URLs = nil
|
|
return o
|
|
},
|
|
expectedErr: "has no URL",
|
|
},
|
|
{
|
|
name: "gateway_dst_urls_is_empty",
|
|
opts: func() *Options {
|
|
o := baseOpt.Clone()
|
|
o.Gateway.Gateways[0].Name = "B"
|
|
o.Gateway.Gateways[0].URLs = []*url.URL{}
|
|
return o
|
|
},
|
|
expectedErr: "has no URL",
|
|
},
|
|
} {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
if err := validateOptions(test.opts()); err == nil || !strings.Contains(err.Error(), test.expectedErr) {
|
|
t.Fatalf("Expected error about %q, got %v", test.expectedErr, err)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestAcceptError(t *testing.T) {
|
|
o := DefaultOptions()
|
|
s := New(o)
|
|
s.mu.Lock()
|
|
s.running = true
|
|
s.mu.Unlock()
|
|
defer s.Shutdown()
|
|
orgDelay := time.Hour
|
|
delay := s.acceptError("Test", fmt.Errorf("any error"), orgDelay)
|
|
if delay != orgDelay {
|
|
t.Fatalf("With this type of error, delay should have stayed same, got %v", delay)
|
|
}
|
|
|
|
// Create any net.Error and make it a temporary
|
|
ne := &net.DNSError{IsTemporary: true}
|
|
orgDelay = 10 * time.Millisecond
|
|
delay = s.acceptError("Test", ne, orgDelay)
|
|
if delay != 2*orgDelay {
|
|
t.Fatalf("Expected delay to double, got %v", delay)
|
|
}
|
|
// Now check the max
|
|
orgDelay = 60 * ACCEPT_MAX_SLEEP / 100
|
|
delay = s.acceptError("Test", ne, orgDelay)
|
|
if delay != ACCEPT_MAX_SLEEP {
|
|
t.Fatalf("Expected delay to double, got %v", delay)
|
|
}
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(1)
|
|
start := time.Now()
|
|
go func() {
|
|
s.acceptError("Test", ne, orgDelay)
|
|
wg.Done()
|
|
}()
|
|
time.Sleep(100 * time.Millisecond)
|
|
// This should kick out the sleep in acceptError
|
|
s.Shutdown()
|
|
if dur := time.Since(start); dur >= ACCEPT_MAX_SLEEP {
|
|
t.Fatalf("Shutdown took too long: %v", dur)
|
|
}
|
|
wg.Wait()
|
|
if d := s.acceptError("Test", ne, orgDelay); d >= 0 {
|
|
t.Fatalf("Expected delay to be negative, got %v", d)
|
|
}
|
|
}
|
|
|
|
func TestServerShutdownDuringStart(t *testing.T) {
|
|
o := DefaultOptions()
|
|
o.DisableShortFirstPing = true
|
|
o.Accounts = []*Account{NewAccount("$SYS")}
|
|
o.SystemAccount = "$SYS"
|
|
o.Cluster.Name = "abc"
|
|
o.Cluster.Host = "127.0.0.1"
|
|
o.Cluster.Port = -1
|
|
o.Gateway.Name = "abc"
|
|
o.Gateway.Host = "127.0.0.1"
|
|
o.Gateway.Port = -1
|
|
o.LeafNode.Host = "127.0.0.1"
|
|
o.LeafNode.Port = -1
|
|
o.Websocket.Host = "127.0.0.1"
|
|
o.Websocket.Port = -1
|
|
o.Websocket.HandshakeTimeout = 1
|
|
o.Websocket.NoTLS = true
|
|
o.MQTT.Host = "127.0.0.1"
|
|
o.MQTT.Port = -1
|
|
|
|
// We are going to test that if the server is shutdown
|
|
// while Start() runs (in this case, before), we don't
|
|
// start the listeners and therefore leave accept loops
|
|
// hanging.
|
|
s, err := NewServer(o)
|
|
if err != nil {
|
|
t.Fatalf("Error creating server: %v", err)
|
|
}
|
|
s.Shutdown()
|
|
|
|
// Start() should not block, but just in case, start in
|
|
// different go routine.
|
|
ch := make(chan struct{}, 1)
|
|
go func() {
|
|
s.Start()
|
|
close(ch)
|
|
}()
|
|
select {
|
|
case <-ch:
|
|
case <-time.After(time.Second):
|
|
t.Fatal("Start appear to have blocked after server was shutdown")
|
|
}
|
|
// Now make sure that none of the listeners have been created
|
|
listeners := []string{}
|
|
s.mu.Lock()
|
|
if s.listener != nil {
|
|
listeners = append(listeners, "client")
|
|
}
|
|
if s.routeListener != nil {
|
|
listeners = append(listeners, "route")
|
|
}
|
|
if s.gatewayListener != nil {
|
|
listeners = append(listeners, "gateway")
|
|
}
|
|
if s.leafNodeListener != nil {
|
|
listeners = append(listeners, "leafnode")
|
|
}
|
|
if s.websocket.listener != nil {
|
|
listeners = append(listeners, "websocket")
|
|
}
|
|
if s.mqtt.listener != nil {
|
|
listeners = append(listeners, "mqtt")
|
|
}
|
|
s.mu.Unlock()
|
|
if len(listeners) > 0 {
|
|
lst := ""
|
|
for i, l := range listeners {
|
|
if i > 0 {
|
|
lst += ", "
|
|
}
|
|
lst += l
|
|
}
|
|
t.Fatalf("Following listeners have been created: %s", lst)
|
|
}
|
|
}
|
|
|
|
type myDummyDNSResolver struct {
|
|
ips []string
|
|
err error
|
|
}
|
|
|
|
func (r *myDummyDNSResolver) LookupHost(ctx context.Context, host string) ([]string, error) {
|
|
if r.err != nil {
|
|
return nil, r.err
|
|
}
|
|
return r.ips, nil
|
|
}
|
|
|
|
func TestGetRandomIP(t *testing.T) {
|
|
s := &Server{}
|
|
resolver := &myDummyDNSResolver{}
|
|
// no port...
|
|
if _, err := s.getRandomIP(resolver, "noport", nil); err == nil || !strings.Contains(err.Error(), "port") {
|
|
t.Fatalf("Expected error about port missing, got %v", err)
|
|
}
|
|
resolver.err = fmt.Errorf("on purpose")
|
|
if _, err := s.getRandomIP(resolver, "localhost:4222", nil); err == nil || !strings.Contains(err.Error(), "on purpose") {
|
|
t.Fatalf("Expected error about no port, got %v", err)
|
|
}
|
|
resolver.err = nil
|
|
a, err := s.getRandomIP(resolver, "localhost:4222", nil)
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
if a != "localhost:4222" {
|
|
t.Fatalf("Expected address to be %q, got %q", "localhost:4222", a)
|
|
}
|
|
resolver.ips = []string{"1.2.3.4"}
|
|
a, err = s.getRandomIP(resolver, "localhost:4222", nil)
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
if a != "1.2.3.4:4222" {
|
|
t.Fatalf("Expected address to be %q, got %q", "1.2.3.4:4222", a)
|
|
}
|
|
// Check for randomness
|
|
resolver.ips = []string{"1.2.3.4", "2.2.3.4", "3.2.3.4"}
|
|
dist := [3]int{}
|
|
for i := 0; i < 100; i++ {
|
|
ip, err := s.getRandomIP(resolver, "localhost:4222", nil)
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
v := int(ip[0]-'0') - 1
|
|
dist[v]++
|
|
}
|
|
low := 20
|
|
high := 47
|
|
for i, d := range dist {
|
|
if d == 0 || d == 100 {
|
|
t.Fatalf("Unexpected distribution for ip %v, got %v", i, d)
|
|
} else if d < low || d > high {
|
|
t.Logf("Warning: out of expected range [%v,%v] for ip %v, got %v", low, high, i, d)
|
|
}
|
|
}
|
|
|
|
// Check IP exclusions
|
|
excludedIPs := map[string]struct{}{"1.2.3.4:4222": {}}
|
|
for i := 0; i < 100; i++ {
|
|
ip, err := s.getRandomIP(resolver, "localhost:4222", excludedIPs)
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
if ip[0] == '1' {
|
|
t.Fatalf("Should not have returned this ip: %q", ip)
|
|
}
|
|
}
|
|
excludedIPs["2.2.3.4:4222"] = struct{}{}
|
|
for i := 0; i < 100; i++ {
|
|
ip, err := s.getRandomIP(resolver, "localhost:4222", excludedIPs)
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
if ip[0] != '3' {
|
|
t.Fatalf("Should only have returned '3.2.3.4', got returned %q", ip)
|
|
}
|
|
}
|
|
excludedIPs["3.2.3.4:4222"] = struct{}{}
|
|
for i := 0; i < 100; i++ {
|
|
if _, err := s.getRandomIP(resolver, "localhost:4222", excludedIPs); err != errNoIPAvail {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
}
|
|
|
|
// Now check that exclusion takes into account the port number.
|
|
resolver.ips = []string{"127.0.0.1"}
|
|
excludedIPs = map[string]struct{}{"127.0.0.1:4222": {}}
|
|
for i := 0; i < 100; i++ {
|
|
if _, err := s.getRandomIP(resolver, "localhost:4223", excludedIPs); err == errNoIPAvail {
|
|
t.Fatal("Should not have failed")
|
|
}
|
|
}
|
|
}
|
|
|
|
type slowWriteConn struct {
|
|
net.Conn
|
|
}
|
|
|
|
func (swc *slowWriteConn) Write(b []byte) (int, error) {
|
|
// Limit the write to 10 bytes at a time.
|
|
max := len(b)
|
|
if max > 10 {
|
|
max = 10
|
|
}
|
|
return swc.Conn.Write(b[:max])
|
|
}
|
|
|
|
func TestClientWriteLoopStall(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
errCh := make(chan error, 1)
|
|
|
|
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
|
nc, err := nats.Connect(url,
|
|
nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, e error) {
|
|
select {
|
|
case errCh <- e:
|
|
default:
|
|
}
|
|
}))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
sub, err := nc.SubscribeSync("foo")
|
|
if err != nil {
|
|
t.Fatalf("Error on subscribe: %v", err)
|
|
}
|
|
nc.Flush()
|
|
cid, _ := nc.GetClientID()
|
|
|
|
sender, err := nats.Connect(url)
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer sender.Close()
|
|
|
|
c := s.getClient(cid)
|
|
c.mu.Lock()
|
|
c.nc = &slowWriteConn{Conn: c.nc}
|
|
c.mu.Unlock()
|
|
|
|
sender.Publish("foo", make([]byte, 100))
|
|
|
|
if _, err := sub.NextMsg(3 * time.Second); err != nil {
|
|
t.Fatalf("WriteLoop has stalled!")
|
|
}
|
|
|
|
// Make sure that we did not get any async error
|
|
select {
|
|
case e := <-errCh:
|
|
t.Fatalf("Got error: %v", e)
|
|
case <-time.After(250 * time.Millisecond):
|
|
}
|
|
}
|
|
|
|
func TestInsecureSkipVerifyWarning(t *testing.T) {
|
|
checkWarnReported := func(t *testing.T, o *Options, expectedWarn string) {
|
|
t.Helper()
|
|
s, err := NewServer(o)
|
|
if err != nil {
|
|
t.Fatalf("Error on new server: %v", err)
|
|
}
|
|
l := &captureWarnLogger{warn: make(chan string, 1)}
|
|
s.SetLogger(l, false, false)
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(1)
|
|
go func() {
|
|
s.Start()
|
|
wg.Done()
|
|
}()
|
|
if err := s.readyForConnections(time.Second); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
select {
|
|
case w := <-l.warn:
|
|
if !strings.Contains(w, expectedWarn) {
|
|
t.Fatalf("Expected warning %q, got %q", expectedWarn, w)
|
|
}
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatalf("Did not get warning %q", expectedWarn)
|
|
}
|
|
s.Shutdown()
|
|
wg.Wait()
|
|
}
|
|
|
|
tc := &TLSConfigOpts{}
|
|
tc.CertFile = "../test/configs/certs/server-cert.pem"
|
|
tc.KeyFile = "../test/configs/certs/server-key.pem"
|
|
tc.CaFile = "../test/configs/certs/ca.pem"
|
|
tc.Insecure = true
|
|
config, err := GenTLSConfig(tc)
|
|
if err != nil {
|
|
t.Fatalf("Error generating tls config: %v", err)
|
|
}
|
|
|
|
o := DefaultOptions()
|
|
o.Cluster.Name = "A"
|
|
o.Cluster.Port = -1
|
|
o.Cluster.TLSConfig = config.Clone()
|
|
checkWarnReported(t, o, clusterTLSInsecureWarning)
|
|
|
|
// Remove the route setting
|
|
o.Cluster.Port = 0
|
|
o.Cluster.TLSConfig = nil
|
|
|
|
// Configure LeafNode with no TLS in the main block first, but only with remotes.
|
|
o.LeafNode.Port = -1
|
|
rurl, _ := url.Parse("nats://127.0.0.1:1234")
|
|
o.LeafNode.Remotes = []*RemoteLeafOpts{
|
|
{
|
|
URLs: []*url.URL{rurl},
|
|
TLSConfig: config.Clone(),
|
|
},
|
|
}
|
|
checkWarnReported(t, o, leafnodeTLSInsecureWarning)
|
|
|
|
// Now add to main block.
|
|
o.LeafNode.TLSConfig = config.Clone()
|
|
checkWarnReported(t, o, leafnodeTLSInsecureWarning)
|
|
|
|
// Now remove remote and check warning still reported
|
|
o.LeafNode.Remotes = nil
|
|
checkWarnReported(t, o, leafnodeTLSInsecureWarning)
|
|
|
|
// Remove the LN setting
|
|
o.LeafNode.Port = 0
|
|
o.LeafNode.TLSConfig = nil
|
|
|
|
// Configure GW with no TLS in main block first, but only with remotes
|
|
o.Gateway.Name = "A"
|
|
o.Gateway.Host = "127.0.0.1"
|
|
o.Gateway.Port = -1
|
|
o.Gateway.Gateways = []*RemoteGatewayOpts{
|
|
{
|
|
Name: "B",
|
|
URLs: []*url.URL{rurl},
|
|
TLSConfig: config.Clone(),
|
|
},
|
|
}
|
|
checkWarnReported(t, o, gatewayTLSInsecureWarning)
|
|
|
|
// Now add to main block.
|
|
o.Gateway.TLSConfig = config.Clone()
|
|
checkWarnReported(t, o, gatewayTLSInsecureWarning)
|
|
|
|
// Now remove remote and check warning still reported
|
|
o.Gateway.Gateways = nil
|
|
checkWarnReported(t, o, gatewayTLSInsecureWarning)
|
|
}
|
|
|
|
func TestConnectErrorReports(t *testing.T) {
|
|
// On Windows, an attempt to connect to a port that has no listener will
|
|
// take whatever timeout specified in DialTimeout() before failing.
|
|
// So skip for now.
|
|
if runtime.GOOS == "windows" {
|
|
t.Skip()
|
|
}
|
|
// Check that default report attempts is as expected
|
|
opts := DefaultOptions()
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
if ra := s.getOpts().ConnectErrorReports; ra != DEFAULT_CONNECT_ERROR_REPORTS {
|
|
t.Fatalf("Expected default value to be %v, got %v", DEFAULT_CONNECT_ERROR_REPORTS, ra)
|
|
}
|
|
|
|
tmpFile := createFile(t, "")
|
|
log := tmpFile.Name()
|
|
tmpFile.Close()
|
|
defer removeFile(t, log)
|
|
|
|
remoteURLs := RoutesFromStr("nats://127.0.0.1:1234")
|
|
|
|
opts = DefaultOptions()
|
|
opts.ConnectErrorReports = 3
|
|
opts.Cluster.Port = -1
|
|
opts.Routes = remoteURLs
|
|
opts.NoLog = false
|
|
opts.LogFile = log
|
|
opts.Logtime = true
|
|
opts.Debug = true
|
|
s = RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
checkContent := func(t *testing.T, txt string, attempt int, shouldBeThere bool) {
|
|
t.Helper()
|
|
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
|
|
content, err := ioutil.ReadFile(log)
|
|
if err != nil {
|
|
return fmt.Errorf("Error reading log file: %v", err)
|
|
}
|
|
present := bytes.Contains(content, []byte(fmt.Sprintf("%s (attempt %d)", txt, attempt)))
|
|
if shouldBeThere && !present {
|
|
return fmt.Errorf("Did not find expected log statement (%s) for attempt %d: %s", txt, attempt, content)
|
|
} else if !shouldBeThere && present {
|
|
return fmt.Errorf("Log statement (%s) for attempt %d should not be present: %s", txt, attempt, content)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
type testConnect struct {
|
|
name string
|
|
attempt int
|
|
errExpected bool
|
|
}
|
|
for _, test := range []testConnect{
|
|
{"route_attempt_1", 1, true},
|
|
{"route_attempt_2", 2, false},
|
|
{"route_attempt_3", 3, true},
|
|
{"route_attempt_4", 4, false},
|
|
{"route_attempt_6", 6, true},
|
|
{"route_attempt_7", 7, false},
|
|
} {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
debugExpected := !test.errExpected
|
|
checkContent(t, "[DBG] Error trying to connect to route", test.attempt, debugExpected)
|
|
checkContent(t, "[ERR] Error trying to connect to route", test.attempt, test.errExpected)
|
|
})
|
|
}
|
|
|
|
s.Shutdown()
|
|
removeFile(t, log)
|
|
|
|
// Now try with leaf nodes
|
|
opts.Cluster.Port = 0
|
|
opts.Routes = nil
|
|
opts.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{remoteURLs[0]}}}
|
|
opts.LeafNode.ReconnectInterval = 15 * time.Millisecond
|
|
s = RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
checkLeafContent := func(t *testing.T, txt, host string, attempt int, shouldBeThere bool) {
|
|
t.Helper()
|
|
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
|
|
content, err := ioutil.ReadFile(log)
|
|
if err != nil {
|
|
return fmt.Errorf("Error reading log file: %v", err)
|
|
}
|
|
present := bytes.Contains(content, []byte(fmt.Sprintf("%s %q (attempt %d)", txt, host, attempt)))
|
|
if shouldBeThere && !present {
|
|
return fmt.Errorf("Did not find expected log statement (%s %q) for attempt %d: %s", txt, host, attempt, content)
|
|
} else if !shouldBeThere && present {
|
|
return fmt.Errorf("Log statement (%s %q) for attempt %d should not be present: %s", txt, host, attempt, content)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
for _, test := range []testConnect{
|
|
{"leafnode_attempt_1", 1, true},
|
|
{"leafnode_attempt_2", 2, false},
|
|
{"leafnode_attempt_3", 3, true},
|
|
{"leafnode_attempt_4", 4, false},
|
|
{"leafnode_attempt_6", 6, true},
|
|
{"leafnode_attempt_7", 7, false},
|
|
} {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
debugExpected := !test.errExpected
|
|
checkLeafContent(t, "[DBG] Error trying to connect as leafnode to remote server", remoteURLs[0].Host, test.attempt, debugExpected)
|
|
checkLeafContent(t, "[ERR] Error trying to connect as leafnode to remote server", remoteURLs[0].Host, test.attempt, test.errExpected)
|
|
})
|
|
}
|
|
|
|
s.Shutdown()
|
|
removeFile(t, log)
|
|
|
|
// Now try with gateways
|
|
opts.LeafNode.Remotes = nil
|
|
opts.Cluster.Name = "A"
|
|
opts.Gateway.Name = "A"
|
|
opts.Gateway.Port = -1
|
|
opts.Gateway.Gateways = []*RemoteGatewayOpts{
|
|
{
|
|
Name: "B",
|
|
URLs: remoteURLs,
|
|
},
|
|
}
|
|
opts.gatewaysSolicitDelay = 15 * time.Millisecond
|
|
s = RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
for _, test := range []testConnect{
|
|
{"gateway_attempt_1", 1, true},
|
|
{"gateway_attempt_2", 2, false},
|
|
{"gateway_attempt_3", 3, true},
|
|
{"gateway_attempt_4", 4, false},
|
|
{"gateway_attempt_6", 6, true},
|
|
{"gateway_attempt_7", 7, false},
|
|
} {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
debugExpected := !test.errExpected
|
|
infoExpected := test.errExpected
|
|
// For gateways, we also check our notice that we attempt to connect
|
|
checkContent(t, "[DBG] Connecting to explicit gateway \"B\" (127.0.0.1:1234) at 127.0.0.1:1234", test.attempt, debugExpected)
|
|
checkContent(t, "[INF] Connecting to explicit gateway \"B\" (127.0.0.1:1234) at 127.0.0.1:1234", test.attempt, infoExpected)
|
|
checkContent(t, "[DBG] Error connecting to explicit gateway \"B\" (127.0.0.1:1234) at 127.0.0.1:1234", test.attempt, debugExpected)
|
|
checkContent(t, "[ERR] Error connecting to explicit gateway \"B\" (127.0.0.1:1234) at 127.0.0.1:1234", test.attempt, test.errExpected)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestReconnectErrorReports(t *testing.T) {
|
|
// On Windows, an attempt to connect to a port that has no listener will
|
|
// take whatever timeout specified in DialTimeout() before failing.
|
|
// So skip for now.
|
|
if runtime.GOOS == "windows" {
|
|
t.Skip()
|
|
}
|
|
// Check that default report attempts is as expected
|
|
opts := DefaultOptions()
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
if ra := s.getOpts().ReconnectErrorReports; ra != DEFAULT_RECONNECT_ERROR_REPORTS {
|
|
t.Fatalf("Expected default value to be %v, got %v", DEFAULT_RECONNECT_ERROR_REPORTS, ra)
|
|
}
|
|
|
|
tmpFile := createFile(t, "")
|
|
log := tmpFile.Name()
|
|
tmpFile.Close()
|
|
defer removeFile(t, log)
|
|
|
|
csOpts := DefaultOptions()
|
|
csOpts.Cluster.Port = -1
|
|
cs := RunServer(csOpts)
|
|
defer cs.Shutdown()
|
|
|
|
opts = DefaultOptions()
|
|
opts.ReconnectErrorReports = 3
|
|
opts.Cluster.Port = -1
|
|
opts.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", cs.ClusterAddr().Port))
|
|
opts.NoLog = false
|
|
opts.LogFile = log
|
|
opts.Logtime = true
|
|
opts.Debug = true
|
|
s = RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
// Wait for cluster to be formed
|
|
checkClusterFormed(t, s, cs)
|
|
|
|
// Now shutdown the server s connected to.
|
|
cs.Shutdown()
|
|
|
|
// Specifically for route test, wait at least reconnect interval before checking logs
|
|
time.Sleep(DEFAULT_ROUTE_RECONNECT)
|
|
|
|
checkContent := func(t *testing.T, txt string, attempt int, shouldBeThere bool) {
|
|
t.Helper()
|
|
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
|
|
content, err := ioutil.ReadFile(log)
|
|
if err != nil {
|
|
return fmt.Errorf("Error reading log file: %v", err)
|
|
}
|
|
present := bytes.Contains(content, []byte(fmt.Sprintf("%s (attempt %d)", txt, attempt)))
|
|
if shouldBeThere && !present {
|
|
return fmt.Errorf("Did not find expected log statement (%s) for attempt %d: %s", txt, attempt, content)
|
|
} else if !shouldBeThere && present {
|
|
return fmt.Errorf("Log statement (%s) for attempt %d should not be present: %s", txt, attempt, content)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
type testConnect struct {
|
|
name string
|
|
attempt int
|
|
errExpected bool
|
|
}
|
|
for _, test := range []testConnect{
|
|
{"route_attempt_1", 1, true},
|
|
{"route_attempt_2", 2, false},
|
|
{"route_attempt_3", 3, true},
|
|
{"route_attempt_4", 4, false},
|
|
{"route_attempt_6", 6, true},
|
|
{"route_attempt_7", 7, false},
|
|
} {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
debugExpected := !test.errExpected
|
|
checkContent(t, "[DBG] Error trying to connect to route", test.attempt, debugExpected)
|
|
checkContent(t, "[ERR] Error trying to connect to route", test.attempt, test.errExpected)
|
|
})
|
|
}
|
|
|
|
s.Shutdown()
|
|
removeFile(t, log)
|
|
|
|
// Now try with leaf nodes
|
|
csOpts.Cluster.Port = 0
|
|
csOpts.LeafNode.Host = "127.0.0.1"
|
|
csOpts.LeafNode.Port = -1
|
|
|
|
cs = RunServer(csOpts)
|
|
defer cs.Shutdown()
|
|
|
|
opts.Cluster.Port = 0
|
|
opts.Routes = nil
|
|
u, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", csOpts.LeafNode.Port))
|
|
opts.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{u}}}
|
|
opts.LeafNode.ReconnectInterval = 15 * time.Millisecond
|
|
s = RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
checkLeafNodeConnected(t, s)
|
|
|
|
// Now shutdown the server s is connected to
|
|
cs.Shutdown()
|
|
|
|
checkLeafContent := func(t *testing.T, txt, host string, attempt int, shouldBeThere bool) {
|
|
t.Helper()
|
|
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
|
|
content, err := ioutil.ReadFile(log)
|
|
if err != nil {
|
|
return fmt.Errorf("Error reading log file: %v", err)
|
|
}
|
|
present := bytes.Contains(content, []byte(fmt.Sprintf("%s %q (attempt %d)", txt, host, attempt)))
|
|
if shouldBeThere && !present {
|
|
return fmt.Errorf("Did not find expected log statement (%s %q) for attempt %d: %s", txt, host, attempt, content)
|
|
} else if !shouldBeThere && present {
|
|
return fmt.Errorf("Log statement (%s %q) for attempt %d should not be present: %s", txt, host, attempt, content)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
for _, test := range []testConnect{
|
|
{"leafnode_attempt_1", 1, true},
|
|
{"leafnode_attempt_2", 2, false},
|
|
{"leafnode_attempt_3", 3, true},
|
|
{"leafnode_attempt_4", 4, false},
|
|
{"leafnode_attempt_6", 6, true},
|
|
{"leafnode_attempt_7", 7, false},
|
|
} {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
debugExpected := !test.errExpected
|
|
checkLeafContent(t, "[DBG] Error trying to connect as leafnode to remote server", u.Host, test.attempt, debugExpected)
|
|
checkLeafContent(t, "[ERR] Error trying to connect as leafnode to remote server", u.Host, test.attempt, test.errExpected)
|
|
})
|
|
}
|
|
|
|
s.Shutdown()
|
|
removeFile(t, log)
|
|
|
|
// Now try with gateways
|
|
csOpts.LeafNode.Port = 0
|
|
csOpts.Cluster.Name = "B"
|
|
csOpts.Gateway.Name = "B"
|
|
csOpts.Gateway.Port = -1
|
|
cs = RunServer(csOpts)
|
|
|
|
opts.LeafNode.Remotes = nil
|
|
opts.Cluster.Name = "A"
|
|
opts.Gateway.Name = "A"
|
|
opts.Gateway.Port = -1
|
|
remoteGWPort := cs.GatewayAddr().Port
|
|
u, _ = url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", remoteGWPort))
|
|
opts.Gateway.Gateways = []*RemoteGatewayOpts{
|
|
{
|
|
Name: "B",
|
|
URLs: []*url.URL{u},
|
|
},
|
|
}
|
|
opts.gatewaysSolicitDelay = 15 * time.Millisecond
|
|
s = RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
waitForOutboundGateways(t, s, 1, 2*time.Second)
|
|
waitForInboundGateways(t, s, 1, 2*time.Second)
|
|
|
|
// Now stop server s is connecting to
|
|
cs.Shutdown()
|
|
|
|
connTxt := fmt.Sprintf("Connecting to explicit gateway \"B\" (127.0.0.1:%d) at 127.0.0.1:%d", remoteGWPort, remoteGWPort)
|
|
dbgConnTxt := fmt.Sprintf("[DBG] %s", connTxt)
|
|
infConnTxt := fmt.Sprintf("[INF] %s", connTxt)
|
|
|
|
errTxt := fmt.Sprintf("Error connecting to explicit gateway \"B\" (127.0.0.1:%d) at 127.0.0.1:%d", remoteGWPort, remoteGWPort)
|
|
dbgErrTxt := fmt.Sprintf("[DBG] %s", errTxt)
|
|
errErrTxt := fmt.Sprintf("[ERR] %s", errTxt)
|
|
|
|
for _, test := range []testConnect{
|
|
{"gateway_attempt_1", 1, true},
|
|
{"gateway_attempt_2", 2, false},
|
|
{"gateway_attempt_3", 3, true},
|
|
{"gateway_attempt_4", 4, false},
|
|
{"gateway_attempt_6", 6, true},
|
|
{"gateway_attempt_7", 7, false},
|
|
} {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
debugExpected := !test.errExpected
|
|
infoExpected := test.errExpected
|
|
// For gateways, we also check our notice that we attempt to connect
|
|
checkContent(t, dbgConnTxt, test.attempt, debugExpected)
|
|
checkContent(t, infConnTxt, test.attempt, infoExpected)
|
|
checkContent(t, dbgErrTxt, test.attempt, debugExpected)
|
|
checkContent(t, errErrTxt, test.attempt, test.errExpected)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestServerLogsConfigurationFile(t *testing.T) {
|
|
tmpDir := createDir(t, "_nats-server")
|
|
defer removeDir(t, tmpDir)
|
|
|
|
file := createFileAtDir(t, tmpDir, "nats_server_log_")
|
|
file.Close()
|
|
|
|
conf := createConfFile(t, []byte(fmt.Sprintf(`
|
|
port: -1
|
|
logfile: "%s"
|
|
`, file.Name())))
|
|
defer removeFile(t, conf)
|
|
|
|
o := LoadConfig(conf)
|
|
o.ConfigFile = file.Name()
|
|
o.NoLog = false
|
|
s := RunServer(o)
|
|
s.Shutdown()
|
|
|
|
log, err := ioutil.ReadFile(file.Name())
|
|
if err != nil {
|
|
t.Fatalf("Error reading log file: %v", err)
|
|
}
|
|
if !bytes.Contains(log, []byte(fmt.Sprintf("Using configuration file: %s", file.Name()))) {
|
|
t.Fatalf("Config file location was not reported in log: %s", log)
|
|
}
|
|
}
|