mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
As soon as server has processed a client CONNECT, it was possible that if Connz() or other was requested, the server will send a PING to compute the RTT. This would cause clients that expect the first PONG as part of synchronous CONNECT logic to fail. Make sure that we delay the first RTT ping to after sending the first PONG, or if client does not send PING as part of the CONNECT, after 2 seconds have elapsed since the tcp connection was accepted. Resolves #1174 Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
1698 lines
43 KiB
Go
1698 lines
43 KiB
Go
// Copyright 2012-2019 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package server
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"net"
|
|
"reflect"
|
|
"regexp"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"crypto/rand"
|
|
"crypto/tls"
|
|
|
|
"github.com/nats-io/nats.go"
|
|
)
|
|
|
|
type serverInfo struct {
|
|
ID string `json:"server_id"`
|
|
Host string `json:"host"`
|
|
Port uint `json:"port"`
|
|
Version string `json:"version"`
|
|
AuthRequired bool `json:"auth_required"`
|
|
TLSRequired bool `json:"tls_required"`
|
|
MaxPayload int64 `json:"max_payload"`
|
|
}
|
|
|
|
func createClientAsync(ch chan *client, s *Server, cli net.Conn) {
|
|
go func() {
|
|
c := s.createClient(cli)
|
|
// Must be here to suppress +OK
|
|
c.opts.Verbose = false
|
|
ch <- c
|
|
}()
|
|
}
|
|
|
|
func newClientForServer(s *Server) (*client, *bufio.Reader, string) {
|
|
cli, srv := net.Pipe()
|
|
cr := bufio.NewReaderSize(cli, maxBufSize)
|
|
ch := make(chan *client)
|
|
createClientAsync(ch, s, srv)
|
|
// So failing tests don't just hang.
|
|
cli.SetReadDeadline(time.Now().Add(10 * time.Second))
|
|
l, _ := cr.ReadString('\n')
|
|
// Grab client
|
|
c := <-ch
|
|
return c, cr, l
|
|
}
|
|
|
|
var defaultServerOptions = Options{
|
|
Host: "127.0.0.1",
|
|
Trace: false,
|
|
Debug: false,
|
|
NoLog: true,
|
|
NoSigs: true,
|
|
}
|
|
|
|
func rawSetup(serverOptions Options) (*Server, *client, *bufio.Reader, string) {
|
|
cli, srv := net.Pipe()
|
|
cr := bufio.NewReaderSize(cli, maxBufSize)
|
|
s := New(&serverOptions)
|
|
|
|
ch := make(chan *client)
|
|
createClientAsync(ch, s, srv)
|
|
|
|
l, _ := cr.ReadString('\n')
|
|
|
|
// Grab client
|
|
c := <-ch
|
|
return s, c, cr, l
|
|
}
|
|
|
|
func setUpClientWithResponse() (*client, string) {
|
|
_, c, _, l := rawSetup(defaultServerOptions)
|
|
return c, l
|
|
}
|
|
|
|
func setupClient() (*Server, *client, *bufio.Reader) {
|
|
s, c, cr, _ := rawSetup(defaultServerOptions)
|
|
return s, c, cr
|
|
}
|
|
|
|
func checkClientsCount(t *testing.T, s *Server, expected int) {
|
|
t.Helper()
|
|
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
|
|
if nc := s.NumClients(); nc != expected {
|
|
return fmt.Errorf("The number of expected connections was %v, got %v", expected, nc)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func TestClientCreateAndInfo(t *testing.T) {
|
|
c, l := setUpClientWithResponse()
|
|
|
|
if c.cid != 1 {
|
|
t.Fatalf("Expected cid of 1 vs %d\n", c.cid)
|
|
}
|
|
if c.state != OP_START {
|
|
t.Fatal("Expected state to be OP_START")
|
|
}
|
|
|
|
if !strings.HasPrefix(l, "INFO ") {
|
|
t.Fatalf("INFO response incorrect: %s\n", l)
|
|
}
|
|
// Make sure payload is proper json
|
|
var info serverInfo
|
|
err := json.Unmarshal([]byte(l[5:]), &info)
|
|
if err != nil {
|
|
t.Fatalf("Could not parse INFO json: %v\n", err)
|
|
}
|
|
// Sanity checks
|
|
if info.MaxPayload != MAX_PAYLOAD_SIZE ||
|
|
info.AuthRequired || info.TLSRequired ||
|
|
info.Port != DEFAULT_PORT {
|
|
t.Fatalf("INFO inconsistent: %+v\n", info)
|
|
}
|
|
}
|
|
|
|
func TestNonTLSConnectionState(t *testing.T) {
|
|
_, c, _ := setupClient()
|
|
state := c.GetTLSConnectionState()
|
|
if state != nil {
|
|
t.Error("GetTLSConnectionState() returned non-nil")
|
|
}
|
|
}
|
|
|
|
func TestClientConnect(t *testing.T) {
|
|
_, c, _ := setupClient()
|
|
|
|
// Basic Connect setting flags
|
|
connectOp := []byte("CONNECT {\"verbose\":true,\"pedantic\":true,\"tls_required\":false,\"echo\":false}\r\n")
|
|
err := c.parse(connectOp)
|
|
if err != nil {
|
|
t.Fatalf("Received error: %v\n", err)
|
|
}
|
|
if c.state != OP_START {
|
|
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
|
|
}
|
|
if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Echo: false}) {
|
|
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
|
|
}
|
|
|
|
// Test that we can capture user/pass
|
|
connectOp = []byte("CONNECT {\"user\":\"derek\",\"pass\":\"foo\"}\r\n")
|
|
c.opts = defaultOpts
|
|
err = c.parse(connectOp)
|
|
if err != nil {
|
|
t.Fatalf("Received error: %v\n", err)
|
|
}
|
|
if c.state != OP_START {
|
|
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
|
|
}
|
|
if !reflect.DeepEqual(c.opts, clientOpts{Echo: true, Verbose: true, Pedantic: true, Username: "derek", Password: "foo"}) {
|
|
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
|
|
}
|
|
|
|
// Test that we can capture client name
|
|
connectOp = []byte("CONNECT {\"user\":\"derek\",\"pass\":\"foo\",\"name\":\"router\"}\r\n")
|
|
c.opts = defaultOpts
|
|
err = c.parse(connectOp)
|
|
if err != nil {
|
|
t.Fatalf("Received error: %v\n", err)
|
|
}
|
|
if c.state != OP_START {
|
|
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
|
|
}
|
|
|
|
if !reflect.DeepEqual(c.opts, clientOpts{Echo: true, Verbose: true, Pedantic: true, Username: "derek", Password: "foo", Name: "router"}) {
|
|
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
|
|
}
|
|
|
|
// Test that we correctly capture auth tokens
|
|
connectOp = []byte("CONNECT {\"auth_token\":\"YZZ222\",\"name\":\"router\"}\r\n")
|
|
c.opts = defaultOpts
|
|
err = c.parse(connectOp)
|
|
if err != nil {
|
|
t.Fatalf("Received error: %v\n", err)
|
|
}
|
|
if c.state != OP_START {
|
|
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
|
|
}
|
|
|
|
if !reflect.DeepEqual(c.opts, clientOpts{Echo: true, Verbose: true, Pedantic: true, Authorization: "YZZ222", Name: "router"}) {
|
|
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
|
|
}
|
|
}
|
|
|
|
func TestClientConnectProto(t *testing.T) {
|
|
_, c, r := setupClient()
|
|
|
|
// Basic Connect setting flags, proto should be zero (original proto)
|
|
connectOp := []byte("CONNECT {\"verbose\":true,\"pedantic\":true,\"tls_required\":false}\r\n")
|
|
err := c.parse(connectOp)
|
|
if err != nil {
|
|
t.Fatalf("Received error: %v\n", err)
|
|
}
|
|
if c.state != OP_START {
|
|
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
|
|
}
|
|
if !reflect.DeepEqual(c.opts, clientOpts{Echo: true, Verbose: true, Pedantic: true, Protocol: ClientProtoZero}) {
|
|
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
|
|
}
|
|
|
|
// ProtoInfo
|
|
connectOp = []byte(fmt.Sprintf("CONNECT {\"verbose\":true,\"pedantic\":true,\"tls_required\":false,\"protocol\":%d}\r\n", ClientProtoInfo))
|
|
err = c.parse(connectOp)
|
|
if err != nil {
|
|
t.Fatalf("Received error: %v\n", err)
|
|
}
|
|
if c.state != OP_START {
|
|
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
|
|
}
|
|
if !reflect.DeepEqual(c.opts, clientOpts{Echo: true, Verbose: true, Pedantic: true, Protocol: ClientProtoInfo}) {
|
|
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
|
|
}
|
|
if c.opts.Protocol != ClientProtoInfo {
|
|
t.Fatalf("Protocol should have been set to %v, but is set to %v", ClientProtoInfo, c.opts.Protocol)
|
|
}
|
|
|
|
// Illegal Option
|
|
connectOp = []byte("CONNECT {\"protocol\":22}\r\n")
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(1)
|
|
// The client here is using a pipe, we need to be dequeuing
|
|
// data otherwise the server would be blocked trying to send
|
|
// the error back to it.
|
|
go func() {
|
|
defer wg.Done()
|
|
for {
|
|
if _, _, err := r.ReadLine(); err != nil {
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
err = c.parse(connectOp)
|
|
if err == nil {
|
|
t.Fatalf("Expected to receive an error\n")
|
|
}
|
|
if err != ErrBadClientProtocol {
|
|
t.Fatalf("Expected err of %q, got %q\n", ErrBadClientProtocol, err)
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
func TestRemoteAddress(t *testing.T) {
|
|
c := &client{}
|
|
|
|
// though in reality this will panic if it does not, adding coverage anyway
|
|
if c.RemoteAddress() != nil {
|
|
t.Errorf("RemoteAddress() did not handle nil connection correctly")
|
|
}
|
|
|
|
_, c, _ = setupClient()
|
|
addr := c.RemoteAddress()
|
|
|
|
if addr.Network() != "pipe" {
|
|
t.Errorf("RemoteAddress() returned invalid network: %s", addr.Network())
|
|
}
|
|
|
|
if addr.String() != "pipe" {
|
|
t.Errorf("RemoteAddress() returned invalid string: %s", addr.String())
|
|
}
|
|
}
|
|
|
|
func TestClientPing(t *testing.T) {
|
|
_, c, cr := setupClient()
|
|
|
|
// PING
|
|
pingOp := []byte("PING\r\n")
|
|
go c.parse(pingOp)
|
|
l, err := cr.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error receiving info from server: %v\n", err)
|
|
}
|
|
if !strings.HasPrefix(l, "PONG\r\n") {
|
|
t.Fatalf("PONG response incorrect: %s\n", l)
|
|
}
|
|
}
|
|
|
|
var msgPat = regexp.MustCompile(`MSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\r\n`)
|
|
|
|
const (
|
|
SUB_INDEX = 1
|
|
SID_INDEX = 2
|
|
REPLY_INDEX = 4
|
|
LEN_INDEX = 5
|
|
)
|
|
|
|
func grabPayload(cr *bufio.Reader, expected int) []byte {
|
|
d := make([]byte, expected)
|
|
n, _ := cr.Read(d)
|
|
cr.ReadString('\n')
|
|
return d[:n]
|
|
}
|
|
|
|
func checkPayload(cr *bufio.Reader, expected []byte, t *testing.T) {
|
|
t.Helper()
|
|
// Read in payload
|
|
d := make([]byte, len(expected))
|
|
n, err := cr.Read(d)
|
|
if err != nil {
|
|
t.Fatalf("Error receiving msg payload from server: %v\n", err)
|
|
}
|
|
if n != len(expected) {
|
|
t.Fatalf("Did not read correct amount of bytes: %d vs %d\n", n, len(expected))
|
|
}
|
|
if !bytes.Equal(d, expected) {
|
|
t.Fatalf("Did not read correct payload:: <%s>\n", d)
|
|
}
|
|
}
|
|
|
|
func TestClientSimplePubSub(t *testing.T) {
|
|
_, c, cr := setupClient()
|
|
// SUB/PUB
|
|
go c.parse([]byte("SUB foo 1\r\nPUB foo 5\r\nhello\r\nPING\r\n"))
|
|
l, err := cr.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error receiving msg from server: %v\n", err)
|
|
}
|
|
matches := msgPat.FindAllStringSubmatch(l, -1)[0]
|
|
if len(matches) != 6 {
|
|
t.Fatalf("Did not get correct # matches: %d vs %d\n", len(matches), 6)
|
|
}
|
|
if matches[SUB_INDEX] != "foo" {
|
|
t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX])
|
|
}
|
|
if matches[SID_INDEX] != "1" {
|
|
t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX])
|
|
}
|
|
if matches[LEN_INDEX] != "5" {
|
|
t.Fatalf("Did not get correct msg length: '%s'\n", matches[LEN_INDEX])
|
|
}
|
|
checkPayload(cr, []byte("hello\r\n"), t)
|
|
}
|
|
|
|
func TestClientPubSubNoEcho(t *testing.T) {
|
|
_, c, cr := setupClient()
|
|
// Specify no echo
|
|
connectOp := []byte("CONNECT {\"echo\":false}\r\n")
|
|
err := c.parse(connectOp)
|
|
if err != nil {
|
|
t.Fatalf("Received error: %v\n", err)
|
|
}
|
|
// SUB/PUB
|
|
go c.parse([]byte("SUB foo 1\r\nPUB foo 5\r\nhello\r\nPING\r\n"))
|
|
l, err := cr.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error receiving msg from server: %v\n", err)
|
|
}
|
|
// We should not receive anything but a PONG since we specified no echo.
|
|
if !strings.HasPrefix(l, "PONG\r\n") {
|
|
t.Fatalf("PONG response incorrect: %q\n", l)
|
|
}
|
|
}
|
|
|
|
func TestClientSimplePubSubWithReply(t *testing.T) {
|
|
_, c, cr := setupClient()
|
|
|
|
// SUB/PUB
|
|
go c.parse([]byte("SUB foo 1\r\nPUB foo bar 5\r\nhello\r\nPING\r\n"))
|
|
l, err := cr.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error receiving msg from server: %v\n", err)
|
|
}
|
|
matches := msgPat.FindAllStringSubmatch(l, -1)[0]
|
|
if len(matches) != 6 {
|
|
t.Fatalf("Did not get correct # matches: %d vs %d\n", len(matches), 6)
|
|
}
|
|
if matches[SUB_INDEX] != "foo" {
|
|
t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX])
|
|
}
|
|
if matches[SID_INDEX] != "1" {
|
|
t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX])
|
|
}
|
|
if matches[REPLY_INDEX] != "bar" {
|
|
t.Fatalf("Did not get correct reply subject: '%s'\n", matches[REPLY_INDEX])
|
|
}
|
|
if matches[LEN_INDEX] != "5" {
|
|
t.Fatalf("Did not get correct msg length: '%s'\n", matches[LEN_INDEX])
|
|
}
|
|
}
|
|
|
|
func TestClientNoBodyPubSubWithReply(t *testing.T) {
|
|
_, c, cr := setupClient()
|
|
|
|
// SUB/PUB
|
|
go c.parse([]byte("SUB foo 1\r\nPUB foo bar 0\r\n\r\nPING\r\n"))
|
|
l, err := cr.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error receiving msg from server: %v\n", err)
|
|
}
|
|
matches := msgPat.FindAllStringSubmatch(l, -1)[0]
|
|
if len(matches) != 6 {
|
|
t.Fatalf("Did not get correct # matches: %d vs %d\n", len(matches), 6)
|
|
}
|
|
if matches[SUB_INDEX] != "foo" {
|
|
t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX])
|
|
}
|
|
if matches[SID_INDEX] != "1" {
|
|
t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX])
|
|
}
|
|
if matches[REPLY_INDEX] != "bar" {
|
|
t.Fatalf("Did not get correct reply subject: '%s'\n", matches[REPLY_INDEX])
|
|
}
|
|
if matches[LEN_INDEX] != "0" {
|
|
t.Fatalf("Did not get correct msg length: '%s'\n", matches[LEN_INDEX])
|
|
}
|
|
}
|
|
|
|
// This needs to clear any flushOutbound flags since writeLoop not running.
|
|
func (c *client) parseAndFlush(op []byte) {
|
|
c.parse(op)
|
|
for cp := range c.pcd {
|
|
cp.mu.Lock()
|
|
cp.flags.clear(flushOutbound)
|
|
cp.flushOutbound()
|
|
cp.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
func (c *client) parseFlushAndClose(op []byte) {
|
|
c.parseAndFlush(op)
|
|
c.nc.Close()
|
|
}
|
|
|
|
func TestClientPubWithQueueSub(t *testing.T) {
|
|
_, c, cr := setupClient()
|
|
|
|
num := 100
|
|
|
|
// Queue SUB/PUB
|
|
subs := []byte("SUB foo g1 1\r\nSUB foo g1 2\r\n")
|
|
pubs := []byte("PUB foo bar 5\r\nhello\r\n")
|
|
op := []byte{}
|
|
op = append(op, subs...)
|
|
for i := 0; i < num; i++ {
|
|
op = append(op, pubs...)
|
|
}
|
|
|
|
go c.parseFlushAndClose(op)
|
|
|
|
var n1, n2, received int
|
|
for ; ; received++ {
|
|
l, err := cr.ReadString('\n')
|
|
if err != nil {
|
|
break
|
|
}
|
|
matches := msgPat.FindAllStringSubmatch(l, -1)[0]
|
|
|
|
// Count which sub
|
|
switch matches[SID_INDEX] {
|
|
case "1":
|
|
n1++
|
|
case "2":
|
|
n2++
|
|
}
|
|
checkPayload(cr, []byte("hello\r\n"), t)
|
|
}
|
|
if received != num {
|
|
t.Fatalf("Received wrong # of msgs: %d vs %d\n", received, num)
|
|
}
|
|
// Threshold for randomness for now
|
|
if n1 < 20 || n2 < 20 {
|
|
t.Fatalf("Received wrong # of msgs per subscriber: %d - %d\n", n1, n2)
|
|
}
|
|
}
|
|
|
|
func TestSplitSubjectQueue(t *testing.T) {
|
|
cases := []struct {
|
|
name string
|
|
sq string
|
|
wantSubject []byte
|
|
wantQueue []byte
|
|
wantErr bool
|
|
}{
|
|
{name: "single subject",
|
|
sq: "foo", wantSubject: []byte("foo"), wantQueue: nil},
|
|
{name: "subject and queue",
|
|
sq: "foo bar", wantSubject: []byte("foo"), wantQueue: []byte("bar")},
|
|
{name: "subject and queue with surrounding spaces",
|
|
sq: " foo bar ", wantSubject: []byte("foo"), wantQueue: []byte("bar")},
|
|
{name: "subject and queue with extra spaces in the middle",
|
|
sq: "foo bar", wantSubject: []byte("foo"), wantQueue: []byte("bar")},
|
|
{name: "subject, queue, and extra token",
|
|
sq: "foo bar fizz", wantSubject: []byte(nil), wantQueue: []byte(nil), wantErr: true},
|
|
}
|
|
|
|
for _, c := range cases {
|
|
t.Run(c.name, func(t *testing.T) {
|
|
sub, que, err := splitSubjectQueue(c.sq)
|
|
if err == nil && c.wantErr {
|
|
t.Fatal("Expected error, but got nil")
|
|
}
|
|
if err != nil && !c.wantErr {
|
|
t.Fatalf("Expected nil error, but got %v", err)
|
|
}
|
|
|
|
if !reflect.DeepEqual(sub, c.wantSubject) {
|
|
t.Fatalf("Expected to get subject %#v, but instead got %#v", c.wantSubject, sub)
|
|
}
|
|
if !reflect.DeepEqual(que, c.wantQueue) {
|
|
t.Fatalf("Expected to get queue %#v, but instead got %#v", c.wantQueue, que)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestQueueSubscribePermissions(t *testing.T) {
|
|
cases := []struct {
|
|
name string
|
|
perms *SubjectPermission
|
|
subject string
|
|
queue string
|
|
want string
|
|
}{
|
|
{
|
|
name: "plain subscription on foo",
|
|
perms: &SubjectPermission{Allow: []string{"foo"}},
|
|
subject: "foo",
|
|
want: "+OK\r\n",
|
|
},
|
|
{
|
|
name: "queue subscribe with allowed group",
|
|
perms: &SubjectPermission{Allow: []string{"foo bar"}},
|
|
subject: "foo",
|
|
queue: "bar",
|
|
want: "+OK\r\n",
|
|
},
|
|
{
|
|
name: "queue subscribe with wildcard allowed group",
|
|
perms: &SubjectPermission{Allow: []string{"foo bar.*"}},
|
|
subject: "foo",
|
|
queue: "bar.fizz",
|
|
want: "+OK\r\n",
|
|
},
|
|
{
|
|
name: "queue subscribe with full wildcard subject and subgroup",
|
|
perms: &SubjectPermission{Allow: []string{"> bar.>"}},
|
|
subject: "whizz",
|
|
queue: "bar.bang",
|
|
want: "+OK\r\n",
|
|
},
|
|
{
|
|
name: "plain subscribe with full wildcard subject and subgroup",
|
|
perms: &SubjectPermission{Allow: []string{"> bar.>"}},
|
|
subject: "whizz",
|
|
want: "-ERR 'Permissions Violation for Subscription to \"whizz\"'\r\n",
|
|
},
|
|
{
|
|
name: "deny plain subscription on foo",
|
|
perms: &SubjectPermission{Allow: []string{">"}, Deny: []string{"foo"}},
|
|
subject: "foo",
|
|
queue: "bar",
|
|
want: "-ERR 'Permissions Violation for Subscription to \"foo\" using queue \"bar\"'\r\n",
|
|
},
|
|
{
|
|
name: "allow plain subscription, except foo",
|
|
perms: &SubjectPermission{Allow: []string{">"}, Deny: []string{"foo"}},
|
|
subject: "bar",
|
|
want: "+OK\r\n",
|
|
},
|
|
{
|
|
name: "deny everything",
|
|
perms: &SubjectPermission{Allow: []string{">"}, Deny: []string{">"}},
|
|
subject: "foo",
|
|
queue: "bar",
|
|
want: "-ERR 'Permissions Violation for Subscription to \"foo\" using queue \"bar\"'\r\n",
|
|
},
|
|
{
|
|
name: "can only subscribe to queues v1",
|
|
perms: &SubjectPermission{Allow: []string{"> v1.>"}},
|
|
subject: "foo",
|
|
queue: "v1.prod",
|
|
want: "+OK\r\n",
|
|
},
|
|
{
|
|
name: "cannot subscribe to queues, plain subscribe ok",
|
|
perms: &SubjectPermission{Allow: []string{">"}, Deny: []string{"> >"}},
|
|
subject: "foo",
|
|
want: "+OK\r\n",
|
|
},
|
|
{
|
|
name: "cannot subscribe to queues, queue subscribe not ok",
|
|
perms: &SubjectPermission{Deny: []string{"> >"}},
|
|
subject: "foo",
|
|
queue: "bar",
|
|
want: "-ERR 'Permissions Violation for Subscription to \"foo\" using queue \"bar\"'\r\n",
|
|
},
|
|
{
|
|
name: "deny all queue subscriptions on dev or stg only",
|
|
perms: &SubjectPermission{Deny: []string{"> *.dev", "> *.stg"}},
|
|
subject: "foo",
|
|
queue: "bar",
|
|
want: "+OK\r\n",
|
|
},
|
|
{
|
|
name: "allow only queue subscription on dev or stg",
|
|
perms: &SubjectPermission{Allow: []string{"> *.dev", "> *.stg"}},
|
|
subject: "foo",
|
|
queue: "bar",
|
|
want: "-ERR 'Permissions Violation for Subscription to \"foo\" using queue \"bar\"'\r\n",
|
|
},
|
|
{
|
|
name: "deny queue subscriptions with subject foo",
|
|
perms: &SubjectPermission{Deny: []string{"foo >"}},
|
|
subject: "foo",
|
|
queue: "bar",
|
|
want: "-ERR 'Permissions Violation for Subscription to \"foo\" using queue \"bar\"'\r\n",
|
|
},
|
|
{
|
|
name: "plain sub is allowed, but queue subscribe with queue not in list",
|
|
perms: &SubjectPermission{Allow: []string{"foo bar"}},
|
|
subject: "foo",
|
|
queue: "fizz",
|
|
want: "-ERR 'Permissions Violation for Subscription to \"foo\" using queue \"fizz\"'\r\n",
|
|
},
|
|
{
|
|
name: "allow plain sub, but do queue subscribe",
|
|
perms: &SubjectPermission{Allow: []string{"foo"}},
|
|
subject: "foo",
|
|
queue: "bar",
|
|
want: "+OK\r\n",
|
|
},
|
|
}
|
|
for _, c := range cases {
|
|
t.Run(c.name, func(t *testing.T) {
|
|
_, client, r := setupClient()
|
|
|
|
client.RegisterUser(&User{
|
|
Permissions: &Permissions{Subscribe: c.perms},
|
|
})
|
|
connect := []byte("CONNECT {\"verbose\":true}\r\n")
|
|
qsub := []byte(fmt.Sprintf("SUB %s %s 1\r\n", c.subject, c.queue))
|
|
|
|
go client.parseFlushAndClose(append(connect, qsub...))
|
|
|
|
var buf bytes.Buffer
|
|
if _, err := io.Copy(&buf, r); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Extra OK is from the successful CONNECT.
|
|
want := "+OK\r\n" + c.want
|
|
if got := buf.String(); got != want {
|
|
t.Fatalf("Expected to receive %q, but instead received %q", want, got)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestClientPubWithQueueSubNoEcho(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
nc1, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc1.Close()
|
|
|
|
// Grab the client from server and set no echo by hand.
|
|
s.mu.Lock()
|
|
lc := len(s.clients)
|
|
c := s.clients[s.gcid]
|
|
s.mu.Unlock()
|
|
|
|
if lc != 1 {
|
|
t.Fatalf("Expected only 1 client but got %d\n", lc)
|
|
}
|
|
if c == nil {
|
|
t.Fatal("Expected to retrieve client\n")
|
|
}
|
|
c.mu.Lock()
|
|
c.echo = false
|
|
c.mu.Unlock()
|
|
|
|
// Queue sub on nc1.
|
|
_, err = nc1.QueueSubscribe("foo", "bar", func(*nats.Msg) {})
|
|
if err != nil {
|
|
t.Fatalf("Error on subscribe: %v", err)
|
|
}
|
|
nc1.Flush()
|
|
|
|
nc2, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc2.Close()
|
|
|
|
n := int32(0)
|
|
cb := func(m *nats.Msg) {
|
|
atomic.AddInt32(&n, 1)
|
|
}
|
|
|
|
_, err = nc2.QueueSubscribe("foo", "bar", cb)
|
|
if err != nil {
|
|
t.Fatalf("Error on subscribe: %v", err)
|
|
}
|
|
nc2.Flush()
|
|
|
|
// Now publish 100 messages on nc1 which does not allow echo.
|
|
for i := 0; i < 100; i++ {
|
|
nc1.Publish("foo", []byte("Hello"))
|
|
}
|
|
nc1.Flush()
|
|
nc2.Flush()
|
|
|
|
checkFor(t, 5*time.Second, 10*time.Millisecond, func() error {
|
|
num := atomic.LoadInt32(&n)
|
|
if num != int32(100) {
|
|
return fmt.Errorf("Expected all the msgs to be received by nc2, got %d\n", num)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func TestClientUnSub(t *testing.T) {
|
|
_, c, cr := setupClient()
|
|
|
|
num := 1
|
|
|
|
// SUB/PUB
|
|
subs := []byte("SUB foo 1\r\nSUB foo 2\r\n")
|
|
unsub := []byte("UNSUB 1\r\n")
|
|
pub := []byte("PUB foo bar 5\r\nhello\r\n")
|
|
|
|
op := []byte{}
|
|
op = append(op, subs...)
|
|
op = append(op, unsub...)
|
|
op = append(op, pub...)
|
|
|
|
go c.parseFlushAndClose(op)
|
|
|
|
var received int
|
|
for ; ; received++ {
|
|
l, err := cr.ReadString('\n')
|
|
if err != nil {
|
|
break
|
|
}
|
|
matches := msgPat.FindAllStringSubmatch(l, -1)[0]
|
|
if matches[SID_INDEX] != "2" {
|
|
t.Fatalf("Received msg on unsubscribed subscription!\n")
|
|
}
|
|
checkPayload(cr, []byte("hello\r\n"), t)
|
|
}
|
|
if received != num {
|
|
t.Fatalf("Received wrong # of msgs: %d vs %d\n", received, num)
|
|
}
|
|
}
|
|
|
|
func TestClientUnSubMax(t *testing.T) {
|
|
_, c, cr := setupClient()
|
|
|
|
num := 10
|
|
exp := 5
|
|
|
|
// SUB/PUB
|
|
subs := []byte("SUB foo 1\r\n")
|
|
unsub := []byte("UNSUB 1 5\r\n")
|
|
pub := []byte("PUB foo bar 5\r\nhello\r\n")
|
|
|
|
op := []byte{}
|
|
op = append(op, subs...)
|
|
op = append(op, unsub...)
|
|
for i := 0; i < num; i++ {
|
|
op = append(op, pub...)
|
|
}
|
|
|
|
go c.parseFlushAndClose(op)
|
|
|
|
var received int
|
|
for ; ; received++ {
|
|
l, err := cr.ReadString('\n')
|
|
if err != nil {
|
|
break
|
|
}
|
|
matches := msgPat.FindAllStringSubmatch(l, -1)[0]
|
|
if matches[SID_INDEX] != "1" {
|
|
t.Fatalf("Received msg on unsubscribed subscription!\n")
|
|
}
|
|
checkPayload(cr, []byte("hello\r\n"), t)
|
|
}
|
|
if received != exp {
|
|
t.Fatalf("Received wrong # of msgs: %d vs %d\n", received, exp)
|
|
}
|
|
}
|
|
|
|
func TestClientAutoUnsubExactReceived(t *testing.T) {
|
|
_, c, _ := setupClient()
|
|
defer c.nc.Close()
|
|
|
|
// SUB/PUB
|
|
subs := []byte("SUB foo 1\r\n")
|
|
unsub := []byte("UNSUB 1 1\r\n")
|
|
pub := []byte("PUB foo bar 2\r\nok\r\n")
|
|
|
|
op := []byte{}
|
|
op = append(op, subs...)
|
|
op = append(op, unsub...)
|
|
op = append(op, pub...)
|
|
|
|
ch := make(chan bool)
|
|
go func() {
|
|
c.parse(op)
|
|
ch <- true
|
|
}()
|
|
|
|
// Wait for processing
|
|
<-ch
|
|
|
|
// We should not have any subscriptions in place here.
|
|
if len(c.subs) != 0 {
|
|
t.Fatalf("Wrong number of subscriptions: expected 0, got %d\n", len(c.subs))
|
|
}
|
|
}
|
|
|
|
func TestClientUnsubAfterAutoUnsub(t *testing.T) {
|
|
_, c, _ := setupClient()
|
|
defer c.nc.Close()
|
|
|
|
// SUB/UNSUB/UNSUB
|
|
subs := []byte("SUB foo 1\r\n")
|
|
asub := []byte("UNSUB 1 1\r\n")
|
|
unsub := []byte("UNSUB 1\r\n")
|
|
|
|
op := []byte{}
|
|
op = append(op, subs...)
|
|
op = append(op, asub...)
|
|
op = append(op, unsub...)
|
|
|
|
ch := make(chan bool)
|
|
go func() {
|
|
c.parse(op)
|
|
ch <- true
|
|
}()
|
|
|
|
// Wait for processing
|
|
<-ch
|
|
|
|
// We should not have any subscriptions in place here.
|
|
if len(c.subs) != 0 {
|
|
t.Fatalf("Wrong number of subscriptions: expected 0, got %d\n", len(c.subs))
|
|
}
|
|
}
|
|
|
|
func TestClientRemoveSubsOnDisconnect(t *testing.T) {
|
|
s, c, _ := setupClient()
|
|
subs := []byte("SUB foo 1\r\nSUB bar 2\r\n")
|
|
|
|
ch := make(chan bool)
|
|
go func() {
|
|
c.parse(subs)
|
|
ch <- true
|
|
}()
|
|
<-ch
|
|
|
|
if s.NumSubscriptions() != 2 {
|
|
t.Fatalf("Should have 2 subscriptions, got %d\n", s.NumSubscriptions())
|
|
}
|
|
c.closeConnection(ClientClosed)
|
|
if s.NumSubscriptions() != 0 {
|
|
t.Fatalf("Should have no subscriptions after close, got %d\n", s.NumSubscriptions())
|
|
}
|
|
}
|
|
|
|
func TestClientDoesNotAddSubscriptionsWhenConnectionClosed(t *testing.T) {
|
|
_, c, _ := setupClient()
|
|
c.closeConnection(ClientClosed)
|
|
subs := []byte("SUB foo 1\r\nSUB bar 2\r\n")
|
|
|
|
ch := make(chan bool)
|
|
go func() {
|
|
c.parse(subs)
|
|
ch <- true
|
|
}()
|
|
<-ch
|
|
|
|
if c.acc.sl.Count() != 0 {
|
|
t.Fatalf("Should have no subscriptions after close, got %d\n", c.acc.sl.Count())
|
|
}
|
|
}
|
|
|
|
func TestClientMapRemoval(t *testing.T) {
|
|
s, c, _ := setupClient()
|
|
c.nc.Close()
|
|
|
|
checkClientsCount(t, s, 0)
|
|
}
|
|
|
|
func TestAuthorizationTimeout(t *testing.T) {
|
|
serverOptions := DefaultOptions()
|
|
serverOptions.Authorization = "my_token"
|
|
serverOptions.AuthTimeout = 0.4
|
|
s := RunServer(serverOptions)
|
|
defer s.Shutdown()
|
|
|
|
conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", serverOptions.Host, serverOptions.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error dialing server: %v\n", err)
|
|
}
|
|
defer conn.Close()
|
|
client := bufio.NewReaderSize(conn, maxBufSize)
|
|
if _, err := client.ReadString('\n'); err != nil {
|
|
t.Fatalf("Error receiving info from server: %v\n", err)
|
|
}
|
|
time.Sleep(3 * secondsToDuration(serverOptions.AuthTimeout))
|
|
l, err := client.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error receiving info from server: %v\n", err)
|
|
}
|
|
if !strings.Contains(l, "Authentication Timeout") {
|
|
t.Fatalf("Authentication Timeout response incorrect: %q\n", l)
|
|
}
|
|
}
|
|
|
|
// This is from bug report #18
|
|
func TestTwoTokenPubMatchSingleTokenSub(t *testing.T) {
|
|
_, c, cr := setupClient()
|
|
test := []byte("PUB foo.bar 5\r\nhello\r\nSUB foo 1\r\nPING\r\nPUB foo.bar 5\r\nhello\r\nPING\r\n")
|
|
go c.parse(test)
|
|
l, err := cr.ReadString('\n')
|
|
if err != nil {
|
|
t.Fatalf("Error receiving info from server: %v\n", err)
|
|
}
|
|
if !strings.HasPrefix(l, "PONG\r\n") {
|
|
t.Fatalf("PONG response incorrect: %q\n", l)
|
|
}
|
|
// Expect just a pong, no match should exist here..
|
|
l, _ = cr.ReadString('\n')
|
|
if !strings.HasPrefix(l, "PONG\r\n") {
|
|
t.Fatalf("PONG response was expected, got: %q\n", l)
|
|
}
|
|
}
|
|
|
|
func TestUnsubRace(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
url := fmt.Sprintf("nats://%s:%d",
|
|
s.getOpts().Host,
|
|
s.Addr().(*net.TCPAddr).Port,
|
|
)
|
|
nc, err := nats.Connect(url)
|
|
if err != nil {
|
|
t.Fatalf("Error creating client to %s: %v\n", url, err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
ncp, err := nats.Connect(url)
|
|
if err != nil {
|
|
t.Fatalf("Error creating client: %v\n", err)
|
|
}
|
|
defer ncp.Close()
|
|
|
|
sub, _ := nc.Subscribe("foo", func(m *nats.Msg) {
|
|
// Just eat it..
|
|
})
|
|
nc.Flush()
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
wg.Add(1)
|
|
|
|
go func() {
|
|
for i := 0; i < 10000; i++ {
|
|
ncp.Publish("foo", []byte("hello"))
|
|
}
|
|
wg.Done()
|
|
}()
|
|
|
|
time.Sleep(5 * time.Millisecond)
|
|
|
|
sub.Unsubscribe()
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
func TestTLSCloseClientConnection(t *testing.T) {
|
|
opts, err := ProcessConfigFile("./configs/tls.conf")
|
|
if err != nil {
|
|
t.Fatalf("Error processing config file: %v", err)
|
|
}
|
|
opts.TLSTimeout = 100
|
|
opts.NoLog = true
|
|
opts.NoSigs = true
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
endpoint := fmt.Sprintf("%s:%d", opts.Host, opts.Port)
|
|
conn, err := net.DialTimeout("tcp", endpoint, 2*time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error on dial: %v", err)
|
|
}
|
|
defer conn.Close()
|
|
br := bufio.NewReaderSize(conn, 100)
|
|
if _, err := br.ReadString('\n'); err != nil {
|
|
t.Fatalf("Unexpected error reading INFO: %v", err)
|
|
}
|
|
|
|
tlsConn := tls.Client(conn, &tls.Config{InsecureSkipVerify: true})
|
|
defer tlsConn.Close()
|
|
if err := tlsConn.Handshake(); err != nil {
|
|
t.Fatalf("Unexpected error during handshake: %v", err)
|
|
}
|
|
br = bufio.NewReaderSize(tlsConn, 100)
|
|
connectOp := []byte("CONNECT {\"user\":\"derek\",\"pass\":\"foo\",\"verbose\":false,\"pedantic\":false,\"tls_required\":true}\r\n")
|
|
if _, err := tlsConn.Write(connectOp); err != nil {
|
|
t.Fatalf("Unexpected error writing CONNECT: %v", err)
|
|
}
|
|
if _, err := tlsConn.Write([]byte("PING\r\n")); err != nil {
|
|
t.Fatalf("Unexpected error writing PING: %v", err)
|
|
}
|
|
if _, err := br.ReadString('\n'); err != nil {
|
|
t.Fatalf("Unexpected error reading PONG: %v", err)
|
|
}
|
|
|
|
// Check that client is registered.
|
|
checkClientsCount(t, s, 1)
|
|
var cli *client
|
|
s.mu.Lock()
|
|
for _, c := range s.clients {
|
|
cli = c
|
|
break
|
|
}
|
|
s.mu.Unlock()
|
|
if cli == nil {
|
|
t.Fatal("Did not register client on time")
|
|
}
|
|
// Test GetTLSConnectionState
|
|
state := cli.GetTLSConnectionState()
|
|
if state == nil {
|
|
t.Error("GetTLSConnectionState() returned nil")
|
|
}
|
|
|
|
// Test RemoteAddress
|
|
addr := cli.RemoteAddress()
|
|
if addr == nil {
|
|
t.Error("RemoteAddress() returned nil")
|
|
}
|
|
|
|
if addr.(*net.TCPAddr).IP.String() != "127.0.0.1" {
|
|
t.Error("RemoteAddress() returned incorrect ip " + addr.String())
|
|
}
|
|
|
|
// Fill the buffer. Need to send 1 byte at a time so that we timeout here
|
|
// the nc.Close() would block due to a write that can not complete.
|
|
done := false
|
|
for !done {
|
|
cli.nc.SetWriteDeadline(time.Now().Add(time.Second))
|
|
if _, err := cli.nc.Write([]byte("a")); err != nil {
|
|
done = true
|
|
}
|
|
cli.nc.SetWriteDeadline(time.Time{})
|
|
}
|
|
ch := make(chan bool)
|
|
go func() {
|
|
select {
|
|
case <-ch:
|
|
return
|
|
case <-time.After(3 * time.Second):
|
|
fmt.Println("!!!! closeConnection is blocked, test will hang !!!")
|
|
return
|
|
}
|
|
}()
|
|
// Close the client
|
|
cli.closeConnection(ClientClosed)
|
|
ch <- true
|
|
}
|
|
|
|
// This tests issue #558
|
|
func TestWildcardCharsInLiteralSubjectWorks(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
ch := make(chan bool, 1)
|
|
// This subject is a literal even though it contains `*` and `>`,
|
|
// they are not treated as wildcards.
|
|
subj := "foo.bar,*,>,baz"
|
|
cb := func(_ *nats.Msg) {
|
|
ch <- true
|
|
}
|
|
for i := 0; i < 2; i++ {
|
|
sub, err := nc.Subscribe(subj, cb)
|
|
if err != nil {
|
|
t.Fatalf("Error on subscribe: %v", err)
|
|
}
|
|
if err := nc.Flush(); err != nil {
|
|
t.Fatalf("Error on flush: %v", err)
|
|
}
|
|
if err := nc.LastError(); err != nil {
|
|
t.Fatalf("Server reported error: %v", err)
|
|
}
|
|
if err := nc.Publish(subj, []byte("msg")); err != nil {
|
|
t.Fatalf("Error on publish: %v", err)
|
|
}
|
|
select {
|
|
case <-ch:
|
|
case <-time.After(time.Second):
|
|
t.Fatalf("Should have received the message")
|
|
}
|
|
if err := sub.Unsubscribe(); err != nil {
|
|
t.Fatalf("Error on unsubscribe: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestDynamicBuffers(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
// Grab the client from server.
|
|
s.mu.Lock()
|
|
lc := len(s.clients)
|
|
c := s.clients[s.gcid]
|
|
s.mu.Unlock()
|
|
|
|
if lc != 1 {
|
|
t.Fatalf("Expected only 1 client but got %d\n", lc)
|
|
}
|
|
if c == nil {
|
|
t.Fatal("Expected to retrieve client\n")
|
|
}
|
|
|
|
// Create some helper functions and data structures.
|
|
done := make(chan bool) // Used to stop recording.
|
|
type maxv struct{ rsz, wsz int32 } // Used to hold max values.
|
|
results := make(chan maxv)
|
|
|
|
// stopRecording stops the recording ticker and releases go routine.
|
|
stopRecording := func() maxv {
|
|
done <- true
|
|
return <-results
|
|
}
|
|
// max just grabs max values.
|
|
max := func(a, b int32) int32 {
|
|
if a > b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
// Returns current value of the buffer sizes.
|
|
getBufferSizes := func() (int32, int32) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
return c.in.rsz, c.out.sz
|
|
}
|
|
// Record the max values seen.
|
|
recordMaxBufferSizes := func() {
|
|
ticker := time.NewTicker(10 * time.Microsecond)
|
|
defer ticker.Stop()
|
|
|
|
var m maxv
|
|
|
|
recordMax := func() {
|
|
rsz, wsz := getBufferSizes()
|
|
m.rsz = max(m.rsz, rsz)
|
|
m.wsz = max(m.wsz, wsz)
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-done:
|
|
recordMax()
|
|
results <- m
|
|
return
|
|
case <-ticker.C:
|
|
recordMax()
|
|
}
|
|
}
|
|
}
|
|
// Check that the current value is what we expected.
|
|
checkBuffers := func(ers, ews int32) {
|
|
t.Helper()
|
|
rsz, wsz := getBufferSizes()
|
|
if rsz != ers {
|
|
t.Fatalf("Expected read buffer of %d, but got %d\n", ers, rsz)
|
|
}
|
|
if wsz != ews {
|
|
t.Fatalf("Expected write buffer of %d, but got %d\n", ews, wsz)
|
|
}
|
|
}
|
|
|
|
// Check that the max was as expected.
|
|
checkResults := func(m maxv, rsz, wsz int32) {
|
|
t.Helper()
|
|
if rsz != m.rsz {
|
|
t.Fatalf("Expected read buffer of %d, but got %d\n", rsz, m.rsz)
|
|
}
|
|
if wsz != m.wsz {
|
|
t.Fatalf("Expected write buffer of %d, but got %d\n", wsz, m.wsz)
|
|
}
|
|
}
|
|
|
|
// Here is where testing begins..
|
|
|
|
// Should be at or below the startBufSize for both.
|
|
rsz, wsz := getBufferSizes()
|
|
if rsz > startBufSize {
|
|
t.Fatalf("Expected read buffer of <= %d, but got %d\n", startBufSize, rsz)
|
|
}
|
|
if wsz > startBufSize {
|
|
t.Fatalf("Expected write buffer of <= %d, but got %d\n", startBufSize, wsz)
|
|
}
|
|
|
|
// Send some data.
|
|
data := make([]byte, 2048)
|
|
rand.Read(data)
|
|
|
|
go recordMaxBufferSizes()
|
|
for i := 0; i < 200; i++ {
|
|
nc.Publish("foo", data)
|
|
}
|
|
nc.Flush()
|
|
m := stopRecording()
|
|
|
|
if m.rsz != maxBufSize && m.rsz != maxBufSize/2 {
|
|
t.Fatalf("Expected read buffer of %d or %d, but got %d\n", maxBufSize, maxBufSize/2, m.rsz)
|
|
}
|
|
if m.wsz > startBufSize {
|
|
t.Fatalf("Expected write buffer of <= %d, but got %d\n", startBufSize, m.wsz)
|
|
}
|
|
|
|
// Create Subscription to test outbound buffer from server.
|
|
nc.Subscribe("foo", func(m *nats.Msg) {
|
|
// Just eat it..
|
|
})
|
|
go recordMaxBufferSizes()
|
|
|
|
for i := 0; i < 200; i++ {
|
|
nc.Publish("foo", data)
|
|
}
|
|
nc.Flush()
|
|
|
|
m = stopRecording()
|
|
checkResults(m, maxBufSize, maxBufSize)
|
|
|
|
// Now test that we shrink correctly.
|
|
|
|
// Should go to minimum for both..
|
|
for i := 0; i < 20; i++ {
|
|
nc.Flush()
|
|
}
|
|
checkBuffers(minBufSize, minBufSize)
|
|
}
|
|
|
|
// Similar to the routed version. Make sure we receive all of the
|
|
// messages with auto-unsubscribe enabled.
|
|
func TestQueueAutoUnsubscribe(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
rbar := int32(0)
|
|
barCb := func(m *nats.Msg) {
|
|
atomic.AddInt32(&rbar, 1)
|
|
}
|
|
rbaz := int32(0)
|
|
bazCb := func(m *nats.Msg) {
|
|
atomic.AddInt32(&rbaz, 1)
|
|
}
|
|
|
|
// Create 1000 subscriptions with auto-unsubscribe of 1.
|
|
// Do two groups, one bar and one baz.
|
|
for i := 0; i < 1000; i++ {
|
|
qsub, err := nc.QueueSubscribe("foo", "bar", barCb)
|
|
if err != nil {
|
|
t.Fatalf("Error on subscribe: %v", err)
|
|
}
|
|
if err := qsub.AutoUnsubscribe(1); err != nil {
|
|
t.Fatalf("Error on auto-unsubscribe: %v", err)
|
|
}
|
|
qsub, err = nc.QueueSubscribe("foo", "baz", bazCb)
|
|
if err != nil {
|
|
t.Fatalf("Error on subscribe: %v", err)
|
|
}
|
|
if err := qsub.AutoUnsubscribe(1); err != nil {
|
|
t.Fatalf("Error on auto-unsubscribe: %v", err)
|
|
}
|
|
}
|
|
nc.Flush()
|
|
|
|
expected := int32(1000)
|
|
for i := int32(0); i < expected; i++ {
|
|
nc.Publish("foo", []byte("Don't Drop Me!"))
|
|
}
|
|
nc.Flush()
|
|
|
|
checkFor(t, 5*time.Second, 10*time.Millisecond, func() error {
|
|
nbar := atomic.LoadInt32(&rbar)
|
|
nbaz := atomic.LoadInt32(&rbaz)
|
|
if nbar == expected && nbaz == expected {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("Did not receive all %d queue messages, received %d for 'bar' and %d for 'baz'",
|
|
expected, atomic.LoadInt32(&rbar), atomic.LoadInt32(&rbaz))
|
|
})
|
|
}
|
|
|
|
func TestClientTraceRace(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
// Activate trace logging
|
|
s.SetLogger(&DummyLogger{}, false, true)
|
|
|
|
nc1, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc1.Close()
|
|
total := 10000
|
|
count := 0
|
|
ch := make(chan bool, 1)
|
|
if _, err := nc1.Subscribe("foo", func(_ *nats.Msg) {
|
|
count++
|
|
if count == total {
|
|
ch <- true
|
|
}
|
|
}); err != nil {
|
|
t.Fatalf("Error on subscribe: %v", err)
|
|
}
|
|
nc2, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc2.Close()
|
|
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for i := 0; i < total; i++ {
|
|
nc1.Publish("bar", []byte("hello"))
|
|
}
|
|
}()
|
|
for i := 0; i < total; i++ {
|
|
nc2.Publish("foo", []byte("hello"))
|
|
}
|
|
if err := wait(ch); err != nil {
|
|
t.Fatal("Did not get all our messages")
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
func TestClientUserInfo(t *testing.T) {
|
|
pnkey := "UD6AYQSOIN2IN5OGC6VQZCR4H3UFMIOXSW6NNS6N53CLJA4PB56CEJJI"
|
|
c := &client{
|
|
cid: 1024,
|
|
opts: clientOpts{
|
|
Nkey: pnkey,
|
|
},
|
|
}
|
|
got := c.getAuthUser()
|
|
expected := `Nkey "UD6AYQSOIN2IN5OGC6VQZCR4H3UFMIOXSW6NNS6N53CLJA4PB56CEJJI"`
|
|
if got != expected {
|
|
t.Errorf("Expected %q, got %q", expected, got)
|
|
}
|
|
|
|
c = &client{
|
|
cid: 1024,
|
|
opts: clientOpts{
|
|
Username: "foo",
|
|
},
|
|
}
|
|
got = c.getAuthUser()
|
|
expected = `User "foo"`
|
|
if got != expected {
|
|
t.Errorf("Expected %q, got %q", expected, got)
|
|
}
|
|
|
|
c = &client{
|
|
cid: 1024,
|
|
opts: clientOpts{},
|
|
}
|
|
got = c.getAuthUser()
|
|
expected = `User "N/A"`
|
|
if got != expected {
|
|
t.Errorf("Expected %q, got %q", expected, got)
|
|
}
|
|
}
|
|
|
|
type captureWarnLogger struct {
|
|
DummyLogger
|
|
warn chan string
|
|
}
|
|
|
|
func (l *captureWarnLogger) Warnf(format string, v ...interface{}) {
|
|
select {
|
|
case l.warn <- fmt.Sprintf(format, v...):
|
|
default:
|
|
}
|
|
}
|
|
|
|
type pauseWriteConn struct {
|
|
net.Conn
|
|
}
|
|
|
|
func (swc *pauseWriteConn) Write(b []byte) (int, error) {
|
|
time.Sleep(250 * time.Millisecond)
|
|
return swc.Conn.Write(b)
|
|
}
|
|
|
|
func TestReadloopWarning(t *testing.T) {
|
|
readLoopReportThreshold = 100 * time.Millisecond
|
|
defer func() { readLoopReportThreshold = readLoopReport }()
|
|
|
|
opts := DefaultOptions()
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
l := &captureWarnLogger{warn: make(chan string, 1)}
|
|
s.SetLogger(l, false, false)
|
|
|
|
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
|
nc := natsConnect(t, url)
|
|
defer nc.Close()
|
|
natsSubSync(t, nc, "foo")
|
|
cid, _ := nc.GetClientID()
|
|
|
|
sender := natsConnect(t, url)
|
|
defer sender.Close()
|
|
|
|
c := s.getClient(cid)
|
|
c.mu.Lock()
|
|
c.nc = &pauseWriteConn{Conn: c.nc}
|
|
c.mu.Unlock()
|
|
|
|
natsPub(t, nc, "foo", make([]byte, 100))
|
|
natsFlush(t, nc)
|
|
|
|
select {
|
|
case warn := <-l.warn:
|
|
if !strings.Contains(warn, "Readloop") {
|
|
t.Fatalf("unexpected warning: %v", warn)
|
|
}
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatalf("No warning printed")
|
|
}
|
|
}
|
|
|
|
func TestTraceMsg(t *testing.T) {
|
|
c := &client{}
|
|
// Enable message trace
|
|
c.trace = true
|
|
|
|
cases := []struct {
|
|
Desc string
|
|
Msg []byte
|
|
Wanted string
|
|
MaxTracedMsgLen int
|
|
}{
|
|
{
|
|
Desc: "normal length",
|
|
Msg: []byte(fmt.Sprintf("normal%s", CR_LF)),
|
|
Wanted: " - <<- MSG_PAYLOAD: [\"normal\"]",
|
|
MaxTracedMsgLen: 10,
|
|
},
|
|
{
|
|
Desc: "over length",
|
|
Msg: []byte(fmt.Sprintf("over length%s", CR_LF)),
|
|
Wanted: " - <<- MSG_PAYLOAD: [\"over lengt...\"]",
|
|
MaxTracedMsgLen: 10,
|
|
},
|
|
{
|
|
Desc: "unlimited length",
|
|
Msg: []byte(fmt.Sprintf("unlimited length%s", CR_LF)),
|
|
Wanted: " - <<- MSG_PAYLOAD: [\"unlimited length\"]",
|
|
MaxTracedMsgLen: 0,
|
|
},
|
|
{
|
|
Desc: "negative max traced msg len",
|
|
Msg: []byte(fmt.Sprintf("negative max traced msg len%s", CR_LF)),
|
|
Wanted: " - <<- MSG_PAYLOAD: [\"negative max traced msg len\"]",
|
|
MaxTracedMsgLen: -1,
|
|
},
|
|
}
|
|
|
|
for _, ut := range cases {
|
|
c.srv = &Server{
|
|
opts: &Options{MaxTracedMsgLen: ut.MaxTracedMsgLen},
|
|
}
|
|
c.srv.SetLogger(&DummyLogger{}, true, true)
|
|
|
|
c.traceMsg(ut.Msg)
|
|
|
|
got := c.srv.logging.logger.(*DummyLogger).msg
|
|
if !reflect.DeepEqual(ut.Wanted, got) {
|
|
t.Errorf("Desc: %s. Msg %q. Traced msg want: %s, got: %s", ut.Desc, ut.Msg, ut.Wanted, got)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestClientMaxPending(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
opts.MaxPending = math.MaxInt32 + 1
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
nc := natsConnect(t, fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
|
|
defer nc.Close()
|
|
|
|
sub := natsSubSync(t, nc, "foo")
|
|
natsPub(t, nc, "foo", []byte("msg"))
|
|
natsNexMsg(t, sub, 100*time.Millisecond)
|
|
}
|
|
|
|
func TestResponsePermissions(t *testing.T) {
|
|
for i, test := range []struct {
|
|
name string
|
|
perms *ResponsePermission
|
|
}{
|
|
{"max_msgs", &ResponsePermission{MaxMsgs: 2, Expires: time.Hour}},
|
|
{"no_expire_limit", &ResponsePermission{MaxMsgs: 3, Expires: -1 * time.Millisecond}},
|
|
{"expire", &ResponsePermission{MaxMsgs: 1000, Expires: 100 * time.Millisecond}},
|
|
{"no_msgs_limit", &ResponsePermission{MaxMsgs: -1, Expires: 100 * time.Millisecond}},
|
|
} {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
u1 := &User{
|
|
Username: "service",
|
|
Password: "pwd",
|
|
Permissions: &Permissions{Response: test.perms},
|
|
}
|
|
u2 := &User{Username: "ivan", Password: "pwd"}
|
|
opts.Users = []*User{u1, u2}
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
svcNC := natsConnect(t, fmt.Sprintf("nats://service:pwd@%s:%d", opts.Host, opts.Port))
|
|
defer svcNC.Close()
|
|
reqSub := natsSubSync(t, svcNC, "request")
|
|
|
|
nc := natsConnect(t, fmt.Sprintf("nats://ivan:pwd@%s:%d", opts.Host, opts.Port))
|
|
defer nc.Close()
|
|
|
|
replySub := natsSubSync(t, nc, "reply")
|
|
|
|
natsPubReq(t, nc, "request", "reply", []byte("req1"))
|
|
|
|
req1 := natsNexMsg(t, reqSub, 100*time.Millisecond)
|
|
|
|
checkFailed := func(t *testing.T) {
|
|
t.Helper()
|
|
if reply, err := replySub.NextMsg(100 * time.Millisecond); err != nats.ErrTimeout {
|
|
if reply != nil {
|
|
t.Fatalf("Expected to receive timeout, got reply=%q", reply.Data)
|
|
} else {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
switch i {
|
|
case 0:
|
|
// Should allow only 2 replies...
|
|
for i := 0; i < 10; i++ {
|
|
natsPub(t, svcNC, req1.Reply, []byte("reply"))
|
|
}
|
|
natsNexMsg(t, replySub, 100*time.Millisecond)
|
|
natsNexMsg(t, replySub, 100*time.Millisecond)
|
|
// The next should fail...
|
|
checkFailed(t)
|
|
case 1:
|
|
// Expiration is set to -1ms, which should count as infinite...
|
|
natsPub(t, svcNC, req1.Reply, []byte("reply"))
|
|
// Sleep a bit before next send
|
|
time.Sleep(50 * time.Millisecond)
|
|
natsPub(t, svcNC, req1.Reply, []byte("reply"))
|
|
// Make sure we receive both
|
|
natsNexMsg(t, replySub, 100*time.Millisecond)
|
|
natsNexMsg(t, replySub, 100*time.Millisecond)
|
|
case 2:
|
|
fallthrough
|
|
case 3:
|
|
// Expire set to 100ms so make sure we wait more between
|
|
// next publish
|
|
natsPub(t, svcNC, req1.Reply, []byte("reply"))
|
|
time.Sleep(200 * time.Millisecond)
|
|
natsPub(t, svcNC, req1.Reply, []byte("reply"))
|
|
// Should receive one, and fail on the other
|
|
natsNexMsg(t, replySub, 100*time.Millisecond)
|
|
checkFailed(t)
|
|
}
|
|
// When testing expiration, sleep before sending next reply
|
|
if i >= 2 {
|
|
time.Sleep(400 * time.Millisecond)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestPingNotSentTooSoon(t *testing.T) {
|
|
opts := DefaultOptions()
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
doneCh := make(chan bool, 1)
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for {
|
|
s.Connz(nil)
|
|
select {
|
|
case <-doneCh:
|
|
return
|
|
case <-time.After(time.Millisecond):
|
|
}
|
|
}
|
|
}()
|
|
|
|
for i := 0; i < 100; i++ {
|
|
nc, err := nats.Connect(s.ClientURL())
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
nc.Close()
|
|
}
|
|
close(doneCh)
|
|
wg.Wait()
|
|
|
|
c, br, _ := newClientForServer(s)
|
|
connectOp := []byte("CONNECT {\"user\":\"ivan\",\"pass\":\"bar\"}\r\n")
|
|
c.parse(connectOp)
|
|
|
|
// Since client has not send PING, having server try to send RTT ping
|
|
// to client should not do anything
|
|
if c.sendRTTPing() {
|
|
t.Fatalf("RTT ping should not have been sent")
|
|
}
|
|
// Speed up detection of time elapsed by moving the c.start to more than
|
|
// 2 secs in the past.
|
|
c.mu.Lock()
|
|
c.start = time.Unix(0, c.start.UnixNano()-int64(maxNoRTTPingBeforeFirstPong+time.Second))
|
|
c.mu.Unlock()
|
|
|
|
errCh := make(chan error, 1)
|
|
go func() {
|
|
l, _ := br.ReadString('\n')
|
|
if l != "PING\r\n" {
|
|
errCh <- fmt.Errorf("expected to get PING, got %s", l)
|
|
return
|
|
}
|
|
errCh <- nil
|
|
}()
|
|
if !c.sendRTTPing() {
|
|
t.Fatalf("RTT ping should have been sent")
|
|
}
|
|
wg.Wait()
|
|
if e := <-errCh; e != nil {
|
|
t.Fatal(e.Error())
|
|
}
|
|
}
|