mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
2533 lines
73 KiB
Go
2533 lines
73 KiB
Go
// Copyright 2018-2020 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package server
|
|
|
|
import (
|
|
"bytes"
|
|
"crypto/sha256"
|
|
"encoding/json"
|
|
"fmt"
|
|
"math/rand"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/nats-io/jwt/v2"
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/nats-io/nkeys"
|
|
"github.com/nats-io/nuid"
|
|
)
|
|
|
|
func createAccount(s *Server) (*Account, nkeys.KeyPair) {
|
|
okp, _ := nkeys.FromSeed(oSeed)
|
|
akp, _ := nkeys.CreateAccount()
|
|
pub, _ := akp.PublicKey()
|
|
nac := jwt.NewAccountClaims(pub)
|
|
jwt, _ := nac.Encode(okp)
|
|
addAccountToMemResolver(s, pub, jwt)
|
|
acc, err := s.LookupAccount(pub)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return acc, akp
|
|
}
|
|
|
|
func createUserCredsEx(t *testing.T, nuc *jwt.UserClaims, akp nkeys.KeyPair) nats.Option {
|
|
t.Helper()
|
|
kp, _ := nkeys.CreateUser()
|
|
nuc.Subject, _ = kp.PublicKey()
|
|
ujwt, err := nuc.Encode(akp)
|
|
if err != nil {
|
|
t.Fatalf("Error generating user JWT: %v", err)
|
|
}
|
|
userCB := func() (string, error) {
|
|
return ujwt, nil
|
|
}
|
|
sigCB := func(nonce []byte) ([]byte, error) {
|
|
sig, _ := kp.Sign(nonce)
|
|
return sig, nil
|
|
}
|
|
return nats.UserJWT(userCB, sigCB)
|
|
}
|
|
|
|
func createUserCreds(t *testing.T, s *Server, akp nkeys.KeyPair) nats.Option {
|
|
return createUserCredsEx(t, jwt.NewUserClaims("test"), akp)
|
|
}
|
|
|
|
func runTrustedServer(t *testing.T) (*Server, *Options) {
|
|
t.Helper()
|
|
opts := DefaultOptions()
|
|
kp, _ := nkeys.FromSeed(oSeed)
|
|
pub, _ := kp.PublicKey()
|
|
opts.TrustedKeys = []string{pub}
|
|
opts.AccountResolver = &MemAccResolver{}
|
|
s := RunServer(opts)
|
|
return s, opts
|
|
}
|
|
|
|
func runTrustedCluster(t *testing.T) (*Server, *Options, *Server, *Options, nkeys.KeyPair) {
|
|
t.Helper()
|
|
|
|
kp, _ := nkeys.FromSeed(oSeed)
|
|
pub, _ := kp.PublicKey()
|
|
|
|
mr := &MemAccResolver{}
|
|
|
|
// Now create a system account.
|
|
// NOTE: This can NOT be shared directly between servers.
|
|
// Set via server options.
|
|
okp, _ := nkeys.FromSeed(oSeed)
|
|
akp, _ := nkeys.CreateAccount()
|
|
apub, _ := akp.PublicKey()
|
|
nac := jwt.NewAccountClaims(apub)
|
|
jwt, _ := nac.Encode(okp)
|
|
|
|
mr.Store(apub, jwt)
|
|
|
|
optsA := DefaultOptions()
|
|
optsA.Cluster.Name = "TEST CLUSTER 22"
|
|
optsA.Cluster.Host = "127.0.0.1"
|
|
optsA.TrustedKeys = []string{pub}
|
|
optsA.AccountResolver = mr
|
|
optsA.SystemAccount = apub
|
|
optsA.ServerName = "A_SRV"
|
|
// Add in dummy gateway
|
|
optsA.Gateway.Name = "TEST CLUSTER 22"
|
|
optsA.Gateway.Host = "127.0.0.1"
|
|
optsA.Gateway.Port = -1
|
|
optsA.gatewaysSolicitDelay = 30 * time.Second
|
|
|
|
sa := RunServer(optsA)
|
|
|
|
optsB := nextServerOpts(optsA)
|
|
optsB.ServerName = "B_SRV"
|
|
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host, optsA.Cluster.Port))
|
|
sb := RunServer(optsB)
|
|
|
|
checkClusterFormed(t, sa, sb)
|
|
|
|
return sa, optsA, sb, optsB, akp
|
|
}
|
|
|
|
func runTrustedGateways(t *testing.T) (*Server, *Options, *Server, *Options, nkeys.KeyPair) {
|
|
t.Helper()
|
|
|
|
kp, _ := nkeys.FromSeed(oSeed)
|
|
pub, _ := kp.PublicKey()
|
|
|
|
mr := &MemAccResolver{}
|
|
|
|
// Now create a system account.
|
|
// NOTE: This can NOT be shared directly between servers.
|
|
// Set via server options.
|
|
okp, _ := nkeys.FromSeed(oSeed)
|
|
akp, _ := nkeys.CreateAccount()
|
|
apub, _ := akp.PublicKey()
|
|
nac := jwt.NewAccountClaims(apub)
|
|
jwt, _ := nac.Encode(okp)
|
|
|
|
mr.Store(apub, jwt)
|
|
|
|
optsA := testDefaultOptionsForGateway("A")
|
|
optsA.Cluster.Name = "A"
|
|
optsA.Cluster.Host = "127.0.0.1"
|
|
optsA.TrustedKeys = []string{pub}
|
|
optsA.AccountResolver = mr
|
|
optsA.SystemAccount = apub
|
|
|
|
sa := RunServer(optsA)
|
|
|
|
optsB := testGatewayOptionsFromToWithServers(t, "B", "A", sa)
|
|
optsB.Cluster.Name = "B"
|
|
optsB.TrustedKeys = []string{pub}
|
|
optsB.AccountResolver = mr
|
|
optsB.SystemAccount = apub
|
|
|
|
sb := RunServer(optsB)
|
|
|
|
waitForInboundGateways(t, sa, 1, time.Second)
|
|
waitForOutboundGateways(t, sa, 1, time.Second)
|
|
waitForInboundGateways(t, sb, 1, time.Second)
|
|
waitForOutboundGateways(t, sb, 1, time.Second)
|
|
|
|
return sa, optsA, sb, optsB, akp
|
|
}
|
|
|
|
func TestSystemAccount(t *testing.T) {
|
|
s, _ := runTrustedServer(t)
|
|
defer s.Shutdown()
|
|
|
|
acc, _ := createAccount(s)
|
|
s.setSystemAccount(acc)
|
|
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if s.sys == nil || s.sys.account == nil {
|
|
t.Fatalf("Expected sys.account to be non-nil")
|
|
}
|
|
if s.sys.client == nil {
|
|
t.Fatalf("Expected sys.client to be non-nil")
|
|
}
|
|
|
|
s.sys.client.mu.Lock()
|
|
defer s.sys.client.mu.Unlock()
|
|
if s.sys.client.echo {
|
|
t.Fatalf("Internal clients should always have echo false")
|
|
}
|
|
}
|
|
|
|
func TestSystemAccountNewConnection(t *testing.T) {
|
|
s, opts := runTrustedServer(t)
|
|
defer s.Shutdown()
|
|
|
|
acc, akp := createAccount(s)
|
|
s.setSystemAccount(acc)
|
|
|
|
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
|
ncs, err := nats.Connect(url, createUserCreds(t, s, akp))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer ncs.Close()
|
|
|
|
// We may not be able to hear ourselves (if the event is processed
|
|
// before we create the sub), so we need to create a second client to
|
|
// trigger the connect/disconnect events.
|
|
acc2, akp2 := createAccount(s)
|
|
|
|
// Be explicit to only receive the event for acc2.
|
|
sub, _ := ncs.SubscribeSync(fmt.Sprintf("$SYS.ACCOUNT.%s.>", acc2.Name))
|
|
defer sub.Unsubscribe()
|
|
ncs.Flush()
|
|
|
|
nc, err := nats.Connect(url, createUserCreds(t, s, akp2), nats.Name("TEST EVENTS"))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
msg, err := sub.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Error receiving msg: %v", err)
|
|
}
|
|
connsMsg, err := sub.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Error receiving msg: %v", err)
|
|
}
|
|
if strings.HasPrefix(msg.Subject, fmt.Sprintf("$SYS.ACCOUNT.%s.SERVER.CONNS", acc2.Name)) {
|
|
msg, connsMsg = connsMsg, msg
|
|
}
|
|
if !strings.HasPrefix(connsMsg.Subject, fmt.Sprintf("$SYS.ACCOUNT.%s.SERVER.CONNS", acc2.Name)) {
|
|
t.Fatalf("Expected subject to start with %q, got %q", "$SYS.ACCOUNT.<account>.CONNECT", msg.Subject)
|
|
}
|
|
conns := AccountNumConns{}
|
|
if err := json.Unmarshal(connsMsg.Data, &conns); err != nil {
|
|
t.Fatalf("Error unmarshalling conns event message: %v", err)
|
|
} else if conns.Account != acc2.Name {
|
|
t.Fatalf("Wrong account in conns message: %v", conns)
|
|
} else if conns.Conns != 1 || conns.TotalConns != 1 || conns.LeafNodes != 0 {
|
|
t.Fatalf("Wrong counts in conns message: %v", conns)
|
|
}
|
|
if !strings.HasPrefix(msg.Subject, fmt.Sprintf("$SYS.ACCOUNT.%s.CONNECT", acc2.Name)) {
|
|
t.Fatalf("Expected subject to start with %q, got %q", "$SYS.ACCOUNT.<account>.CONNECT", msg.Subject)
|
|
}
|
|
tokens := strings.Split(msg.Subject, ".")
|
|
if len(tokens) < 4 {
|
|
t.Fatalf("Expected 4 tokens, got %d", len(tokens))
|
|
}
|
|
account := tokens[2]
|
|
if account != acc2.Name {
|
|
t.Fatalf("Expected %q for account, got %q", acc2.Name, account)
|
|
}
|
|
|
|
cem := ConnectEventMsg{}
|
|
if err := json.Unmarshal(msg.Data, &cem); err != nil {
|
|
t.Fatalf("Error unmarshalling connect event message: %v", err)
|
|
}
|
|
if cem.Type != ConnectEventMsgType {
|
|
t.Fatalf("Incorrect schema in connect event: %s", cem.Type)
|
|
}
|
|
if cem.Time.IsZero() {
|
|
t.Fatalf("Event time is not set")
|
|
}
|
|
if len(cem.ID) != 22 {
|
|
t.Fatalf("Event ID is incorrectly set to len %d", len(cem.ID))
|
|
}
|
|
if cem.Server.ID != s.ID() {
|
|
t.Fatalf("Expected server to be %q, got %q", s.ID(), cem.Server.ID)
|
|
}
|
|
if cem.Server.Seq == 0 {
|
|
t.Fatalf("Expected sequence to be non-zero")
|
|
}
|
|
if cem.Client.Name != "TEST EVENTS" {
|
|
t.Fatalf("Expected client name to be %q, got %q", "TEST EVENTS", cem.Client.Name)
|
|
}
|
|
if cem.Client.Lang != "go" {
|
|
t.Fatalf("Expected client lang to be \"go\", got %q", cem.Client.Lang)
|
|
}
|
|
|
|
// Now close the other client. Should fire a disconnect event.
|
|
// First send and receive some messages.
|
|
sub2, _ := nc.SubscribeSync("foo")
|
|
defer sub2.Unsubscribe()
|
|
sub3, _ := nc.SubscribeSync("*")
|
|
defer sub3.Unsubscribe()
|
|
|
|
for i := 0; i < 10; i++ {
|
|
nc.Publish("foo", []byte("HELLO WORLD"))
|
|
}
|
|
nc.Flush()
|
|
nc.Close()
|
|
|
|
msg, err = sub.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Error receiving msg: %v", err)
|
|
}
|
|
connsMsg, err = sub.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Error receiving msg: %v", err)
|
|
}
|
|
if strings.HasPrefix(msg.Subject, fmt.Sprintf("$SYS.ACCOUNT.%s.SERVER.CONNS", acc2.Name)) {
|
|
msg, connsMsg = connsMsg, msg
|
|
}
|
|
if !strings.HasPrefix(connsMsg.Subject, fmt.Sprintf("$SYS.ACCOUNT.%s.SERVER.CONNS", acc2.Name)) {
|
|
t.Fatalf("Expected subject to start with %q, got %q", "$SYS.ACCOUNT.<account>.CONNECT", msg.Subject)
|
|
} else if !strings.Contains(string(connsMsg.Data), `"total_conns":0`) {
|
|
t.Fatalf("Expected event to reflect created connection, got: %s", string(connsMsg.Data))
|
|
}
|
|
conns = AccountNumConns{}
|
|
if err := json.Unmarshal(connsMsg.Data, &conns); err != nil {
|
|
t.Fatalf("Error unmarshalling conns event message: %v", err)
|
|
} else if conns.Account != acc2.Name {
|
|
t.Fatalf("Wrong account in conns message: %v", conns)
|
|
} else if conns.Conns != 0 || conns.TotalConns != 0 || conns.LeafNodes != 0 {
|
|
t.Fatalf("Wrong counts in conns message: %v", conns)
|
|
}
|
|
if !strings.HasPrefix(msg.Subject, fmt.Sprintf("$SYS.ACCOUNT.%s.DISCONNECT", acc2.Name)) {
|
|
t.Fatalf("Expected subject to start with %q, got %q", "$SYS.ACCOUNT.<account>.DISCONNECT", msg.Subject)
|
|
}
|
|
tokens = strings.Split(msg.Subject, ".")
|
|
if len(tokens) < 4 {
|
|
t.Fatalf("Expected 4 tokens, got %d", len(tokens))
|
|
}
|
|
account = tokens[2]
|
|
if account != acc2.Name {
|
|
t.Fatalf("Expected %q for account, got %q", acc2.Name, account)
|
|
}
|
|
|
|
dem := DisconnectEventMsg{}
|
|
if err := json.Unmarshal(msg.Data, &dem); err != nil {
|
|
t.Fatalf("Error unmarshalling disconnect event message: %v", err)
|
|
}
|
|
if dem.Type != DisconnectEventMsgType {
|
|
t.Fatalf("Incorrect schema in connect event: %s", cem.Type)
|
|
}
|
|
if dem.Time.IsZero() {
|
|
t.Fatalf("Event time is not set")
|
|
}
|
|
if len(dem.ID) != 22 {
|
|
t.Fatalf("Event ID is incorrectly set to len %d", len(cem.ID))
|
|
}
|
|
if dem.Server.ID != s.ID() {
|
|
t.Fatalf("Expected server to be %q, got %q", s.ID(), dem.Server.ID)
|
|
}
|
|
if dem.Server.Seq == 0 {
|
|
t.Fatalf("Expected sequence to be non-zero")
|
|
}
|
|
if dem.Server.Seq <= cem.Server.Seq {
|
|
t.Fatalf("Expected sequence to be increasing")
|
|
}
|
|
|
|
if cem.Client.Name != "TEST EVENTS" {
|
|
t.Fatalf("Expected client name to be %q, got %q", "TEST EVENTS", dem.Client.Name)
|
|
}
|
|
if dem.Client.Lang != "go" {
|
|
t.Fatalf("Expected client lang to be \"go\", got %q", dem.Client.Lang)
|
|
}
|
|
|
|
if dem.Sent.Msgs != 10 {
|
|
t.Fatalf("Expected 10 msgs sent, got %d", dem.Sent.Msgs)
|
|
}
|
|
if dem.Sent.Bytes != 110 {
|
|
t.Fatalf("Expected 110 bytes sent, got %d", dem.Sent.Bytes)
|
|
}
|
|
if dem.Received.Msgs != 20 {
|
|
t.Fatalf("Expected 20 msgs received, got %d", dem.Sent.Msgs)
|
|
}
|
|
if dem.Received.Bytes != 220 {
|
|
t.Fatalf("Expected 220 bytes sent, got %d", dem.Sent.Bytes)
|
|
}
|
|
}
|
|
|
|
func runTrustedLeafServer(t *testing.T) (*Server, *Options) {
|
|
t.Helper()
|
|
opts := DefaultOptions()
|
|
kp, _ := nkeys.FromSeed(oSeed)
|
|
pub, _ := kp.PublicKey()
|
|
opts.TrustedKeys = []string{pub}
|
|
opts.AccountResolver = &MemAccResolver{}
|
|
opts.LeafNode.Port = -1
|
|
s := RunServer(opts)
|
|
return s, opts
|
|
}
|
|
|
|
func genCredsFile(t *testing.T, jwt string, seed []byte) string {
|
|
creds := `
|
|
-----BEGIN NATS USER JWT-----
|
|
%s
|
|
------END NATS USER JWT------
|
|
|
|
************************* IMPORTANT *************************
|
|
NKEY Seed printed below can be used to sign and prove identity.
|
|
NKEYs are sensitive and should be treated as secrets.
|
|
|
|
-----BEGIN USER NKEY SEED-----
|
|
%s
|
|
------END USER NKEY SEED------
|
|
|
|
*************************************************************
|
|
`
|
|
return createConfFile(t, []byte(strings.Replace(fmt.Sprintf(creds, jwt, seed), "\t\t", "", -1)))
|
|
}
|
|
|
|
func runSolicitWithCredentials(t *testing.T, opts *Options, creds string) (*Server, *Options, string) {
|
|
content := `
|
|
port: -1
|
|
leafnodes {
|
|
remotes = [
|
|
{
|
|
url: nats-leaf://127.0.0.1:%d
|
|
credentials: '%s'
|
|
}
|
|
]
|
|
}
|
|
`
|
|
config := fmt.Sprintf(content, opts.LeafNode.Port, creds)
|
|
conf := createConfFile(t, []byte(config))
|
|
s, opts := RunServerWithConfig(conf)
|
|
return s, opts, conf
|
|
}
|
|
|
|
// Helper function to check that a leaf node has connected to our server.
|
|
func checkLeafNodeConnected(t testing.TB, s *Server) {
|
|
t.Helper()
|
|
checkLeafNodeConnectedCount(t, s, 1)
|
|
}
|
|
|
|
// Helper function to check that a leaf node has connected to n server.
|
|
func checkLeafNodeConnectedCount(t testing.TB, s *Server, lnCons int) {
|
|
t.Helper()
|
|
checkFor(t, 5*time.Second, 15*time.Millisecond, func() error {
|
|
if nln := s.NumLeafNodes(); nln != lnCons {
|
|
return fmt.Errorf("Expected %d connected leafnode(s) for server %q, got %d",
|
|
lnCons, s.ID(), nln)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func TestSystemAccountingWithLeafNodes(t *testing.T) {
|
|
s, opts := runTrustedLeafServer(t)
|
|
defer s.Shutdown()
|
|
|
|
acc, akp := createAccount(s)
|
|
s.setSystemAccount(acc)
|
|
|
|
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
|
ncs, err := nats.Connect(url, createUserCreds(t, s, akp))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer ncs.Close()
|
|
|
|
acc2, akp2 := createAccount(s)
|
|
|
|
// Be explicit to only receive the event for acc2 account.
|
|
sub, _ := ncs.SubscribeSync(fmt.Sprintf("$SYS.ACCOUNT.%s.DISCONNECT", acc2.Name))
|
|
defer sub.Unsubscribe()
|
|
ncs.Flush()
|
|
|
|
kp, _ := nkeys.CreateUser()
|
|
pub, _ := kp.PublicKey()
|
|
nuc := jwt.NewUserClaims(pub)
|
|
ujwt, err := nuc.Encode(akp2)
|
|
if err != nil {
|
|
t.Fatalf("Error generating user JWT: %v", err)
|
|
}
|
|
seed, _ := kp.Seed()
|
|
mycreds := genCredsFile(t, ujwt, seed)
|
|
|
|
// Create a server that solicits a leafnode connection.
|
|
sl, slopts, _ := runSolicitWithCredentials(t, opts, mycreds)
|
|
defer sl.Shutdown()
|
|
|
|
checkLeafNodeConnected(t, s)
|
|
|
|
// Compute the expected number of subs on "sl" based on number
|
|
// of existing subs before creating the sub on "s".
|
|
expected := int(sl.NumSubscriptions() + 1)
|
|
|
|
nc, err := nats.Connect(url, createUserCreds(t, s, akp2), nats.Name("TEST LEAFNODE EVENTS"))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
fooSub := natsSubSync(t, nc, "foo")
|
|
natsFlush(t, nc)
|
|
|
|
checkExpectedSubs(t, expected, sl)
|
|
|
|
surl := fmt.Sprintf("nats://%s:%d", slopts.Host, slopts.Port)
|
|
nc2, err := nats.Connect(surl, nats.Name("TEST LEAFNODE EVENTS"))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc2.Close()
|
|
|
|
// Compute the expected number of subs on "s" based on number
|
|
// of existing subs before creating the sub on "sl".
|
|
expected = int(s.NumSubscriptions() + 1)
|
|
|
|
m := []byte("HELLO WORLD")
|
|
|
|
// Now generate some traffic
|
|
starSub := natsSubSync(t, nc2, "*")
|
|
for i := 0; i < 10; i++ {
|
|
nc2.Publish("foo", m)
|
|
nc2.Publish("bar", m)
|
|
}
|
|
natsFlush(t, nc2)
|
|
|
|
checkExpectedSubs(t, expected, s)
|
|
|
|
// Now send some from the cluster side too.
|
|
for i := 0; i < 10; i++ {
|
|
nc.Publish("foo", m)
|
|
nc.Publish("bar", m)
|
|
}
|
|
nc.Flush()
|
|
|
|
// Make sure all messages are received
|
|
for i := 0; i < 20; i++ {
|
|
if _, err := fooSub.NextMsg(time.Second); err != nil {
|
|
t.Fatalf("Did not get message: %v", err)
|
|
}
|
|
}
|
|
for i := 0; i < 40; i++ {
|
|
if _, err := starSub.NextMsg(time.Second); err != nil {
|
|
t.Fatalf("Did not get message: %v", err)
|
|
}
|
|
}
|
|
|
|
// Now shutdown the leafnode server since this is where the event tracking should
|
|
// happen. Right now we do not track local clients to the leafnode server that
|
|
// solicited to the cluster, but we should track usage once the leafnode connection stops.
|
|
sl.Shutdown()
|
|
|
|
// Make sure we get disconnect event and that tracking is correct.
|
|
msg, err := sub.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Error receiving msg: %v", err)
|
|
}
|
|
|
|
dem := DisconnectEventMsg{}
|
|
if err := json.Unmarshal(msg.Data, &dem); err != nil {
|
|
t.Fatalf("Error unmarshalling disconnect event message: %v", err)
|
|
}
|
|
if dem.Sent.Msgs != 10 {
|
|
t.Fatalf("Expected 10 msgs sent, got %d", dem.Sent.Msgs)
|
|
}
|
|
if dem.Sent.Bytes != 110 {
|
|
t.Fatalf("Expected 110 bytes sent, got %d", dem.Sent.Bytes)
|
|
}
|
|
if dem.Received.Msgs != 20 {
|
|
t.Fatalf("Expected 20 msgs received, got %d", dem.Received.Msgs)
|
|
}
|
|
if dem.Received.Bytes != 220 {
|
|
t.Fatalf("Expected 220 bytes sent, got %d", dem.Received.Bytes)
|
|
}
|
|
}
|
|
|
|
func TestSystemAccountDisconnectBadLogin(t *testing.T) {
|
|
s, opts := runTrustedServer(t)
|
|
defer s.Shutdown()
|
|
|
|
acc, akp := createAccount(s)
|
|
s.setSystemAccount(acc)
|
|
|
|
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
|
ncs, err := nats.Connect(url, createUserCreds(t, s, akp))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer ncs.Close()
|
|
|
|
// We should never hear $G account events for bad logins.
|
|
sub, _ := ncs.SubscribeSync("$SYS.ACCOUNT.$G.*")
|
|
defer sub.Unsubscribe()
|
|
|
|
// Listen for auth error events though.
|
|
asub, _ := ncs.SubscribeSync("$SYS.SERVER.*.CLIENT.AUTH.ERR")
|
|
defer asub.Unsubscribe()
|
|
|
|
ncs.Flush()
|
|
|
|
nats.Connect(url, nats.Name("TEST BAD LOGIN"))
|
|
|
|
// Should not hear these.
|
|
if _, err := sub.NextMsg(100 * time.Millisecond); err == nil {
|
|
t.Fatalf("Received a disconnect message from bad login, expected none")
|
|
}
|
|
|
|
m, err := asub.NextMsg(100 * time.Millisecond)
|
|
if err != nil {
|
|
t.Fatalf("Should have heard an auth error event")
|
|
}
|
|
dem := DisconnectEventMsg{}
|
|
if err := json.Unmarshal(m.Data, &dem); err != nil {
|
|
t.Fatalf("Error unmarshalling disconnect event message: %v", err)
|
|
}
|
|
if dem.Reason != "Authentication Failure" {
|
|
t.Fatalf("Expected auth error, got %q", dem.Reason)
|
|
}
|
|
}
|
|
|
|
func TestSysSubscribeRace(t *testing.T) {
|
|
s, opts := runTrustedServer(t)
|
|
defer s.Shutdown()
|
|
|
|
acc, akp := createAccount(s)
|
|
s.setSystemAccount(acc)
|
|
|
|
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
|
|
|
nc, err := nats.Connect(url, createUserCreds(t, s, akp))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
done := make(chan struct{})
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for {
|
|
nc.Publish("foo", []byte("hello"))
|
|
select {
|
|
case <-done:
|
|
return
|
|
default:
|
|
}
|
|
}
|
|
}()
|
|
|
|
time.Sleep(10 * time.Millisecond)
|
|
|
|
received := make(chan struct{})
|
|
// Create message callback handler.
|
|
cb := func(sub *subscription, producer *client, _ *Account, subject, reply string, msg []byte) {
|
|
select {
|
|
case received <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
// Now create an internal subscription
|
|
sub, err := s.sysSubscribe("foo", cb)
|
|
if sub == nil || err != nil {
|
|
t.Fatalf("Expected to subscribe, got %v", err)
|
|
}
|
|
select {
|
|
case <-received:
|
|
close(done)
|
|
case <-time.After(time.Second):
|
|
t.Fatalf("Did not receive the message")
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
func TestSystemAccountInternalSubscriptions(t *testing.T) {
|
|
s, opts := runTrustedServer(t)
|
|
defer s.Shutdown()
|
|
|
|
sub, err := s.sysSubscribe("foo", nil)
|
|
if sub != nil || err != ErrNoSysAccount {
|
|
t.Fatalf("Expected to get proper error, got %v", err)
|
|
}
|
|
|
|
acc, akp := createAccount(s)
|
|
s.setSystemAccount(acc)
|
|
|
|
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
|
|
|
nc, err := nats.Connect(url, createUserCreds(t, s, akp))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
sub, err = s.sysSubscribe("foo", nil)
|
|
if sub != nil || err == nil {
|
|
t.Fatalf("Expected to get error for no handler, got %v", err)
|
|
}
|
|
|
|
received := make(chan *nats.Msg)
|
|
// Create message callback handler.
|
|
cb := func(sub *subscription, _ *client, _ *Account, subject, reply string, msg []byte) {
|
|
copy := append([]byte(nil), msg...)
|
|
received <- &nats.Msg{Subject: subject, Reply: reply, Data: copy}
|
|
}
|
|
|
|
// Now create an internal subscription
|
|
sub, err = s.sysSubscribe("foo", cb)
|
|
if sub == nil || err != nil {
|
|
t.Fatalf("Expected to subscribe, got %v", err)
|
|
}
|
|
// Now send out a message from our normal client.
|
|
nc.Publish("foo", []byte("HELLO WORLD"))
|
|
|
|
var msg *nats.Msg
|
|
|
|
select {
|
|
case msg = <-received:
|
|
if msg.Subject != "foo" {
|
|
t.Fatalf("Expected \"foo\" as subject, got %q", msg.Subject)
|
|
}
|
|
if msg.Reply != "" {
|
|
t.Fatalf("Expected no reply, got %q", msg.Reply)
|
|
}
|
|
if !bytes.Equal(msg.Data, []byte("HELLO WORLD")) {
|
|
t.Fatalf("Got the wrong msg payload: %q", msg.Data)
|
|
}
|
|
break
|
|
case <-time.After(time.Second):
|
|
t.Fatalf("Did not receive the message")
|
|
}
|
|
s.sysUnsubscribe(sub)
|
|
|
|
// Now send out a message from our normal client.
|
|
// We should not see this one.
|
|
nc.Publish("foo", []byte("You There?"))
|
|
|
|
select {
|
|
case <-received:
|
|
t.Fatalf("Received a message when we should not have")
|
|
case <-time.After(100 * time.Millisecond):
|
|
break
|
|
}
|
|
|
|
// Now make sure we do not hear ourselves. We optimize this for internally
|
|
// generated messages.
|
|
s.mu.Lock()
|
|
s.sendInternalMsg("foo", "", nil, msg.Data)
|
|
s.mu.Unlock()
|
|
|
|
select {
|
|
case <-received:
|
|
t.Fatalf("Received a message when we should not have")
|
|
case <-time.After(100 * time.Millisecond):
|
|
break
|
|
}
|
|
}
|
|
|
|
func TestSystemAccountConnectionUpdatesStopAfterNoLocal(t *testing.T) {
|
|
sa, _, sb, optsB, _ := runTrustedCluster(t)
|
|
defer sa.Shutdown()
|
|
defer sb.Shutdown()
|
|
|
|
// Normal Account
|
|
okp, _ := nkeys.FromSeed(oSeed)
|
|
akp, _ := nkeys.CreateAccount()
|
|
pub, _ := akp.PublicKey()
|
|
nac := jwt.NewAccountClaims(pub)
|
|
nac.Limits.Conn = 4 // Limit to 4 connections.
|
|
jwt, _ := nac.Encode(okp)
|
|
|
|
addAccountToMemResolver(sa, pub, jwt)
|
|
|
|
// Listen for updates to the new account connection activity.
|
|
received := make(chan *nats.Msg, 10)
|
|
cb := func(sub *subscription, _ *client, _ *Account, subject, reply string, msg []byte) {
|
|
copy := append([]byte(nil), msg...)
|
|
received <- &nats.Msg{Subject: subject, Reply: reply, Data: copy}
|
|
}
|
|
subj := fmt.Sprintf(accConnsEventSubjNew, pub)
|
|
sub, err := sa.sysSubscribe(subj, cb)
|
|
if sub == nil || err != nil {
|
|
t.Fatalf("Expected to subscribe, got %v", err)
|
|
}
|
|
defer sa.sysUnsubscribe(sub)
|
|
|
|
// Create a few users on the new account.
|
|
clients := []*nats.Conn{}
|
|
|
|
url := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
|
|
for i := 0; i < 4; i++ {
|
|
nc, err := nats.Connect(url, createUserCreds(t, sb, akp))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
clients = append(clients, nc)
|
|
}
|
|
|
|
// Wait for all 4 notifications.
|
|
checkFor(t, time.Second, 50*time.Millisecond, func() error {
|
|
if len(received) == 4 {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("Not enough messages, %d vs 4", len(received))
|
|
})
|
|
|
|
// Now lookup the account doing the events on sb.
|
|
acc, _ := sb.LookupAccount(pub)
|
|
// Make sure we have the timer running.
|
|
acc.mu.RLock()
|
|
ctmr := acc.ctmr
|
|
acc.mu.RUnlock()
|
|
if ctmr == nil {
|
|
t.Fatalf("Expected event timer for acc conns to be running")
|
|
}
|
|
|
|
// Now close all of the connections.
|
|
for _, nc := range clients {
|
|
nc.Close()
|
|
}
|
|
|
|
// Wait for the 4 new notifications, 8 total (4 for connect, 4 for disconnect)
|
|
checkFor(t, time.Second, 50*time.Millisecond, func() error {
|
|
if len(received) == 8 {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("Not enough messages, %d vs 4", len(received))
|
|
})
|
|
// Drain the messages.
|
|
for i := 0; i < 7; i++ {
|
|
<-received
|
|
}
|
|
// Check last one.
|
|
msg := <-received
|
|
m := AccountNumConns{}
|
|
if err := json.Unmarshal(msg.Data, &m); err != nil {
|
|
t.Fatalf("Error unmarshalling account connections request message: %v", err)
|
|
}
|
|
if m.Conns != 0 {
|
|
t.Fatalf("Expected Conns to be 0, got %d", m.Conns)
|
|
}
|
|
|
|
// Should not receive any more messages..
|
|
select {
|
|
case <-received:
|
|
t.Fatalf("Did not expect a message here")
|
|
case <-time.After(50 * time.Millisecond):
|
|
break
|
|
}
|
|
|
|
// Make sure we have the timer is NOT running.
|
|
acc.mu.RLock()
|
|
ctmr = acc.ctmr
|
|
acc.mu.RUnlock()
|
|
if ctmr != nil {
|
|
t.Fatalf("Expected event timer for acc conns to NOT be running after reaching zero local clients")
|
|
}
|
|
}
|
|
|
|
func TestSystemAccountConnectionLimits(t *testing.T) {
|
|
sa, optsA, sb, optsB, _ := runTrustedCluster(t)
|
|
defer sa.Shutdown()
|
|
defer sb.Shutdown()
|
|
|
|
// We want to test that we are limited to a certain number of active connections
|
|
// across multiple servers.
|
|
|
|
// Let's create a user account.
|
|
okp, _ := nkeys.FromSeed(oSeed)
|
|
akp, _ := nkeys.CreateAccount()
|
|
pub, _ := akp.PublicKey()
|
|
nac := jwt.NewAccountClaims(pub)
|
|
nac.Limits.Conn = 4 // Limit to 4 connections.
|
|
jwt, _ := nac.Encode(okp)
|
|
|
|
addAccountToMemResolver(sa, pub, jwt)
|
|
|
|
urlA := fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port)
|
|
urlB := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
|
|
|
|
// Create a user on each server. Break on first failure.
|
|
for {
|
|
nca1, err := nats.Connect(urlA, createUserCreds(t, sa, akp))
|
|
if err != nil {
|
|
break
|
|
}
|
|
defer nca1.Close()
|
|
ncb1, err := nats.Connect(urlB, createUserCreds(t, sb, akp))
|
|
if err != nil {
|
|
break
|
|
}
|
|
defer ncb1.Close()
|
|
}
|
|
|
|
checkFor(t, 5*time.Second, 50*time.Millisecond, func() error {
|
|
total := sa.NumClients() + sb.NumClients()
|
|
if total > int(nac.Limits.Conn) {
|
|
return fmt.Errorf("Expected only %d connections, was allowed to connect %d", nac.Limits.Conn, total)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func TestBadAccountUpdate(t *testing.T) {
|
|
sa, _ := runTrustedServer(t)
|
|
defer sa.Shutdown()
|
|
akp1, _ := nkeys.CreateAccount()
|
|
pub, _ := akp1.PublicKey()
|
|
nac := jwt.NewAccountClaims(pub)
|
|
ajwt1, err := nac.Encode(oKp)
|
|
require_NoError(t, err)
|
|
addAccountToMemResolver(sa, pub, ajwt1)
|
|
akp2, _ := nkeys.CreateAccount()
|
|
pub2, _ := akp2.PublicKey()
|
|
nac.Subject = pub2 // maliciously use a different subject but pretend to remain pub
|
|
ajwt2, err := nac.Encode(oKp)
|
|
require_NoError(t, err)
|
|
acc, err := sa.fetchAccount(pub)
|
|
require_NoError(t, err)
|
|
if err := sa.updateAccountWithClaimJWT(acc, ajwt2); err != ErrAccountValidation {
|
|
t.Fatalf("expected %v but got %v", ErrAccountValidation, err)
|
|
}
|
|
}
|
|
|
|
// Make sure connection limits apply to the system account itself.
|
|
func TestSystemAccountSystemConnectionLimitsHonored(t *testing.T) {
|
|
sa, optsA, sb, optsB, sakp := runTrustedCluster(t)
|
|
defer sa.Shutdown()
|
|
defer sb.Shutdown()
|
|
|
|
okp, _ := nkeys.FromSeed(oSeed)
|
|
// Update system account to have 10 connections
|
|
pub, _ := sakp.PublicKey()
|
|
nac := jwt.NewAccountClaims(pub)
|
|
nac.Limits.Conn = 10
|
|
ajwt, _ := nac.Encode(okp)
|
|
|
|
addAccountToMemResolver(sa, pub, ajwt)
|
|
addAccountToMemResolver(sb, pub, ajwt)
|
|
|
|
// Update the accounts on each server with new claims to force update.
|
|
sysAccA := sa.SystemAccount()
|
|
sa.updateAccountWithClaimJWT(sysAccA, ajwt)
|
|
sysAccB := sb.SystemAccount()
|
|
sb.updateAccountWithClaimJWT(sysAccB, ajwt)
|
|
|
|
// Check system here first, with no external it should be zero.
|
|
sacc := sa.SystemAccount()
|
|
if nlc := sacc.NumLocalConnections(); nlc != 0 {
|
|
t.Fatalf("Expected no local connections, got %d", nlc)
|
|
}
|
|
|
|
urlA := fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port)
|
|
urlB := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
|
|
|
|
// Create a user on each server. Break on first failure.
|
|
tc := 0
|
|
for {
|
|
nca1, err := nats.Connect(urlA, createUserCreds(t, sa, sakp))
|
|
if err != nil {
|
|
break
|
|
}
|
|
defer nca1.Close()
|
|
tc++
|
|
|
|
ncb1, err := nats.Connect(urlB, createUserCreds(t, sb, sakp))
|
|
if err != nil {
|
|
break
|
|
}
|
|
defer ncb1.Close()
|
|
tc++
|
|
|
|
// The account's connection count is exchanged between servers
|
|
// so that the local count on each server reflects the total count.
|
|
// Pause a bit to give a chance to each server to process the update.
|
|
time.Sleep(15 * time.Millisecond)
|
|
}
|
|
if tc != 10 {
|
|
t.Fatalf("Expected to get 10 external connections, got %d", tc)
|
|
}
|
|
|
|
checkFor(t, 1*time.Second, 50*time.Millisecond, func() error {
|
|
total := sa.NumClients() + sb.NumClients()
|
|
if total > int(nac.Limits.Conn) {
|
|
return fmt.Errorf("Expected only %d connections, was allowed to connect %d", nac.Limits.Conn, total)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// Test that the remote accounting works when a server is started some time later.
|
|
func TestSystemAccountConnectionLimitsServersStaggered(t *testing.T) {
|
|
sa, optsA, sb, optsB, _ := runTrustedCluster(t)
|
|
defer sa.Shutdown()
|
|
sb.Shutdown()
|
|
|
|
// Let's create a user account.
|
|
okp, _ := nkeys.FromSeed(oSeed)
|
|
akp, _ := nkeys.CreateAccount()
|
|
pub, _ := akp.PublicKey()
|
|
nac := jwt.NewAccountClaims(pub)
|
|
nac.Limits.Conn = 4 // Limit to 4 connections.
|
|
jwt, _ := nac.Encode(okp)
|
|
|
|
addAccountToMemResolver(sa, pub, jwt)
|
|
|
|
urlA := fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port)
|
|
// Create max connections on sa.
|
|
for i := 0; i < int(nac.Limits.Conn); i++ {
|
|
nc, err := nats.Connect(urlA, createUserCreds(t, sa, akp))
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error on #%d try: %v", i+1, err)
|
|
}
|
|
defer nc.Close()
|
|
}
|
|
|
|
// Restart server B.
|
|
optsB.AccountResolver = sa.AccountResolver()
|
|
optsB.SystemAccount = sa.SystemAccount().Name
|
|
sb = RunServer(optsB)
|
|
defer sb.Shutdown()
|
|
checkClusterFormed(t, sa, sb)
|
|
|
|
// Trigger a load of the user account on the new server
|
|
// NOTE: If we do not load the user, the user can be the first
|
|
// to request this account, hence the connection will succeed.
|
|
checkFor(t, time.Second, 15*time.Millisecond, func() error {
|
|
if acc, err := sb.LookupAccount(pub); acc == nil || err != nil {
|
|
return fmt.Errorf("LookupAccount did not return account or failed, err=%v", err)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// Expect this to fail.
|
|
urlB := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
|
|
if _, err := nats.Connect(urlB, createUserCreds(t, sb, akp)); err == nil {
|
|
t.Fatalf("Expected connection to fail due to max limit")
|
|
}
|
|
}
|
|
|
|
// Test that the remote accounting works when a server is shutdown.
|
|
func TestSystemAccountConnectionLimitsServerShutdownGraceful(t *testing.T) {
|
|
sa, optsA, sb, optsB, _ := runTrustedCluster(t)
|
|
defer sa.Shutdown()
|
|
defer sb.Shutdown()
|
|
|
|
// Let's create a user account.
|
|
okp, _ := nkeys.FromSeed(oSeed)
|
|
akp, _ := nkeys.CreateAccount()
|
|
pub, _ := akp.PublicKey()
|
|
nac := jwt.NewAccountClaims(pub)
|
|
nac.Limits.Conn = 10 // Limit to 10 connections.
|
|
jwt, _ := nac.Encode(okp)
|
|
|
|
addAccountToMemResolver(sa, pub, jwt)
|
|
addAccountToMemResolver(sb, pub, jwt)
|
|
|
|
urlA := fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port)
|
|
urlB := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
|
|
|
|
for i := 0; i < 5; i++ {
|
|
nc, err := nats.Connect(urlA, nats.NoReconnect(), createUserCreds(t, sa, akp))
|
|
if err != nil {
|
|
t.Fatalf("Expected to connect, got %v", err)
|
|
}
|
|
defer nc.Close()
|
|
nc, err = nats.Connect(urlB, nats.NoReconnect(), createUserCreds(t, sb, akp))
|
|
if err != nil {
|
|
t.Fatalf("Expected to connect, got %v", err)
|
|
}
|
|
defer nc.Close()
|
|
}
|
|
|
|
// We are at capacity so both of these should fail.
|
|
if _, err := nats.Connect(urlA, createUserCreds(t, sa, akp)); err == nil {
|
|
t.Fatalf("Expected connection to fail due to max limit")
|
|
}
|
|
if _, err := nats.Connect(urlB, createUserCreds(t, sb, akp)); err == nil {
|
|
t.Fatalf("Expected connection to fail due to max limit")
|
|
}
|
|
|
|
// Now shutdown Server B.
|
|
sb.Shutdown()
|
|
|
|
// Now we should be able to create more on A now.
|
|
for i := 0; i < 5; i++ {
|
|
nc, err := nats.Connect(urlA, createUserCreds(t, sa, akp))
|
|
if err != nil {
|
|
t.Fatalf("Expected to connect on %d, got %v", i, err)
|
|
}
|
|
defer nc.Close()
|
|
}
|
|
}
|
|
|
|
// Test that the remote accounting works when a server goes away.
|
|
func TestSystemAccountConnectionLimitsServerShutdownForced(t *testing.T) {
|
|
sa, optsA, sb, optsB, _ := runTrustedCluster(t)
|
|
defer sa.Shutdown()
|
|
|
|
// Let's create a user account.
|
|
okp, _ := nkeys.FromSeed(oSeed)
|
|
akp, _ := nkeys.CreateAccount()
|
|
pub, _ := akp.PublicKey()
|
|
nac := jwt.NewAccountClaims(pub)
|
|
nac.Limits.Conn = 20 // Limit to 20 connections.
|
|
jwt, _ := nac.Encode(okp)
|
|
|
|
addAccountToMemResolver(sa, pub, jwt)
|
|
addAccountToMemResolver(sb, pub, jwt)
|
|
|
|
urlA := fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port)
|
|
urlB := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
|
|
|
|
for i := 0; i < 10; i++ {
|
|
c, err := nats.Connect(urlA, nats.NoReconnect(), createUserCreds(t, sa, akp))
|
|
if err != nil {
|
|
t.Fatalf("Expected to connect, got %v", err)
|
|
}
|
|
defer c.Close()
|
|
c, err = nats.Connect(urlB, nats.NoReconnect(), createUserCreds(t, sb, akp))
|
|
if err != nil {
|
|
t.Fatalf("Expected to connect, got %v", err)
|
|
}
|
|
defer c.Close()
|
|
}
|
|
|
|
// Now shutdown Server B. Do so such that no communications go out.
|
|
sb.mu.Lock()
|
|
sb.sys = nil
|
|
sb.mu.Unlock()
|
|
sb.Shutdown()
|
|
|
|
if _, err := nats.Connect(urlA, createUserCreds(t, sa, akp)); err == nil {
|
|
t.Fatalf("Expected connection to fail due to max limit")
|
|
}
|
|
|
|
// Let's speed up the checking process.
|
|
sa.mu.Lock()
|
|
sa.sys.chkOrph = 10 * time.Millisecond
|
|
sa.sys.orphMax = 30 * time.Millisecond
|
|
sa.sys.sweeper.Reset(sa.sys.chkOrph)
|
|
sa.mu.Unlock()
|
|
|
|
// We should eventually be able to connect.
|
|
checkFor(t, 2*time.Second, 50*time.Millisecond, func() error {
|
|
if c, err := nats.Connect(urlA, createUserCreds(t, sa, akp)); err != nil {
|
|
return err
|
|
} else {
|
|
c.Close()
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func TestSystemAccountFromConfig(t *testing.T) {
|
|
kp, _ := nkeys.FromSeed(oSeed)
|
|
opub, _ := kp.PublicKey()
|
|
akp, _ := nkeys.CreateAccount()
|
|
apub, _ := akp.PublicKey()
|
|
nac := jwt.NewAccountClaims(apub)
|
|
ajwt, err := nac.Encode(kp)
|
|
if err != nil {
|
|
t.Fatalf("Error generating account JWT: %v", err)
|
|
}
|
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
w.Write([]byte(ajwt))
|
|
}))
|
|
defer ts.Close()
|
|
|
|
confTemplate := `
|
|
listen: -1
|
|
trusted: %s
|
|
system_account: %s
|
|
resolver: URL("%s/jwt/v1/accounts/")
|
|
`
|
|
|
|
conf := createConfFile(t, []byte(fmt.Sprintf(confTemplate, opub, apub, ts.URL)))
|
|
|
|
s, _ := RunServerWithConfig(conf)
|
|
defer s.Shutdown()
|
|
|
|
if acc := s.SystemAccount(); acc == nil || acc.Name != apub {
|
|
t.Fatalf("System Account not properly set")
|
|
}
|
|
}
|
|
|
|
func TestAccountClaimsUpdates(t *testing.T) {
|
|
test := func(subj string) {
|
|
s, opts := runTrustedServer(t)
|
|
defer s.Shutdown()
|
|
|
|
sacc, sakp := createAccount(s)
|
|
s.setSystemAccount(sacc)
|
|
|
|
// Let's create a normal account with limits we can update.
|
|
okp, _ := nkeys.FromSeed(oSeed)
|
|
akp, _ := nkeys.CreateAccount()
|
|
pub, _ := akp.PublicKey()
|
|
nac := jwt.NewAccountClaims(pub)
|
|
nac.Limits.Conn = 4
|
|
ajwt, _ := nac.Encode(okp)
|
|
|
|
addAccountToMemResolver(s, pub, ajwt)
|
|
|
|
acc, _ := s.LookupAccount(pub)
|
|
if acc.MaxActiveConnections() != 4 {
|
|
t.Fatalf("Expected to see a limit of 4 connections")
|
|
}
|
|
|
|
// Simulate a systems publisher so we can do an account claims update.
|
|
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
|
nc, err := nats.Connect(url, createUserCreds(t, s, sakp))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
// Update the account
|
|
nac = jwt.NewAccountClaims(pub)
|
|
nac.Limits.Conn = 8
|
|
issAt := time.Now().Add(-30 * time.Second).Unix()
|
|
nac.IssuedAt = issAt
|
|
expires := time.Now().Add(2 * time.Second).Unix()
|
|
nac.Expires = expires
|
|
ajwt, _ = nac.Encode(okp)
|
|
|
|
// Publish to the system update subject.
|
|
claimUpdateSubj := fmt.Sprintf(subj, pub)
|
|
nc.Publish(claimUpdateSubj, []byte(ajwt))
|
|
nc.Flush()
|
|
|
|
acc, _ = s.LookupAccount(pub)
|
|
if acc.MaxActiveConnections() != 8 {
|
|
t.Fatalf("Account was not updated")
|
|
}
|
|
}
|
|
t.Run("new", func(t *testing.T) {
|
|
test(accUpdateEventSubjNew)
|
|
})
|
|
t.Run("old", func(t *testing.T) {
|
|
test(accUpdateEventSubjOld)
|
|
})
|
|
}
|
|
|
|
func TestAccountReqMonitoring(t *testing.T) {
|
|
s, opts := runTrustedServer(t)
|
|
defer s.Shutdown()
|
|
sacc, sakp := createAccount(s)
|
|
s.setSystemAccount(sacc)
|
|
s.EnableJetStream(nil)
|
|
unusedAcc, _ := createAccount(s)
|
|
acc, akp := createAccount(s)
|
|
acc.EnableJetStream(nil)
|
|
subsz := fmt.Sprintf(accDirectReqSubj, acc.Name, "SUBSZ")
|
|
connz := fmt.Sprintf(accDirectReqSubj, acc.Name, "CONNZ")
|
|
jsz := fmt.Sprintf(accDirectReqSubj, acc.Name, "JSZ")
|
|
|
|
pStatz := fmt.Sprintf(accPingReqSubj, "STATZ")
|
|
statz := func(name string) string { return fmt.Sprintf(accDirectReqSubj, name, "STATZ") }
|
|
// Create system account connection to query
|
|
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
|
ncSys, err := nats.Connect(url, createUserCreds(t, s, sakp))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer ncSys.Close()
|
|
// Create a connection that we can query
|
|
nc, err := nats.Connect(url, createUserCreds(t, s, akp))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
// query SUBSZ for account
|
|
resp, err := ncSys.Request(subsz, nil, time.Second)
|
|
require_NoError(t, err)
|
|
require_Contains(t, string(resp.Data), `"num_subscriptions":5,`)
|
|
// create a subscription
|
|
sub, err := nc.Subscribe("foo", func(msg *nats.Msg) {})
|
|
require_NoError(t, err)
|
|
defer sub.Unsubscribe()
|
|
|
|
require_NoError(t, nc.Flush())
|
|
// query SUBSZ for account
|
|
resp, err = ncSys.Request(subsz, nil, time.Second)
|
|
require_NoError(t, err)
|
|
require_Contains(t, string(resp.Data), `"num_subscriptions":6,`, `"subject":"foo"`)
|
|
// query connections for account
|
|
resp, err = ncSys.Request(connz, nil, time.Second)
|
|
require_NoError(t, err)
|
|
require_Contains(t, string(resp.Data), `"num_connections":1,`, `"total":1,`)
|
|
// query connections for js account
|
|
resp, err = ncSys.Request(jsz, nil, time.Second)
|
|
require_NoError(t, err)
|
|
require_Contains(t, string(resp.Data), `"memory":0,`, `"storage":0,`)
|
|
// query statz/conns for account
|
|
resp, err = ncSys.Request(statz(acc.Name), nil, time.Second)
|
|
require_NoError(t, err)
|
|
respContentAcc := []string{`"conns":1,`, `"total_conns":1`, `"slow_consumers":0`, `"sent":{"msgs":0,"bytes":0}`,
|
|
`"received":{"msgs":0,"bytes":0}`, `"num_subscriptions":`, fmt.Sprintf(`"acc":"%s"`, acc.Name)}
|
|
require_Contains(t, string(resp.Data), respContentAcc...)
|
|
|
|
rIb := ncSys.NewRespInbox()
|
|
rSub, err := ncSys.SubscribeSync(rIb)
|
|
require_NoError(t, err)
|
|
require_NoError(t, ncSys.PublishRequest(pStatz, rIb, nil))
|
|
minRespContentForBothAcc := []string{`"conns":1,`, `"total_conns":1`, `"slow_consumers":0`, `"acc":"`, `"num_subscriptions":`}
|
|
resp, err = rSub.NextMsg(time.Second)
|
|
require_NoError(t, err)
|
|
require_Contains(t, string(resp.Data), minRespContentForBothAcc...)
|
|
// expect one entry per account
|
|
require_Contains(t, string(resp.Data), fmt.Sprintf(`"acc":"%s"`, acc.Name), fmt.Sprintf(`"acc":"%s"`, sacc.Name))
|
|
|
|
// Test ping with filter by account name
|
|
require_NoError(t, ncSys.PublishRequest(pStatz, rIb, []byte(fmt.Sprintf(`{"accounts":["%s"]}`, sacc.Name))))
|
|
m, err := rSub.NextMsg(time.Second)
|
|
require_NoError(t, err)
|
|
require_Contains(t, string(m.Data), minRespContentForBothAcc...)
|
|
|
|
require_NoError(t, ncSys.PublishRequest(pStatz, rIb, []byte(fmt.Sprintf(`{"accounts":["%s"]}`, acc.Name))))
|
|
m, err = rSub.NextMsg(time.Second)
|
|
require_NoError(t, err)
|
|
require_Contains(t, string(m.Data), respContentAcc...)
|
|
|
|
// Test include unused for statz and ping of statz
|
|
unusedContent := []string{`"conns":0,`, `"total_conns":0`, `"slow_consumers":0`,
|
|
fmt.Sprintf(`"acc":"%s"`, unusedAcc.Name)}
|
|
|
|
resp, err = ncSys.Request(statz(unusedAcc.Name),
|
|
[]byte(fmt.Sprintf(`{"accounts":["%s"], "include_unused":true}`, unusedAcc.Name)),
|
|
time.Second)
|
|
require_NoError(t, err)
|
|
require_Contains(t, string(resp.Data), unusedContent...)
|
|
|
|
require_NoError(t, ncSys.PublishRequest(pStatz, rIb,
|
|
[]byte(fmt.Sprintf(`{"accounts":["%s"], "include_unused":true}`, unusedAcc.Name))))
|
|
resp, err = rSub.NextMsg(time.Second)
|
|
require_NoError(t, err)
|
|
require_Contains(t, string(resp.Data), unusedContent...)
|
|
|
|
require_NoError(t, ncSys.PublishRequest(pStatz, rIb, []byte(fmt.Sprintf(`{"accounts":["%s"]}`, unusedAcc.Name))))
|
|
_, err = rSub.NextMsg(200 * time.Millisecond)
|
|
require_Error(t, err)
|
|
|
|
// Test ping from within account
|
|
ib := nc.NewRespInbox()
|
|
rSub, err = nc.SubscribeSync(ib)
|
|
require_NoError(t, err)
|
|
require_NoError(t, nc.PublishRequest(pStatz, ib, nil))
|
|
resp, err = rSub.NextMsg(time.Second)
|
|
require_NoError(t, err)
|
|
require_Contains(t, string(resp.Data), respContentAcc...)
|
|
_, err = rSub.NextMsg(200 * time.Millisecond)
|
|
require_Error(t, err)
|
|
}
|
|
|
|
func TestAccountReqInfo(t *testing.T) {
|
|
s, opts := runTrustedServer(t)
|
|
defer s.Shutdown()
|
|
sacc, sakp := createAccount(s)
|
|
s.setSystemAccount(sacc)
|
|
// Let's create an account with service export.
|
|
akp, _ := nkeys.CreateAccount()
|
|
pub1, _ := akp.PublicKey()
|
|
nac1 := jwt.NewAccountClaims(pub1)
|
|
nac1.Exports.Add(&jwt.Export{Subject: "req.*", Type: jwt.Service})
|
|
ajwt1, _ := nac1.Encode(oKp)
|
|
addAccountToMemResolver(s, pub1, ajwt1)
|
|
s.LookupAccount(pub1)
|
|
info1 := fmt.Sprintf(accDirectReqSubj, pub1, "INFO")
|
|
// Now add an account with service imports.
|
|
akp2, _ := nkeys.CreateAccount()
|
|
pub2, _ := akp2.PublicKey()
|
|
nac2 := jwt.NewAccountClaims(pub2)
|
|
nac2.Imports.Add(&jwt.Import{Account: pub1, Subject: "req.1", Type: jwt.Service})
|
|
ajwt2, _ := nac2.Encode(oKp)
|
|
addAccountToMemResolver(s, pub2, ajwt2)
|
|
s.LookupAccount(pub2)
|
|
info2 := fmt.Sprintf(accDirectReqSubj, pub2, "INFO")
|
|
// Create system account connection to query
|
|
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
|
ncSys, err := nats.Connect(url, createUserCreds(t, s, sakp))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer ncSys.Close()
|
|
checkCommon := func(info *AccountInfo, srv *ServerInfo, pub, jwt string) {
|
|
if info.Complete != true {
|
|
t.Fatalf("Unexpected value: %v", info.Complete)
|
|
} else if info.Expired != false {
|
|
t.Fatalf("Unexpected value: %v", info.Expired)
|
|
} else if info.JetStream != false {
|
|
t.Fatalf("Unexpected value: %v", info.JetStream)
|
|
} else if info.ClientCnt != 0 {
|
|
t.Fatalf("Unexpected value: %v", info.ClientCnt)
|
|
} else if info.AccountName != pub {
|
|
t.Fatalf("Unexpected value: %v", info.AccountName)
|
|
} else if info.LeafCnt != 0 {
|
|
t.Fatalf("Unexpected value: %v", info.LeafCnt)
|
|
} else if info.Jwt != jwt {
|
|
t.Fatalf("Unexpected value: %v", info.Jwt)
|
|
} else if srv.Cluster != "abc" {
|
|
t.Fatalf("Unexpected value: %v", srv.Cluster)
|
|
} else if srv.Name != s.Name() {
|
|
t.Fatalf("Unexpected value: %v", srv.Name)
|
|
} else if srv.Host != opts.Host {
|
|
t.Fatalf("Unexpected value: %v", srv.Host)
|
|
} else if srv.Seq < 1 {
|
|
t.Fatalf("Unexpected value: %v", srv.Seq)
|
|
}
|
|
}
|
|
info := AccountInfo{}
|
|
srv := ServerInfo{}
|
|
msg := struct {
|
|
Data *AccountInfo `json:"data"`
|
|
Srv *ServerInfo `json:"server"`
|
|
}{
|
|
&info,
|
|
&srv,
|
|
}
|
|
if resp, err := ncSys.Request(info1, nil, time.Second); err != nil {
|
|
t.Fatalf("Error on request: %v", err)
|
|
} else if err := json.Unmarshal(resp.Data, &msg); err != nil {
|
|
t.Fatalf("Unmarshalling failed: %v", err)
|
|
} else if len(info.Exports) != 1 {
|
|
t.Fatalf("Unexpected value: %v", info.Exports)
|
|
} else if len(info.Imports) != 4 {
|
|
t.Fatalf("Unexpected value: %+v", info.Imports)
|
|
} else if info.Exports[0].Subject != "req.*" {
|
|
t.Fatalf("Unexpected value: %v", info.Exports)
|
|
} else if info.Exports[0].Type != jwt.Service {
|
|
t.Fatalf("Unexpected value: %v", info.Exports)
|
|
} else if info.Exports[0].ResponseType != jwt.ResponseTypeSingleton {
|
|
t.Fatalf("Unexpected value: %v", info.Exports)
|
|
} else if info.SubCnt != 4 {
|
|
t.Fatalf("Unexpected value: %v", info.SubCnt)
|
|
} else {
|
|
checkCommon(&info, &srv, pub1, ajwt1)
|
|
}
|
|
info = AccountInfo{}
|
|
srv = ServerInfo{}
|
|
if resp, err := ncSys.Request(info2, nil, time.Second); err != nil {
|
|
t.Fatalf("Error on request: %v", err)
|
|
} else if err := json.Unmarshal(resp.Data, &msg); err != nil {
|
|
t.Fatalf("Unmarshalling failed: %v", err)
|
|
} else if len(info.Exports) != 0 {
|
|
t.Fatalf("Unexpected value: %v", info.Exports)
|
|
} else if len(info.Imports) != 5 {
|
|
t.Fatalf("Unexpected value: %+v", info.Imports)
|
|
}
|
|
// Here we need to find our import
|
|
var si *ExtImport
|
|
for _, im := range info.Imports {
|
|
if im.Subject == "req.1" {
|
|
si = &im
|
|
break
|
|
}
|
|
}
|
|
if si == nil {
|
|
t.Fatalf("Could not find our import")
|
|
}
|
|
if si.Type != jwt.Service {
|
|
t.Fatalf("Unexpected value: %+v", si)
|
|
} else if si.Account != pub1 {
|
|
t.Fatalf("Unexpected value: %+v", si)
|
|
} else if info.SubCnt != 5 {
|
|
t.Fatalf("Unexpected value: %+v", si)
|
|
} else {
|
|
checkCommon(&info, &srv, pub2, ajwt2)
|
|
}
|
|
}
|
|
|
|
func TestAccountClaimsUpdatesWithServiceImports(t *testing.T) {
|
|
s, opts := runTrustedServer(t)
|
|
defer s.Shutdown()
|
|
|
|
sacc, sakp := createAccount(s)
|
|
s.setSystemAccount(sacc)
|
|
|
|
okp, _ := nkeys.FromSeed(oSeed)
|
|
|
|
// Let's create an account with service export.
|
|
akp, _ := nkeys.CreateAccount()
|
|
pub, _ := akp.PublicKey()
|
|
nac := jwt.NewAccountClaims(pub)
|
|
nac.Exports.Add(&jwt.Export{Subject: "req.*", Type: jwt.Service})
|
|
ajwt, _ := nac.Encode(okp)
|
|
addAccountToMemResolver(s, pub, ajwt)
|
|
s.LookupAccount(pub)
|
|
|
|
// Now add an account with multiple service imports.
|
|
akp2, _ := nkeys.CreateAccount()
|
|
pub2, _ := akp2.PublicKey()
|
|
nac2 := jwt.NewAccountClaims(pub2)
|
|
nac2.Imports.Add(&jwt.Import{Account: pub, Subject: "req.1", Type: jwt.Service})
|
|
ajwt2, _ := nac2.Encode(okp)
|
|
|
|
addAccountToMemResolver(s, pub2, ajwt2)
|
|
s.LookupAccount(pub2)
|
|
|
|
startSubs := s.NumSubscriptions()
|
|
|
|
// Simulate a systems publisher so we can do an account claims update.
|
|
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
|
nc, err := nats.Connect(url, createUserCreds(t, s, sakp))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
// Update the account several times
|
|
for i := 1; i <= 10; i++ {
|
|
nac2 = jwt.NewAccountClaims(pub2)
|
|
nac2.Limits.Conn = int64(i)
|
|
nac2.Imports.Add(&jwt.Import{Account: pub, Subject: "req.1", Type: jwt.Service})
|
|
ajwt2, _ = nac2.Encode(okp)
|
|
|
|
// Publish to the system update subject.
|
|
claimUpdateSubj := fmt.Sprintf(accUpdateEventSubjNew, pub2)
|
|
nc.Publish(claimUpdateSubj, []byte(ajwt2))
|
|
}
|
|
nc.Flush()
|
|
|
|
if startSubs < s.NumSubscriptions() {
|
|
t.Fatalf("Subscriptions leaked: %d vs %d", startSubs, s.NumSubscriptions())
|
|
}
|
|
}
|
|
|
|
func TestAccountConnsLimitExceededAfterUpdate(t *testing.T) {
|
|
s, opts := runTrustedServer(t)
|
|
defer s.Shutdown()
|
|
|
|
sacc, _ := createAccount(s)
|
|
s.setSystemAccount(sacc)
|
|
|
|
// Let's create a normal account with limits we can update.
|
|
okp, _ := nkeys.FromSeed(oSeed)
|
|
akp, _ := nkeys.CreateAccount()
|
|
pub, _ := akp.PublicKey()
|
|
nac := jwt.NewAccountClaims(pub)
|
|
nac.Limits.Conn = 10
|
|
ajwt, _ := nac.Encode(okp)
|
|
|
|
addAccountToMemResolver(s, pub, ajwt)
|
|
acc, _ := s.LookupAccount(pub)
|
|
|
|
// Now create the max connections.
|
|
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
|
for {
|
|
nc, err := nats.Connect(url, createUserCreds(t, s, akp))
|
|
if err != nil {
|
|
break
|
|
}
|
|
defer nc.Close()
|
|
}
|
|
|
|
// We should have max here.
|
|
checkFor(t, 2*time.Second, 50*time.Millisecond, func() error {
|
|
if total := s.NumClients(); total != acc.MaxActiveConnections() {
|
|
return fmt.Errorf("Expected %d connections, got %d", acc.MaxActiveConnections(), total)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// Now change limits to make current connections over the limit.
|
|
nac = jwt.NewAccountClaims(pub)
|
|
nac.Limits.Conn = 2
|
|
ajwt, _ = nac.Encode(okp)
|
|
|
|
s.updateAccountWithClaimJWT(acc, ajwt)
|
|
if acc.MaxActiveConnections() != 2 {
|
|
t.Fatalf("Expected max connections to be set to 2, got %d", acc.MaxActiveConnections())
|
|
}
|
|
// We should have closed the excess connections.
|
|
checkClientsCount(t, s, acc.MaxActiveConnections())
|
|
}
|
|
|
|
func TestAccountConnsLimitExceededAfterUpdateDisconnectNewOnly(t *testing.T) {
|
|
s, opts := runTrustedServer(t)
|
|
defer s.Shutdown()
|
|
|
|
sacc, _ := createAccount(s)
|
|
s.setSystemAccount(sacc)
|
|
|
|
// Let's create a normal account with limits we can update.
|
|
okp, _ := nkeys.FromSeed(oSeed)
|
|
akp, _ := nkeys.CreateAccount()
|
|
pub, _ := akp.PublicKey()
|
|
nac := jwt.NewAccountClaims(pub)
|
|
nac.Limits.Conn = 10
|
|
ajwt, _ := nac.Encode(okp)
|
|
|
|
addAccountToMemResolver(s, pub, ajwt)
|
|
acc, _ := s.LookupAccount(pub)
|
|
|
|
// Now create the max connections.
|
|
// We create half then we will wait and then create the rest.
|
|
// Will test that we disconnect the newest ones.
|
|
newConns := make([]*nats.Conn, 0, 5)
|
|
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
|
for i := 0; i < 5; i++ {
|
|
nc, err := nats.Connect(url, nats.NoReconnect(), createUserCreds(t, s, akp))
|
|
require_NoError(t, err)
|
|
defer nc.Close()
|
|
}
|
|
time.Sleep(500 * time.Millisecond)
|
|
for i := 0; i < 5; i++ {
|
|
nc, err := nats.Connect(url, nats.NoReconnect(), createUserCreds(t, s, akp))
|
|
require_NoError(t, err)
|
|
defer nc.Close()
|
|
newConns = append(newConns, nc)
|
|
}
|
|
|
|
// We should have max here.
|
|
checkClientsCount(t, s, acc.MaxActiveConnections())
|
|
|
|
// Now change limits to make current connections over the limit.
|
|
nac = jwt.NewAccountClaims(pub)
|
|
nac.Limits.Conn = 5
|
|
ajwt, _ = nac.Encode(okp)
|
|
|
|
s.updateAccountWithClaimJWT(acc, ajwt)
|
|
if acc.MaxActiveConnections() != 5 {
|
|
t.Fatalf("Expected max connections to be set to 2, got %d", acc.MaxActiveConnections())
|
|
}
|
|
// We should have closed the excess connections.
|
|
checkClientsCount(t, s, acc.MaxActiveConnections())
|
|
|
|
// Now make sure that only the new ones were closed.
|
|
var closed int
|
|
for _, nc := range newConns {
|
|
if !nc.IsClosed() {
|
|
closed++
|
|
}
|
|
}
|
|
if closed != 5 {
|
|
t.Fatalf("Expected all new clients to be closed, only got %d of 5", closed)
|
|
}
|
|
}
|
|
|
|
func TestSystemAccountWithBadRemoteLatencyUpdate(t *testing.T) {
|
|
s, _ := runTrustedServer(t)
|
|
defer s.Shutdown()
|
|
|
|
acc, _ := createAccount(s)
|
|
s.setSystemAccount(acc)
|
|
|
|
rl := remoteLatency{
|
|
Account: "NONSENSE",
|
|
ReqId: "_INBOX.22",
|
|
}
|
|
b, _ := json.Marshal(&rl)
|
|
s.remoteLatencyUpdate(nil, nil, nil, "foo", _EMPTY_, b)
|
|
}
|
|
|
|
func TestSystemAccountWithGateways(t *testing.T) {
|
|
sa, oa, sb, ob, akp := runTrustedGateways(t)
|
|
defer sa.Shutdown()
|
|
defer sb.Shutdown()
|
|
|
|
// Create a client on A that will subscribe on $SYS.ACCOUNT.>
|
|
urla := fmt.Sprintf("nats://%s:%d", oa.Host, oa.Port)
|
|
nca := natsConnect(t, urla, createUserCreds(t, sa, akp), nats.Name("SYS"))
|
|
defer nca.Close()
|
|
nca.Flush()
|
|
|
|
sub, _ := nca.SubscribeSync("$SYS.ACCOUNT.>")
|
|
defer sub.Unsubscribe()
|
|
nca.Flush()
|
|
|
|
// If this tests fails with wrong number after 10 seconds we may have
|
|
// added a new inititial subscription for the eventing system.
|
|
checkExpectedSubs(t, 52, sa)
|
|
|
|
// Create a client on B and see if we receive the event
|
|
urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port)
|
|
ncb := natsConnect(t, urlb, createUserCreds(t, sb, akp), nats.Name("TEST EVENTS"))
|
|
defer ncb.Close()
|
|
|
|
// space for .CONNECT and .CONNS from SYS and $G as well as one extra message
|
|
msgs := [4]*nats.Msg{}
|
|
var err error
|
|
msgs[0], err = sub.NextMsg(time.Second)
|
|
require_NoError(t, err)
|
|
msgs[1], err = sub.NextMsg(time.Second)
|
|
require_NoError(t, err)
|
|
// TODO: There is a race currently that can cause the server to process the
|
|
// system event *after* the subscription on "A" has been registered, and so
|
|
// the "nca" client would receive its own CONNECT message.
|
|
msgs[2], _ = sub.NextMsg(250 * time.Millisecond)
|
|
|
|
findMsgs := func(sub string) []*nats.Msg {
|
|
rMsgs := []*nats.Msg{}
|
|
for _, m := range msgs {
|
|
if m == nil {
|
|
continue
|
|
}
|
|
if m.Subject == sub {
|
|
rMsgs = append(rMsgs, m)
|
|
}
|
|
}
|
|
return rMsgs
|
|
}
|
|
|
|
msg := findMsgs(fmt.Sprintf("$SYS.ACCOUNT.%s.CONNECT", sa.SystemAccount().Name))
|
|
var bMsg *nats.Msg
|
|
if len(msg) < 1 {
|
|
t.Fatal("Expected at least one message")
|
|
}
|
|
bMsg = msg[len(msg)-1]
|
|
|
|
require_Contains(t, string(bMsg.Data), sb.ID())
|
|
require_Contains(t, string(bMsg.Data), `"cluster":"B"`)
|
|
require_Contains(t, string(bMsg.Data), `"name":"TEST EVENTS"`)
|
|
|
|
connsMsgA := findMsgs(fmt.Sprintf("$SYS.ACCOUNT.%s.SERVER.CONNS", sa.SystemAccount().Name))
|
|
if len(connsMsgA) != 1 {
|
|
t.Fatal("Expected a message")
|
|
}
|
|
}
|
|
|
|
func TestSystemAccountNoAuthUser(t *testing.T) {
|
|
conf := createConfFile(t, []byte(`
|
|
listen: "127.0.0.1:-1"
|
|
accounts {
|
|
$SYS {
|
|
users [{user: "admin", password: "pwd"}]
|
|
}
|
|
}
|
|
`))
|
|
defer os.Remove(conf)
|
|
s, o := RunServerWithConfig(conf)
|
|
defer s.Shutdown()
|
|
|
|
for _, test := range []struct {
|
|
name string
|
|
usrInfo string
|
|
ok bool
|
|
account string
|
|
}{
|
|
{"valid user/pwd", "admin:pwd@", true, "$SYS"},
|
|
{"invalid pwd", "admin:wrong@", false, _EMPTY_},
|
|
{"some token", "sometoken@", false, _EMPTY_},
|
|
{"user used without pwd", "admin@", false, _EMPTY_}, // will be treated as a token
|
|
{"user with empty password", "admin:@", false, _EMPTY_},
|
|
{"no user means global account", _EMPTY_, true, globalAccountName},
|
|
} {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
url := fmt.Sprintf("nats://%s127.0.0.1:%d", test.usrInfo, o.Port)
|
|
nc, err := nats.Connect(url)
|
|
if err != nil {
|
|
if test.ok {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
return
|
|
} else if !test.ok {
|
|
nc.Close()
|
|
t.Fatalf("Should have failed, did not")
|
|
}
|
|
var accName string
|
|
s.mu.Lock()
|
|
for _, c := range s.clients {
|
|
c.mu.Lock()
|
|
if c.acc != nil {
|
|
accName = c.acc.Name
|
|
}
|
|
c.mu.Unlock()
|
|
break
|
|
}
|
|
s.mu.Unlock()
|
|
nc.Close()
|
|
checkClientsCount(t, s, 0)
|
|
if accName != test.account {
|
|
t.Fatalf("The account should have been %q, got %q", test.account, accName)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestServerAccountConns(t *testing.T) {
|
|
// speed up hb
|
|
orgHBInterval := eventsHBInterval
|
|
eventsHBInterval = time.Millisecond * 100
|
|
defer func() { eventsHBInterval = orgHBInterval }()
|
|
conf := createConfFile(t, []byte(`
|
|
host: 127.0.0.1
|
|
port: -1
|
|
system_account: SYS
|
|
accounts: {
|
|
SYS: {users: [{user: s, password: s}]}
|
|
ACC: {users: [{user: a, password: a}]}
|
|
}`))
|
|
s, _ := RunServerWithConfig(conf)
|
|
defer s.Shutdown()
|
|
|
|
nc := natsConnect(t, s.ClientURL(), nats.UserInfo("a", "a"))
|
|
defer nc.Close()
|
|
|
|
subOut, err := nc.SubscribeSync("foo")
|
|
require_NoError(t, err)
|
|
hw := "HELLO WORLD"
|
|
nc.Publish("foo", []byte(hw))
|
|
nc.Publish("bar", []byte(hw)) // will only count towards received
|
|
nc.Flush()
|
|
m, err := subOut.NextMsg(time.Second)
|
|
require_NoError(t, err)
|
|
require_Equal(t, string(m.Data), hw)
|
|
|
|
ncs := natsConnect(t, s.ClientURL(), nats.UserInfo("s", "s"))
|
|
defer ncs.Close()
|
|
subs, err := ncs.SubscribeSync("$SYS.ACCOUNT.ACC.SERVER.CONNS")
|
|
require_NoError(t, err)
|
|
|
|
m, err = subs.NextMsg(time.Second)
|
|
require_NoError(t, err)
|
|
accConns := &AccountNumConns{}
|
|
err = json.Unmarshal(m.Data, accConns)
|
|
require_NoError(t, err)
|
|
|
|
require_True(t, accConns.Received.Msgs == 2)
|
|
require_True(t, accConns.Received.Bytes == 2*int64(len(hw)))
|
|
require_True(t, accConns.Sent.Msgs == 1)
|
|
require_True(t, accConns.Sent.Bytes == int64(len(hw)))
|
|
}
|
|
|
|
func TestServerEventsStatsZ(t *testing.T) {
|
|
serverStatsReqSubj := "$SYS.REQ.SERVER.%s.STATSZ"
|
|
preStart := time.Now().UTC()
|
|
// Add little bit of delay to make sure that time check
|
|
// between pre-start and actual start does not fail.
|
|
time.Sleep(5 * time.Millisecond)
|
|
sa, optsA, sb, _, akp := runTrustedCluster(t)
|
|
defer sa.Shutdown()
|
|
defer sb.Shutdown()
|
|
// Same between actual start and post start.
|
|
time.Sleep(5 * time.Millisecond)
|
|
postStart := time.Now().UTC()
|
|
|
|
url := fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port)
|
|
ncs, err := nats.Connect(url, createUserCreds(t, sa, akp))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer ncs.Close()
|
|
|
|
subj := fmt.Sprintf(serverStatsSubj, sa.ID())
|
|
sub, _ := ncs.SubscribeSync(subj)
|
|
defer sub.Unsubscribe()
|
|
ncs.Publish("foo", []byte("HELLO WORLD"))
|
|
ncs.Flush()
|
|
|
|
// Let's speed up the checking process.
|
|
sa.mu.Lock()
|
|
sa.sys.statsz = 10 * time.Millisecond
|
|
sa.sys.stmr.Reset(sa.sys.statsz)
|
|
sa.mu.Unlock()
|
|
|
|
_, err = sub.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Error receiving msg: %v", err)
|
|
}
|
|
// Get it the second time so we can check some stats
|
|
msg, err := sub.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Error receiving msg: %v", err)
|
|
}
|
|
m := ServerStatsMsg{}
|
|
if err := json.Unmarshal(msg.Data, &m); err != nil {
|
|
t.Fatalf("Error unmarshalling the statz json: %v", err)
|
|
}
|
|
if m.Server.ID != sa.ID() {
|
|
t.Fatalf("Did not match IDs")
|
|
}
|
|
if m.Server.Cluster != "TEST CLUSTER 22" {
|
|
t.Fatalf("Did not match cluster name")
|
|
}
|
|
if m.Server.Version != VERSION {
|
|
t.Fatalf("Did not match server version")
|
|
}
|
|
if !m.Stats.Start.After(preStart) && m.Stats.Start.Before(postStart) {
|
|
t.Fatalf("Got a wrong start time for the server %v", m.Stats.Start)
|
|
}
|
|
if m.Stats.Connections != 1 {
|
|
t.Fatalf("Did not match connections of 1, got %d", m.Stats.Connections)
|
|
}
|
|
if m.Stats.ActiveAccounts != 2 {
|
|
t.Fatalf("Did not match active accounts of 2, got %d", m.Stats.ActiveAccounts)
|
|
}
|
|
if m.Stats.Sent.Msgs < 1 {
|
|
t.Fatalf("Did not match sent msgs of >=1, got %d", m.Stats.Sent.Msgs)
|
|
}
|
|
if m.Stats.Received.Msgs < 1 {
|
|
t.Fatalf("Did not match received msgs of >=1, got %d", m.Stats.Received.Msgs)
|
|
}
|
|
if lr := len(m.Stats.Routes); lr != 1 {
|
|
t.Fatalf("Expected a route, but got %d", lr)
|
|
}
|
|
|
|
// Now let's prompt this server to send us the statsz
|
|
subj = fmt.Sprintf(serverStatsReqSubj, sa.ID())
|
|
msg, err = ncs.Request(subj, nil, time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Error trying to request statsz: %v", err)
|
|
}
|
|
m2 := ServerStatsMsg{}
|
|
if err := json.Unmarshal(msg.Data, &m2); err != nil {
|
|
t.Fatalf("Error unmarshalling the statz json: %v", err)
|
|
}
|
|
if m2.Server.ID != sa.ID() {
|
|
t.Fatalf("Did not match IDs")
|
|
}
|
|
if m2.Stats.Connections != 1 {
|
|
t.Fatalf("Did not match connections of 1, got %d", m2.Stats.Connections)
|
|
}
|
|
if m2.Stats.ActiveAccounts != 2 {
|
|
t.Fatalf("Did not match active accounts of 2, got %d", m2.Stats.ActiveAccounts)
|
|
}
|
|
if m2.Stats.Sent.Msgs < 3 {
|
|
t.Fatalf("Did not match sent msgs of >= 3, got %d", m2.Stats.Sent.Msgs)
|
|
}
|
|
if m2.Stats.Received.Msgs < 1 {
|
|
t.Fatalf("Did not match received msgs of >= 1, got %d", m2.Stats.Received.Msgs)
|
|
}
|
|
if lr := len(m2.Stats.Routes); lr != 1 {
|
|
t.Fatalf("Expected a route, but got %d", lr)
|
|
}
|
|
|
|
msg, err = ncs.Request(subj, nil, time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Error trying to request statsz: %v", err)
|
|
}
|
|
m3 := ServerStatsMsg{}
|
|
if err := json.Unmarshal(msg.Data, &m3); err != nil {
|
|
t.Fatalf("Error unmarshalling the statz json: %v", err)
|
|
}
|
|
if m3.Server.ID != sa.ID() {
|
|
t.Fatalf("Did not match IDs")
|
|
}
|
|
if m3.Stats.Connections != 1 {
|
|
t.Fatalf("Did not match connections of 1, got %d", m3.Stats.Connections)
|
|
}
|
|
if m3.Stats.ActiveAccounts != 2 {
|
|
t.Fatalf("Did not match active accounts of 2, got %d", m3.Stats.ActiveAccounts)
|
|
}
|
|
if m3.Stats.Sent.Msgs < 4 {
|
|
t.Fatalf("Did not match sent msgs of >= 4, got %d", m3.Stats.Sent.Msgs)
|
|
}
|
|
if m3.Stats.Received.Msgs < 2 {
|
|
t.Fatalf("Did not match received msgs of >= 2, got %d", m3.Stats.Received.Msgs)
|
|
}
|
|
if lr := len(m3.Stats.Routes); lr != 1 {
|
|
t.Fatalf("Expected a route, but got %d", lr)
|
|
}
|
|
if sr := m3.Stats.Routes[0]; sr.Name != "B_SRV" {
|
|
t.Fatalf("Expected server A's route to B to have Name set to %q, got %q", "B", sr.Name)
|
|
}
|
|
|
|
// Now query B and check that route's name is "A"
|
|
subj = fmt.Sprintf(serverStatsReqSubj, sb.ID())
|
|
ncs.SubscribeSync(subj)
|
|
msg, err = ncs.Request(subj, nil, time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Error trying to request statsz: %v", err)
|
|
}
|
|
m = ServerStatsMsg{}
|
|
if err := json.Unmarshal(msg.Data, &m); err != nil {
|
|
t.Fatalf("Error unmarshalling the statz json: %v", err)
|
|
}
|
|
if lr := len(m.Stats.Routes); lr != 1 {
|
|
t.Fatalf("Expected a route, but got %d", lr)
|
|
}
|
|
if sr := m.Stats.Routes[0]; sr.Name != "A_SRV" {
|
|
t.Fatalf("Expected server B's route to A to have Name set to %q, got %q", "A_SRV", sr.Name)
|
|
}
|
|
}
|
|
|
|
func TestServerEventsPingStatsZ(t *testing.T) {
|
|
sa, _, sb, optsB, akp := runTrustedCluster(t)
|
|
defer sa.Shutdown()
|
|
defer sb.Shutdown()
|
|
url := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
|
|
nc, err := nats.Connect(url, createUserCreds(t, sb, akp))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
test := func(req []byte) {
|
|
reply := nc.NewRespInbox()
|
|
sub, _ := nc.SubscribeSync(reply)
|
|
nc.PublishRequest(serverStatsPingReqSubj, reply, req)
|
|
// Make sure its a statsz
|
|
m := ServerStatsMsg{}
|
|
// Receive both manually.
|
|
msg, err := sub.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Error receiving msg: %v", err)
|
|
}
|
|
if err := json.Unmarshal(msg.Data, &m); err != nil {
|
|
t.Fatalf("Error unmarshalling the statz json: %v", err)
|
|
}
|
|
msg, err = sub.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Error receiving msg: %v", err)
|
|
}
|
|
if err := json.Unmarshal(msg.Data, &m); err != nil {
|
|
t.Fatalf("Error unmarshalling the statz json: %v", err)
|
|
}
|
|
}
|
|
strRequestTbl := []string{
|
|
`{"cluster":"TEST"}`,
|
|
`{"cluster":"CLUSTER"}`,
|
|
`{"server_name":"SRV"}`,
|
|
`{"server_name":"_"}`,
|
|
fmt.Sprintf(`{"host":"%s"}`, optsB.Host),
|
|
fmt.Sprintf(`{"host":"%s", "cluster":"CLUSTER", "name":"SRV"}`, optsB.Host),
|
|
}
|
|
for i, opt := range strRequestTbl {
|
|
t.Run(fmt.Sprintf("%s-%d", t.Name(), i), func(t *testing.T) {
|
|
test([]byte(opt))
|
|
})
|
|
}
|
|
requestTbl := []StatszEventOptions{
|
|
{EventFilterOptions: EventFilterOptions{Cluster: "TEST"}},
|
|
{EventFilterOptions: EventFilterOptions{Cluster: "CLUSTER"}},
|
|
{EventFilterOptions: EventFilterOptions{Name: "SRV"}},
|
|
{EventFilterOptions: EventFilterOptions{Name: "_"}},
|
|
{EventFilterOptions: EventFilterOptions{Host: optsB.Host}},
|
|
{EventFilterOptions: EventFilterOptions{Host: optsB.Host, Cluster: "CLUSTER", Name: "SRV"}},
|
|
}
|
|
for i, opt := range requestTbl {
|
|
t.Run(fmt.Sprintf("%s-%d", t.Name(), i), func(t *testing.T) {
|
|
msg, _ := json.MarshalIndent(&opt, "", " ")
|
|
test(msg)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestServerEventsPingStatsZFilter(t *testing.T) {
|
|
sa, _, sb, optsB, akp := runTrustedCluster(t)
|
|
defer sa.Shutdown()
|
|
defer sb.Shutdown()
|
|
|
|
url := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
|
|
nc, err := nats.Connect(url, createUserCreds(t, sb, akp))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
requestTbl := []string{
|
|
`{"cluster":"DOESNOTEXIST"}`,
|
|
`{"host":"DOESNOTEXIST"}`,
|
|
`{"server_name":"DOESNOTEXIST"}`,
|
|
}
|
|
for i, msg := range requestTbl {
|
|
t.Run(fmt.Sprintf("%s-%d", t.Name(), i), func(t *testing.T) {
|
|
// Receive both manually.
|
|
if _, err := nc.Request(serverStatsPingReqSubj, []byte(msg), time.Second/4); err != nats.ErrTimeout {
|
|
t.Fatalf("Error, expected timeout: %v", err)
|
|
}
|
|
})
|
|
}
|
|
requestObjTbl := []EventFilterOptions{
|
|
{Cluster: "DOESNOTEXIST"},
|
|
{Host: "DOESNOTEXIST"},
|
|
{Name: "DOESNOTEXIST"},
|
|
}
|
|
for i, opt := range requestObjTbl {
|
|
t.Run(fmt.Sprintf("%s-%d", t.Name(), i), func(t *testing.T) {
|
|
msg, _ := json.MarshalIndent(&opt, "", " ")
|
|
// Receive both manually.
|
|
if _, err := nc.Request(serverStatsPingReqSubj, []byte(msg), time.Second/4); err != nats.ErrTimeout {
|
|
t.Fatalf("Error, expected timeout: %v", err)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestServerEventsPingStatsZFailFilter(t *testing.T) {
|
|
sa, _, sb, optsB, akp := runTrustedCluster(t)
|
|
defer sa.Shutdown()
|
|
defer sb.Shutdown()
|
|
|
|
url := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
|
|
nc, err := nats.Connect(url, createUserCreds(t, sb, akp))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
// Receive both manually.
|
|
if msg, err := nc.Request(serverStatsPingReqSubj, []byte(`{MALFORMEDJSON`), time.Second/4); err != nil {
|
|
t.Fatalf("Error: %v", err)
|
|
} else {
|
|
resp := make(map[string]map[string]interface{})
|
|
if err := json.Unmarshal(msg.Data, &resp); err != nil {
|
|
t.Fatalf("Error unmarshalling the response json: %v", err)
|
|
}
|
|
if resp["error"]["code"].(float64) != http.StatusBadRequest {
|
|
t.Fatal("bad error code")
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestServerEventsPingMonitorz(t *testing.T) {
|
|
sa, _, sb, optsB, akp := runTrustedCluster(t)
|
|
defer sa.Shutdown()
|
|
defer sb.Shutdown()
|
|
sysAcc, _ := akp.PublicKey()
|
|
url := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
|
|
nc, err := nats.Connect(url, createUserCreds(t, sb, akp))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
nc.Flush()
|
|
|
|
tests := []struct {
|
|
endpoint string
|
|
opt interface{}
|
|
resp interface{}
|
|
respField []string
|
|
}{
|
|
{"VARZ", nil, &Varz{},
|
|
[]string{"now", "cpu", "system_account"}},
|
|
{"SUBSZ", nil, &Subsz{},
|
|
[]string{"num_subscriptions", "num_cache"}},
|
|
{"CONNZ", nil, &Connz{},
|
|
[]string{"now", "connections"}},
|
|
{"ROUTEZ", nil, &Routez{},
|
|
[]string{"now", "routes"}},
|
|
{"GATEWAYZ", nil, &Gatewayz{},
|
|
[]string{"now", "outbound_gateways", "inbound_gateways"}},
|
|
{"LEAFZ", nil, &Leafz{},
|
|
[]string{"now", "leafs"}},
|
|
|
|
{"SUBSZ", &SubszOptions{}, &Subsz{},
|
|
[]string{"num_subscriptions", "num_cache"}},
|
|
{"CONNZ", &ConnzOptions{}, &Connz{},
|
|
[]string{"now", "connections"}},
|
|
{"ROUTEZ", &RoutezOptions{}, &Routez{},
|
|
[]string{"now", "routes"}},
|
|
{"GATEWAYZ", &GatewayzOptions{}, &Gatewayz{},
|
|
[]string{"now", "outbound_gateways", "inbound_gateways"}},
|
|
{"LEAFZ", &LeafzOptions{}, &Leafz{},
|
|
[]string{"now", "leafs"}},
|
|
{"ACCOUNTZ", &AccountzOptions{}, &Accountz{},
|
|
[]string{"now", "accounts"}},
|
|
|
|
{"SUBSZ", &SubszOptions{Limit: 5}, &Subsz{},
|
|
[]string{"num_subscriptions", "num_cache"}},
|
|
{"CONNZ", &ConnzOptions{Limit: 5}, &Connz{},
|
|
[]string{"now", "connections"}},
|
|
{"ROUTEZ", &RoutezOptions{SubscriptionsDetail: true}, &Routez{},
|
|
[]string{"now", "routes"}},
|
|
{"GATEWAYZ", &GatewayzOptions{Accounts: true}, &Gatewayz{},
|
|
[]string{"now", "outbound_gateways", "inbound_gateways"}},
|
|
{"LEAFZ", &LeafzOptions{Subscriptions: true}, &Leafz{},
|
|
[]string{"now", "leafs"}},
|
|
{"ACCOUNTZ", &AccountzOptions{Account: sysAcc}, &Accountz{},
|
|
[]string{"now", "account_detail"}},
|
|
{"LEAFZ", &LeafzOptions{Account: sysAcc}, &Leafz{},
|
|
[]string{"now", "leafs"}},
|
|
|
|
{"ROUTEZ", json.RawMessage(`{"cluster":""}`), &Routez{},
|
|
[]string{"now", "routes"}},
|
|
{"ROUTEZ", json.RawMessage(`{"name":""}`), &Routez{},
|
|
[]string{"now", "routes"}},
|
|
{"ROUTEZ", json.RawMessage(`{"cluster":"TEST CLUSTER 22"}`), &Routez{},
|
|
[]string{"now", "routes"}},
|
|
{"ROUTEZ", json.RawMessage(`{"cluster":"CLUSTER"}`), &Routez{},
|
|
[]string{"now", "routes"}},
|
|
{"ROUTEZ", json.RawMessage(`{"cluster":"TEST CLUSTER 22", "subscriptions":true}`), &Routez{},
|
|
[]string{"now", "routes"}},
|
|
|
|
{"JSZ", nil, &JSzOptions{}, []string{"now", "disabled"}},
|
|
|
|
{"HEALTHZ", nil, &JSzOptions{}, []string{"status"}},
|
|
{"HEALTHZ", &HealthzOptions{JSEnabledOnly: true}, &JSzOptions{}, []string{"status"}},
|
|
{"HEALTHZ", &HealthzOptions{JSServerOnly: true}, &JSzOptions{}, []string{"status"}},
|
|
}
|
|
|
|
for i, test := range tests {
|
|
t.Run(fmt.Sprintf("%s-%d", test.endpoint, i), func(t *testing.T) {
|
|
var opt []byte
|
|
if test.opt != nil {
|
|
opt, err = json.Marshal(test.opt)
|
|
if err != nil {
|
|
t.Fatalf("Error marshaling opts: %v", err)
|
|
}
|
|
}
|
|
reply := nc.NewRespInbox()
|
|
replySubj, _ := nc.SubscribeSync(reply)
|
|
|
|
// set a header to make sure request parsing knows to ignore them
|
|
nc.PublishMsg(&nats.Msg{
|
|
Subject: fmt.Sprintf("%s.%s", serverStatsPingReqSubj, test.endpoint),
|
|
Reply: reply,
|
|
Header: nats.Header{"header": []string{"for header sake"}},
|
|
Data: opt,
|
|
})
|
|
|
|
// Receive both manually.
|
|
msg, err := replySubj.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Error receiving msg: %v", err)
|
|
}
|
|
response1 := make(map[string]map[string]interface{})
|
|
|
|
if err := json.Unmarshal(msg.Data, &response1); err != nil {
|
|
t.Fatalf("Error unmarshalling response1 json: %v", err)
|
|
}
|
|
|
|
serverName := ""
|
|
if response1["server"]["name"] == "A_SRV" {
|
|
serverName = "B_SRV"
|
|
} else if response1["server"]["name"] == "B_SRV" {
|
|
serverName = "A_SRV"
|
|
} else {
|
|
t.Fatalf("Error finding server in %s", string(msg.Data))
|
|
}
|
|
if resp, ok := response1["data"]; !ok {
|
|
t.Fatalf("Error finding: %s in %s",
|
|
strings.ToLower(test.endpoint), string(msg.Data))
|
|
} else {
|
|
for _, respField := range test.respField {
|
|
if _, ok := resp[respField]; !ok {
|
|
t.Fatalf("Error finding: %s in %s", respField, resp)
|
|
}
|
|
}
|
|
}
|
|
|
|
msg, err = replySubj.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Error receiving msg: %v", err)
|
|
}
|
|
response2 := make(map[string]map[string]interface{})
|
|
if err := json.Unmarshal(msg.Data, &response2); err != nil {
|
|
t.Fatalf("Error unmarshalling the response2 json: %v", err)
|
|
}
|
|
if response2["server"]["name"] != serverName {
|
|
t.Fatalf("Error finding server %s in %s", serverName, string(msg.Data))
|
|
}
|
|
if resp, ok := response2["data"]; !ok {
|
|
t.Fatalf("Error finding: %s in %s",
|
|
strings.ToLower(test.endpoint), string(msg.Data))
|
|
} else {
|
|
for _, respField := range test.respField {
|
|
if _, ok := resp[respField]; !ok {
|
|
t.Fatalf("Error finding: %s in %s", respField, resp)
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestGatewayNameClientInfo(t *testing.T) {
|
|
sa, _, sb, _, _ := runTrustedCluster(t)
|
|
defer sa.Shutdown()
|
|
defer sb.Shutdown()
|
|
|
|
c, _, l := newClientForServer(sa)
|
|
defer c.close()
|
|
|
|
var info Info
|
|
err := json.Unmarshal([]byte(l[5:]), &info)
|
|
if err != nil {
|
|
t.Fatalf("Could not parse INFO json: %v\n", err)
|
|
}
|
|
if info.Cluster != "TEST CLUSTER 22" {
|
|
t.Fatalf("Expected a cluster name of 'TEST CLUSTER 22', got %q", info.Cluster)
|
|
}
|
|
}
|
|
|
|
type slowAccResolver struct {
|
|
sync.Mutex
|
|
AccountResolver
|
|
acc string
|
|
}
|
|
|
|
func (sr *slowAccResolver) Fetch(name string) (string, error) {
|
|
sr.Lock()
|
|
delay := sr.acc == name
|
|
sr.Unlock()
|
|
if delay {
|
|
time.Sleep(200 * time.Millisecond)
|
|
}
|
|
return sr.AccountResolver.Fetch(name)
|
|
}
|
|
|
|
func TestConnectionUpdatesTimerProperlySet(t *testing.T) {
|
|
origEventsHBInterval := eventsHBInterval
|
|
eventsHBInterval = 50 * time.Millisecond
|
|
defer func() { eventsHBInterval = origEventsHBInterval }()
|
|
|
|
sa, _, sb, optsB, _ := runTrustedCluster(t)
|
|
defer sa.Shutdown()
|
|
defer sb.Shutdown()
|
|
|
|
// Normal Account
|
|
okp, _ := nkeys.FromSeed(oSeed)
|
|
akp, _ := nkeys.CreateAccount()
|
|
pub, _ := akp.PublicKey()
|
|
nac := jwt.NewAccountClaims(pub)
|
|
nac.Limits.Conn = 10 // set any limit...
|
|
jwt, _ := nac.Encode(okp)
|
|
|
|
addAccountToMemResolver(sa, pub, jwt)
|
|
|
|
// Listen for HB updates...
|
|
count := int32(0)
|
|
cb := func(sub *subscription, _ *client, _ *Account, subject, reply string, msg []byte) {
|
|
atomic.AddInt32(&count, 1)
|
|
}
|
|
subj := fmt.Sprintf(accConnsEventSubjNew, pub)
|
|
sub, err := sa.sysSubscribe(subj, cb)
|
|
if sub == nil || err != nil {
|
|
t.Fatalf("Expected to subscribe, got %v", err)
|
|
}
|
|
defer sa.sysUnsubscribe(sub)
|
|
|
|
url := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
|
|
nc := natsConnect(t, url, createUserCreds(t, sb, akp))
|
|
defer nc.Close()
|
|
|
|
time.Sleep(500 * time.Millisecond)
|
|
// After waiting 500ms with HB interval of 50ms, we should get
|
|
// about 10 updates, no much more
|
|
if n := atomic.LoadInt32(&count); n > 15 {
|
|
t.Fatalf("Expected about 10 updates, got %v", n)
|
|
}
|
|
|
|
// Now lookup the account doing the events on sb.
|
|
acc, _ := sb.LookupAccount(pub)
|
|
// Make sure we have the timer running.
|
|
acc.mu.RLock()
|
|
ctmr := acc.ctmr
|
|
acc.mu.RUnlock()
|
|
if ctmr == nil {
|
|
t.Fatalf("Expected event timer for acc conns to be running")
|
|
}
|
|
|
|
nc.Close()
|
|
|
|
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
|
|
// Make sure we have the timer is NOT running.
|
|
acc.mu.RLock()
|
|
ctmr = acc.ctmr
|
|
acc.mu.RUnlock()
|
|
if ctmr != nil {
|
|
return fmt.Errorf("Expected event timer for acc conns to NOT be running after reaching zero local clients")
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func TestServerEventsReceivedByQSubs(t *testing.T) {
|
|
s, opts := runTrustedServer(t)
|
|
defer s.Shutdown()
|
|
|
|
acc, akp := createAccount(s)
|
|
s.setSystemAccount(acc)
|
|
|
|
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
|
ncs, err := nats.Connect(url, createUserCreds(t, s, akp))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer ncs.Close()
|
|
|
|
// Listen for auth error events.
|
|
qsub, _ := ncs.QueueSubscribeSync("$SYS.SERVER.*.CLIENT.AUTH.ERR", "queue")
|
|
defer qsub.Unsubscribe()
|
|
|
|
ncs.Flush()
|
|
|
|
nats.Connect(url, nats.Name("TEST BAD LOGIN"))
|
|
|
|
m, err := qsub.NextMsg(time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Should have heard an auth error event")
|
|
}
|
|
dem := DisconnectEventMsg{}
|
|
if err := json.Unmarshal(m.Data, &dem); err != nil {
|
|
t.Fatalf("Error unmarshalling disconnect event message: %v", err)
|
|
}
|
|
if dem.Reason != "Authentication Failure" {
|
|
t.Fatalf("Expected auth error, got %q", dem.Reason)
|
|
}
|
|
}
|
|
|
|
func TestServerEventsFilteredByTag(t *testing.T) {
|
|
confA := createConfFile(t, []byte(`
|
|
listen: -1
|
|
server_name: srv-A
|
|
server_tags: ["foo", "bar"]
|
|
cluster {
|
|
name: clust
|
|
listen: -1
|
|
no_advertise: true
|
|
}
|
|
system_account: SYS
|
|
accounts: {
|
|
SYS: {
|
|
users: [
|
|
{user: b, password: b}
|
|
]
|
|
}
|
|
}
|
|
no_auth_user: b
|
|
`))
|
|
sA, _ := RunServerWithConfig(confA)
|
|
defer sA.Shutdown()
|
|
confB := createConfFile(t, []byte(fmt.Sprintf(`
|
|
listen: -1
|
|
server_name: srv-B
|
|
server_tags: ["bar", "baz"]
|
|
cluster {
|
|
name: clust
|
|
listen: -1
|
|
no_advertise: true
|
|
routes [
|
|
nats-route://127.0.0.1:%d
|
|
]
|
|
}
|
|
system_account: SYS
|
|
accounts: {
|
|
SYS: {
|
|
users: [
|
|
{user: b, password: b}
|
|
]
|
|
}
|
|
}
|
|
no_auth_user: b
|
|
`, sA.opts.Cluster.Port)))
|
|
sB, _ := RunServerWithConfig(confB)
|
|
defer sB.Shutdown()
|
|
checkClusterFormed(t, sA, sB)
|
|
nc := natsConnect(t, sA.ClientURL())
|
|
defer nc.Close()
|
|
|
|
ib := nats.NewInbox()
|
|
req := func(tags ...string) {
|
|
t.Helper()
|
|
r, err := json.Marshal(VarzEventOptions{EventFilterOptions: EventFilterOptions{Tags: tags}})
|
|
require_NoError(t, err)
|
|
err = nc.PublishRequest(fmt.Sprintf(serverPingReqSubj, "VARZ"), ib, r)
|
|
require_NoError(t, err)
|
|
}
|
|
|
|
msgs := make(chan *nats.Msg, 10)
|
|
defer close(msgs)
|
|
_, err := nc.ChanSubscribe(ib, msgs)
|
|
require_NoError(t, err)
|
|
req("none")
|
|
select {
|
|
case <-msgs:
|
|
t.Fatalf("no message expected")
|
|
case <-time.After(200 * time.Millisecond):
|
|
}
|
|
req("foo")
|
|
m := <-msgs
|
|
require_Contains(t, string(m.Data), "srv-A", "foo", "bar")
|
|
req("foo", "bar")
|
|
m = <-msgs
|
|
require_Contains(t, string(m.Data), "srv-A", "foo", "bar")
|
|
req("baz")
|
|
m = <-msgs
|
|
require_Contains(t, string(m.Data), "srv-B", "bar", "baz")
|
|
req("bar")
|
|
m1 := <-msgs
|
|
m2 := <-msgs
|
|
require_Contains(t, string(m1.Data)+string(m2.Data), "srv-A", "srv-B", "foo", "bar", "baz")
|
|
require_Len(t, len(msgs), 0)
|
|
}
|
|
|
|
// https://github.com/nats-io/nats-server/issues/3177
|
|
func TestServerEventsAndDQSubscribers(t *testing.T) {
|
|
c := createJetStreamClusterWithTemplate(t, jsClusterAccountsTempl, "DDQ", 3)
|
|
defer c.shutdown()
|
|
|
|
nc, err := nats.Connect(c.randomServer().ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
|
|
require_NoError(t, err)
|
|
defer nc.Close()
|
|
|
|
sub, err := nc.QueueSubscribeSync("$SYS.ACCOUNT.*.DISCONNECT", "qq")
|
|
require_NoError(t, err)
|
|
nc.Flush()
|
|
|
|
// Create and disconnect 10 random connections.
|
|
for i := 0; i < 10; i++ {
|
|
nc, err := nats.Connect(c.randomServer().ClientURL())
|
|
require_NoError(t, err)
|
|
nc.Close()
|
|
}
|
|
|
|
checkSubsPending(t, sub, 10)
|
|
}
|
|
|
|
func Benchmark_GetHash(b *testing.B) {
|
|
b.StopTimer()
|
|
// Get 100 random names
|
|
names := make([]string, 0, 100)
|
|
for i := 0; i < 100; i++ {
|
|
names = append(names, nuid.Next())
|
|
}
|
|
hashes := make([]string, 0, 100)
|
|
for j := 0; j < 100; j++ {
|
|
sha := sha256.New()
|
|
sha.Write([]byte(names[j]))
|
|
b := sha.Sum(nil)
|
|
for i := 0; i < 8; i++ {
|
|
b[i] = digits[int(b[i]%base)]
|
|
}
|
|
hashes = append(hashes, string(b[:8]))
|
|
}
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(8)
|
|
errCh := make(chan error, 8)
|
|
b.StartTimer()
|
|
for i := 0; i < 8; i++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
for i := 0; i < b.N; i++ {
|
|
idx := rand.Intn(100)
|
|
if h := getHash(names[idx]); h != hashes[idx] {
|
|
errCh <- fmt.Errorf("Hash for name %q was %q, but should be %q", names[idx], h, hashes[idx])
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
select {
|
|
case err := <-errCh:
|
|
b.Fatal(err.Error())
|
|
default:
|
|
}
|
|
}
|