Files
nats-server/server/server_test.go
Ivan Kozlovic 1acf330e07 [ADDED] Notification to clients when servers leave the cluster
Until now, a server would only notify clients of servers that join
the cluster. More than that, a server would send ot its clients only
information if new servers were added.
This PR changes this by sending to clients that support async INFO
the list of URLs for all servers in the cluster any time that there
is a change (joining or leaving the cluster).
As of now, clients will not be affected by the change (and will not
take benefit of this: removing servers from their server pool). This
will be addressed in each supported client once this is merged.
2018-02-27 14:22:13 -07:00

565 lines
14 KiB
Go

// Copyright 2015-2016 Apcera Inc. All rights reserved.
package server
import (
"flag"
"fmt"
"net"
"strings"
"testing"
"time"
"github.com/nats-io/go-nats"
)
func DefaultOptions() *Options {
return &Options{
Host: "localhost",
Port: -1,
HTTPPort: -1,
Cluster: ClusterOpts{Port: -1},
NoLog: true,
NoSigs: true,
Debug: true,
Trace: true,
}
}
// New Go Routine based server
func RunServer(opts *Options) *Server {
if opts == nil {
opts = DefaultOptions()
}
s := New(opts)
if s == nil {
panic("No NATS Server object returned.")
}
if !opts.NoLog {
s.ConfigureLogger()
}
// Run server in Go routine.
go s.Start()
// Wait for accept loop(s) to be started
if !s.ReadyForConnections(10 * time.Second) {
panic("Unable to start NATS Server in Go Routine")
}
return s
}
func TestStartProfiler(t *testing.T) {
s := New(DefaultOptions())
s.StartProfiler()
s.mu.Lock()
s.profiler.Close()
s.mu.Unlock()
}
func TestStartupAndShutdown(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()
if !s.isRunning() {
t.Fatal("Could not run server")
}
// Debug stuff.
numRoutes := s.NumRoutes()
if numRoutes != 0 {
t.Fatalf("Expected numRoutes to be 0 vs %d\n", numRoutes)
}
numRemotes := s.NumRemotes()
if numRemotes != 0 {
t.Fatalf("Expected numRemotes to be 0 vs %d\n", numRemotes)
}
numClients := s.NumClients()
if numClients != 0 && numClients != 1 {
t.Fatalf("Expected numClients to be 1 or 0 vs %d\n", numClients)
}
numSubscriptions := s.NumSubscriptions()
if numSubscriptions != 0 {
t.Fatalf("Expected numSubscriptions to be 0 vs %d\n", numSubscriptions)
}
}
func TestTlsCipher(t *testing.T) {
if strings.Compare(tlsCipher(0x0005), "TLS_RSA_WITH_RC4_128_SHA") != 0 {
t.Fatalf("Invalid tls cipher")
}
if strings.Compare(tlsCipher(0x000a), "TLS_RSA_WITH_3DES_EDE_CBC_SHA") != 0 {
t.Fatalf("Invalid tls cipher")
}
if strings.Compare(tlsCipher(0x002f), "TLS_RSA_WITH_AES_128_CBC_SHA") != 0 {
t.Fatalf("Invalid tls cipher")
}
if strings.Compare(tlsCipher(0x0035), "TLS_RSA_WITH_AES_256_CBC_SHA") != 0 {
t.Fatalf("Invalid tls cipher")
}
if strings.Compare(tlsCipher(0xc007), "TLS_ECDHE_ECDSA_WITH_RC4_128_SHA") != 0 {
t.Fatalf("Invalid tls cipher")
}
if strings.Compare(tlsCipher(0xc009), "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA") != 0 {
t.Fatalf("Invalid tls cipher")
}
if strings.Compare(tlsCipher(0xc00a), "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA") != 0 {
t.Fatalf("Invalid tls cipher")
}
if strings.Compare(tlsCipher(0xc011), "TLS_ECDHE_RSA_WITH_RC4_128_SHA") != 0 {
t.Fatalf("Invalid tls cipher")
}
if strings.Compare(tlsCipher(0xc012), "TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA") != 0 {
t.Fatalf("Invalid tls cipher")
}
if strings.Compare(tlsCipher(0xc013), "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA") != 0 {
t.Fatalf("Invalid tls cipher")
}
if strings.Compare(tlsCipher(0xc014), "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA") != 0 {
t.Fatalf("IUnknownnvalid tls cipher")
}
if strings.Compare(tlsCipher(0xc02f), "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256") != 0 {
t.Fatalf("Invalid tls cipher")
}
if strings.Compare(tlsCipher(0xc02b), "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256") != 0 {
t.Fatalf("Invalid tls cipher")
}
if strings.Compare(tlsCipher(0xc030), "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384") != 0 {
t.Fatalf("Invalid tls cipher")
}
if strings.Compare(tlsCipher(0xc02c), "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384") != 0 {
t.Fatalf("Invalid tls cipher")
}
if !strings.Contains(tlsCipher(0x9999), "Unknown") {
t.Fatalf("Expected an unknown cipher.")
}
}
func TestGetConnectURLs(t *testing.T) {
opts := DefaultOptions()
opts.Port = 4222
var globalIP net.IP
checkGlobalConnectURLs := func() {
s := New(opts)
defer s.Shutdown()
s.mu.Lock()
urls := s.getClientConnectURLs()
s.mu.Unlock()
if len(urls) == 0 {
t.Fatalf("Expected to get a list of urls, got none for listen addr: %v", opts.Host)
}
for _, u := range urls {
tcpaddr, err := net.ResolveTCPAddr("tcp", u)
if err != nil {
t.Fatalf("Error resolving: %v", err)
}
ip := tcpaddr.IP
if !ip.IsGlobalUnicast() {
t.Fatalf("IP %v is not global", ip.String())
}
if ip.IsUnspecified() {
t.Fatalf("IP %v is unspecified", ip.String())
}
addr := strings.TrimSuffix(u, ":4222")
if addr == opts.Host {
t.Fatalf("Returned url is not right: %v", u)
}
if globalIP == nil {
globalIP = ip
}
}
}
listenAddrs := []string{"0.0.0.0", "::"}
for _, listenAddr := range listenAddrs {
opts.Host = listenAddr
checkGlobalConnectURLs()
}
checkConnectURLsHasOnlyOne := func() {
s := New(opts)
defer s.Shutdown()
s.mu.Lock()
urls := s.getClientConnectURLs()
s.mu.Unlock()
if len(urls) != 1 {
t.Fatalf("Expected one URL, got %v", urls)
}
tcpaddr, err := net.ResolveTCPAddr("tcp", urls[0])
if err != nil {
t.Fatalf("Error resolving: %v", err)
}
ip := tcpaddr.IP
if ip.String() != opts.Host {
t.Fatalf("Expected connect URL to be %v, got %v", opts.Host, ip.String())
}
}
singleConnectReturned := []string{"127.0.0.1", "::1"}
if globalIP != nil {
singleConnectReturned = append(singleConnectReturned, globalIP.String())
}
for _, listenAddr := range singleConnectReturned {
opts.Host = listenAddr
checkConnectURLsHasOnlyOne()
}
}
func TestClientAdvertiseConnectURL(t *testing.T) {
opts := DefaultOptions()
opts.Port = 4222
opts.ClientAdvertise = "nats.example.com"
s := New(opts)
defer s.Shutdown()
s.mu.Lock()
urls := s.getClientConnectURLs()
s.mu.Unlock()
if len(urls) != 1 {
t.Fatalf("Expected to get one url, got none: %v with ClientAdvertise %v",
opts.Host, opts.ClientAdvertise)
}
if urls[0] != "nats.example.com:4222" {
t.Fatalf("Expected to get '%s', got: '%v'", "nats.example.com:4222", urls[0])
}
s.Shutdown()
opts.ClientAdvertise = "nats.example.com:7777"
s = New(opts)
s.mu.Lock()
urls = s.getClientConnectURLs()
s.mu.Unlock()
if len(urls) != 1 {
t.Fatalf("Expected to get one url, got none: %v with ClientAdvertise %v",
opts.Host, opts.ClientAdvertise)
}
if urls[0] != "nats.example.com:7777" {
t.Fatalf("Expected 'nats.example.com:7777', got: '%v'", urls[0])
}
if s.info.Host != "nats.example.com" {
t.Fatalf("Expected host to be set to nats.example.com")
}
if s.info.Port != 7777 {
t.Fatalf("Expected port to be set to 7777")
}
s.Shutdown()
opts = DefaultOptions()
opts.Port = 0
opts.ClientAdvertise = "nats.example.com:7777"
s = New(opts)
if s.info.Host != "nats.example.com" && s.info.Port != 7777 {
t.Fatalf("Expected Client Advertise Host:Port to be nats.example.com:7777, got: %s:%d",
s.info.Host, s.info.Port)
}
s.Shutdown()
}
func TestClientAdvertiseErrorOnStartup(t *testing.T) {
opts := DefaultOptions()
// Set invalid address
opts.ClientAdvertise = "addr:::123"
s := New(opts)
defer s.Shutdown()
dl := &DummyLogger{}
s.SetLogger(dl, false, false)
// Expect this to return due to failure
s.Start()
dl.Lock()
msg := dl.msg
dl.Unlock()
if !strings.Contains(msg, "ClientAdvertise") {
t.Fatalf("Unexpected error: %v", msg)
}
}
func TestNoDeadlockOnStartFailure(t *testing.T) {
opts := DefaultOptions()
opts.Host = "x.x.x.x" // bad host
opts.Port = 4222
opts.HTTPHost = opts.Host
opts.Cluster.Host = "localhost"
opts.Cluster.Port = -1
opts.ProfPort = -1
s := New(opts)
// This should return since it should fail to start a listener
// on x.x.x.x:4222
s.Start()
// We should be able to shutdown
s.Shutdown()
}
func TestMaxConnections(t *testing.T) {
opts := DefaultOptions()
opts.MaxConn = 1
s := RunServer(opts)
defer s.Shutdown()
addr := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
nc, err := nats.Connect(addr)
if err != nil {
t.Fatalf("Error creating client: %v\n", err)
}
defer nc.Close()
nc2, err := nats.Connect(addr)
if err == nil {
nc2.Close()
t.Fatal("Expected connection to fail")
}
}
func TestProcessCommandLineArgs(t *testing.T) {
var host string
var port int
cmd := flag.NewFlagSet("gnatsd", flag.ExitOnError)
cmd.StringVar(&host, "a", "0.0.0.0", "Host.")
cmd.IntVar(&port, "p", 4222, "Port.")
cmd.Parse([]string{"-a", "127.0.0.1", "-p", "9090"})
showVersion, showHelp, err := ProcessCommandLineArgs(cmd)
if err != nil {
t.Errorf("Expected no errors, got: %s", err)
}
if showVersion || showHelp {
t.Errorf("Expected not having to handle subcommands")
}
cmd.Parse([]string{"version"})
showVersion, showHelp, err = ProcessCommandLineArgs(cmd)
if err != nil {
t.Errorf("Expected no errors, got: %s", err)
}
if !showVersion {
t.Errorf("Expected having to handle version command")
}
if showHelp {
t.Errorf("Expected not having to handle help command")
}
cmd.Parse([]string{"help"})
showVersion, showHelp, err = ProcessCommandLineArgs(cmd)
if err != nil {
t.Errorf("Expected no errors, got: %s", err)
}
if showVersion {
t.Errorf("Expected not having to handle version command")
}
if !showHelp {
t.Errorf("Expected having to handle help command")
}
cmd.Parse([]string{"foo", "-p", "9090"})
_, _, err = ProcessCommandLineArgs(cmd)
if err == nil {
t.Errorf("Expected an error handling the command arguments")
}
}
func TestWriteDeadline(t *testing.T) {
opts := DefaultOptions()
opts.WriteDeadline = 20 * time.Millisecond
s := RunServer(opts)
defer s.Shutdown()
c, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", opts.Host, opts.Port), 3*time.Second)
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer c.Close()
if _, err := c.Write([]byte("CONNECT {}\r\nPING\r\nSUB foo 1\r\n")); err != nil {
t.Fatalf("Error sending protocols to server: %v", err)
}
// Reduce socket buffer to increase reliability of getting
// write deadline errors.
c.(*net.TCPConn).SetReadBuffer(10)
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
sender, err := nats.Connect(url)
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer sender.Close()
payload := make([]byte, 1000000)
start := time.Now()
for i := 0; i < 10; i++ {
if err := sender.Publish("foo", payload); err != nil {
t.Fatalf("Error on publish: %v", err)
}
}
dur := time.Since(start)
// user more than the write deadline to account for calls
// overhead, running with -race, etc...
if dur > 110*time.Millisecond {
t.Fatalf("Flush should have returned sooner, took: %v", dur)
}
// Flush sender connection to ensure that all data has been sent.
sender.Flush()
// At this point server should have closed connection c.
// On certain platforms, it may take more than one call before
// getting the error.
for i := 0; i < 100; i++ {
if _, err := c.Write([]byte("PUB bar 5\r\nhello\r\n")); err != nil {
// ok
return
}
}
t.Fatal("Connection should have been closed")
}
func TestRandomPorts(t *testing.T) {
opts := DefaultOptions()
opts.HTTPPort = -1
opts.Port = -1
s := RunServer(opts)
defer s.Shutdown()
if s.Addr() == nil || s.Addr().(*net.TCPAddr).Port <= 0 {
t.Fatal("Should have dynamically assigned server port.")
}
if s.Addr() == nil || s.Addr().(*net.TCPAddr).Port == 4222 {
t.Fatal("Should not have dynamically assigned default port: 4222.")
}
if s.MonitorAddr() == nil || s.MonitorAddr().Port <= 0 {
t.Fatal("Should have dynamically assigned monitoring port.")
}
}
func TestNilMonitoringPort(t *testing.T) {
opts := DefaultOptions()
opts.HTTPPort = 0
opts.HTTPSPort = 0
s := RunServer(opts)
defer s.Shutdown()
if s.MonitorAddr() != nil {
t.Fatal("HttpAddr should be nil.")
}
}
type DummyAuth struct{}
func (d *DummyAuth) Check(c ClientAuthentication) bool {
return c.GetOpts().Username == "valid"
}
func TestCustomClientAuthentication(t *testing.T) {
var clientAuth DummyAuth
opts := DefaultOptions()
opts.CustomClientAuthentication = &clientAuth
s := RunServer(opts)
defer s.Shutdown()
addr := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
nc, err := nats.Connect(addr, nats.UserInfo("valid", ""))
if err != nil {
t.Fatalf("Expected client to connect, got: %s", err)
}
nc.Close()
if _, err := nats.Connect(addr, nats.UserInfo("invalid", "")); err == nil {
t.Fatal("Expected client to fail to connect")
}
}
func TestCustomRouterAuthentication(t *testing.T) {
opts := DefaultOptions()
opts.CustomRouterAuthentication = &DummyAuth{}
opts.Cluster.Host = "127.0.0.1"
s := RunServer(opts)
defer s.Shutdown()
clusterPort := s.ClusterAddr().Port
opts2 := DefaultOptions()
opts2.Cluster.Host = "127.0.0.1"
opts2.Routes = RoutesFromStr(fmt.Sprintf("nats://invalid@127.0.0.1:%d", clusterPort))
s2 := RunServer(opts2)
defer s2.Shutdown()
if nr := s2.NumRoutes(); nr != 0 {
t.Fatalf("Expected no route, got %v", nr)
}
opts3 := DefaultOptions()
opts3.Cluster.Host = "127.0.0.1"
opts3.Routes = RoutesFromStr(fmt.Sprintf("nats://valid@127.0.0.1:%d", clusterPort))
s3 := RunServer(opts3)
defer s3.Shutdown()
if nr := s3.NumRoutes(); nr != 1 {
t.Fatalf("Expected 1 route, got %v", nr)
}
}
func TestMonitoringNoTimeout(t *testing.T) {
s := runMonitorServer()
defer s.Shutdown()
s.mu.Lock()
srv := s.monitoringServer
s.mu.Unlock()
if srv == nil {
t.Fatalf("Monitoring server not set")
}
if srv.ReadTimeout != 0 {
t.Fatalf("ReadTimeout should not be set, was set to %v", srv.ReadTimeout)
}
if srv.WriteTimeout != 0 {
t.Fatalf("WriteTimeout should not be set, was set to %v", srv.WriteTimeout)
}
}
func TestProfilingNoTimeout(t *testing.T) {
opts := DefaultOptions()
opts.ProfPort = -1
s := RunServer(opts)
defer s.Shutdown()
paddr := s.ProfilerAddr()
if paddr == nil {
t.Fatalf("Profiler not started")
}
pport := paddr.Port
if pport <= 0 {
t.Fatalf("Expected profiler port to be set, got %v", pport)
}
s.mu.Lock()
srv := s.profilingServer
s.mu.Unlock()
if srv == nil {
t.Fatalf("Profiling server not set")
}
if srv.ReadTimeout != 0 {
t.Fatalf("ReadTimeout should not be set, was set to %v", srv.ReadTimeout)
}
if srv.WriteTimeout != 0 {
t.Fatalf("WriteTimeout should not be set, was set to %v", srv.WriteTimeout)
}
}