Files
nats-server/server/events_test.go
Ivan Kozlovic b69ffe244e Fixed some tests
Code change:
- Do not start the processMirrorMsgs and processSourceMsgs go routine
if the server has been detected to be shutdown. This would otherwise
leave some go routine running at the end of some tests.
- Pass the fch and qch to the consumerFileStore's flushLoop otherwise
in some tests this routine could be left running.

Tests changes:
- Added missing defer NATS connection close
- Added missing defer server shutdown

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
2022-09-08 11:28:23 -06:00

2501 lines
73 KiB
Go

// Copyright 2018-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"os"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/nats-io/jwt/v2"
"github.com/nats-io/nats.go"
"github.com/nats-io/nkeys"
)
func createAccount(s *Server) (*Account, nkeys.KeyPair) {
okp, _ := nkeys.FromSeed(oSeed)
akp, _ := nkeys.CreateAccount()
pub, _ := akp.PublicKey()
nac := jwt.NewAccountClaims(pub)
jwt, _ := nac.Encode(okp)
addAccountToMemResolver(s, pub, jwt)
acc, err := s.LookupAccount(pub)
if err != nil {
panic(err)
}
return acc, akp
}
func createUserCredsEx(t *testing.T, nuc *jwt.UserClaims, akp nkeys.KeyPair) nats.Option {
t.Helper()
kp, _ := nkeys.CreateUser()
nuc.Subject, _ = kp.PublicKey()
ujwt, err := nuc.Encode(akp)
if err != nil {
t.Fatalf("Error generating user JWT: %v", err)
}
userCB := func() (string, error) {
return ujwt, nil
}
sigCB := func(nonce []byte) ([]byte, error) {
sig, _ := kp.Sign(nonce)
return sig, nil
}
return nats.UserJWT(userCB, sigCB)
}
func createUserCreds(t *testing.T, s *Server, akp nkeys.KeyPair) nats.Option {
return createUserCredsEx(t, jwt.NewUserClaims("test"), akp)
}
func runTrustedServer(t *testing.T) (*Server, *Options) {
t.Helper()
opts := DefaultOptions()
kp, _ := nkeys.FromSeed(oSeed)
pub, _ := kp.PublicKey()
opts.TrustedKeys = []string{pub}
opts.AccountResolver = &MemAccResolver{}
s := RunServer(opts)
return s, opts
}
func runTrustedCluster(t *testing.T) (*Server, *Options, *Server, *Options, nkeys.KeyPair) {
t.Helper()
kp, _ := nkeys.FromSeed(oSeed)
pub, _ := kp.PublicKey()
mr := &MemAccResolver{}
// Now create a system account.
// NOTE: This can NOT be shared directly between servers.
// Set via server options.
okp, _ := nkeys.FromSeed(oSeed)
akp, _ := nkeys.CreateAccount()
apub, _ := akp.PublicKey()
nac := jwt.NewAccountClaims(apub)
jwt, _ := nac.Encode(okp)
mr.Store(apub, jwt)
optsA := DefaultOptions()
optsA.Cluster.Name = "TEST CLUSTER 22"
optsA.Cluster.Host = "127.0.0.1"
optsA.TrustedKeys = []string{pub}
optsA.AccountResolver = mr
optsA.SystemAccount = apub
optsA.ServerName = "A_SRV"
// Add in dummy gateway
optsA.Gateway.Name = "TEST CLUSTER 22"
optsA.Gateway.Host = "127.0.0.1"
optsA.Gateway.Port = -1
optsA.gatewaysSolicitDelay = 30 * time.Second
sa := RunServer(optsA)
optsB := nextServerOpts(optsA)
optsB.ServerName = "B_SRV"
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host, optsA.Cluster.Port))
sb := RunServer(optsB)
checkClusterFormed(t, sa, sb)
return sa, optsA, sb, optsB, akp
}
func runTrustedGateways(t *testing.T) (*Server, *Options, *Server, *Options, nkeys.KeyPair) {
t.Helper()
kp, _ := nkeys.FromSeed(oSeed)
pub, _ := kp.PublicKey()
mr := &MemAccResolver{}
// Now create a system account.
// NOTE: This can NOT be shared directly between servers.
// Set via server options.
okp, _ := nkeys.FromSeed(oSeed)
akp, _ := nkeys.CreateAccount()
apub, _ := akp.PublicKey()
nac := jwt.NewAccountClaims(apub)
jwt, _ := nac.Encode(okp)
mr.Store(apub, jwt)
optsA := testDefaultOptionsForGateway("A")
optsA.Cluster.Name = "A"
optsA.Cluster.Host = "127.0.0.1"
optsA.TrustedKeys = []string{pub}
optsA.AccountResolver = mr
optsA.SystemAccount = apub
sa := RunServer(optsA)
optsB := testGatewayOptionsFromToWithServers(t, "B", "A", sa)
optsB.Cluster.Name = "B"
optsB.TrustedKeys = []string{pub}
optsB.AccountResolver = mr
optsB.SystemAccount = apub
sb := RunServer(optsB)
waitForInboundGateways(t, sa, 1, time.Second)
waitForOutboundGateways(t, sa, 1, time.Second)
waitForInboundGateways(t, sb, 1, time.Second)
waitForOutboundGateways(t, sb, 1, time.Second)
return sa, optsA, sb, optsB, akp
}
func TestSystemAccount(t *testing.T) {
s, _ := runTrustedServer(t)
defer s.Shutdown()
acc, _ := createAccount(s)
s.setSystemAccount(acc)
s.mu.Lock()
defer s.mu.Unlock()
if s.sys == nil || s.sys.account == nil {
t.Fatalf("Expected sys.account to be non-nil")
}
if s.sys.client == nil {
t.Fatalf("Expected sys.client to be non-nil")
}
s.sys.client.mu.Lock()
defer s.sys.client.mu.Unlock()
if s.sys.client.echo {
t.Fatalf("Internal clients should always have echo false")
}
}
func TestSystemAccountNewConnection(t *testing.T) {
s, opts := runTrustedServer(t)
defer s.Shutdown()
acc, akp := createAccount(s)
s.setSystemAccount(acc)
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
ncs, err := nats.Connect(url, createUserCreds(t, s, akp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer ncs.Close()
// We may not be able to hear ourselves (if the event is processed
// before we create the sub), so we need to create a second client to
// trigger the connect/disconnect events.
acc2, akp2 := createAccount(s)
// Be explicit to only receive the event for acc2.
sub, _ := ncs.SubscribeSync(fmt.Sprintf("$SYS.ACCOUNT.%s.>", acc2.Name))
defer sub.Unsubscribe()
ncs.Flush()
nc, err := nats.Connect(url, createUserCreds(t, s, akp2), nats.Name("TEST EVENTS"))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
connsMsg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
if strings.HasPrefix(msg.Subject, fmt.Sprintf("$SYS.ACCOUNT.%s.SERVER.CONNS", acc2.Name)) {
msg, connsMsg = connsMsg, msg
}
if !strings.HasPrefix(connsMsg.Subject, fmt.Sprintf("$SYS.ACCOUNT.%s.SERVER.CONNS", acc2.Name)) {
t.Fatalf("Expected subject to start with %q, got %q", "$SYS.ACCOUNT.<account>.CONNECT", msg.Subject)
}
conns := AccountNumConns{}
if err := json.Unmarshal(connsMsg.Data, &conns); err != nil {
t.Fatalf("Error unmarshalling conns event message: %v", err)
} else if conns.Account != acc2.Name {
t.Fatalf("Wrong account in conns message: %v", conns)
} else if conns.Conns != 1 || conns.TotalConns != 1 || conns.LeafNodes != 0 {
t.Fatalf("Wrong counts in conns message: %v", conns)
}
if !strings.HasPrefix(msg.Subject, fmt.Sprintf("$SYS.ACCOUNT.%s.CONNECT", acc2.Name)) {
t.Fatalf("Expected subject to start with %q, got %q", "$SYS.ACCOUNT.<account>.CONNECT", msg.Subject)
}
tokens := strings.Split(msg.Subject, ".")
if len(tokens) < 4 {
t.Fatalf("Expected 4 tokens, got %d", len(tokens))
}
account := tokens[2]
if account != acc2.Name {
t.Fatalf("Expected %q for account, got %q", acc2.Name, account)
}
cem := ConnectEventMsg{}
if err := json.Unmarshal(msg.Data, &cem); err != nil {
t.Fatalf("Error unmarshalling connect event message: %v", err)
}
if cem.Type != ConnectEventMsgType {
t.Fatalf("Incorrect schema in connect event: %s", cem.Type)
}
if cem.Time.IsZero() {
t.Fatalf("Event time is not set")
}
if len(cem.ID) != 22 {
t.Fatalf("Event ID is incorrectly set to len %d", len(cem.ID))
}
if cem.Server.ID != s.ID() {
t.Fatalf("Expected server to be %q, got %q", s.ID(), cem.Server.ID)
}
if cem.Server.Seq == 0 {
t.Fatalf("Expected sequence to be non-zero")
}
if cem.Client.Name != "TEST EVENTS" {
t.Fatalf("Expected client name to be %q, got %q", "TEST EVENTS", cem.Client.Name)
}
if cem.Client.Lang != "go" {
t.Fatalf("Expected client lang to be \"go\", got %q", cem.Client.Lang)
}
// Now close the other client. Should fire a disconnect event.
// First send and receive some messages.
sub2, _ := nc.SubscribeSync("foo")
defer sub2.Unsubscribe()
sub3, _ := nc.SubscribeSync("*")
defer sub3.Unsubscribe()
for i := 0; i < 10; i++ {
nc.Publish("foo", []byte("HELLO WORLD"))
}
nc.Flush()
nc.Close()
msg, err = sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
connsMsg, err = sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
if strings.HasPrefix(msg.Subject, fmt.Sprintf("$SYS.ACCOUNT.%s.SERVER.CONNS", acc2.Name)) {
msg, connsMsg = connsMsg, msg
}
if !strings.HasPrefix(connsMsg.Subject, fmt.Sprintf("$SYS.ACCOUNT.%s.SERVER.CONNS", acc2.Name)) {
t.Fatalf("Expected subject to start with %q, got %q", "$SYS.ACCOUNT.<account>.CONNECT", msg.Subject)
} else if !strings.Contains(string(connsMsg.Data), `"total_conns":0`) {
t.Fatalf("Expected event to reflect created connection, got: %s", string(connsMsg.Data))
}
conns = AccountNumConns{}
if err := json.Unmarshal(connsMsg.Data, &conns); err != nil {
t.Fatalf("Error unmarshalling conns event message: %v", err)
} else if conns.Account != acc2.Name {
t.Fatalf("Wrong account in conns message: %v", conns)
} else if conns.Conns != 0 || conns.TotalConns != 0 || conns.LeafNodes != 0 {
t.Fatalf("Wrong counts in conns message: %v", conns)
}
if !strings.HasPrefix(msg.Subject, fmt.Sprintf("$SYS.ACCOUNT.%s.DISCONNECT", acc2.Name)) {
t.Fatalf("Expected subject to start with %q, got %q", "$SYS.ACCOUNT.<account>.DISCONNECT", msg.Subject)
}
tokens = strings.Split(msg.Subject, ".")
if len(tokens) < 4 {
t.Fatalf("Expected 4 tokens, got %d", len(tokens))
}
account = tokens[2]
if account != acc2.Name {
t.Fatalf("Expected %q for account, got %q", acc2.Name, account)
}
dem := DisconnectEventMsg{}
if err := json.Unmarshal(msg.Data, &dem); err != nil {
t.Fatalf("Error unmarshalling disconnect event message: %v", err)
}
if dem.Type != DisconnectEventMsgType {
t.Fatalf("Incorrect schema in connect event: %s", cem.Type)
}
if dem.Time.IsZero() {
t.Fatalf("Event time is not set")
}
if len(dem.ID) != 22 {
t.Fatalf("Event ID is incorrectly set to len %d", len(cem.ID))
}
if dem.Server.ID != s.ID() {
t.Fatalf("Expected server to be %q, got %q", s.ID(), dem.Server.ID)
}
if dem.Server.Seq == 0 {
t.Fatalf("Expected sequence to be non-zero")
}
if dem.Server.Seq <= cem.Server.Seq {
t.Fatalf("Expected sequence to be increasing")
}
if cem.Client.Name != "TEST EVENTS" {
t.Fatalf("Expected client name to be %q, got %q", "TEST EVENTS", dem.Client.Name)
}
if dem.Client.Lang != "go" {
t.Fatalf("Expected client lang to be \"go\", got %q", dem.Client.Lang)
}
if dem.Sent.Msgs != 10 {
t.Fatalf("Expected 10 msgs sent, got %d", dem.Sent.Msgs)
}
if dem.Sent.Bytes != 110 {
t.Fatalf("Expected 110 bytes sent, got %d", dem.Sent.Bytes)
}
if dem.Received.Msgs != 20 {
t.Fatalf("Expected 20 msgs received, got %d", dem.Sent.Msgs)
}
if dem.Received.Bytes != 220 {
t.Fatalf("Expected 220 bytes sent, got %d", dem.Sent.Bytes)
}
}
func runTrustedLeafServer(t *testing.T) (*Server, *Options) {
t.Helper()
opts := DefaultOptions()
kp, _ := nkeys.FromSeed(oSeed)
pub, _ := kp.PublicKey()
opts.TrustedKeys = []string{pub}
opts.AccountResolver = &MemAccResolver{}
opts.LeafNode.Port = -1
s := RunServer(opts)
return s, opts
}
func genCredsFile(t *testing.T, jwt string, seed []byte) string {
creds := `
-----BEGIN NATS USER JWT-----
%s
------END NATS USER JWT------
************************* IMPORTANT *************************
NKEY Seed printed below can be used to sign and prove identity.
NKEYs are sensitive and should be treated as secrets.
-----BEGIN USER NKEY SEED-----
%s
------END USER NKEY SEED------
*************************************************************
`
return createConfFile(t, []byte(strings.Replace(fmt.Sprintf(creds, jwt, seed), "\t\t", "", -1)))
}
func runSolicitWithCredentials(t *testing.T, opts *Options, creds string) (*Server, *Options, string) {
content := `
port: -1
leafnodes {
remotes = [
{
url: nats-leaf://127.0.0.1:%d
credentials: '%s'
}
]
}
`
config := fmt.Sprintf(content, opts.LeafNode.Port, creds)
conf := createConfFile(t, []byte(config))
s, opts := RunServerWithConfig(conf)
return s, opts, conf
}
// Helper function to check that a leaf node has connected to our server.
func checkLeafNodeConnected(t testing.TB, s *Server) {
t.Helper()
checkLeafNodeConnectedCount(t, s, 1)
}
// Helper function to check that a leaf node has connected to n server.
func checkLeafNodeConnectedCount(t testing.TB, s *Server, lnCons int) {
t.Helper()
checkFor(t, 5*time.Second, 15*time.Millisecond, func() error {
if nln := s.NumLeafNodes(); nln != lnCons {
return fmt.Errorf("Expected %d connected leafnode(s) for server %q, got %d",
lnCons, s.ID(), nln)
}
return nil
})
}
func TestSystemAccountingWithLeafNodes(t *testing.T) {
s, opts := runTrustedLeafServer(t)
defer s.Shutdown()
acc, akp := createAccount(s)
s.setSystemAccount(acc)
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
ncs, err := nats.Connect(url, createUserCreds(t, s, akp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer ncs.Close()
acc2, akp2 := createAccount(s)
// Be explicit to only receive the event for acc2 account.
sub, _ := ncs.SubscribeSync(fmt.Sprintf("$SYS.ACCOUNT.%s.DISCONNECT", acc2.Name))
defer sub.Unsubscribe()
ncs.Flush()
kp, _ := nkeys.CreateUser()
pub, _ := kp.PublicKey()
nuc := jwt.NewUserClaims(pub)
ujwt, err := nuc.Encode(akp2)
if err != nil {
t.Fatalf("Error generating user JWT: %v", err)
}
seed, _ := kp.Seed()
mycreds := genCredsFile(t, ujwt, seed)
defer removeFile(t, mycreds)
// Create a server that solicits a leafnode connection.
sl, slopts, lnconf := runSolicitWithCredentials(t, opts, mycreds)
defer removeFile(t, lnconf)
defer sl.Shutdown()
checkLeafNodeConnected(t, s)
// Compute the expected number of subs on "sl" based on number
// of existing subs before creating the sub on "s".
expected := int(sl.NumSubscriptions() + 1)
nc, err := nats.Connect(url, createUserCreds(t, s, akp2), nats.Name("TEST LEAFNODE EVENTS"))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
fooSub := natsSubSync(t, nc, "foo")
natsFlush(t, nc)
checkExpectedSubs(t, expected, sl)
surl := fmt.Sprintf("nats://%s:%d", slopts.Host, slopts.Port)
nc2, err := nats.Connect(surl, nats.Name("TEST LEAFNODE EVENTS"))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc2.Close()
// Compute the expected number of subs on "s" based on number
// of existing subs before creating the sub on "sl".
expected = int(s.NumSubscriptions() + 1)
m := []byte("HELLO WORLD")
// Now generate some traffic
starSub := natsSubSync(t, nc2, "*")
for i := 0; i < 10; i++ {
nc2.Publish("foo", m)
nc2.Publish("bar", m)
}
natsFlush(t, nc2)
checkExpectedSubs(t, expected, s)
// Now send some from the cluster side too.
for i := 0; i < 10; i++ {
nc.Publish("foo", m)
nc.Publish("bar", m)
}
nc.Flush()
// Make sure all messages are received
for i := 0; i < 20; i++ {
if _, err := fooSub.NextMsg(time.Second); err != nil {
t.Fatalf("Did not get message: %v", err)
}
}
for i := 0; i < 40; i++ {
if _, err := starSub.NextMsg(time.Second); err != nil {
t.Fatalf("Did not get message: %v", err)
}
}
// Now shutdown the leafnode server since this is where the event tracking should
// happen. Right now we do not track local clients to the leafnode server that
// solicited to the cluster, but we should track usage once the leafnode connection stops.
sl.Shutdown()
// Make sure we get disconnect event and that tracking is correct.
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
dem := DisconnectEventMsg{}
if err := json.Unmarshal(msg.Data, &dem); err != nil {
t.Fatalf("Error unmarshalling disconnect event message: %v", err)
}
if dem.Sent.Msgs != 10 {
t.Fatalf("Expected 10 msgs sent, got %d", dem.Sent.Msgs)
}
if dem.Sent.Bytes != 110 {
t.Fatalf("Expected 110 bytes sent, got %d", dem.Sent.Bytes)
}
if dem.Received.Msgs != 20 {
t.Fatalf("Expected 20 msgs received, got %d", dem.Received.Msgs)
}
if dem.Received.Bytes != 220 {
t.Fatalf("Expected 220 bytes sent, got %d", dem.Received.Bytes)
}
}
func TestSystemAccountDisconnectBadLogin(t *testing.T) {
s, opts := runTrustedServer(t)
defer s.Shutdown()
acc, akp := createAccount(s)
s.setSystemAccount(acc)
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
ncs, err := nats.Connect(url, createUserCreds(t, s, akp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer ncs.Close()
// We should never hear $G account events for bad logins.
sub, _ := ncs.SubscribeSync("$SYS.ACCOUNT.$G.*")
defer sub.Unsubscribe()
// Listen for auth error events though.
asub, _ := ncs.SubscribeSync("$SYS.SERVER.*.CLIENT.AUTH.ERR")
defer asub.Unsubscribe()
ncs.Flush()
nats.Connect(url, nats.Name("TEST BAD LOGIN"))
// Should not hear these.
if _, err := sub.NextMsg(100 * time.Millisecond); err == nil {
t.Fatalf("Received a disconnect message from bad login, expected none")
}
m, err := asub.NextMsg(100 * time.Millisecond)
if err != nil {
t.Fatalf("Should have heard an auth error event")
}
dem := DisconnectEventMsg{}
if err := json.Unmarshal(m.Data, &dem); err != nil {
t.Fatalf("Error unmarshalling disconnect event message: %v", err)
}
if dem.Reason != "Authentication Failure" {
t.Fatalf("Expected auth error, got %q", dem.Reason)
}
}
func TestSysSubscribeRace(t *testing.T) {
s, opts := runTrustedServer(t)
defer s.Shutdown()
acc, akp := createAccount(s)
s.setSystemAccount(acc)
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
nc, err := nats.Connect(url, createUserCreds(t, s, akp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
done := make(chan struct{})
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
nc.Publish("foo", []byte("hello"))
select {
case <-done:
return
default:
}
}
}()
time.Sleep(10 * time.Millisecond)
received := make(chan struct{})
// Create message callback handler.
cb := func(sub *subscription, producer *client, _ *Account, subject, reply string, msg []byte) {
select {
case received <- struct{}{}:
default:
}
}
// Now create an internal subscription
sub, err := s.sysSubscribe("foo", cb)
if sub == nil || err != nil {
t.Fatalf("Expected to subscribe, got %v", err)
}
select {
case <-received:
close(done)
case <-time.After(time.Second):
t.Fatalf("Did not receive the message")
}
wg.Wait()
}
func TestSystemAccountInternalSubscriptions(t *testing.T) {
s, opts := runTrustedServer(t)
defer s.Shutdown()
sub, err := s.sysSubscribe("foo", nil)
if sub != nil || err != ErrNoSysAccount {
t.Fatalf("Expected to get proper error, got %v", err)
}
acc, akp := createAccount(s)
s.setSystemAccount(acc)
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
nc, err := nats.Connect(url, createUserCreds(t, s, akp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
sub, err = s.sysSubscribe("foo", nil)
if sub != nil || err == nil {
t.Fatalf("Expected to get error for no handler, got %v", err)
}
received := make(chan *nats.Msg)
// Create message callback handler.
cb := func(sub *subscription, _ *client, _ *Account, subject, reply string, msg []byte) {
copy := append([]byte(nil), msg...)
received <- &nats.Msg{Subject: subject, Reply: reply, Data: copy}
}
// Now create an internal subscription
sub, err = s.sysSubscribe("foo", cb)
if sub == nil || err != nil {
t.Fatalf("Expected to subscribe, got %v", err)
}
// Now send out a message from our normal client.
nc.Publish("foo", []byte("HELLO WORLD"))
var msg *nats.Msg
select {
case msg = <-received:
if msg.Subject != "foo" {
t.Fatalf("Expected \"foo\" as subject, got %q", msg.Subject)
}
if msg.Reply != "" {
t.Fatalf("Expected no reply, got %q", msg.Reply)
}
if !bytes.Equal(msg.Data, []byte("HELLO WORLD")) {
t.Fatalf("Got the wrong msg payload: %q", msg.Data)
}
break
case <-time.After(time.Second):
t.Fatalf("Did not receive the message")
}
s.sysUnsubscribe(sub)
// Now send out a message from our normal client.
// We should not see this one.
nc.Publish("foo", []byte("You There?"))
select {
case <-received:
t.Fatalf("Received a message when we should not have")
case <-time.After(100 * time.Millisecond):
break
}
// Now make sure we do not hear ourselves. We optimize this for internally
// generated messages.
s.mu.Lock()
s.sendInternalMsg("foo", "", nil, msg.Data)
s.mu.Unlock()
select {
case <-received:
t.Fatalf("Received a message when we should not have")
case <-time.After(100 * time.Millisecond):
break
}
}
func TestSystemAccountConnectionUpdatesStopAfterNoLocal(t *testing.T) {
sa, _, sb, optsB, _ := runTrustedCluster(t)
defer sa.Shutdown()
defer sb.Shutdown()
// Normal Account
okp, _ := nkeys.FromSeed(oSeed)
akp, _ := nkeys.CreateAccount()
pub, _ := akp.PublicKey()
nac := jwt.NewAccountClaims(pub)
nac.Limits.Conn = 4 // Limit to 4 connections.
jwt, _ := nac.Encode(okp)
addAccountToMemResolver(sa, pub, jwt)
// Listen for updates to the new account connection activity.
received := make(chan *nats.Msg, 10)
cb := func(sub *subscription, _ *client, _ *Account, subject, reply string, msg []byte) {
copy := append([]byte(nil), msg...)
received <- &nats.Msg{Subject: subject, Reply: reply, Data: copy}
}
subj := fmt.Sprintf(accConnsEventSubjNew, pub)
sub, err := sa.sysSubscribe(subj, cb)
if sub == nil || err != nil {
t.Fatalf("Expected to subscribe, got %v", err)
}
defer sa.sysUnsubscribe(sub)
// Create a few users on the new account.
clients := []*nats.Conn{}
url := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
for i := 0; i < 4; i++ {
nc, err := nats.Connect(url, createUserCreds(t, sb, akp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
clients = append(clients, nc)
}
// Wait for all 4 notifications.
checkFor(t, time.Second, 50*time.Millisecond, func() error {
if len(received) == 4 {
return nil
}
return fmt.Errorf("Not enough messages, %d vs 4", len(received))
})
// Now lookup the account doing the events on sb.
acc, _ := sb.LookupAccount(pub)
// Make sure we have the timer running.
acc.mu.RLock()
ctmr := acc.ctmr
acc.mu.RUnlock()
if ctmr == nil {
t.Fatalf("Expected event timer for acc conns to be running")
}
// Now close all of the connections.
for _, nc := range clients {
nc.Close()
}
// Wait for the 4 new notifications, 8 total (4 for connect, 4 for disconnect)
checkFor(t, time.Second, 50*time.Millisecond, func() error {
if len(received) == 8 {
return nil
}
return fmt.Errorf("Not enough messages, %d vs 4", len(received))
})
// Drain the messages.
for i := 0; i < 7; i++ {
<-received
}
// Check last one.
msg := <-received
m := AccountNumConns{}
if err := json.Unmarshal(msg.Data, &m); err != nil {
t.Fatalf("Error unmarshalling account connections request message: %v", err)
}
if m.Conns != 0 {
t.Fatalf("Expected Conns to be 0, got %d", m.Conns)
}
// Should not receive any more messages..
select {
case <-received:
t.Fatalf("Did not expect a message here")
case <-time.After(50 * time.Millisecond):
break
}
// Make sure we have the timer is NOT running.
acc.mu.RLock()
ctmr = acc.ctmr
acc.mu.RUnlock()
if ctmr != nil {
t.Fatalf("Expected event timer for acc conns to NOT be running after reaching zero local clients")
}
}
func TestSystemAccountConnectionLimits(t *testing.T) {
sa, optsA, sb, optsB, _ := runTrustedCluster(t)
defer sa.Shutdown()
defer sb.Shutdown()
// We want to test that we are limited to a certain number of active connections
// across multiple servers.
// Let's create a user account.
okp, _ := nkeys.FromSeed(oSeed)
akp, _ := nkeys.CreateAccount()
pub, _ := akp.PublicKey()
nac := jwt.NewAccountClaims(pub)
nac.Limits.Conn = 4 // Limit to 4 connections.
jwt, _ := nac.Encode(okp)
addAccountToMemResolver(sa, pub, jwt)
urlA := fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port)
urlB := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
// Create a user on each server. Break on first failure.
for {
nca1, err := nats.Connect(urlA, createUserCreds(t, sa, akp))
if err != nil {
break
}
defer nca1.Close()
ncb1, err := nats.Connect(urlB, createUserCreds(t, sb, akp))
if err != nil {
break
}
defer ncb1.Close()
}
checkFor(t, 5*time.Second, 50*time.Millisecond, func() error {
total := sa.NumClients() + sb.NumClients()
if total > int(nac.Limits.Conn) {
return fmt.Errorf("Expected only %d connections, was allowed to connect %d", nac.Limits.Conn, total)
}
return nil
})
}
func TestBadAccountUpdate(t *testing.T) {
sa, _ := runTrustedServer(t)
defer sa.Shutdown()
akp1, _ := nkeys.CreateAccount()
pub, _ := akp1.PublicKey()
nac := jwt.NewAccountClaims(pub)
ajwt1, err := nac.Encode(oKp)
require_NoError(t, err)
addAccountToMemResolver(sa, pub, ajwt1)
akp2, _ := nkeys.CreateAccount()
pub2, _ := akp2.PublicKey()
nac.Subject = pub2 // maliciously use a different subject but pretend to remain pub
ajwt2, err := nac.Encode(oKp)
require_NoError(t, err)
acc, err := sa.fetchAccount(pub)
require_NoError(t, err)
if err := sa.updateAccountWithClaimJWT(acc, ajwt2); err != ErrAccountValidation {
t.Fatalf("expected %v but got %v", ErrAccountValidation, err)
}
}
// Make sure connection limits apply to the system account itself.
func TestSystemAccountSystemConnectionLimitsHonored(t *testing.T) {
sa, optsA, sb, optsB, sakp := runTrustedCluster(t)
defer sa.Shutdown()
defer sb.Shutdown()
okp, _ := nkeys.FromSeed(oSeed)
// Update system account to have 10 connections
pub, _ := sakp.PublicKey()
nac := jwt.NewAccountClaims(pub)
nac.Limits.Conn = 10
ajwt, _ := nac.Encode(okp)
addAccountToMemResolver(sa, pub, ajwt)
addAccountToMemResolver(sb, pub, ajwt)
// Update the accounts on each server with new claims to force update.
sysAccA := sa.SystemAccount()
sa.updateAccountWithClaimJWT(sysAccA, ajwt)
sysAccB := sb.SystemAccount()
sb.updateAccountWithClaimJWT(sysAccB, ajwt)
// Check system here first, with no external it should be zero.
sacc := sa.SystemAccount()
if nlc := sacc.NumLocalConnections(); nlc != 0 {
t.Fatalf("Expected no local connections, got %d", nlc)
}
urlA := fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port)
urlB := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
// Create a user on each server. Break on first failure.
tc := 0
for {
nca1, err := nats.Connect(urlA, createUserCreds(t, sa, sakp))
if err != nil {
break
}
defer nca1.Close()
tc++
ncb1, err := nats.Connect(urlB, createUserCreds(t, sb, sakp))
if err != nil {
break
}
defer ncb1.Close()
tc++
// The account's connection count is exchanged between servers
// so that the local count on each server reflects the total count.
// Pause a bit to give a chance to each server to process the update.
time.Sleep(15 * time.Millisecond)
}
if tc != 10 {
t.Fatalf("Expected to get 10 external connections, got %d", tc)
}
checkFor(t, 1*time.Second, 50*time.Millisecond, func() error {
total := sa.NumClients() + sb.NumClients()
if total > int(nac.Limits.Conn) {
return fmt.Errorf("Expected only %d connections, was allowed to connect %d", nac.Limits.Conn, total)
}
return nil
})
}
// Test that the remote accounting works when a server is started some time later.
func TestSystemAccountConnectionLimitsServersStaggered(t *testing.T) {
sa, optsA, sb, optsB, _ := runTrustedCluster(t)
defer sa.Shutdown()
sb.Shutdown()
// Let's create a user account.
okp, _ := nkeys.FromSeed(oSeed)
akp, _ := nkeys.CreateAccount()
pub, _ := akp.PublicKey()
nac := jwt.NewAccountClaims(pub)
nac.Limits.Conn = 4 // Limit to 4 connections.
jwt, _ := nac.Encode(okp)
addAccountToMemResolver(sa, pub, jwt)
urlA := fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port)
// Create max connections on sa.
for i := 0; i < int(nac.Limits.Conn); i++ {
nc, err := nats.Connect(urlA, createUserCreds(t, sa, akp))
if err != nil {
t.Fatalf("Unexpected error on #%d try: %v", i+1, err)
}
defer nc.Close()
}
// Restart server B.
optsB.AccountResolver = sa.AccountResolver()
optsB.SystemAccount = sa.SystemAccount().Name
sb = RunServer(optsB)
defer sb.Shutdown()
checkClusterFormed(t, sa, sb)
// Trigger a load of the user account on the new server
// NOTE: If we do not load the user, the user can be the first
// to request this account, hence the connection will succeed.
checkFor(t, time.Second, 15*time.Millisecond, func() error {
if acc, err := sb.LookupAccount(pub); acc == nil || err != nil {
return fmt.Errorf("LookupAccount did not return account or failed, err=%v", err)
}
return nil
})
// Expect this to fail.
urlB := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
if _, err := nats.Connect(urlB, createUserCreds(t, sb, akp)); err == nil {
t.Fatalf("Expected connection to fail due to max limit")
}
}
// Test that the remote accounting works when a server is shutdown.
func TestSystemAccountConnectionLimitsServerShutdownGraceful(t *testing.T) {
sa, optsA, sb, optsB, _ := runTrustedCluster(t)
defer sa.Shutdown()
defer sb.Shutdown()
// Let's create a user account.
okp, _ := nkeys.FromSeed(oSeed)
akp, _ := nkeys.CreateAccount()
pub, _ := akp.PublicKey()
nac := jwt.NewAccountClaims(pub)
nac.Limits.Conn = 10 // Limit to 10 connections.
jwt, _ := nac.Encode(okp)
addAccountToMemResolver(sa, pub, jwt)
addAccountToMemResolver(sb, pub, jwt)
urlA := fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port)
urlB := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
for i := 0; i < 5; i++ {
nc, err := nats.Connect(urlA, nats.NoReconnect(), createUserCreds(t, sa, akp))
if err != nil {
t.Fatalf("Expected to connect, got %v", err)
}
defer nc.Close()
nc, err = nats.Connect(urlB, nats.NoReconnect(), createUserCreds(t, sb, akp))
if err != nil {
t.Fatalf("Expected to connect, got %v", err)
}
defer nc.Close()
}
// We are at capacity so both of these should fail.
if _, err := nats.Connect(urlA, createUserCreds(t, sa, akp)); err == nil {
t.Fatalf("Expected connection to fail due to max limit")
}
if _, err := nats.Connect(urlB, createUserCreds(t, sb, akp)); err == nil {
t.Fatalf("Expected connection to fail due to max limit")
}
// Now shutdown Server B.
sb.Shutdown()
// Now we should be able to create more on A now.
for i := 0; i < 5; i++ {
nc, err := nats.Connect(urlA, createUserCreds(t, sa, akp))
if err != nil {
t.Fatalf("Expected to connect on %d, got %v", i, err)
}
defer nc.Close()
}
}
// Test that the remote accounting works when a server goes away.
func TestSystemAccountConnectionLimitsServerShutdownForced(t *testing.T) {
sa, optsA, sb, optsB, _ := runTrustedCluster(t)
defer sa.Shutdown()
// Let's create a user account.
okp, _ := nkeys.FromSeed(oSeed)
akp, _ := nkeys.CreateAccount()
pub, _ := akp.PublicKey()
nac := jwt.NewAccountClaims(pub)
nac.Limits.Conn = 20 // Limit to 20 connections.
jwt, _ := nac.Encode(okp)
addAccountToMemResolver(sa, pub, jwt)
addAccountToMemResolver(sb, pub, jwt)
urlA := fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port)
urlB := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
for i := 0; i < 10; i++ {
c, err := nats.Connect(urlA, nats.NoReconnect(), createUserCreds(t, sa, akp))
if err != nil {
t.Fatalf("Expected to connect, got %v", err)
}
defer c.Close()
c, err = nats.Connect(urlB, nats.NoReconnect(), createUserCreds(t, sb, akp))
if err != nil {
t.Fatalf("Expected to connect, got %v", err)
}
defer c.Close()
}
// Now shutdown Server B. Do so such that no communications go out.
sb.mu.Lock()
sb.sys = nil
sb.mu.Unlock()
sb.Shutdown()
if _, err := nats.Connect(urlA, createUserCreds(t, sa, akp)); err == nil {
t.Fatalf("Expected connection to fail due to max limit")
}
// Let's speed up the checking process.
sa.mu.Lock()
sa.sys.chkOrph = 10 * time.Millisecond
sa.sys.orphMax = 30 * time.Millisecond
sa.sys.sweeper.Reset(sa.sys.chkOrph)
sa.mu.Unlock()
// We should eventually be able to connect.
checkFor(t, 2*time.Second, 50*time.Millisecond, func() error {
if c, err := nats.Connect(urlA, createUserCreds(t, sa, akp)); err != nil {
return err
} else {
c.Close()
}
return nil
})
}
func TestSystemAccountFromConfig(t *testing.T) {
kp, _ := nkeys.FromSeed(oSeed)
opub, _ := kp.PublicKey()
akp, _ := nkeys.CreateAccount()
apub, _ := akp.PublicKey()
nac := jwt.NewAccountClaims(apub)
ajwt, err := nac.Encode(kp)
if err != nil {
t.Fatalf("Error generating account JWT: %v", err)
}
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(ajwt))
}))
defer ts.Close()
confTemplate := `
listen: -1
trusted: %s
system_account: %s
resolver: URL("%s/jwt/v1/accounts/")
`
conf := createConfFile(t, []byte(fmt.Sprintf(confTemplate, opub, apub, ts.URL)))
defer removeFile(t, conf)
s, _ := RunServerWithConfig(conf)
defer s.Shutdown()
if acc := s.SystemAccount(); acc == nil || acc.Name != apub {
t.Fatalf("System Account not properly set")
}
}
func TestAccountClaimsUpdates(t *testing.T) {
test := func(subj string) {
s, opts := runTrustedServer(t)
defer s.Shutdown()
sacc, sakp := createAccount(s)
s.setSystemAccount(sacc)
// Let's create a normal account with limits we can update.
okp, _ := nkeys.FromSeed(oSeed)
akp, _ := nkeys.CreateAccount()
pub, _ := akp.PublicKey()
nac := jwt.NewAccountClaims(pub)
nac.Limits.Conn = 4
ajwt, _ := nac.Encode(okp)
addAccountToMemResolver(s, pub, ajwt)
acc, _ := s.LookupAccount(pub)
if acc.MaxActiveConnections() != 4 {
t.Fatalf("Expected to see a limit of 4 connections")
}
// Simulate a systems publisher so we can do an account claims update.
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
nc, err := nats.Connect(url, createUserCreds(t, s, sakp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
// Update the account
nac = jwt.NewAccountClaims(pub)
nac.Limits.Conn = 8
issAt := time.Now().Add(-30 * time.Second).Unix()
nac.IssuedAt = issAt
expires := time.Now().Add(2 * time.Second).Unix()
nac.Expires = expires
ajwt, _ = nac.Encode(okp)
// Publish to the system update subject.
claimUpdateSubj := fmt.Sprintf(subj, pub)
nc.Publish(claimUpdateSubj, []byte(ajwt))
nc.Flush()
acc, _ = s.LookupAccount(pub)
if acc.MaxActiveConnections() != 8 {
t.Fatalf("Account was not updated")
}
}
t.Run("new", func(t *testing.T) {
test(accUpdateEventSubjNew)
})
t.Run("old", func(t *testing.T) {
test(accUpdateEventSubjOld)
})
}
func TestAccountReqMonitoring(t *testing.T) {
s, opts := runTrustedServer(t)
defer s.Shutdown()
sacc, sakp := createAccount(s)
s.setSystemAccount(sacc)
s.EnableJetStream(nil)
unusedAcc, _ := createAccount(s)
acc, akp := createAccount(s)
acc.EnableJetStream(nil)
subsz := fmt.Sprintf(accDirectReqSubj, acc.Name, "SUBSZ")
connz := fmt.Sprintf(accDirectReqSubj, acc.Name, "CONNZ")
jsz := fmt.Sprintf(accDirectReqSubj, acc.Name, "JSZ")
pStatz := fmt.Sprintf(accPingReqSubj, "STATZ")
statz := func(name string) string { return fmt.Sprintf(accDirectReqSubj, name, "STATZ") }
// Create system account connection to query
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
ncSys, err := nats.Connect(url, createUserCreds(t, s, sakp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer ncSys.Close()
// Create a connection that we can query
nc, err := nats.Connect(url, createUserCreds(t, s, akp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
// query SUBSZ for account
resp, err := ncSys.Request(subsz, nil, time.Second)
require_NoError(t, err)
require_Contains(t, string(resp.Data), `"num_subscriptions":4,`)
// create a subscription
sub, err := nc.Subscribe("foo", func(msg *nats.Msg) {})
require_NoError(t, err)
defer sub.Unsubscribe()
require_NoError(t, nc.Flush())
// query SUBSZ for account
resp, err = ncSys.Request(subsz, nil, time.Second)
require_NoError(t, err)
require_Contains(t, string(resp.Data), `"num_subscriptions":5,`, `"subject":"foo"`)
// query connections for account
resp, err = ncSys.Request(connz, nil, time.Second)
require_NoError(t, err)
require_Contains(t, string(resp.Data), `"num_connections":1,`, `"total":1,`)
// query connections for js account
resp, err = ncSys.Request(jsz, nil, time.Second)
require_NoError(t, err)
require_Contains(t, string(resp.Data), `"memory":0,`, `"storage":0,`)
// query statz/conns for account
resp, err = ncSys.Request(statz(acc.Name), nil, time.Second)
require_NoError(t, err)
respContentAcc := []string{`"conns":1,`, `"total_conns":1`, `"slow_consumers":0`, `"sent":{"msgs":0,"bytes":0}`,
`"received":{"msgs":0,"bytes":0}`, fmt.Sprintf(`"acc":"%s"`, acc.Name)}
require_Contains(t, string(resp.Data), respContentAcc...)
rIb := ncSys.NewRespInbox()
rSub, err := ncSys.SubscribeSync(rIb)
require_NoError(t, err)
require_NoError(t, ncSys.PublishRequest(pStatz, rIb, nil))
minRespContentForBothAcc := []string{`"conns":1,`, `"total_conns":1`, `"slow_consumers":0`, `"acc":"`}
resp, err = rSub.NextMsg(time.Second)
require_NoError(t, err)
require_Contains(t, string(resp.Data), minRespContentForBothAcc...)
// expect one entry per account
require_Contains(t, string(resp.Data), fmt.Sprintf(`"acc":"%s"`, acc.Name), fmt.Sprintf(`"acc":"%s"`, sacc.Name))
// Test ping with filter by account name
require_NoError(t, ncSys.PublishRequest(pStatz, rIb, []byte(fmt.Sprintf(`{"accounts":["%s"]}`, sacc.Name))))
m, err := rSub.NextMsg(time.Second)
require_NoError(t, err)
require_Contains(t, string(m.Data), minRespContentForBothAcc...)
require_NoError(t, ncSys.PublishRequest(pStatz, rIb, []byte(fmt.Sprintf(`{"accounts":["%s"]}`, acc.Name))))
m, err = rSub.NextMsg(time.Second)
require_NoError(t, err)
require_Contains(t, string(m.Data), respContentAcc...)
// Test include unused for statz and ping of statz
unusedContent := []string{`"conns":0,`, `"total_conns":0`, `"slow_consumers":0`,
fmt.Sprintf(`"acc":"%s"`, unusedAcc.Name)}
resp, err = ncSys.Request(statz(unusedAcc.Name),
[]byte(fmt.Sprintf(`{"accounts":["%s"], "include_unused":true}`, unusedAcc.Name)),
time.Second)
require_NoError(t, err)
require_Contains(t, string(resp.Data), unusedContent...)
require_NoError(t, ncSys.PublishRequest(pStatz, rIb,
[]byte(fmt.Sprintf(`{"accounts":["%s"], "include_unused":true}`, unusedAcc.Name))))
resp, err = rSub.NextMsg(time.Second)
require_NoError(t, err)
require_Contains(t, string(resp.Data), unusedContent...)
require_NoError(t, ncSys.PublishRequest(pStatz, rIb, []byte(fmt.Sprintf(`{"accounts":["%s"]}`, unusedAcc.Name))))
_, err = rSub.NextMsg(200 * time.Millisecond)
require_Error(t, err)
// Test ping from within account
ib := nc.NewRespInbox()
rSub, err = nc.SubscribeSync(ib)
require_NoError(t, err)
require_NoError(t, nc.PublishRequest(pStatz, ib, nil))
resp, err = rSub.NextMsg(time.Second)
require_NoError(t, err)
require_Contains(t, string(resp.Data), respContentAcc...)
_, err = rSub.NextMsg(200 * time.Millisecond)
require_Error(t, err)
}
func TestAccountReqInfo(t *testing.T) {
s, opts := runTrustedServer(t)
defer s.Shutdown()
sacc, sakp := createAccount(s)
s.setSystemAccount(sacc)
// Let's create an account with service export.
akp, _ := nkeys.CreateAccount()
pub1, _ := akp.PublicKey()
nac1 := jwt.NewAccountClaims(pub1)
nac1.Exports.Add(&jwt.Export{Subject: "req.*", Type: jwt.Service})
ajwt1, _ := nac1.Encode(oKp)
addAccountToMemResolver(s, pub1, ajwt1)
s.LookupAccount(pub1)
info1 := fmt.Sprintf(accDirectReqSubj, pub1, "INFO")
// Now add an account with service imports.
akp2, _ := nkeys.CreateAccount()
pub2, _ := akp2.PublicKey()
nac2 := jwt.NewAccountClaims(pub2)
nac2.Imports.Add(&jwt.Import{Account: pub1, Subject: "req.1", Type: jwt.Service})
ajwt2, _ := nac2.Encode(oKp)
addAccountToMemResolver(s, pub2, ajwt2)
s.LookupAccount(pub2)
info2 := fmt.Sprintf(accDirectReqSubj, pub2, "INFO")
// Create system account connection to query
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
ncSys, err := nats.Connect(url, createUserCreds(t, s, sakp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer ncSys.Close()
checkCommon := func(info *AccountInfo, srv *ServerInfo, pub, jwt string) {
if info.Complete != true {
t.Fatalf("Unexpected value: %v", info.Complete)
} else if info.Expired != false {
t.Fatalf("Unexpected value: %v", info.Expired)
} else if info.JetStream != false {
t.Fatalf("Unexpected value: %v", info.JetStream)
} else if info.ClientCnt != 0 {
t.Fatalf("Unexpected value: %v", info.ClientCnt)
} else if info.AccountName != pub {
t.Fatalf("Unexpected value: %v", info.AccountName)
} else if info.LeafCnt != 0 {
t.Fatalf("Unexpected value: %v", info.LeafCnt)
} else if info.Jwt != jwt {
t.Fatalf("Unexpected value: %v", info.Jwt)
} else if srv.Cluster != "abc" {
t.Fatalf("Unexpected value: %v", srv.Cluster)
} else if srv.Name != s.Name() {
t.Fatalf("Unexpected value: %v", srv.Name)
} else if srv.Host != opts.Host {
t.Fatalf("Unexpected value: %v", srv.Host)
} else if srv.Seq < 1 {
t.Fatalf("Unexpected value: %v", srv.Seq)
}
}
info := AccountInfo{}
srv := ServerInfo{}
msg := struct {
Data *AccountInfo `json:"data"`
Srv *ServerInfo `json:"server"`
}{
&info,
&srv,
}
if resp, err := ncSys.Request(info1, nil, time.Second); err != nil {
t.Fatalf("Error on request: %v", err)
} else if err := json.Unmarshal(resp.Data, &msg); err != nil {
t.Fatalf("Unmarshalling failed: %v", err)
} else if len(info.Exports) != 1 {
t.Fatalf("Unexpected value: %v", info.Exports)
} else if len(info.Imports) != 3 {
t.Fatalf("Unexpected value: %+v", info.Imports)
} else if info.Exports[0].Subject != "req.*" {
t.Fatalf("Unexpected value: %v", info.Exports)
} else if info.Exports[0].Type != jwt.Service {
t.Fatalf("Unexpected value: %v", info.Exports)
} else if info.Exports[0].ResponseType != jwt.ResponseTypeSingleton {
t.Fatalf("Unexpected value: %v", info.Exports)
} else if info.SubCnt != 3 {
t.Fatalf("Unexpected value: %v", info.SubCnt)
} else {
checkCommon(&info, &srv, pub1, ajwt1)
}
info = AccountInfo{}
srv = ServerInfo{}
if resp, err := ncSys.Request(info2, nil, time.Second); err != nil {
t.Fatalf("Error on request: %v", err)
} else if err := json.Unmarshal(resp.Data, &msg); err != nil {
t.Fatalf("Unmarshalling failed: %v", err)
} else if len(info.Exports) != 0 {
t.Fatalf("Unexpected value: %v", info.Exports)
} else if len(info.Imports) != 4 {
t.Fatalf("Unexpected value: %+v", info.Imports)
}
// Here we need to find our import
var si *ExtImport
for _, im := range info.Imports {
if im.Subject == "req.1" {
si = &im
break
}
}
if si == nil {
t.Fatalf("Could not find our import")
}
if si.Type != jwt.Service {
t.Fatalf("Unexpected value: %+v", si)
} else if si.Account != pub1 {
t.Fatalf("Unexpected value: %+v", si)
} else if info.SubCnt != 4 {
t.Fatalf("Unexpected value: %+v", si)
} else {
checkCommon(&info, &srv, pub2, ajwt2)
}
}
func TestAccountClaimsUpdatesWithServiceImports(t *testing.T) {
s, opts := runTrustedServer(t)
defer s.Shutdown()
sacc, sakp := createAccount(s)
s.setSystemAccount(sacc)
okp, _ := nkeys.FromSeed(oSeed)
// Let's create an account with service export.
akp, _ := nkeys.CreateAccount()
pub, _ := akp.PublicKey()
nac := jwt.NewAccountClaims(pub)
nac.Exports.Add(&jwt.Export{Subject: "req.*", Type: jwt.Service})
ajwt, _ := nac.Encode(okp)
addAccountToMemResolver(s, pub, ajwt)
s.LookupAccount(pub)
// Now add an account with multiple service imports.
akp2, _ := nkeys.CreateAccount()
pub2, _ := akp2.PublicKey()
nac2 := jwt.NewAccountClaims(pub2)
nac2.Imports.Add(&jwt.Import{Account: pub, Subject: "req.1", Type: jwt.Service})
ajwt2, _ := nac2.Encode(okp)
addAccountToMemResolver(s, pub2, ajwt2)
s.LookupAccount(pub2)
startSubs := s.NumSubscriptions()
// Simulate a systems publisher so we can do an account claims update.
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
nc, err := nats.Connect(url, createUserCreds(t, s, sakp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
// Update the account several times
for i := 1; i <= 10; i++ {
nac2 = jwt.NewAccountClaims(pub2)
nac2.Limits.Conn = int64(i)
nac2.Imports.Add(&jwt.Import{Account: pub, Subject: "req.1", Type: jwt.Service})
ajwt2, _ = nac2.Encode(okp)
// Publish to the system update subject.
claimUpdateSubj := fmt.Sprintf(accUpdateEventSubjNew, pub2)
nc.Publish(claimUpdateSubj, []byte(ajwt2))
}
nc.Flush()
if startSubs < s.NumSubscriptions() {
t.Fatalf("Subscriptions leaked: %d vs %d", startSubs, s.NumSubscriptions())
}
}
func TestAccountConnsLimitExceededAfterUpdate(t *testing.T) {
s, opts := runTrustedServer(t)
defer s.Shutdown()
sacc, _ := createAccount(s)
s.setSystemAccount(sacc)
// Let's create a normal account with limits we can update.
okp, _ := nkeys.FromSeed(oSeed)
akp, _ := nkeys.CreateAccount()
pub, _ := akp.PublicKey()
nac := jwt.NewAccountClaims(pub)
nac.Limits.Conn = 10
ajwt, _ := nac.Encode(okp)
addAccountToMemResolver(s, pub, ajwt)
acc, _ := s.LookupAccount(pub)
// Now create the max connections.
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
for {
nc, err := nats.Connect(url, createUserCreds(t, s, akp))
if err != nil {
break
}
defer nc.Close()
}
// We should have max here.
checkFor(t, 2*time.Second, 50*time.Millisecond, func() error {
if total := s.NumClients(); total != acc.MaxActiveConnections() {
return fmt.Errorf("Expected %d connections, got %d", acc.MaxActiveConnections(), total)
}
return nil
})
// Now change limits to make current connections over the limit.
nac = jwt.NewAccountClaims(pub)
nac.Limits.Conn = 2
ajwt, _ = nac.Encode(okp)
s.updateAccountWithClaimJWT(acc, ajwt)
if acc.MaxActiveConnections() != 2 {
t.Fatalf("Expected max connections to be set to 2, got %d", acc.MaxActiveConnections())
}
// We should have closed the excess connections.
checkClientsCount(t, s, acc.MaxActiveConnections())
}
func TestAccountConnsLimitExceededAfterUpdateDisconnectNewOnly(t *testing.T) {
s, opts := runTrustedServer(t)
defer s.Shutdown()
sacc, _ := createAccount(s)
s.setSystemAccount(sacc)
// Let's create a normal account with limits we can update.
okp, _ := nkeys.FromSeed(oSeed)
akp, _ := nkeys.CreateAccount()
pub, _ := akp.PublicKey()
nac := jwt.NewAccountClaims(pub)
nac.Limits.Conn = 10
ajwt, _ := nac.Encode(okp)
addAccountToMemResolver(s, pub, ajwt)
acc, _ := s.LookupAccount(pub)
// Now create the max connections.
// We create half then we will wait and then create the rest.
// Will test that we disconnect the newest ones.
newConns := make([]*nats.Conn, 0, 5)
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
for i := 0; i < 5; i++ {
nc, err := nats.Connect(url, nats.NoReconnect(), createUserCreds(t, s, akp))
require_NoError(t, err)
defer nc.Close()
}
time.Sleep(500 * time.Millisecond)
for i := 0; i < 5; i++ {
nc, err := nats.Connect(url, nats.NoReconnect(), createUserCreds(t, s, akp))
require_NoError(t, err)
defer nc.Close()
newConns = append(newConns, nc)
}
// We should have max here.
checkClientsCount(t, s, acc.MaxActiveConnections())
// Now change limits to make current connections over the limit.
nac = jwt.NewAccountClaims(pub)
nac.Limits.Conn = 5
ajwt, _ = nac.Encode(okp)
s.updateAccountWithClaimJWT(acc, ajwt)
if acc.MaxActiveConnections() != 5 {
t.Fatalf("Expected max connections to be set to 2, got %d", acc.MaxActiveConnections())
}
// We should have closed the excess connections.
checkClientsCount(t, s, acc.MaxActiveConnections())
// Now make sure that only the new ones were closed.
var closed int
for _, nc := range newConns {
if !nc.IsClosed() {
closed++
}
}
if closed != 5 {
t.Fatalf("Expected all new clients to be closed, only got %d of 5", closed)
}
}
func TestSystemAccountWithBadRemoteLatencyUpdate(t *testing.T) {
s, _ := runTrustedServer(t)
defer s.Shutdown()
acc, _ := createAccount(s)
s.setSystemAccount(acc)
rl := remoteLatency{
Account: "NONSENSE",
ReqId: "_INBOX.22",
}
b, _ := json.Marshal(&rl)
s.remoteLatencyUpdate(nil, nil, nil, "foo", _EMPTY_, b)
}
func TestSystemAccountWithGateways(t *testing.T) {
sa, oa, sb, ob, akp := runTrustedGateways(t)
defer sa.Shutdown()
defer sb.Shutdown()
// Create a client on A that will subscribe on $SYS.ACCOUNT.>
urla := fmt.Sprintf("nats://%s:%d", oa.Host, oa.Port)
nca := natsConnect(t, urla, createUserCreds(t, sa, akp), nats.Name("SYS"))
defer nca.Close()
nca.Flush()
sub, _ := nca.SubscribeSync("$SYS.ACCOUNT.>")
defer sub.Unsubscribe()
nca.Flush()
// If this tests fails with wrong number after 10 seconds we may have
// added a new inititial subscription for the eventing system.
checkExpectedSubs(t, 45, sa)
// Create a client on B and see if we receive the event
urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port)
ncb := natsConnect(t, urlb, createUserCreds(t, sb, akp), nats.Name("TEST EVENTS"))
defer ncb.Close()
// space for .CONNECT and .CONNS from SYS and $G as well as one extra message
msgs := [4]*nats.Msg{}
var err error
msgs[0], err = sub.NextMsg(time.Second)
require_NoError(t, err)
msgs[1], err = sub.NextMsg(time.Second)
require_NoError(t, err)
msgs[2], err = sub.NextMsg(time.Second)
require_NoError(t, err)
// TODO: There is a race currently that can cause the server to process the
// system event *after* the subscription on "A" has been registered, and so
// the "nca" client would receive its own CONNECT message.
msgs[3], _ = sub.NextMsg(250 * time.Millisecond)
findMsgs := func(sub string) []*nats.Msg {
rMsgs := []*nats.Msg{}
for _, m := range msgs {
if m == nil {
continue
}
if m.Subject == sub {
rMsgs = append(rMsgs, m)
}
}
return rMsgs
}
msg := findMsgs(fmt.Sprintf("$SYS.ACCOUNT.%s.CONNECT", sa.SystemAccount().Name))
var bMsg *nats.Msg
if len(msg) < 1 {
t.Fatal("Expected at least one message")
}
bMsg = msg[len(msg)-1]
require_Contains(t, string(bMsg.Data), sb.ID())
require_Contains(t, string(bMsg.Data), `"cluster":"B"`)
require_Contains(t, string(bMsg.Data), `"name":"TEST EVENTS"`)
connsMsgA := findMsgs(fmt.Sprintf("$SYS.ACCOUNT.%s.SERVER.CONNS", sa.SystemAccount().Name))
if len(connsMsgA) != 1 {
t.Fatal("Expected a message")
}
connsMsgG := findMsgs("$SYS.ACCOUNT.$G.SERVER.CONNS")
if len(connsMsgG) != 1 {
t.Fatal("Expected a message")
}
}
func TestSystemAccountNoAuthUser(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: "127.0.0.1:-1"
accounts {
$SYS {
users [{user: "admin", password: "pwd"}]
}
}
`))
defer os.Remove(conf)
s, o := RunServerWithConfig(conf)
defer s.Shutdown()
for _, test := range []struct {
name string
usrInfo string
ok bool
account string
}{
{"valid user/pwd", "admin:pwd@", true, "$SYS"},
{"invalid pwd", "admin:wrong@", false, _EMPTY_},
{"some token", "sometoken@", false, _EMPTY_},
{"user used without pwd", "admin@", false, _EMPTY_}, // will be treated as a token
{"user with empty password", "admin:@", false, _EMPTY_},
{"no user means global account", _EMPTY_, true, globalAccountName},
} {
t.Run(test.name, func(t *testing.T) {
url := fmt.Sprintf("nats://%s127.0.0.1:%d", test.usrInfo, o.Port)
nc, err := nats.Connect(url)
if err != nil {
if test.ok {
t.Fatalf("Unexpected error: %v", err)
}
return
} else if !test.ok {
nc.Close()
t.Fatalf("Should have failed, did not")
}
var accName string
s.mu.Lock()
for _, c := range s.clients {
c.mu.Lock()
if c.acc != nil {
accName = c.acc.Name
}
c.mu.Unlock()
break
}
s.mu.Unlock()
nc.Close()
checkClientsCount(t, s, 0)
if accName != test.account {
t.Fatalf("The account should have been %q, got %q", test.account, accName)
}
})
}
}
func TestServerAccountConns(t *testing.T) {
// speed up hb
orgHBInterval := eventsHBInterval
eventsHBInterval = time.Millisecond * 100
defer func() { eventsHBInterval = orgHBInterval }()
conf := createConfFile(t, []byte(`
host: 127.0.0.1
port: -1
system_account: SYS
accounts: {
SYS: {users: [{user: s, password: s}]}
ACC: {users: [{user: a, password: a}]}
}`))
defer removeFile(t, conf)
s, _ := RunServerWithConfig(conf)
defer s.Shutdown()
nc := natsConnect(t, s.ClientURL(), nats.UserInfo("a", "a"))
defer nc.Close()
subOut, err := nc.SubscribeSync("foo")
require_NoError(t, err)
hw := "HELLO WORLD"
nc.Publish("foo", []byte(hw))
nc.Publish("bar", []byte(hw)) // will only count towards received
nc.Flush()
m, err := subOut.NextMsg(time.Second)
require_NoError(t, err)
require_Equal(t, string(m.Data), hw)
ncs := natsConnect(t, s.ClientURL(), nats.UserInfo("s", "s"))
defer ncs.Close()
subs, err := ncs.SubscribeSync("$SYS.ACCOUNT.ACC.SERVER.CONNS")
require_NoError(t, err)
m, err = subs.NextMsg(time.Second)
require_NoError(t, err)
accConns := &AccountNumConns{}
err = json.Unmarshal(m.Data, accConns)
require_NoError(t, err)
require_True(t, accConns.Received.Msgs == 2)
require_True(t, accConns.Received.Bytes == 2*int64(len(hw)))
require_True(t, accConns.Sent.Msgs == 1)
require_True(t, accConns.Sent.Bytes == int64(len(hw)))
}
func TestServerEventsStatsZ(t *testing.T) {
serverStatsReqSubj := "$SYS.REQ.SERVER.%s.STATSZ"
preStart := time.Now().UTC()
// Add little bit of delay to make sure that time check
// between pre-start and actual start does not fail.
time.Sleep(5 * time.Millisecond)
sa, optsA, sb, _, akp := runTrustedCluster(t)
defer sa.Shutdown()
defer sb.Shutdown()
// Same between actual start and post start.
time.Sleep(5 * time.Millisecond)
postStart := time.Now().UTC()
url := fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port)
ncs, err := nats.Connect(url, createUserCreds(t, sa, akp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer ncs.Close()
subj := fmt.Sprintf(serverStatsSubj, sa.ID())
sub, _ := ncs.SubscribeSync(subj)
defer sub.Unsubscribe()
ncs.Publish("foo", []byte("HELLO WORLD"))
ncs.Flush()
// Let's speed up the checking process.
sa.mu.Lock()
sa.sys.statsz = 10 * time.Millisecond
sa.sys.stmr.Reset(sa.sys.statsz)
sa.mu.Unlock()
_, err = sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
// Get it the second time so we can check some stats
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
m := ServerStatsMsg{}
if err := json.Unmarshal(msg.Data, &m); err != nil {
t.Fatalf("Error unmarshalling the statz json: %v", err)
}
if m.Server.ID != sa.ID() {
t.Fatalf("Did not match IDs")
}
if m.Server.Cluster != "TEST CLUSTER 22" {
t.Fatalf("Did not match cluster name")
}
if m.Server.Version != VERSION {
t.Fatalf("Did not match server version")
}
if !m.Stats.Start.After(preStart) && m.Stats.Start.Before(postStart) {
t.Fatalf("Got a wrong start time for the server %v", m.Stats.Start)
}
if m.Stats.Connections != 1 {
t.Fatalf("Did not match connections of 1, got %d", m.Stats.Connections)
}
if m.Stats.ActiveAccounts != 2 {
t.Fatalf("Did not match active accounts of 2, got %d", m.Stats.ActiveAccounts)
}
if m.Stats.Sent.Msgs < 1 {
t.Fatalf("Did not match sent msgs of >=1, got %d", m.Stats.Sent.Msgs)
}
if m.Stats.Received.Msgs < 1 {
t.Fatalf("Did not match received msgs of >=1, got %d", m.Stats.Received.Msgs)
}
if lr := len(m.Stats.Routes); lr != 1 {
t.Fatalf("Expected a route, but got %d", lr)
}
// Now let's prompt this server to send us the statsz
subj = fmt.Sprintf(serverStatsReqSubj, sa.ID())
msg, err = ncs.Request(subj, nil, time.Second)
if err != nil {
t.Fatalf("Error trying to request statsz: %v", err)
}
m2 := ServerStatsMsg{}
if err := json.Unmarshal(msg.Data, &m2); err != nil {
t.Fatalf("Error unmarshalling the statz json: %v", err)
}
if m2.Server.ID != sa.ID() {
t.Fatalf("Did not match IDs")
}
if m2.Stats.Connections != 1 {
t.Fatalf("Did not match connections of 1, got %d", m2.Stats.Connections)
}
if m2.Stats.ActiveAccounts != 2 {
t.Fatalf("Did not match active accounts of 2, got %d", m2.Stats.ActiveAccounts)
}
if m2.Stats.Sent.Msgs < 3 {
t.Fatalf("Did not match sent msgs of >= 3, got %d", m2.Stats.Sent.Msgs)
}
if m2.Stats.Received.Msgs < 1 {
t.Fatalf("Did not match received msgs of >= 1, got %d", m2.Stats.Received.Msgs)
}
if lr := len(m2.Stats.Routes); lr != 1 {
t.Fatalf("Expected a route, but got %d", lr)
}
msg, err = ncs.Request(subj, nil, time.Second)
if err != nil {
t.Fatalf("Error trying to request statsz: %v", err)
}
m3 := ServerStatsMsg{}
if err := json.Unmarshal(msg.Data, &m3); err != nil {
t.Fatalf("Error unmarshalling the statz json: %v", err)
}
if m3.Server.ID != sa.ID() {
t.Fatalf("Did not match IDs")
}
if m3.Stats.Connections != 1 {
t.Fatalf("Did not match connections of 1, got %d", m3.Stats.Connections)
}
if m3.Stats.ActiveAccounts != 2 {
t.Fatalf("Did not match active accounts of 2, got %d", m3.Stats.ActiveAccounts)
}
if m3.Stats.Sent.Msgs < 4 {
t.Fatalf("Did not match sent msgs of >= 4, got %d", m3.Stats.Sent.Msgs)
}
if m3.Stats.Received.Msgs < 2 {
t.Fatalf("Did not match received msgs of >= 2, got %d", m3.Stats.Received.Msgs)
}
if lr := len(m3.Stats.Routes); lr != 1 {
t.Fatalf("Expected a route, but got %d", lr)
}
if sr := m3.Stats.Routes[0]; sr.Name != "B_SRV" {
t.Fatalf("Expected server A's route to B to have Name set to %q, got %q", "B", sr.Name)
}
// Now query B and check that route's name is "A"
subj = fmt.Sprintf(serverStatsReqSubj, sb.ID())
ncs.SubscribeSync(subj)
msg, err = ncs.Request(subj, nil, time.Second)
if err != nil {
t.Fatalf("Error trying to request statsz: %v", err)
}
m = ServerStatsMsg{}
if err := json.Unmarshal(msg.Data, &m); err != nil {
t.Fatalf("Error unmarshalling the statz json: %v", err)
}
if lr := len(m.Stats.Routes); lr != 1 {
t.Fatalf("Expected a route, but got %d", lr)
}
if sr := m.Stats.Routes[0]; sr.Name != "A_SRV" {
t.Fatalf("Expected server B's route to A to have Name set to %q, got %q", "A_SRV", sr.Name)
}
}
func TestServerEventsPingStatsZ(t *testing.T) {
sa, _, sb, optsB, akp := runTrustedCluster(t)
defer sa.Shutdown()
defer sb.Shutdown()
url := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
nc, err := nats.Connect(url, createUserCreds(t, sb, akp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
test := func(req []byte) {
reply := nc.NewRespInbox()
sub, _ := nc.SubscribeSync(reply)
nc.PublishRequest(serverStatsPingReqSubj, reply, req)
// Make sure its a statsz
m := ServerStatsMsg{}
// Receive both manually.
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
if err := json.Unmarshal(msg.Data, &m); err != nil {
t.Fatalf("Error unmarshalling the statz json: %v", err)
}
msg, err = sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
if err := json.Unmarshal(msg.Data, &m); err != nil {
t.Fatalf("Error unmarshalling the statz json: %v", err)
}
}
strRequestTbl := []string{
`{"cluster":"TEST"}`,
`{"cluster":"CLUSTER"}`,
`{"server_name":"SRV"}`,
`{"server_name":"_"}`,
fmt.Sprintf(`{"host":"%s"}`, optsB.Host),
fmt.Sprintf(`{"host":"%s", "cluster":"CLUSTER", "name":"SRV"}`, optsB.Host),
}
for i, opt := range strRequestTbl {
t.Run(fmt.Sprintf("%s-%d", t.Name(), i), func(t *testing.T) {
test([]byte(opt))
})
}
requestTbl := []StatszEventOptions{
{EventFilterOptions: EventFilterOptions{Cluster: "TEST"}},
{EventFilterOptions: EventFilterOptions{Cluster: "CLUSTER"}},
{EventFilterOptions: EventFilterOptions{Name: "SRV"}},
{EventFilterOptions: EventFilterOptions{Name: "_"}},
{EventFilterOptions: EventFilterOptions{Host: optsB.Host}},
{EventFilterOptions: EventFilterOptions{Host: optsB.Host, Cluster: "CLUSTER", Name: "SRV"}},
}
for i, opt := range requestTbl {
t.Run(fmt.Sprintf("%s-%d", t.Name(), i), func(t *testing.T) {
msg, _ := json.MarshalIndent(&opt, "", " ")
test(msg)
})
}
}
func TestServerEventsPingStatsZFilter(t *testing.T) {
sa, _, sb, optsB, akp := runTrustedCluster(t)
defer sa.Shutdown()
defer sb.Shutdown()
url := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
nc, err := nats.Connect(url, createUserCreds(t, sb, akp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
requestTbl := []string{
`{"cluster":"DOESNOTEXIST"}`,
`{"host":"DOESNOTEXIST"}`,
`{"server_name":"DOESNOTEXIST"}`,
}
for i, msg := range requestTbl {
t.Run(fmt.Sprintf("%s-%d", t.Name(), i), func(t *testing.T) {
// Receive both manually.
if _, err := nc.Request(serverStatsPingReqSubj, []byte(msg), time.Second/4); err != nats.ErrTimeout {
t.Fatalf("Error, expected timeout: %v", err)
}
})
}
requestObjTbl := []EventFilterOptions{
{Cluster: "DOESNOTEXIST"},
{Host: "DOESNOTEXIST"},
{Name: "DOESNOTEXIST"},
}
for i, opt := range requestObjTbl {
t.Run(fmt.Sprintf("%s-%d", t.Name(), i), func(t *testing.T) {
msg, _ := json.MarshalIndent(&opt, "", " ")
// Receive both manually.
if _, err := nc.Request(serverStatsPingReqSubj, []byte(msg), time.Second/4); err != nats.ErrTimeout {
t.Fatalf("Error, expected timeout: %v", err)
}
})
}
}
func TestServerEventsPingStatsZFailFilter(t *testing.T) {
sa, _, sb, optsB, akp := runTrustedCluster(t)
defer sa.Shutdown()
defer sb.Shutdown()
url := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
nc, err := nats.Connect(url, createUserCreds(t, sb, akp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
// Receive both manually.
if msg, err := nc.Request(serverStatsPingReqSubj, []byte(`{MALFORMEDJSON`), time.Second/4); err != nil {
t.Fatalf("Error: %v", err)
} else {
resp := make(map[string]map[string]interface{})
if err := json.Unmarshal(msg.Data, &resp); err != nil {
t.Fatalf("Error unmarshalling the response json: %v", err)
}
if resp["error"]["code"].(float64) != http.StatusBadRequest {
t.Fatal("bad error code")
}
}
}
func TestServerEventsPingMonitorz(t *testing.T) {
sa, _, sb, optsB, akp := runTrustedCluster(t)
defer sa.Shutdown()
defer sb.Shutdown()
sysAcc, _ := akp.PublicKey()
url := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
nc, err := nats.Connect(url, createUserCreds(t, sb, akp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
nc.Flush()
tests := []struct {
endpoint string
opt interface{}
resp interface{}
respField []string
}{
{"VARZ", nil, &Varz{},
[]string{"now", "cpu", "system_account"}},
{"SUBSZ", nil, &Subsz{},
[]string{"num_subscriptions", "num_cache"}},
{"CONNZ", nil, &Connz{},
[]string{"now", "connections"}},
{"ROUTEZ", nil, &Routez{},
[]string{"now", "routes"}},
{"GATEWAYZ", nil, &Gatewayz{},
[]string{"now", "outbound_gateways", "inbound_gateways"}},
{"LEAFZ", nil, &Leafz{},
[]string{"now", "leafs"}},
{"SUBSZ", &SubszOptions{}, &Subsz{},
[]string{"num_subscriptions", "num_cache"}},
{"CONNZ", &ConnzOptions{}, &Connz{},
[]string{"now", "connections"}},
{"ROUTEZ", &RoutezOptions{}, &Routez{},
[]string{"now", "routes"}},
{"GATEWAYZ", &GatewayzOptions{}, &Gatewayz{},
[]string{"now", "outbound_gateways", "inbound_gateways"}},
{"LEAFZ", &LeafzOptions{}, &Leafz{},
[]string{"now", "leafs"}},
{"ACCOUNTZ", &AccountzOptions{}, &Accountz{},
[]string{"now", "accounts"}},
{"SUBSZ", &SubszOptions{Limit: 5}, &Subsz{},
[]string{"num_subscriptions", "num_cache"}},
{"CONNZ", &ConnzOptions{Limit: 5}, &Connz{},
[]string{"now", "connections"}},
{"ROUTEZ", &RoutezOptions{SubscriptionsDetail: true}, &Routez{},
[]string{"now", "routes"}},
{"GATEWAYZ", &GatewayzOptions{Accounts: true}, &Gatewayz{},
[]string{"now", "outbound_gateways", "inbound_gateways"}},
{"LEAFZ", &LeafzOptions{Subscriptions: true}, &Leafz{},
[]string{"now", "leafs"}},
{"ACCOUNTZ", &AccountzOptions{Account: sysAcc}, &Accountz{},
[]string{"now", "account_detail"}},
{"LEAFZ", &LeafzOptions{Account: sysAcc}, &Leafz{},
[]string{"now", "leafs"}},
{"ROUTEZ", json.RawMessage(`{"cluster":""}`), &Routez{},
[]string{"now", "routes"}},
{"ROUTEZ", json.RawMessage(`{"name":""}`), &Routez{},
[]string{"now", "routes"}},
{"ROUTEZ", json.RawMessage(`{"cluster":"TEST CLUSTER 22"}`), &Routez{},
[]string{"now", "routes"}},
{"ROUTEZ", json.RawMessage(`{"cluster":"CLUSTER"}`), &Routez{},
[]string{"now", "routes"}},
{"ROUTEZ", json.RawMessage(`{"cluster":"TEST CLUSTER 22", "subscriptions":true}`), &Routez{},
[]string{"now", "routes"}},
{"JSZ", nil, &JSzOptions{}, []string{"now", "disabled"}},
{"HEALTHZ", nil, &JSzOptions{}, []string{"status"}},
{"HEALTHZ", &HealthzOptions{JSEnabled: true}, &JSzOptions{}, []string{"status"}},
{"HEALTHZ", &HealthzOptions{JSServerOnly: true}, &JSzOptions{}, []string{"status"}},
}
for i, test := range tests {
t.Run(fmt.Sprintf("%s-%d", test.endpoint, i), func(t *testing.T) {
var opt []byte
if test.opt != nil {
opt, err = json.Marshal(test.opt)
if err != nil {
t.Fatalf("Error marshaling opts: %v", err)
}
}
reply := nc.NewRespInbox()
replySubj, _ := nc.SubscribeSync(reply)
// set a header to make sure request parsing knows to ignore them
nc.PublishMsg(&nats.Msg{
Subject: fmt.Sprintf("%s.%s", serverStatsPingReqSubj, test.endpoint),
Reply: reply,
Header: nats.Header{"header": []string{"for header sake"}},
Data: opt,
})
// Receive both manually.
msg, err := replySubj.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
response1 := make(map[string]map[string]interface{})
if err := json.Unmarshal(msg.Data, &response1); err != nil {
t.Fatalf("Error unmarshalling response1 json: %v", err)
}
serverName := ""
if response1["server"]["name"] == "A_SRV" {
serverName = "B_SRV"
} else if response1["server"]["name"] == "B_SRV" {
serverName = "A_SRV"
} else {
t.Fatalf("Error finding server in %s", string(msg.Data))
}
if resp, ok := response1["data"]; !ok {
t.Fatalf("Error finding: %s in %s",
strings.ToLower(test.endpoint), string(msg.Data))
} else {
for _, respField := range test.respField {
if _, ok := resp[respField]; !ok {
t.Fatalf("Error finding: %s in %s", respField, resp)
}
}
}
msg, err = replySubj.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
response2 := make(map[string]map[string]interface{})
if err := json.Unmarshal(msg.Data, &response2); err != nil {
t.Fatalf("Error unmarshalling the response2 json: %v", err)
}
if response2["server"]["name"] != serverName {
t.Fatalf("Error finding server %s in %s", serverName, string(msg.Data))
}
if resp, ok := response2["data"]; !ok {
t.Fatalf("Error finding: %s in %s",
strings.ToLower(test.endpoint), string(msg.Data))
} else {
for _, respField := range test.respField {
if _, ok := resp[respField]; !ok {
t.Fatalf("Error finding: %s in %s", respField, resp)
}
}
}
})
}
}
func TestGatewayNameClientInfo(t *testing.T) {
sa, _, sb, _, _ := runTrustedCluster(t)
defer sa.Shutdown()
defer sb.Shutdown()
c, _, l := newClientForServer(sa)
defer c.close()
var info Info
err := json.Unmarshal([]byte(l[5:]), &info)
if err != nil {
t.Fatalf("Could not parse INFO json: %v\n", err)
}
if info.Cluster != "TEST CLUSTER 22" {
t.Fatalf("Expected a cluster name of 'TEST CLUSTER 22', got %q", info.Cluster)
}
}
type slowAccResolver struct {
sync.Mutex
AccountResolver
acc string
}
func (sr *slowAccResolver) Fetch(name string) (string, error) {
sr.Lock()
delay := sr.acc == name
sr.Unlock()
if delay {
time.Sleep(200 * time.Millisecond)
}
return sr.AccountResolver.Fetch(name)
}
func TestConnectionUpdatesTimerProperlySet(t *testing.T) {
origEventsHBInterval := eventsHBInterval
eventsHBInterval = 50 * time.Millisecond
defer func() { eventsHBInterval = origEventsHBInterval }()
sa, _, sb, optsB, _ := runTrustedCluster(t)
defer sa.Shutdown()
defer sb.Shutdown()
// Normal Account
okp, _ := nkeys.FromSeed(oSeed)
akp, _ := nkeys.CreateAccount()
pub, _ := akp.PublicKey()
nac := jwt.NewAccountClaims(pub)
nac.Limits.Conn = 10 // set any limit...
jwt, _ := nac.Encode(okp)
addAccountToMemResolver(sa, pub, jwt)
// Listen for HB updates...
count := int32(0)
cb := func(sub *subscription, _ *client, _ *Account, subject, reply string, msg []byte) {
atomic.AddInt32(&count, 1)
}
subj := fmt.Sprintf(accConnsEventSubjNew, pub)
sub, err := sa.sysSubscribe(subj, cb)
if sub == nil || err != nil {
t.Fatalf("Expected to subscribe, got %v", err)
}
defer sa.sysUnsubscribe(sub)
url := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
nc := natsConnect(t, url, createUserCreds(t, sb, akp))
defer nc.Close()
time.Sleep(500 * time.Millisecond)
// After waiting 500ms with HB interval of 50ms, we should get
// about 10 updates, no much more
if n := atomic.LoadInt32(&count); n > 15 {
t.Fatalf("Expected about 10 updates, got %v", n)
}
// Now lookup the account doing the events on sb.
acc, _ := sb.LookupAccount(pub)
// Make sure we have the timer running.
acc.mu.RLock()
ctmr := acc.ctmr
acc.mu.RUnlock()
if ctmr == nil {
t.Fatalf("Expected event timer for acc conns to be running")
}
nc.Close()
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
// Make sure we have the timer is NOT running.
acc.mu.RLock()
ctmr = acc.ctmr
acc.mu.RUnlock()
if ctmr != nil {
return fmt.Errorf("Expected event timer for acc conns to NOT be running after reaching zero local clients")
}
return nil
})
}
func TestServerEventsReceivedByQSubs(t *testing.T) {
s, opts := runTrustedServer(t)
defer s.Shutdown()
acc, akp := createAccount(s)
s.setSystemAccount(acc)
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
ncs, err := nats.Connect(url, createUserCreds(t, s, akp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer ncs.Close()
// Listen for auth error events.
qsub, _ := ncs.QueueSubscribeSync("$SYS.SERVER.*.CLIENT.AUTH.ERR", "queue")
defer qsub.Unsubscribe()
ncs.Flush()
nats.Connect(url, nats.Name("TEST BAD LOGIN"))
m, err := qsub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Should have heard an auth error event")
}
dem := DisconnectEventMsg{}
if err := json.Unmarshal(m.Data, &dem); err != nil {
t.Fatalf("Error unmarshalling disconnect event message: %v", err)
}
if dem.Reason != "Authentication Failure" {
t.Fatalf("Expected auth error, got %q", dem.Reason)
}
}
func TestServerEventsFilteredByTag(t *testing.T) {
confA := createConfFile(t, []byte(`
listen: -1
server_name: srv-A
server_tags: ["foo", "bar"]
cluster {
name: clust
listen: -1
no_advertise: true
}
system_account: SYS
accounts: {
SYS: {
users: [
{user: b, password: b}
]
}
}
no_auth_user: b
`))
defer removeFile(t, confA)
sA, _ := RunServerWithConfig(confA)
defer sA.Shutdown()
confB := createConfFile(t, []byte(fmt.Sprintf(`
listen: -1
server_name: srv-B
server_tags: ["bar", "baz"]
cluster {
name: clust
listen: -1
no_advertise: true
routes [
nats-route://127.0.0.1:%d
]
}
system_account: SYS
accounts: {
SYS: {
users: [
{user: b, password: b}
]
}
}
no_auth_user: b
`, sA.opts.Cluster.Port)))
defer removeFile(t, confB)
sB, _ := RunServerWithConfig(confB)
defer sB.Shutdown()
checkClusterFormed(t, sA, sB)
nc := natsConnect(t, sA.ClientURL())
defer nc.Close()
ib := nats.NewInbox()
req := func(tags ...string) {
t.Helper()
r, err := json.Marshal(VarzEventOptions{EventFilterOptions: EventFilterOptions{Tags: tags}})
require_NoError(t, err)
err = nc.PublishRequest(fmt.Sprintf(serverPingReqSubj, "VARZ"), ib, r)
require_NoError(t, err)
}
msgs := make(chan *nats.Msg, 10)
defer close(msgs)
_, err := nc.ChanSubscribe(ib, msgs)
require_NoError(t, err)
req("none")
select {
case <-msgs:
t.Fatalf("no message expected")
case <-time.After(200 * time.Millisecond):
}
req("foo")
m := <-msgs
require_Contains(t, string(m.Data), "srv-A", "foo", "bar")
req("foo", "bar")
m = <-msgs
require_Contains(t, string(m.Data), "srv-A", "foo", "bar")
req("baz")
m = <-msgs
require_Contains(t, string(m.Data), "srv-B", "bar", "baz")
req("bar")
m1 := <-msgs
m2 := <-msgs
require_Contains(t, string(m1.Data)+string(m2.Data), "srv-A", "srv-B", "foo", "bar", "baz")
require_Len(t, len(msgs), 0)
}
// https://github.com/nats-io/nats-server/issues/3177
func TestServerEventsAndDQSubscribers(t *testing.T) {
c := createJetStreamClusterWithTemplate(t, jsClusterAccountsTempl, "DDQ", 3)
defer c.shutdown()
nc, err := nats.Connect(c.randomServer().ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
require_NoError(t, err)
defer nc.Close()
sub, err := nc.QueueSubscribeSync("$SYS.ACCOUNT.*.DISCONNECT", "qq")
require_NoError(t, err)
nc.Flush()
// Create and disconnect 10 random connections.
for i := 0; i < 10; i++ {
nc, err := nats.Connect(c.randomServer().ClientURL())
require_NoError(t, err)
nc.Close()
}
checkSubsPending(t, sub, 10)
}