Files
nats-server/server/client_test.go
Ivan Kozlovic 1d3cddfa7c [CHANGED] Reduce print for an account subs limit to every 2 sec
We could make it for all limits by having a map of error types
instead of applying just to max subs.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
2021-06-22 11:00:41 -06:00

2576 lines
65 KiB
Go

// Copyright 2012-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
"math"
"net"
"net/url"
"reflect"
"regexp"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"crypto/rand"
"crypto/tls"
"github.com/nats-io/jwt/v2"
"github.com/nats-io/nats.go"
"github.com/nats-io/nkeys"
)
type serverInfo struct {
ID string `json:"server_id"`
Host string `json:"host"`
Port uint `json:"port"`
Version string `json:"version"`
AuthRequired bool `json:"auth_required"`
TLSRequired bool `json:"tls_required"`
MaxPayload int64 `json:"max_payload"`
Headers bool `json:"headers"`
ConnectURLs []string `json:"connect_urls,omitempty"`
LameDuckMode bool `json:"ldm,omitempty"`
CID uint64 `json:"client_id,omitempty"`
}
type testAsyncClient struct {
*client
parseAsync func(string)
quitCh chan bool
}
func (c *testAsyncClient) close() {
c.client.closeConnection(ClientClosed)
c.quitCh <- true
}
func (c *testAsyncClient) parse(proto []byte) error {
err := c.client.parse(proto)
c.client.flushClients(0)
return err
}
func (c *testAsyncClient) parseAndClose(proto []byte) {
c.client.parse(proto)
c.client.flushClients(0)
c.closeConnection(ClientClosed)
}
func createClientAsync(ch chan *client, s *Server, cli net.Conn) {
// Normally, those type of clients are used against non running servers.
// However, some don't, which would then cause the writeLoop to be
// started twice for the same client (since createClient() start both
// read and write loop if it is detected as running).
startWriteLoop := !s.isRunning()
if startWriteLoop {
s.grWG.Add(1)
}
go func() {
c := s.createClient(cli)
// Must be here to suppress +OK
c.opts.Verbose = false
if startWriteLoop {
go c.writeLoop()
}
ch <- c
}()
}
func newClientForServer(s *Server) (*testAsyncClient, *bufio.Reader, string) {
cli, srv := net.Pipe()
cr := bufio.NewReaderSize(cli, maxBufSize)
ch := make(chan *client)
createClientAsync(ch, s, srv)
// So failing tests don't just hang.
cli.SetReadDeadline(time.Now().Add(10 * time.Second))
l, _ := cr.ReadString('\n')
// Grab client
c := <-ch
parse, quitCh := genAsyncParser(c)
asyncClient := &testAsyncClient{
client: c,
parseAsync: parse,
quitCh: quitCh,
}
return asyncClient, cr, l
}
func genAsyncParser(c *client) (func(string), chan bool) {
pab := make(chan []byte, 16)
pas := func(cs string) { pab <- []byte(cs) }
quit := make(chan bool)
go func() {
for {
select {
case cs := <-pab:
c.parse(cs)
c.flushClients(0)
case <-quit:
return
}
}
}()
return pas, quit
}
var defaultServerOptions = Options{
Host: "127.0.0.1",
Trace: true,
Debug: true,
DisableShortFirstPing: true,
NoLog: true,
NoSigs: true,
}
func rawSetup(serverOptions Options) (*Server, *testAsyncClient, *bufio.Reader, string) {
s := New(&serverOptions)
c, cr, l := newClientForServer(s)
return s, c, cr, l
}
func setUpClientWithResponse() (*testAsyncClient, string) {
_, c, _, l := rawSetup(defaultServerOptions)
return c, l
}
func setupClient() (*Server, *testAsyncClient, *bufio.Reader) {
s, c, cr, _ := rawSetup(defaultServerOptions)
return s, c, cr
}
func checkClientsCount(t *testing.T, s *Server, expected int) {
t.Helper()
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
if nc := s.NumClients(); nc != expected {
return fmt.Errorf("The number of expected connections was %v, got %v", expected, nc)
}
return nil
})
}
func checkAccClientsCount(t *testing.T, acc *Account, expected int) {
t.Helper()
checkFor(t, 4*time.Second, 10*time.Millisecond, func() error {
if nc := acc.NumConnections(); nc != expected {
return fmt.Errorf("Expected account %q to have %v clients, got %v",
acc.Name, expected, nc)
}
return nil
})
}
func TestAsyncClientWithRunningServer(t *testing.T) {
o := DefaultOptions()
s := RunServer(o)
defer s.Shutdown()
c, _, _ := newClientForServer(s)
defer c.close()
buf := make([]byte, 1000000)
writeLoopTxt := fmt.Sprintf("writeLoop(%p)", c.client)
checkFor(t, time.Second, 15*time.Millisecond, func() error {
n := runtime.Stack(buf, true)
if count := strings.Count(string(buf[:n]), writeLoopTxt); count != 1 {
return fmt.Errorf("writeLoop for client should have been started only once: %v", count)
}
return nil
})
}
func TestClientCreateAndInfo(t *testing.T) {
c, l := setUpClientWithResponse()
defer c.close()
if c.cid != 1 {
t.Fatalf("Expected cid of 1 vs %d\n", c.cid)
}
if c.state != OP_START {
t.Fatal("Expected state to be OP_START")
}
if !strings.HasPrefix(l, "INFO ") {
t.Fatalf("INFO response incorrect: %s\n", l)
}
// Make sure payload is proper json
var info serverInfo
err := json.Unmarshal([]byte(l[5:]), &info)
if err != nil {
t.Fatalf("Could not parse INFO json: %v\n", err)
}
// Sanity checks
if info.MaxPayload != MAX_PAYLOAD_SIZE ||
info.AuthRequired || info.TLSRequired ||
info.Port != DEFAULT_PORT {
t.Fatalf("INFO inconsistent: %+v\n", info)
}
}
func TestClientNoResponderSupport(t *testing.T) {
opts := defaultServerOptions
opts.Port = -1
s := New(&opts)
c, _, _ := newClientForServer(s)
defer c.close()
// Force header support if you want to do no_responders. Make sure headers are set.
if err := c.parse([]byte("CONNECT {\"no_responders\":true}\r\n")); err == nil {
t.Fatalf("Expected error")
}
c, cr, _ := newClientForServer(s)
defer c.close()
c.parseAsync("CONNECT {\"headers\":true, \"no_responders\":true}\r\nSUB reply 1\r\nPUB foo reply 2\r\nok\r\n")
l, err := cr.ReadString('\n')
if err != nil {
t.Fatalf("Error receiving msg from server: %v\n", err)
}
am := hmsgPat.FindAllStringSubmatch(l, -1)
if len(am) == 0 {
t.Fatalf("Did not get a match for %q", l)
}
checkPayload(cr, []byte("NATS/1.0 503\r\n\r\n"), t)
}
func TestServerHeaderSupport(t *testing.T) {
opts := defaultServerOptions
opts.Port = -1
s := New(&opts)
c, _, l := newClientForServer(s)
defer c.close()
if !strings.HasPrefix(l, "INFO ") {
t.Fatalf("INFO response incorrect: %s\n", l)
}
var info serverInfo
if err := json.Unmarshal([]byte(l[5:]), &info); err != nil {
t.Fatalf("Could not parse INFO json: %v\n", err)
}
if !info.Headers {
t.Fatalf("Expected by default for header support to be enabled")
}
opts.NoHeaderSupport = true
opts.Port = -1
s = New(&opts)
c, _, l = newClientForServer(s)
defer c.close()
if err := json.Unmarshal([]byte(l[5:]), &info); err != nil {
t.Fatalf("Could not parse INFO json: %v\n", err)
}
if info.Headers {
t.Fatalf("Expected header support to be disabled")
}
}
// This test specifically is not testing how headers are encoded in a raw msg.
// It wants to make sure the serve and clients agreement on when to use headers
// is bi-directional and functions properly.
func TestClientHeaderSupport(t *testing.T) {
opts := defaultServerOptions
opts.Port = -1
s := New(&opts)
c, _, _ := newClientForServer(s)
defer c.close()
// Even though the server supports headers we need to explicitly say we do in the
// CONNECT. If we do not we should get an error.
if err := c.parse([]byte("CONNECT {}\r\nHPUB foo 0 2\r\nok\r\n")); err != ErrMsgHeadersNotSupported {
t.Fatalf("Expected to receive an error, got %v", err)
}
// This should succeed.
c, _, _ = newClientForServer(s)
defer c.close()
if err := c.parse([]byte("CONNECT {\"headers\":true}\r\nHPUB foo 0 2\r\nok\r\n")); err != nil {
t.Fatalf("Unexpected error %v", err)
}
// Now start a server without support.
opts.NoHeaderSupport = true
opts.Port = -1
s = New(&opts)
c, _, _ = newClientForServer(s)
defer c.close()
if err := c.parse([]byte("CONNECT {\"headers\":true}\r\nHPUB foo 0 2\r\nok\r\n")); err != ErrMsgHeadersNotSupported {
t.Fatalf("Expected to receive an error, got %v", err)
}
}
var hmsgPat = regexp.MustCompile(`HMSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)[^\S\r\n]+(\d+)\r\n`)
func TestClientHeaderDeliverMsg(t *testing.T) {
opts := defaultServerOptions
opts.Port = -1
s := New(&opts)
c, cr, _ := newClientForServer(s)
defer c.close()
connect := "CONNECT {\"headers\":true}"
subOp := "SUB foo 1"
pubOp := "HPUB foo 12 14\r\nName:Derek\r\nOK\r\n"
cmd := strings.Join([]string{connect, subOp, pubOp}, "\r\n")
c.parseAsync(cmd)
l, err := cr.ReadString('\n')
if err != nil {
t.Fatalf("Error receiving msg from server: %v\n", err)
}
am := hmsgPat.FindAllStringSubmatch(l, -1)
if len(am) == 0 {
t.Fatalf("Did not get a match for %q", l)
}
matches := am[0]
if len(matches) != 7 {
t.Fatalf("Did not get correct # matches: %d vs %d\n", len(matches), 7)
}
if matches[SUB_INDEX] != "foo" {
t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "1" {
t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX])
}
if matches[HDR_INDEX] != "12" {
t.Fatalf("Did not get correct msg length: '%s'\n", matches[HDR_INDEX])
}
if matches[TLEN_INDEX] != "14" {
t.Fatalf("Did not get correct msg length: '%s'\n", matches[TLEN_INDEX])
}
checkPayload(cr, []byte("Name:Derek\r\nOK\r\n"), t)
}
var smsgPat = regexp.MustCompile(`^MSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\r\n`)
func TestClientHeaderDeliverStrippedMsg(t *testing.T) {
opts := defaultServerOptions
opts.Port = -1
s := New(&opts)
c, _, _ := newClientForServer(s)
defer c.close()
b, br, _ := newClientForServer(s)
defer b.close()
// Does not support headers
b.parseAsync("SUB foo 1\r\nPING\r\n")
if _, err := br.ReadString('\n'); err != nil {
t.Fatalf("Error receiving msg from server: %v\n", err)
}
connect := "CONNECT {\"headers\":true}"
pubOp := "HPUB foo 12 14\r\nName:Derek\r\nOK\r\n"
cmd := strings.Join([]string{connect, pubOp}, "\r\n")
c.parseAsync(cmd)
// Read from 'b' client.
l, err := br.ReadString('\n')
if err != nil {
t.Fatalf("Error receiving msg from server: %v\n", err)
}
am := smsgPat.FindAllStringSubmatch(l, -1)
if len(am) == 0 {
t.Fatalf("Did not get a correct match for %q", l)
}
matches := am[0]
if len(matches) != 6 {
t.Fatalf("Did not get correct # matches: %d vs %d\n", len(matches), 6)
}
if matches[SUB_INDEX] != "foo" {
t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "1" {
t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX])
}
if matches[LEN_INDEX] != "2" {
t.Fatalf("Did not get correct msg length: '%s'\n", matches[LEN_INDEX])
}
checkPayload(br, []byte("OK\r\n"), t)
if br.Buffered() != 0 {
t.Fatalf("Expected no extra bytes to be buffered, got %d", br.Buffered())
}
}
func TestClientHeaderDeliverQueueSubStrippedMsg(t *testing.T) {
opts := defaultServerOptions
opts.Port = -1
s := New(&opts)
c, _, _ := newClientForServer(s)
defer c.close()
b, br, _ := newClientForServer(s)
defer b.close()
// Does not support headers
b.parseAsync("SUB foo bar 1\r\nPING\r\n")
if _, err := br.ReadString('\n'); err != nil {
t.Fatalf("Error receiving msg from server: %v\n", err)
}
connect := "CONNECT {\"headers\":true}"
pubOp := "HPUB foo 12 14\r\nName:Derek\r\nOK\r\n"
cmd := strings.Join([]string{connect, pubOp}, "\r\n")
c.parseAsync(cmd)
// Read from 'b' client.
l, err := br.ReadString('\n')
if err != nil {
t.Fatalf("Error receiving msg from server: %v\n", err)
}
am := smsgPat.FindAllStringSubmatch(l, -1)
if len(am) == 0 {
t.Fatalf("Did not get a correct match for %q", l)
}
matches := am[0]
if len(matches) != 6 {
t.Fatalf("Did not get correct # matches: %d vs %d\n", len(matches), 6)
}
if matches[SUB_INDEX] != "foo" {
t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "1" {
t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX])
}
if matches[LEN_INDEX] != "2" {
t.Fatalf("Did not get correct msg length: '%s'\n", matches[LEN_INDEX])
}
checkPayload(br, []byte("OK\r\n"), t)
}
func TestNonTLSConnectionState(t *testing.T) {
_, c, _ := setupClient()
defer c.close()
state := c.GetTLSConnectionState()
if state != nil {
t.Error("GetTLSConnectionState() returned non-nil")
}
}
func TestClientConnect(t *testing.T) {
_, c, _ := setupClient()
defer c.close()
// Basic Connect setting flags
connectOp := []byte("CONNECT {\"verbose\":true,\"pedantic\":true,\"tls_required\":false,\"echo\":false}\r\n")
err := c.parse(connectOp)
if err != nil {
t.Fatalf("Received error: %v\n", err)
}
if c.state != OP_START {
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
}
if !reflect.DeepEqual(c.opts, ClientOpts{Verbose: true, Pedantic: true, Echo: false}) {
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
}
// Test that we can capture user/pass
connectOp = []byte("CONNECT {\"user\":\"derek\",\"pass\":\"foo\"}\r\n")
c.opts = defaultOpts
err = c.parse(connectOp)
if err != nil {
t.Fatalf("Received error: %v\n", err)
}
if c.state != OP_START {
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
}
if !reflect.DeepEqual(c.opts, ClientOpts{Echo: true, Verbose: true, Pedantic: true, Username: "derek", Password: "foo"}) {
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
}
// Test that we can capture client name
connectOp = []byte("CONNECT {\"user\":\"derek\",\"pass\":\"foo\",\"name\":\"router\"}\r\n")
c.opts = defaultOpts
err = c.parse(connectOp)
if err != nil {
t.Fatalf("Received error: %v\n", err)
}
if c.state != OP_START {
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
}
if !reflect.DeepEqual(c.opts, ClientOpts{Echo: true, Verbose: true, Pedantic: true, Username: "derek", Password: "foo", Name: "router"}) {
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
}
// Test that we correctly capture auth tokens
connectOp = []byte("CONNECT {\"auth_token\":\"YZZ222\",\"name\":\"router\"}\r\n")
c.opts = defaultOpts
err = c.parse(connectOp)
if err != nil {
t.Fatalf("Received error: %v\n", err)
}
if c.state != OP_START {
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
}
if !reflect.DeepEqual(c.opts, ClientOpts{Echo: true, Verbose: true, Pedantic: true, Token: "YZZ222", Name: "router"}) {
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
}
}
func TestClientConnectProto(t *testing.T) {
_, c, r := setupClient()
defer c.close()
// Basic Connect setting flags, proto should be zero (original proto)
connectOp := []byte("CONNECT {\"verbose\":true,\"pedantic\":true,\"tls_required\":false}\r\n")
err := c.parse(connectOp)
if err != nil {
t.Fatalf("Received error: %v\n", err)
}
if c.state != OP_START {
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
}
if !reflect.DeepEqual(c.opts, ClientOpts{Echo: true, Verbose: true, Pedantic: true, Protocol: ClientProtoZero}) {
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
}
// ProtoInfo
connectOp = []byte(fmt.Sprintf("CONNECT {\"verbose\":true,\"pedantic\":true,\"tls_required\":false,\"protocol\":%d}\r\n", ClientProtoInfo))
err = c.parse(connectOp)
if err != nil {
t.Fatalf("Received error: %v\n", err)
}
if c.state != OP_START {
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
}
if !reflect.DeepEqual(c.opts, ClientOpts{Echo: true, Verbose: true, Pedantic: true, Protocol: ClientProtoInfo}) {
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
}
if c.opts.Protocol != ClientProtoInfo {
t.Fatalf("Protocol should have been set to %v, but is set to %v", ClientProtoInfo, c.opts.Protocol)
}
// Illegal Option
connectOp = []byte("CONNECT {\"protocol\":22}\r\n")
wg := sync.WaitGroup{}
wg.Add(1)
// The client here is using a pipe, we need to be dequeuing
// data otherwise the server would be blocked trying to send
// the error back to it.
go func() {
defer wg.Done()
for {
if _, _, err := r.ReadLine(); err != nil {
return
}
}
}()
err = c.parse(connectOp)
if err == nil {
t.Fatalf("Expected to receive an error\n")
}
if err != ErrBadClientProtocol {
t.Fatalf("Expected err of %q, got %q\n", ErrBadClientProtocol, err)
}
wg.Wait()
}
func TestRemoteAddress(t *testing.T) {
rc := &client{}
// though in reality this will panic if it does not, adding coverage anyway
if rc.RemoteAddress() != nil {
t.Errorf("RemoteAddress() did not handle nil connection correctly")
}
_, c, _ := setupClient()
defer c.close()
addr := c.RemoteAddress()
if addr.Network() != "pipe" {
t.Errorf("RemoteAddress() returned invalid network: %s", addr.Network())
}
if addr.String() != "pipe" {
t.Errorf("RemoteAddress() returned invalid string: %s", addr.String())
}
}
func TestClientPing(t *testing.T) {
_, c, cr := setupClient()
defer c.close()
// PING
pingOp := "PING\r\n"
c.parseAsync(pingOp)
l, err := cr.ReadString('\n')
if err != nil {
t.Fatalf("Error receiving info from server: %v\n", err)
}
if !strings.HasPrefix(l, "PONG\r\n") {
t.Fatalf("PONG response incorrect: %s\n", l)
}
}
var msgPat = regexp.MustCompile(`MSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\r\n`)
const (
SUB_INDEX = 1
SID_INDEX = 2
REPLY_INDEX = 4
LEN_INDEX = 5
HDR_INDEX = 5
TLEN_INDEX = 6
)
func grabPayload(cr *bufio.Reader, expected int) []byte {
d := make([]byte, expected)
n, _ := cr.Read(d)
cr.ReadString('\n')
return d[:n]
}
func checkPayload(cr *bufio.Reader, expected []byte, t *testing.T) {
t.Helper()
// Read in payload
d := make([]byte, len(expected))
n, err := cr.Read(d)
if err != nil {
t.Fatalf("Error receiving msg payload from server: %v\n", err)
}
if n != len(expected) {
t.Fatalf("Did not read correct amount of bytes: %d vs %d\n", n, len(expected))
}
if !bytes.Equal(d, expected) {
t.Fatalf("Did not read correct payload:: <%s>\n", d)
}
}
func TestClientSimplePubSub(t *testing.T) {
_, c, cr := setupClient()
defer c.close()
// SUB/PUB
c.parseAsync("SUB foo 1\r\nPUB foo 5\r\nhello\r\nPING\r\n")
l, err := cr.ReadString('\n')
if err != nil {
t.Fatalf("Error receiving msg from server: %v\n", err)
}
matches := msgPat.FindAllStringSubmatch(l, -1)[0]
if len(matches) != 6 {
t.Fatalf("Did not get correct # matches: %d vs %d\n", len(matches), 6)
}
if matches[SUB_INDEX] != "foo" {
t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "1" {
t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX])
}
if matches[LEN_INDEX] != "5" {
t.Fatalf("Did not get correct msg length: '%s'\n", matches[LEN_INDEX])
}
checkPayload(cr, []byte("hello\r\n"), t)
}
func TestClientPubSubNoEcho(t *testing.T) {
_, c, cr := setupClient()
defer c.close()
// Specify no echo
connectOp := []byte("CONNECT {\"echo\":false}\r\n")
err := c.parse(connectOp)
if err != nil {
t.Fatalf("Received error: %v\n", err)
}
// SUB/PUB
c.parseAsync("SUB foo 1\r\nPUB foo 5\r\nhello\r\nPING\r\n")
l, err := cr.ReadString('\n')
if err != nil {
t.Fatalf("Error receiving msg from server: %v\n", err)
}
// We should not receive anything but a PONG since we specified no echo.
if !strings.HasPrefix(l, "PONG\r\n") {
t.Fatalf("PONG response incorrect: %q\n", l)
}
}
func TestClientSimplePubSubWithReply(t *testing.T) {
_, c, cr := setupClient()
defer c.close()
// SUB/PUB
c.parseAsync("SUB foo 1\r\nPUB foo bar 5\r\nhello\r\nPING\r\n")
l, err := cr.ReadString('\n')
if err != nil {
t.Fatalf("Error receiving msg from server: %v\n", err)
}
matches := msgPat.FindAllStringSubmatch(l, -1)[0]
if len(matches) != 6 {
t.Fatalf("Did not get correct # matches: %d vs %d\n", len(matches), 6)
}
if matches[SUB_INDEX] != "foo" {
t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "1" {
t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX])
}
if matches[REPLY_INDEX] != "bar" {
t.Fatalf("Did not get correct reply subject: '%s'\n", matches[REPLY_INDEX])
}
if matches[LEN_INDEX] != "5" {
t.Fatalf("Did not get correct msg length: '%s'\n", matches[LEN_INDEX])
}
}
func TestClientNoBodyPubSubWithReply(t *testing.T) {
_, c, cr := setupClient()
defer c.close()
// SUB/PUB
c.parseAsync("SUB foo 1\r\nPUB foo bar 0\r\n\r\nPING\r\n")
l, err := cr.ReadString('\n')
if err != nil {
t.Fatalf("Error receiving msg from server: %v\n", err)
}
matches := msgPat.FindAllStringSubmatch(l, -1)[0]
if len(matches) != 6 {
t.Fatalf("Did not get correct # matches: %d vs %d\n", len(matches), 6)
}
if matches[SUB_INDEX] != "foo" {
t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "1" {
t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX])
}
if matches[REPLY_INDEX] != "bar" {
t.Fatalf("Did not get correct reply subject: '%s'\n", matches[REPLY_INDEX])
}
if matches[LEN_INDEX] != "0" {
t.Fatalf("Did not get correct msg length: '%s'\n", matches[LEN_INDEX])
}
}
func TestClientPubWithQueueSub(t *testing.T) {
_, c, cr := setupClient()
defer c.close()
num := 100
// Queue SUB/PUB
subs := []byte("SUB foo g1 1\r\nSUB foo g1 2\r\n")
pubs := []byte("PUB foo bar 5\r\nhello\r\n")
op := []byte{}
op = append(op, subs...)
for i := 0; i < num; i++ {
op = append(op, pubs...)
}
go c.parseAndClose(op)
var n1, n2, received int
for ; ; received++ {
l, err := cr.ReadString('\n')
if err != nil {
break
}
matches := msgPat.FindAllStringSubmatch(l, -1)[0]
// Count which sub
switch matches[SID_INDEX] {
case "1":
n1++
case "2":
n2++
}
checkPayload(cr, []byte("hello\r\n"), t)
}
if received != num {
t.Fatalf("Received wrong # of msgs: %d vs %d\n", received, num)
}
// Threshold for randomness for now
if n1 < 20 || n2 < 20 {
t.Fatalf("Received wrong # of msgs per subscriber: %d - %d\n", n1, n2)
}
}
func TestSplitSubjectQueue(t *testing.T) {
cases := []struct {
name string
sq string
wantSubject []byte
wantQueue []byte
wantErr bool
}{
{name: "single subject",
sq: "foo", wantSubject: []byte("foo"), wantQueue: nil},
{name: "subject and queue",
sq: "foo bar", wantSubject: []byte("foo"), wantQueue: []byte("bar")},
{name: "subject and queue with surrounding spaces",
sq: " foo bar ", wantSubject: []byte("foo"), wantQueue: []byte("bar")},
{name: "subject and queue with extra spaces in the middle",
sq: "foo bar", wantSubject: []byte("foo"), wantQueue: []byte("bar")},
{name: "subject, queue, and extra token",
sq: "foo bar fizz", wantSubject: []byte(nil), wantQueue: []byte(nil), wantErr: true},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
sub, que, err := splitSubjectQueue(c.sq)
if err == nil && c.wantErr {
t.Fatal("Expected error, but got nil")
}
if err != nil && !c.wantErr {
t.Fatalf("Expected nil error, but got %v", err)
}
if !reflect.DeepEqual(sub, c.wantSubject) {
t.Fatalf("Expected to get subject %#v, but instead got %#v", c.wantSubject, sub)
}
if !reflect.DeepEqual(que, c.wantQueue) {
t.Fatalf("Expected to get queue %#v, but instead got %#v", c.wantQueue, que)
}
})
}
}
func TestTypeString(t *testing.T) {
cases := []struct {
intType int
stringType string
}{
{
intType: CLIENT,
stringType: "Client",
},
{
intType: ROUTER,
stringType: "Router",
},
{
intType: GATEWAY,
stringType: "Gateway",
},
{
intType: LEAF,
stringType: "Leafnode",
},
{
intType: JETSTREAM,
stringType: "JetStream",
},
{
intType: ACCOUNT,
stringType: "Account",
},
{
intType: SYSTEM,
stringType: "System",
},
{
intType: -1,
stringType: "Unknown Type",
},
}
for _, cs := range cases {
c := &client{kind: cs.intType}
typeStringVal := c.typeString()
if typeStringVal != cs.stringType {
t.Fatalf("Expected typeString value %q, but instead received %q", cs.stringType, typeStringVal)
}
}
}
func TestQueueSubscribePermissions(t *testing.T) {
cases := []struct {
name string
perms *SubjectPermission
subject string
queue string
want string
}{
{
name: "plain subscription on foo",
perms: &SubjectPermission{Allow: []string{"foo"}},
subject: "foo",
want: "+OK\r\n",
},
{
name: "queue subscribe with allowed group",
perms: &SubjectPermission{Allow: []string{"foo bar"}},
subject: "foo",
queue: "bar",
want: "+OK\r\n",
},
{
name: "queue subscribe with wildcard allowed group",
perms: &SubjectPermission{Allow: []string{"foo bar.*"}},
subject: "foo",
queue: "bar.fizz",
want: "+OK\r\n",
},
{
name: "queue subscribe with full wildcard subject and subgroup",
perms: &SubjectPermission{Allow: []string{"> bar.>"}},
subject: "whizz",
queue: "bar.bang",
want: "+OK\r\n",
},
{
name: "plain subscribe with full wildcard subject and subgroup",
perms: &SubjectPermission{Allow: []string{"> bar.>"}},
subject: "whizz",
want: "-ERR 'Permissions Violation for Subscription to \"whizz\"'\r\n",
},
{
name: "deny plain subscription on foo",
perms: &SubjectPermission{Allow: []string{">"}, Deny: []string{"foo"}},
subject: "foo",
queue: "bar",
want: "-ERR 'Permissions Violation for Subscription to \"foo\" using queue \"bar\"'\r\n",
},
{
name: "allow plain subscription, except foo",
perms: &SubjectPermission{Allow: []string{">"}, Deny: []string{"foo"}},
subject: "bar",
want: "+OK\r\n",
},
{
name: "deny everything",
perms: &SubjectPermission{Allow: []string{">"}, Deny: []string{">"}},
subject: "foo",
queue: "bar",
want: "-ERR 'Permissions Violation for Subscription to \"foo\" using queue \"bar\"'\r\n",
},
{
name: "can only subscribe to queues v1",
perms: &SubjectPermission{Allow: []string{"> v1.>"}},
subject: "foo",
queue: "v1.prod",
want: "+OK\r\n",
},
{
name: "cannot subscribe to queues, plain subscribe ok",
perms: &SubjectPermission{Allow: []string{">"}, Deny: []string{"> >"}},
subject: "foo",
want: "+OK\r\n",
},
{
name: "cannot subscribe to queues, queue subscribe not ok",
perms: &SubjectPermission{Deny: []string{"> >"}},
subject: "foo",
queue: "bar",
want: "-ERR 'Permissions Violation for Subscription to \"foo\" using queue \"bar\"'\r\n",
},
{
name: "deny all queue subscriptions on dev or stg only",
perms: &SubjectPermission{Deny: []string{"> *.dev", "> *.stg"}},
subject: "foo",
queue: "bar",
want: "+OK\r\n",
},
{
name: "allow only queue subscription on dev or stg",
perms: &SubjectPermission{Allow: []string{"> *.dev", "> *.stg"}},
subject: "foo",
queue: "bar",
want: "-ERR 'Permissions Violation for Subscription to \"foo\" using queue \"bar\"'\r\n",
},
{
name: "deny queue subscriptions with subject foo",
perms: &SubjectPermission{Deny: []string{"foo >"}},
subject: "foo",
queue: "bar",
want: "-ERR 'Permissions Violation for Subscription to \"foo\" using queue \"bar\"'\r\n",
},
{
name: "plain sub is allowed, but queue subscribe with queue not in list",
perms: &SubjectPermission{Allow: []string{"foo bar"}},
subject: "foo",
queue: "fizz",
want: "-ERR 'Permissions Violation for Subscription to \"foo\" using queue \"fizz\"'\r\n",
},
{
name: "allow plain sub, but do queue subscribe",
perms: &SubjectPermission{Allow: []string{"foo"}},
subject: "foo",
queue: "bar",
want: "+OK\r\n",
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
_, client, r := setupClient()
defer client.close()
client.RegisterUser(&User{
Permissions: &Permissions{Subscribe: c.perms},
})
connect := []byte("CONNECT {\"verbose\":true}\r\n")
qsub := []byte(fmt.Sprintf("SUB %s %s 1\r\n", c.subject, c.queue))
go client.parseAndClose(append(connect, qsub...))
var buf bytes.Buffer
if _, err := io.Copy(&buf, r); err != nil {
t.Fatal(err)
}
// Extra OK is from the successful CONNECT.
want := "+OK\r\n" + c.want
if got := buf.String(); got != want {
t.Fatalf("Expected to receive %q, but instead received %q", want, got)
}
})
}
}
func TestClientPubWithQueueSubNoEcho(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()
nc1, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc1.Close()
// Grab the client from server and set no echo by hand.
s.mu.Lock()
lc := len(s.clients)
c := s.clients[s.gcid]
s.mu.Unlock()
if lc != 1 {
t.Fatalf("Expected only 1 client but got %d\n", lc)
}
if c == nil {
t.Fatal("Expected to retrieve client\n")
}
c.mu.Lock()
c.echo = false
c.mu.Unlock()
// Queue sub on nc1.
_, err = nc1.QueueSubscribe("foo", "bar", func(*nats.Msg) {})
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
nc1.Flush()
nc2, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc2.Close()
n := int32(0)
cb := func(m *nats.Msg) {
atomic.AddInt32(&n, 1)
}
_, err = nc2.QueueSubscribe("foo", "bar", cb)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
nc2.Flush()
// Now publish 100 messages on nc1 which does not allow echo.
for i := 0; i < 100; i++ {
nc1.Publish("foo", []byte("Hello"))
}
nc1.Flush()
nc2.Flush()
checkFor(t, 5*time.Second, 10*time.Millisecond, func() error {
num := atomic.LoadInt32(&n)
if num != int32(100) {
return fmt.Errorf("Expected all the msgs to be received by nc2, got %d\n", num)
}
return nil
})
}
func TestClientUnSub(t *testing.T) {
_, c, cr := setupClient()
defer c.close()
num := 1
// SUB/PUB
subs := []byte("SUB foo 1\r\nSUB foo 2\r\n")
unsub := []byte("UNSUB 1\r\n")
pub := []byte("PUB foo bar 5\r\nhello\r\n")
op := []byte{}
op = append(op, subs...)
op = append(op, unsub...)
op = append(op, pub...)
go c.parseAndClose(op)
var received int
for ; ; received++ {
l, err := cr.ReadString('\n')
if err != nil {
break
}
matches := msgPat.FindAllStringSubmatch(l, -1)[0]
if matches[SID_INDEX] != "2" {
t.Fatalf("Received msg on unsubscribed subscription!\n")
}
checkPayload(cr, []byte("hello\r\n"), t)
}
if received != num {
t.Fatalf("Received wrong # of msgs: %d vs %d\n", received, num)
}
}
func TestClientUnSubMax(t *testing.T) {
_, c, cr := setupClient()
defer c.close()
num := 10
exp := 5
// SUB/PUB
subs := []byte("SUB foo 1\r\n")
unsub := []byte("UNSUB 1 5\r\n")
pub := []byte("PUB foo bar 5\r\nhello\r\n")
op := []byte{}
op = append(op, subs...)
op = append(op, unsub...)
for i := 0; i < num; i++ {
op = append(op, pub...)
}
go c.parseAndClose(op)
var received int
for ; ; received++ {
l, err := cr.ReadString('\n')
if err != nil {
break
}
matches := msgPat.FindAllStringSubmatch(l, -1)[0]
if matches[SID_INDEX] != "1" {
t.Fatalf("Received msg on unsubscribed subscription!\n")
}
checkPayload(cr, []byte("hello\r\n"), t)
}
if received != exp {
t.Fatalf("Received wrong # of msgs: %d vs %d\n", received, exp)
}
}
func TestClientAutoUnsubExactReceived(t *testing.T) {
_, c, _ := setupClient()
defer c.close()
// SUB/PUB
subs := []byte("SUB foo 1\r\n")
unsub := []byte("UNSUB 1 1\r\n")
pub := []byte("PUB foo bar 2\r\nok\r\n")
op := []byte{}
op = append(op, subs...)
op = append(op, unsub...)
op = append(op, pub...)
c.parse(op)
// We should not have any subscriptions in place here.
if len(c.subs) != 0 {
t.Fatalf("Wrong number of subscriptions: expected 0, got %d\n", len(c.subs))
}
}
func TestClientUnsubAfterAutoUnsub(t *testing.T) {
_, c, _ := setupClient()
defer c.close()
// SUB/UNSUB/UNSUB
subs := []byte("SUB foo 1\r\n")
asub := []byte("UNSUB 1 1\r\n")
unsub := []byte("UNSUB 1\r\n")
op := []byte{}
op = append(op, subs...)
op = append(op, asub...)
op = append(op, unsub...)
c.parse(op)
// We should not have any subscriptions in place here.
if len(c.subs) != 0 {
t.Fatalf("Wrong number of subscriptions: expected 0, got %d\n", len(c.subs))
}
}
func TestClientRemoveSubsOnDisconnect(t *testing.T) {
s, c, _ := setupClient()
defer c.close()
subs := []byte("SUB foo 1\r\nSUB bar 2\r\n")
c.parse(subs)
if s.NumSubscriptions() != 2 {
t.Fatalf("Should have 2 subscriptions, got %d\n", s.NumSubscriptions())
}
c.closeConnection(ClientClosed)
checkExpectedSubs(t, 0, s)
}
func TestClientDoesNotAddSubscriptionsWhenConnectionClosed(t *testing.T) {
_, c, _ := setupClient()
c.close()
subs := []byte("SUB foo 1\r\nSUB bar 2\r\n")
c.parse(subs)
if c.acc.sl.Count() != 0 {
t.Fatalf("Should have no subscriptions after close, got %d\n", c.acc.sl.Count())
}
}
func TestClientMapRemoval(t *testing.T) {
s, c, _ := setupClient()
c.close()
checkClientsCount(t, s, 0)
}
func TestAuthorizationTimeout(t *testing.T) {
serverOptions := DefaultOptions()
serverOptions.Authorization = "my_token"
serverOptions.AuthTimeout = 0.4
s := RunServer(serverOptions)
defer s.Shutdown()
conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", serverOptions.Host, serverOptions.Port))
if err != nil {
t.Fatalf("Error dialing server: %v\n", err)
}
defer conn.Close()
client := bufio.NewReaderSize(conn, maxBufSize)
if _, err := client.ReadString('\n'); err != nil {
t.Fatalf("Error receiving info from server: %v\n", err)
}
time.Sleep(3 * secondsToDuration(serverOptions.AuthTimeout))
l, err := client.ReadString('\n')
if err != nil {
t.Fatalf("Error receiving info from server: %v\n", err)
}
if !strings.Contains(l, "Authentication Timeout") {
t.Fatalf("Authentication Timeout response incorrect: %q\n", l)
}
}
// This is from bug report #18
func TestTwoTokenPubMatchSingleTokenSub(t *testing.T) {
_, c, cr := setupClient()
defer c.close()
test := "PUB foo.bar 5\r\nhello\r\nSUB foo 1\r\nPING\r\nPUB foo.bar 5\r\nhello\r\nPING\r\n"
c.parseAsync(test)
l, err := cr.ReadString('\n')
if err != nil {
t.Fatalf("Error receiving info from server: %v\n", err)
}
if !strings.HasPrefix(l, "PONG\r\n") {
t.Fatalf("PONG response incorrect: %q\n", l)
}
// Expect just a pong, no match should exist here..
l, _ = cr.ReadString('\n')
if !strings.HasPrefix(l, "PONG\r\n") {
t.Fatalf("PONG response was expected, got: %q\n", l)
}
}
func TestUnsubRace(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()
url := fmt.Sprintf("nats://%s:%d",
s.getOpts().Host,
s.Addr().(*net.TCPAddr).Port,
)
nc, err := nats.Connect(url)
if err != nil {
t.Fatalf("Error creating client to %s: %v\n", url, err)
}
defer nc.Close()
ncp, err := nats.Connect(url)
if err != nil {
t.Fatalf("Error creating client: %v\n", err)
}
defer ncp.Close()
sub, _ := nc.Subscribe("foo", func(m *nats.Msg) {
// Just eat it..
})
nc.Flush()
var wg sync.WaitGroup
wg.Add(1)
go func() {
for i := 0; i < 10000; i++ {
ncp.Publish("foo", []byte("hello"))
}
wg.Done()
}()
time.Sleep(5 * time.Millisecond)
sub.Unsubscribe()
wg.Wait()
}
func TestClientCloseTLSConnection(t *testing.T) {
opts, err := ProcessConfigFile("./configs/tls.conf")
if err != nil {
t.Fatalf("Error processing config file: %v", err)
}
opts.TLSTimeout = 100
opts.NoLog = true
opts.NoSigs = true
s := RunServer(opts)
defer s.Shutdown()
endpoint := fmt.Sprintf("%s:%d", opts.Host, opts.Port)
conn, err := net.DialTimeout("tcp", endpoint, 2*time.Second)
if err != nil {
t.Fatalf("Unexpected error on dial: %v", err)
}
defer conn.Close()
br := bufio.NewReaderSize(conn, 100)
if _, err := br.ReadString('\n'); err != nil {
t.Fatalf("Unexpected error reading INFO: %v", err)
}
tlsConn := tls.Client(conn, &tls.Config{InsecureSkipVerify: true})
defer tlsConn.Close()
if err := tlsConn.Handshake(); err != nil {
t.Fatalf("Unexpected error during handshake: %v", err)
}
br = bufio.NewReaderSize(tlsConn, 100)
connectOp := []byte("CONNECT {\"user\":\"derek\",\"pass\":\"foo\",\"verbose\":false,\"pedantic\":false,\"tls_required\":true}\r\n")
if _, err := tlsConn.Write(connectOp); err != nil {
t.Fatalf("Unexpected error writing CONNECT: %v", err)
}
if _, err := tlsConn.Write([]byte("PING\r\n")); err != nil {
t.Fatalf("Unexpected error writing PING: %v", err)
}
if _, err := br.ReadString('\n'); err != nil {
t.Fatalf("Unexpected error reading PONG: %v", err)
}
// Check that client is registered.
checkClientsCount(t, s, 1)
var cli *client
s.mu.Lock()
for _, c := range s.clients {
cli = c
break
}
s.mu.Unlock()
if cli == nil {
t.Fatal("Did not register client on time")
}
// Test GetTLSConnectionState
state := cli.GetTLSConnectionState()
if state == nil {
t.Error("GetTLSConnectionState() returned nil")
}
// Test RemoteAddress
addr := cli.RemoteAddress()
if addr == nil {
t.Error("RemoteAddress() returned nil")
}
if addr.(*net.TCPAddr).IP.String() != "127.0.0.1" {
t.Error("RemoteAddress() returned incorrect ip " + addr.String())
}
// Fill the buffer. We want to timeout on write so that nc.Close()
// would block due to a write that cannot complete.
buf := make([]byte, 64*1024)
done := false
for !done {
cli.nc.SetWriteDeadline(time.Now().Add(time.Second))
if _, err := cli.nc.Write(buf); err != nil {
done = true
}
cli.nc.SetWriteDeadline(time.Time{})
}
ch := make(chan bool)
go func() {
select {
case <-ch:
return
case <-time.After(3 * time.Second):
fmt.Println("!!!! closeConnection is blocked, test will hang !!!")
return
}
}()
// Close the client
cli.closeConnection(ClientClosed)
ch <- true
}
// This tests issue #558
func TestWildcardCharsInLiteralSubjectWorks(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()
nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
ch := make(chan bool, 1)
// This subject is a literal even though it contains `*` and `>`,
// they are not treated as wildcards.
subj := "foo.bar,*,>,baz"
cb := func(_ *nats.Msg) {
ch <- true
}
for i := 0; i < 2; i++ {
sub, err := nc.Subscribe(subj, cb)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
if err := nc.Flush(); err != nil {
t.Fatalf("Error on flush: %v", err)
}
if err := nc.LastError(); err != nil {
t.Fatalf("Server reported error: %v", err)
}
if err := nc.Publish(subj, []byte("msg")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
select {
case <-ch:
case <-time.After(time.Second):
t.Fatalf("Should have received the message")
}
if err := sub.Unsubscribe(); err != nil {
t.Fatalf("Error on unsubscribe: %v", err)
}
}
}
func TestDynamicBuffers(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()
nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
// Grab the client from server.
s.mu.Lock()
lc := len(s.clients)
c := s.clients[s.gcid]
s.mu.Unlock()
if lc != 1 {
t.Fatalf("Expected only 1 client but got %d\n", lc)
}
if c == nil {
t.Fatal("Expected to retrieve client\n")
}
// Create some helper functions and data structures.
done := make(chan bool) // Used to stop recording.
type maxv struct{ rsz, wsz int32 } // Used to hold max values.
results := make(chan maxv)
// stopRecording stops the recording ticker and releases go routine.
stopRecording := func() maxv {
done <- true
return <-results
}
// max just grabs max values.
max := func(a, b int32) int32 {
if a > b {
return a
}
return b
}
// Returns current value of the buffer sizes.
getBufferSizes := func() (int32, int32) {
c.mu.Lock()
defer c.mu.Unlock()
return c.in.rsz, c.out.sz
}
// Record the max values seen.
recordMaxBufferSizes := func() {
ticker := time.NewTicker(10 * time.Microsecond)
defer ticker.Stop()
var m maxv
recordMax := func() {
rsz, wsz := getBufferSizes()
m.rsz = max(m.rsz, rsz)
m.wsz = max(m.wsz, wsz)
}
for {
select {
case <-done:
recordMax()
results <- m
return
case <-ticker.C:
recordMax()
}
}
}
// Check that the current value is what we expected.
checkBuffers := func(ers, ews int32) {
t.Helper()
rsz, wsz := getBufferSizes()
if rsz != ers {
t.Fatalf("Expected read buffer of %d, but got %d\n", ers, rsz)
}
if wsz != ews {
t.Fatalf("Expected write buffer of %d, but got %d\n", ews, wsz)
}
}
// Check that the max was as expected.
checkResults := func(m maxv, rsz, wsz int32) {
t.Helper()
if rsz != m.rsz {
t.Fatalf("Expected read buffer of %d, but got %d\n", rsz, m.rsz)
}
if wsz != m.wsz {
t.Fatalf("Expected write buffer of %d, but got %d\n", wsz, m.wsz)
}
}
// Here is where testing begins..
// Should be at or below the startBufSize for both.
rsz, wsz := getBufferSizes()
if rsz > startBufSize {
t.Fatalf("Expected read buffer of <= %d, but got %d\n", startBufSize, rsz)
}
if wsz > startBufSize {
t.Fatalf("Expected write buffer of <= %d, but got %d\n", startBufSize, wsz)
}
// Send some data.
data := make([]byte, 2048)
rand.Read(data)
go recordMaxBufferSizes()
for i := 0; i < 200; i++ {
nc.Publish("foo", data)
}
nc.Flush()
m := stopRecording()
if m.rsz != maxBufSize && m.rsz != maxBufSize/2 {
t.Fatalf("Expected read buffer of %d or %d, but got %d\n", maxBufSize, maxBufSize/2, m.rsz)
}
if m.wsz > startBufSize {
t.Fatalf("Expected write buffer of <= %d, but got %d\n", startBufSize, m.wsz)
}
// Create Subscription to test outbound buffer from server.
nc.Subscribe("foo", func(m *nats.Msg) {
// Just eat it..
})
go recordMaxBufferSizes()
for i := 0; i < 200; i++ {
nc.Publish("foo", data)
}
nc.Flush()
m = stopRecording()
checkResults(m, maxBufSize, maxBufSize)
// Now test that we shrink correctly.
// Should go to minimum for both..
for i := 0; i < 20; i++ {
nc.Flush()
}
checkBuffers(minBufSize, minBufSize)
}
func TestClientTraceRace(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()
// Activate trace logging
s.SetLogger(&DummyLogger{}, false, true)
nc1, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc1.Close()
total := 10000
count := 0
ch := make(chan bool, 1)
if _, err := nc1.Subscribe("foo", func(_ *nats.Msg) {
count++
if count == total {
ch <- true
}
}); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
nc2, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc2.Close()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < total; i++ {
nc1.Publish("bar", []byte("hello"))
}
}()
for i := 0; i < total; i++ {
nc2.Publish("foo", []byte("hello"))
}
if err := wait(ch); err != nil {
t.Fatal("Did not get all our messages")
}
wg.Wait()
}
func TestClientUserInfo(t *testing.T) {
pnkey := "UD6AYQSOIN2IN5OGC6VQZCR4H3UFMIOXSW6NNS6N53CLJA4PB56CEJJI"
c := &client{
cid: 1024,
opts: ClientOpts{
Nkey: pnkey,
},
}
got := c.getAuthUser()
expected := `Nkey "UD6AYQSOIN2IN5OGC6VQZCR4H3UFMIOXSW6NNS6N53CLJA4PB56CEJJI"`
if got != expected {
t.Errorf("Expected %q, got %q", expected, got)
}
c = &client{
cid: 1024,
opts: ClientOpts{
Username: "foo",
},
}
got = c.getAuthUser()
expected = `User "foo"`
if got != expected {
t.Errorf("Expected %q, got %q", expected, got)
}
c = &client{
cid: 1024,
opts: ClientOpts{},
}
got = c.getAuthUser()
expected = `User "N/A"`
if got != expected {
t.Errorf("Expected %q, got %q", expected, got)
}
}
type captureWarnLogger struct {
DummyLogger
warn chan string
}
func (l *captureWarnLogger) Warnf(format string, v ...interface{}) {
select {
case l.warn <- fmt.Sprintf(format, v...):
default:
}
}
type pauseWriteConn struct {
net.Conn
}
func (swc *pauseWriteConn) Write(b []byte) (int, error) {
time.Sleep(250 * time.Millisecond)
return swc.Conn.Write(b)
}
func TestReadloopWarning(t *testing.T) {
readLoopReportThreshold = 100 * time.Millisecond
defer func() { readLoopReportThreshold = readLoopReport }()
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()
l := &captureWarnLogger{warn: make(chan string, 1)}
s.SetLogger(l, false, false)
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
nc := natsConnect(t, url)
defer nc.Close()
natsSubSync(t, nc, "foo")
natsFlush(t, nc)
cid, _ := nc.GetClientID()
sender := natsConnect(t, url)
defer sender.Close()
c := s.getClient(cid)
c.mu.Lock()
c.nc = &pauseWriteConn{Conn: c.nc}
c.mu.Unlock()
natsPub(t, sender, "foo", make([]byte, 100))
natsFlush(t, sender)
select {
case warn := <-l.warn:
if !strings.Contains(warn, "Readloop") {
t.Fatalf("unexpected warning: %v", warn)
}
case <-time.After(2 * time.Second):
t.Fatalf("No warning printed")
}
}
func TestTraceMsg(t *testing.T) {
c := &client{}
// Enable message trace
c.trace = true
cases := []struct {
Desc string
Msg []byte
Wanted string
MaxTracedMsgLen int
}{
{
Desc: "normal length",
Msg: []byte(fmt.Sprintf("normal%s", CR_LF)),
Wanted: " - <<- MSG_PAYLOAD: [\"normal\"]",
MaxTracedMsgLen: 10,
},
{
Desc: "over length",
Msg: []byte(fmt.Sprintf("over length%s", CR_LF)),
Wanted: " - <<- MSG_PAYLOAD: [\"over lengt...\"]",
MaxTracedMsgLen: 10,
},
{
Desc: "unlimited length",
Msg: []byte(fmt.Sprintf("unlimited length%s", CR_LF)),
Wanted: " - <<- MSG_PAYLOAD: [\"unlimited length\"]",
MaxTracedMsgLen: 0,
},
{
Desc: "negative max traced msg len",
Msg: []byte(fmt.Sprintf("negative max traced msg len%s", CR_LF)),
Wanted: " - <<- MSG_PAYLOAD: [\"negative max traced msg len\"]",
MaxTracedMsgLen: -1,
},
}
for _, ut := range cases {
c.srv = &Server{
opts: &Options{MaxTracedMsgLen: ut.MaxTracedMsgLen},
}
c.srv.SetLogger(&DummyLogger{}, true, true)
c.traceMsg(ut.Msg)
got := c.srv.logging.logger.(*DummyLogger).msg
if !reflect.DeepEqual(ut.Wanted, got) {
t.Errorf("Desc: %s. Msg %q. Traced msg want: %s, got: %s", ut.Desc, ut.Msg, ut.Wanted, got)
}
}
}
func TestClientMaxPending(t *testing.T) {
opts := DefaultOptions()
opts.MaxPending = math.MaxInt32 + 1
s := RunServer(opts)
defer s.Shutdown()
nc := natsConnect(t, fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
defer nc.Close()
sub := natsSubSync(t, nc, "foo")
natsPub(t, nc, "foo", []byte("msg"))
natsNexMsg(t, sub, 100*time.Millisecond)
}
func TestResponsePermissions(t *testing.T) {
for i, test := range []struct {
name string
perms *ResponsePermission
}{
{"max_msgs", &ResponsePermission{MaxMsgs: 2, Expires: time.Hour}},
{"no_expire_limit", &ResponsePermission{MaxMsgs: 3, Expires: -1 * time.Millisecond}},
{"expire", &ResponsePermission{MaxMsgs: 1000, Expires: 100 * time.Millisecond}},
{"no_msgs_limit", &ResponsePermission{MaxMsgs: -1, Expires: 100 * time.Millisecond}},
} {
t.Run(test.name, func(t *testing.T) {
opts := DefaultOptions()
u1 := &User{
Username: "service",
Password: "pwd",
Permissions: &Permissions{Response: test.perms},
}
u2 := &User{Username: "ivan", Password: "pwd"}
opts.Users = []*User{u1, u2}
s := RunServer(opts)
defer s.Shutdown()
svcNC := natsConnect(t, fmt.Sprintf("nats://service:pwd@%s:%d", opts.Host, opts.Port))
defer svcNC.Close()
reqSub := natsSubSync(t, svcNC, "request")
natsFlush(t, svcNC)
nc := natsConnect(t, fmt.Sprintf("nats://ivan:pwd@%s:%d", opts.Host, opts.Port))
defer nc.Close()
replySub := natsSubSync(t, nc, "reply")
natsPubReq(t, nc, "request", "reply", []byte("req1"))
req1 := natsNexMsg(t, reqSub, 100*time.Millisecond)
checkFailed := func(t *testing.T) {
t.Helper()
if reply, err := replySub.NextMsg(100 * time.Millisecond); err != nats.ErrTimeout {
if reply != nil {
t.Fatalf("Expected to receive timeout, got reply=%q", reply.Data)
} else {
t.Fatalf("Unexpected error: %v", err)
}
}
}
switch i {
case 0:
// Should allow only 2 replies...
for i := 0; i < 10; i++ {
natsPub(t, svcNC, req1.Reply, []byte("reply"))
}
natsNexMsg(t, replySub, 100*time.Millisecond)
natsNexMsg(t, replySub, 100*time.Millisecond)
// The next should fail...
checkFailed(t)
case 1:
// Expiration is set to -1ms, which should count as infinite...
natsPub(t, svcNC, req1.Reply, []byte("reply"))
// Sleep a bit before next send
time.Sleep(50 * time.Millisecond)
natsPub(t, svcNC, req1.Reply, []byte("reply"))
// Make sure we receive both
natsNexMsg(t, replySub, 100*time.Millisecond)
natsNexMsg(t, replySub, 100*time.Millisecond)
case 2:
fallthrough
case 3:
// Expire set to 100ms so make sure we wait more between
// next publish
natsPub(t, svcNC, req1.Reply, []byte("reply"))
time.Sleep(200 * time.Millisecond)
natsPub(t, svcNC, req1.Reply, []byte("reply"))
// Should receive one, and fail on the other
natsNexMsg(t, replySub, 100*time.Millisecond)
checkFailed(t)
}
// When testing expiration, sleep before sending next reply
if i >= 2 {
time.Sleep(400 * time.Millisecond)
}
})
}
}
func TestPingNotSentTooSoon(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()
doneCh := make(chan bool, 1)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
s.Connz(nil)
select {
case <-doneCh:
return
case <-time.After(time.Millisecond):
}
}
}()
for i := 0; i < 100; i++ {
nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
nc.Close()
}
close(doneCh)
wg.Wait()
c, br, _ := newClientForServer(s)
defer c.close()
connectOp := []byte("CONNECT {\"user\":\"ivan\",\"pass\":\"bar\"}\r\n")
c.parse(connectOp)
// Since client has not send PING, having server try to send RTT ping
// to client should not do anything
if c.sendRTTPing() {
t.Fatalf("RTT ping should not have been sent")
}
// Speed up detection of time elapsed by moving the c.start to more than
// 2 secs in the past.
c.mu.Lock()
c.start = time.Unix(0, c.start.UnixNano()-int64(maxNoRTTPingBeforeFirstPong+time.Second))
c.mu.Unlock()
errCh := make(chan error, 1)
go func() {
l, _ := br.ReadString('\n')
if l != "PING\r\n" {
errCh <- fmt.Errorf("expected to get PING, got %s", l)
return
}
errCh <- nil
}()
if !c.sendRTTPing() {
t.Fatalf("RTT ping should have been sent")
}
wg.Wait()
if e := <-errCh; e != nil {
t.Fatal(e.Error())
}
}
func TestClientCheckUseOfGWReplyPrefix(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()
ech := make(chan error, 1)
nc, err := nats.Connect(s.ClientURL(),
nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, e error) {
ech <- e
}))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
// Expect to fail if publish on gateway reply prefix
nc.Publish(gwReplyPrefix+"anything", []byte("should fail"))
// Wait for publish violation error
select {
case e := <-ech:
if e == nil || !strings.Contains(strings.ToLower(e.Error()), "violation for publish") {
t.Fatalf("Expected violation error, got %v", e)
}
case <-time.After(time.Second):
t.Fatalf("Did not receive permissions violation error")
}
// Now publish a message with a reply set to the prefix,
// it should be rejected too.
nc.PublishRequest("foo", gwReplyPrefix+"anything", []byte("should fail"))
// Wait for publish violation error with reply
select {
case e := <-ech:
if e == nil || !strings.Contains(strings.ToLower(e.Error()), "violation for publish with reply") {
t.Fatalf("Expected violation error, got %v", e)
}
case <-time.After(time.Second):
t.Fatalf("Did not receive permissions violation error")
}
}
func TestNoClientLeakOnSlowConsumer(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()
c, err := net.Dial("tcp", fmt.Sprintf("%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error connecting: %v", err)
}
defer c.Close()
cr := bufio.NewReader(c)
// Wait for INFO...
line, _, _ := cr.ReadLine()
var info serverInfo
if err = json.Unmarshal(line[5:], &info); err != nil {
t.Fatalf("Could not parse INFO json: %v\n", err)
}
// Send our connect
if _, err := c.Write([]byte("CONNECT {\"verbose\": false}\r\nSUB foo 1\r\nPING\r\n")); err != nil {
t.Fatalf("Error sending CONNECT and SUB: %v", err)
}
// Wait for PONG
line, _, _ = cr.ReadLine()
if string(line) != "PONG" {
t.Fatalf("Expected 'PONG' but got %q", line)
}
// Get the client from server map
cli := s.GetClient(info.CID)
if cli == nil {
t.Fatalf("No client registered")
}
// Change the write deadline to very low value
cli.mu.Lock()
cli.out.wdl = time.Nanosecond
cli.mu.Unlock()
nc := natsConnect(t, s.ClientURL())
defer nc.Close()
// Send some messages to cause write deadline error on "cli"
payload := make([]byte, 1000)
for i := 0; i < 100; i++ {
natsPub(t, nc, "foo", payload)
}
natsFlush(t, nc)
nc.Close()
// Now make sure that the number of clients goes to 0.
checkClientsCount(t, s, 0)
}
func TestClientSlowConsumerWithoutConnect(t *testing.T) {
opts := DefaultOptions()
opts.WriteDeadline = 100 * time.Millisecond
s := RunServer(opts)
defer s.Shutdown()
url := fmt.Sprintf("127.0.0.1:%d", opts.Port)
c, err := net.Dial("tcp", url)
if err != nil {
t.Fatalf("Error on dial: %v", err)
}
defer c.Close()
c.Write([]byte("SUB foo 1\r\n"))
payload := make([]byte, 10000)
nc, err := nats.Connect(url)
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
for i := 0; i < 10000; i++ {
nc.Publish("foo", payload)
}
nc.Flush()
checkFor(t, time.Second, 15*time.Millisecond, func() error {
// Expect slow consumer..
if n := atomic.LoadInt64(&s.slowConsumers); n != 1 {
return fmt.Errorf("Expected 1 slow consumer, got: %v", n)
}
return nil
})
}
func TestClientNoSlowConsumerIfConnectExpected(t *testing.T) {
opts := DefaultOptions()
opts.Username = "ivan"
opts.Password = "pass"
// Make it very slow so that the INFO sent to client fails...
opts.WriteDeadline = time.Nanosecond
s := RunServer(opts)
defer s.Shutdown()
// Expect server to close the connection, but will bump the slow
// consumer count.
nc, err := nats.Connect(fmt.Sprintf("nats://ivan:pass@%s:%d", opts.Host, opts.Port))
if err == nil {
nc.Close()
t.Fatal("Expected connect error")
}
if n := atomic.LoadInt64(&s.slowConsumers); n != 0 {
t.Fatalf("Expected 0 slow consumer, got: %v", n)
}
}
func TestClientStalledDuration(t *testing.T) {
for _, test := range []struct {
name string
pb int64
mp int64
expectedTTL time.Duration
}{
{"pb above mp", 110, 100, stallClientMaxDuration},
{"pb equal mp", 100, 100, stallClientMaxDuration},
{"pb below mp/2", 49, 100, stallClientMinDuration},
{"pb equal mp/2", 50, 100, stallClientMinDuration},
{"pb at 55% of mp", 55, 100, stallClientMinDuration + 1*stallClientMinDuration},
{"pb at 60% of mp", 60, 100, stallClientMinDuration + 2*stallClientMinDuration},
{"pb at 70% of mp", 70, 100, stallClientMinDuration + 4*stallClientMinDuration},
{"pb at 80% of mp", 80, 100, stallClientMinDuration + 6*stallClientMinDuration},
{"pb at 90% of mp", 90, 100, stallClientMinDuration + 8*stallClientMinDuration},
{"pb at 99% of mp", 99, 100, stallClientMinDuration + 9*stallClientMinDuration},
} {
t.Run(test.name, func(t *testing.T) {
if ttl := stallDuration(test.pb, test.mp); ttl != test.expectedTTL {
t.Fatalf("For pb=%v mp=%v, expected TTL to be %v, got %v", test.pb, test.mp, test.expectedTTL, ttl)
}
})
}
}
func TestClientIPv6Address(t *testing.T) {
opts := DefaultOptions()
opts.Host = "0.0.0.0"
s := RunServer(opts)
defer s.Shutdown()
nc, err := nats.Connect(fmt.Sprintf("nats://[::1]:%v", opts.Port))
// Travis may not accept IPv6, in that case, skip the test.
if err != nil {
t.Skipf("Skipping test because could not connect: %v", err)
}
defer nc.Close()
cid, _ := nc.GetClientID()
c := s.GetClient(cid)
c.mu.Lock()
ncs := c.String()
c.mu.Unlock()
if !strings.HasPrefix(ncs, "[::1]") {
t.Fatalf("Wrong string representation of an IPv6 address: %q", ncs)
}
}
func TestPBNotIncreasedOnMaxPending(t *testing.T) {
opts := DefaultOptions()
opts.MaxPending = 100
s := &Server{opts: opts}
c := &client{srv: s}
c.initClient()
c.mu.Lock()
c.queueOutbound(make([]byte, 200))
pb := c.out.pb
c.mu.Unlock()
if pb != 0 {
t.Fatalf("c.out.pb should be 0, got %v", pb)
}
}
type testConnWritePartial struct {
net.Conn
partial bool
buf bytes.Buffer
}
func (c *testConnWritePartial) Write(p []byte) (int, error) {
n := len(p)
if c.partial {
n = 15
}
return c.buf.Write(p[:n])
}
func (c *testConnWritePartial) RemoteAddr() net.Addr {
return nil
}
func (c *testConnWritePartial) SetWriteDeadline(_ time.Time) error {
return nil
}
func TestFlushOutboundNoSliceReuseIfPartial(t *testing.T) {
opts := DefaultOptions()
opts.MaxPending = 1024
s := &Server{opts: opts}
fakeConn := &testConnWritePartial{partial: true}
c := &client{srv: s, nc: fakeConn}
c.initClient()
bufs := [][]byte{
[]byte("ABCDEFGHIJKLMNOPQRSTUVWXYZ"),
[]byte("------"),
[]byte("0123456789"),
}
expected := bytes.Buffer{}
for _, buf := range bufs {
expected.Write(buf)
c.mu.Lock()
c.queueOutbound(buf)
c.out.sz = 10
c.flushOutbound()
fakeConn.partial = false
c.mu.Unlock()
}
// Ensure everything is flushed.
for done := false; !done; {
c.mu.Lock()
if c.out.pb > 0 {
c.flushOutbound()
} else {
done = true
}
c.mu.Unlock()
}
if !bytes.Equal(expected.Bytes(), fakeConn.buf.Bytes()) {
t.Fatalf("Expected\n%q\ngot\n%q", expected.String(), fakeConn.buf.String())
}
}
type captureNoticeLogger struct {
DummyLogger
notices []string
}
func (l *captureNoticeLogger) Noticef(format string, v ...interface{}) {
l.Lock()
l.notices = append(l.notices, fmt.Sprintf(format, v...))
l.Unlock()
}
func TestCloseConnectionLogsReason(t *testing.T) {
o1 := DefaultOptions()
s1 := RunServer(o1)
defer s1.Shutdown()
l := &captureNoticeLogger{}
s1.SetLogger(l, true, true)
o2 := DefaultOptions()
o2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", o1.Cluster.Port))
s2 := RunServer(o2)
defer s2.Shutdown()
checkClusterFormed(t, s1, s2)
s2.Shutdown()
checkFor(t, time.Second, 15*time.Millisecond, func() error {
if s1.NumRoutes() != 0 {
return fmt.Errorf("route still connected")
}
return nil
})
// Now check that s1 has logged that the connection is closed and that the reason is included.
ok := false
l.Lock()
for _, n := range l.notices {
if strings.Contains(n, "connection closed: "+ClientClosed.String()) {
ok = true
break
}
}
l.Unlock()
if !ok {
t.Fatal("Log does not contain closed reason")
}
}
func TestCloseConnectionVeryEarly(t *testing.T) {
for _, test := range []struct {
name string
useTLS bool
}{
{"no_tls", false},
{"tls", true},
} {
t.Run(test.name, func(t *testing.T) {
o := DefaultOptions()
if test.useTLS {
tc := &TLSConfigOpts{
CertFile: "../test/configs/certs/server-cert.pem",
KeyFile: "../test/configs/certs/server-key.pem",
CaFile: "../test/configs/certs/ca.pem",
}
tlsConfig, err := GenTLSConfig(tc)
if err != nil {
t.Fatalf("Error generating tls config: %v", err)
}
o.TLSConfig = tlsConfig
}
s := RunServer(o)
defer s.Shutdown()
// The issue was with a connection that would break right when
// server was sending the INFO. Creating a bare TCP connection
// and closing it right away won't help reproduce the problem.
// So testing in 2 steps.
// Get a normal TCP connection to the server.
c, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", o.Port))
if err != nil {
t.Fatalf("Unable to create tcp connection")
}
// Now close it.
c.Close()
// Wait that num clients falls to 0.
checkClientsCount(t, s, 0)
// Call again with this closed connection. Alternatively, we
// would have to call with a fake connection that implements
// net.Conn but returns an error on Write.
s.createClient(c)
// This connection should not have been added to the server.
checkClientsCount(t, s, 0)
})
}
}
type connAddrString struct {
net.Addr
}
func (a *connAddrString) String() string {
return "[fe80::abc:def:ghi:123%utun0]:4222"
}
type connString struct {
net.Conn
}
func (c *connString) RemoteAddr() net.Addr {
return &connAddrString{}
}
func TestClientConnectionName(t *testing.T) {
s, err := NewServer(DefaultOptions())
if err != nil {
t.Fatalf("Error creating server: %v", err)
}
l := &DummyLogger{}
s.SetLogger(l, true, true)
for _, test := range []struct {
name string
kind int
kindStr string
ws bool
mqtt bool
}{
{"client", CLIENT, "cid:", false, false},
{"ws client", CLIENT, "wid:", true, false},
{"mqtt client", CLIENT, "mid:", false, true},
{"route", ROUTER, "rid:", false, false},
{"gateway", GATEWAY, "gid:", false, false},
{"leafnode", LEAF, "lid:", false, false},
} {
t.Run(test.name, func(t *testing.T) {
c := &client{srv: s, nc: &connString{}, kind: test.kind}
if test.ws {
c.ws = &websocket{}
}
if test.mqtt {
c.mqtt = &mqtt{}
}
c.initClient()
if host := "fe80::abc:def:ghi:123%utun0"; host != c.host {
t.Fatalf("expected host to be %q, got %q", host, c.host)
}
if port := uint16(4222); port != c.port {
t.Fatalf("expected port to be %v, got %v", port, c.port)
}
checkLog := func(suffix string) {
t.Helper()
l.Lock()
msg := l.msg
l.Unlock()
if strings.Contains(msg, "(MISSING)") {
t.Fatalf("conn name was not escaped properly, got MISSING: %s", msg)
}
if !strings.Contains(l.msg, test.kindStr) {
t.Fatalf("expected kind to be %q, got: %s", test.kindStr, msg)
}
if !strings.HasSuffix(l.msg, suffix) {
t.Fatalf("expected statement to end with %q, got %s", suffix, msg)
}
}
c.Debugf("debug: %v", 1)
checkLog(" 1")
c.Tracef("trace: %s", "2")
checkLog(" 2")
c.Warnf("warn: %s %d", "3", 4)
checkLog(" 3 4")
c.Errorf("error: %v %s", 5, "6")
checkLog(" 5 6")
})
}
}
func TestClientLimits(t *testing.T) {
accKp, err := nkeys.CreateAccount()
if err != nil {
t.Fatalf("Error creating account key: %v", err)
}
uKp, err := nkeys.CreateUser()
if err != nil {
t.Fatalf("Error creating user key: %v", err)
}
uPub, err := uKp.PublicKey()
if err != nil {
t.Fatalf("Error obtaining publicKey: %v", err)
}
s, err := NewServer(DefaultOptions())
if err != nil {
t.Fatalf("Error creating server: %v", err)
}
for _, test := range []struct {
client int32
acc int32
srv int32
expect int32
}{
// all identical
{1, 1, 1, 1},
{-1, -1, 0, -1},
// only one value unlimited
{1, -1, 0, 1},
{-1, 1, 0, 1},
{-1, -1, 1, 1},
// all combinations of distinct values
{1, 2, 3, 1},
{1, 3, 2, 1},
{2, 1, 3, 1},
{2, 3, 1, 1},
{3, 1, 2, 1},
{3, 2, 1, 1},
} {
t.Run("", func(t *testing.T) {
s.opts.MaxPayload = test.srv
s.opts.MaxSubs = int(test.srv)
c := &client{srv: s, acc: &Account{
limits: limits{mpay: test.acc, msubs: test.acc},
}}
uc := jwt.NewUserClaims(uPub)
uc.Limits.Subs = int64(test.client)
uc.Limits.Payload = int64(test.client)
c.opts.JWT, err = uc.Encode(accKp)
if err != nil {
t.Fatalf("Error encoding jwt: %v", err)
}
c.applyAccountLimits()
if c.mpay != test.expect {
t.Fatalf("payload %d not as ecpected %d", c.mpay, test.expect)
}
if c.msubs != test.expect {
t.Fatalf("subscriber %d not as ecpected %d", c.msubs, test.expect)
}
})
}
}
func TestClientClampMaxSubsErrReport(t *testing.T) {
maxSubLimitReportThreshold = int64(100 * time.Millisecond)
defer func() { maxSubLimitReportThreshold = defaultMaxSubLimitReportThreshold }()
o1 := DefaultOptions()
o1.MaxSubs = 1
o1.LeafNode.Host = "127.0.0.1"
o1.LeafNode.Port = -1
s1 := RunServer(o1)
defer s1.Shutdown()
l := &captureErrorLogger{errCh: make(chan string, 10)}
s1.SetLogger(l, false, false)
o2 := DefaultOptions()
u, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", o1.LeafNode.Port))
o2.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{u}}}
s2 := RunServer(o2)
defer s2.Shutdown()
checkLeafNodeConnected(t, s1)
checkLeafNodeConnected(t, s2)
nc := natsConnect(t, s2.ClientURL())
natsSubSync(t, nc, "foo")
natsSubSync(t, nc, "bar")
// Make sure we receive only 1
check := func() {
t.Helper()
for i := 0; i < 2; i++ {
select {
case errStr := <-l.errCh:
if i > 0 {
t.Fatalf("Should not have logged a second time: %s", errStr)
}
if !strings.Contains(errStr, "maximum subscriptions") {
t.Fatalf("Unexpected error: %s", errStr)
}
case <-time.After(300 * time.Millisecond):
if i == 0 {
t.Fatal("Error should have been logged")
}
}
}
}
check()
// The above will have waited long enough to clear the report threshold.
// So create two new subs and check again that we get only 1 report.
natsSubSync(t, nc, "baz")
natsSubSync(t, nc, "bat")
check()
}