mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
* [added] leaf deny exports/imports to varz monitoring Signed-off-by: Matthias Hanel <mh@synadia.com>
4130 lines
121 KiB
Go
4130 lines
121 KiB
Go
// Copyright 2013-2020 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package server
|
|
|
|
import (
|
|
"bytes"
|
|
"crypto/tls"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"math/rand"
|
|
"net"
|
|
"net/http"
|
|
"net/url"
|
|
"reflect"
|
|
"runtime"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
"unicode"
|
|
|
|
"github.com/nats-io/jwt/v2"
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/nats-io/nkeys"
|
|
)
|
|
|
|
const CLIENT_PORT = -1
|
|
const MONITOR_PORT = -1
|
|
const CLUSTER_PORT = -1
|
|
|
|
func DefaultMonitorOptions() *Options {
|
|
return &Options{
|
|
Host: "127.0.0.1",
|
|
Port: CLIENT_PORT,
|
|
HTTPHost: "127.0.0.1",
|
|
HTTPPort: MONITOR_PORT,
|
|
HTTPBasePath: "/",
|
|
ServerName: "monitor_server",
|
|
NoLog: true,
|
|
NoSigs: true,
|
|
Tags: []string{"tag"},
|
|
}
|
|
}
|
|
|
|
func runMonitorServer() *Server {
|
|
resetPreviousHTTPConnections()
|
|
opts := DefaultMonitorOptions()
|
|
opts.NoSystemAccount = true
|
|
return RunServer(opts)
|
|
}
|
|
|
|
func runMonitorServerWithAccounts() *Server {
|
|
resetPreviousHTTPConnections()
|
|
opts := DefaultMonitorOptions()
|
|
opts.NoSystemAccount = true
|
|
aA := NewAccount("A")
|
|
aB := NewAccount("B")
|
|
opts.Accounts = append(opts.Accounts, aA, aB)
|
|
opts.Users = append(opts.Users,
|
|
&User{Username: "a", Password: "a", Account: aA},
|
|
&User{Username: "b", Password: "b", Account: aB})
|
|
return RunServer(opts)
|
|
}
|
|
|
|
func runMonitorServerNoHTTPPort() *Server {
|
|
resetPreviousHTTPConnections()
|
|
opts := DefaultMonitorOptions()
|
|
opts.NoSystemAccount = true
|
|
opts.HTTPPort = 0
|
|
return RunServer(opts)
|
|
}
|
|
|
|
func resetPreviousHTTPConnections() {
|
|
http.DefaultTransport.(*http.Transport).CloseIdleConnections()
|
|
}
|
|
|
|
func TestMyUptime(t *testing.T) {
|
|
// Make sure we print this stuff right.
|
|
var d time.Duration
|
|
var s string
|
|
|
|
d = 22 * time.Second
|
|
s = myUptime(d)
|
|
if s != "22s" {
|
|
t.Fatalf("Expected `22s`, go ``%s`", s)
|
|
}
|
|
d = 4*time.Minute + d
|
|
s = myUptime(d)
|
|
if s != "4m22s" {
|
|
t.Fatalf("Expected `4m22s`, go ``%s`", s)
|
|
}
|
|
d = 4*time.Hour + d
|
|
s = myUptime(d)
|
|
if s != "4h4m22s" {
|
|
t.Fatalf("Expected `4h4m22s`, go ``%s`", s)
|
|
}
|
|
d = 32*24*time.Hour + d
|
|
s = myUptime(d)
|
|
if s != "32d4h4m22s" {
|
|
t.Fatalf("Expected `32d4h4m22s`, go ``%s`", s)
|
|
}
|
|
d = 22*365*24*time.Hour + d
|
|
s = myUptime(d)
|
|
if s != "22y32d4h4m22s" {
|
|
t.Fatalf("Expected `22y32d4h4m22s`, go ``%s`", s)
|
|
}
|
|
}
|
|
|
|
// Make sure that we do not run the http server for monitoring unless asked.
|
|
func TestNoMonitorPort(t *testing.T) {
|
|
s := runMonitorServerNoHTTPPort()
|
|
defer s.Shutdown()
|
|
|
|
// this test might be meaningless now that we're testing with random ports?
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", 11245)
|
|
if resp, err := http.Get(url + "varz"); err == nil {
|
|
t.Fatalf("Expected error: Got %+v\n", resp)
|
|
}
|
|
if resp, err := http.Get(url + "healthz"); err == nil {
|
|
t.Fatalf("Expected error: Got %+v\n", resp)
|
|
}
|
|
if resp, err := http.Get(url + "connz"); err == nil {
|
|
t.Fatalf("Expected error: Got %+v\n", resp)
|
|
}
|
|
}
|
|
|
|
var (
|
|
appJSONContent = "application/json"
|
|
appJSContent = "application/javascript"
|
|
textPlain = "text/plain; charset=utf-8"
|
|
textHTML = "text/html; charset=utf-8"
|
|
)
|
|
|
|
func readBodyEx(t *testing.T, url string, status int, content string) []byte {
|
|
resp, err := http.Get(url)
|
|
if err != nil {
|
|
stackFatalf(t, "Expected no error: Got %v\n", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != status {
|
|
stackFatalf(t, "Expected a %d response, got %d\n", status, resp.StatusCode)
|
|
}
|
|
ct := resp.Header.Get("Content-Type")
|
|
if ct != content {
|
|
stackFatalf(t, "Expected %s content-type, got %s\n", content, ct)
|
|
}
|
|
body, err := ioutil.ReadAll(resp.Body)
|
|
if err != nil {
|
|
stackFatalf(t, "Got an error reading the body: %v\n", err)
|
|
}
|
|
return body
|
|
}
|
|
|
|
func TestHTTPBasePath(t *testing.T) {
|
|
resetPreviousHTTPConnections()
|
|
opts := DefaultMonitorOptions()
|
|
opts.NoSystemAccount = true
|
|
opts.HTTPBasePath = "/nats"
|
|
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/nats", s.MonitorAddr().Port)
|
|
readBodyEx(t, url, http.StatusOK, textHTML)
|
|
}
|
|
|
|
func readBody(t *testing.T, url string) []byte {
|
|
return readBodyEx(t, url, http.StatusOK, appJSONContent)
|
|
}
|
|
|
|
func pollVarz(t *testing.T, s *Server, mode int, url string, opts *VarzOptions) *Varz {
|
|
t.Helper()
|
|
if mode == 0 {
|
|
v := &Varz{}
|
|
body := readBody(t, url)
|
|
if err := json.Unmarshal(body, v); err != nil {
|
|
t.Fatalf("Got an error unmarshalling the body: %v\n", err)
|
|
}
|
|
return v
|
|
}
|
|
v, err := s.Varz(opts)
|
|
if err != nil {
|
|
t.Fatalf("Error on Varz: %v", err)
|
|
}
|
|
return v
|
|
}
|
|
|
|
func TestHandleVarz(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
|
|
for mode := 0; mode < 2; mode++ {
|
|
v := pollVarz(t, s, mode, url+"varz", nil)
|
|
|
|
// Do some sanity checks on values
|
|
if time.Since(v.Start) > 10*time.Second {
|
|
t.Fatal("Expected start time to be within 10 seconds.")
|
|
}
|
|
}
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
nc := createClientConnSubscribeAndPublish(t, s)
|
|
defer nc.Close()
|
|
|
|
for mode := 0; mode < 2; mode++ {
|
|
v := pollVarz(t, s, mode, url+"varz", nil)
|
|
|
|
if v.Connections != 1 {
|
|
t.Fatalf("Expected Connections of 1, got %v\n", v.Connections)
|
|
}
|
|
if v.TotalConnections < 1 {
|
|
t.Fatalf("Expected Total Connections of at least 1, got %v\n", v.TotalConnections)
|
|
}
|
|
if v.InMsgs != 1 {
|
|
t.Fatalf("Expected InMsgs of 1, got %v\n", v.InMsgs)
|
|
}
|
|
if v.OutMsgs != 1 {
|
|
t.Fatalf("Expected OutMsgs of 1, got %v\n", v.OutMsgs)
|
|
}
|
|
if v.InBytes != 5 {
|
|
t.Fatalf("Expected InBytes of 5, got %v\n", v.InBytes)
|
|
}
|
|
if v.OutBytes != 5 {
|
|
t.Fatalf("Expected OutBytes of 5, got %v\n", v.OutBytes)
|
|
}
|
|
if v.Subscriptions != 0 {
|
|
t.Fatalf("Expected Subscriptions of 0, got %v\n", v.Subscriptions)
|
|
}
|
|
if v.Name != "monitor_server" {
|
|
t.Fatal("Expected ServerName to be 'monitor_server'")
|
|
}
|
|
if !v.Tags.Contains("tag") {
|
|
t.Fatal("Expected tags to be 'tag'")
|
|
}
|
|
}
|
|
|
|
// Test JSONP
|
|
readBodyEx(t, url+"varz?callback=callback", http.StatusOK, appJSContent)
|
|
}
|
|
|
|
func pollConz(t *testing.T, s *Server, mode int, url string, opts *ConnzOptions) *Connz {
|
|
t.Helper()
|
|
if mode == 0 {
|
|
body := readBody(t, url)
|
|
c := &Connz{}
|
|
if err := json.Unmarshal(body, &c); err != nil {
|
|
t.Fatalf("Got an error unmarshalling the body: %v\n", err)
|
|
}
|
|
return c
|
|
}
|
|
c, err := s.Connz(opts)
|
|
if err != nil {
|
|
t.Fatalf("Error on Connz(): %v", err)
|
|
}
|
|
return c
|
|
}
|
|
|
|
func TestConnz(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
|
|
testConnz := func(mode int) {
|
|
c := pollConz(t, s, mode, url+"connz", nil)
|
|
|
|
// Test contents..
|
|
if c.NumConns != 0 {
|
|
t.Fatalf("Expected 0 connections, got %d\n", c.NumConns)
|
|
}
|
|
if c.Total != 0 {
|
|
t.Fatalf("Expected 0 live connections, got %d\n", c.Total)
|
|
}
|
|
if c.Conns == nil || len(c.Conns) != 0 {
|
|
t.Fatalf("Expected 0 connections in array, got %p\n", c.Conns)
|
|
}
|
|
|
|
// Test with connections.
|
|
nc := createClientConnSubscribeAndPublish(t, s)
|
|
defer nc.Close()
|
|
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
c = pollConz(t, s, mode, url+"connz", nil)
|
|
|
|
if c.NumConns != 1 {
|
|
t.Fatalf("Expected 1 connection, got %d\n", c.NumConns)
|
|
}
|
|
if c.Total != 1 {
|
|
t.Fatalf("Expected 1 live connection, got %d\n", c.Total)
|
|
}
|
|
if c.Conns == nil || len(c.Conns) != 1 {
|
|
t.Fatalf("Expected 1 connection in array, got %d\n", len(c.Conns))
|
|
}
|
|
|
|
if c.Limit != DefaultConnListSize {
|
|
t.Fatalf("Expected limit of %d, got %v\n", DefaultConnListSize, c.Limit)
|
|
}
|
|
|
|
if c.Offset != 0 {
|
|
t.Fatalf("Expected offset of 0, got %v\n", c.Offset)
|
|
}
|
|
|
|
// Test inside details of each connection
|
|
ci := c.Conns[0]
|
|
|
|
if ci.Cid == 0 {
|
|
t.Fatalf("Expected non-zero cid, got %v\n", ci.Cid)
|
|
}
|
|
if ci.IP != "127.0.0.1" {
|
|
t.Fatalf("Expected \"127.0.0.1\" for IP, got %v\n", ci.IP)
|
|
}
|
|
if ci.Port == 0 {
|
|
t.Fatalf("Expected non-zero port, got %v\n", ci.Port)
|
|
}
|
|
if ci.NumSubs != 0 {
|
|
t.Fatalf("Expected num_subs of 0, got %v\n", ci.NumSubs)
|
|
}
|
|
if len(ci.Subs) != 0 {
|
|
t.Fatalf("Expected subs of 0, got %v\n", ci.Subs)
|
|
}
|
|
if len(ci.SubsDetail) != 0 {
|
|
t.Fatalf("Expected subsdetail of 0, got %v\n", ci.SubsDetail)
|
|
}
|
|
if ci.InMsgs != 1 {
|
|
t.Fatalf("Expected InMsgs of 1, got %v\n", ci.InMsgs)
|
|
}
|
|
if ci.OutMsgs != 1 {
|
|
t.Fatalf("Expected OutMsgs of 1, got %v\n", ci.OutMsgs)
|
|
}
|
|
if ci.InBytes != 5 {
|
|
t.Fatalf("Expected InBytes of 1, got %v\n", ci.InBytes)
|
|
}
|
|
if ci.OutBytes != 5 {
|
|
t.Fatalf("Expected OutBytes of 1, got %v\n", ci.OutBytes)
|
|
}
|
|
if ci.Start.IsZero() {
|
|
t.Fatal("Expected Start to be valid\n")
|
|
}
|
|
if ci.Uptime == "" {
|
|
t.Fatal("Expected Uptime to be valid\n")
|
|
}
|
|
if ci.LastActivity.IsZero() {
|
|
t.Fatal("Expected LastActivity to be valid\n")
|
|
}
|
|
if ci.LastActivity.UnixNano() < ci.Start.UnixNano() {
|
|
t.Fatalf("Expected LastActivity [%v] to be > Start [%v]\n", ci.LastActivity, ci.Start)
|
|
}
|
|
if ci.Idle == "" {
|
|
t.Fatal("Expected Idle to be valid\n")
|
|
}
|
|
// This is a change, we now expect them to be set for connections when the
|
|
// client sends a connect.
|
|
if ci.RTT == "" {
|
|
t.Fatal("Expected RTT to be set for new connection\n")
|
|
}
|
|
}
|
|
|
|
for mode := 0; mode < 2; mode++ {
|
|
testConnz(mode)
|
|
checkClientsCount(t, s, 0)
|
|
}
|
|
|
|
// Test JSONP
|
|
readBodyEx(t, url+"connz?callback=callback", http.StatusOK, appJSContent)
|
|
}
|
|
|
|
func TestConnzBadParams(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/connz?", s.MonitorAddr().Port)
|
|
readBodyEx(t, url+"auth=xxx", http.StatusBadRequest, textPlain)
|
|
readBodyEx(t, url+"subs=xxx", http.StatusBadRequest, textPlain)
|
|
readBodyEx(t, url+"offset=xxx", http.StatusBadRequest, textPlain)
|
|
readBodyEx(t, url+"limit=xxx", http.StatusBadRequest, textPlain)
|
|
readBodyEx(t, url+"state=xxx", http.StatusBadRequest, textPlain)
|
|
}
|
|
|
|
func TestConnzWithSubs(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := createClientConnSubscribeAndPublish(t, s)
|
|
defer nc.Close()
|
|
|
|
nc.Subscribe("hello.foo", func(m *nats.Msg) {})
|
|
ensureServerActivityRecorded(t, nc)
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
for mode := 0; mode < 2; mode++ {
|
|
c := pollConz(t, s, mode, url+"connz?subs=1", &ConnzOptions{Subscriptions: true})
|
|
// Test inside details of each connection
|
|
ci := c.Conns[0]
|
|
if len(ci.Subs) != 1 || ci.Subs[0] != "hello.foo" {
|
|
t.Fatalf("Expected subs of 1, got %v\n", ci.Subs)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestConnzWithSubsDetail(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := createClientConnSubscribeAndPublish(t, s)
|
|
defer nc.Close()
|
|
|
|
nc.Subscribe("hello.foo", func(m *nats.Msg) {})
|
|
ensureServerActivityRecorded(t, nc)
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
for mode := 0; mode < 2; mode++ {
|
|
c := pollConz(t, s, mode, url+"connz?subs=detail", &ConnzOptions{SubscriptionsDetail: true})
|
|
// Test inside details of each connection
|
|
ci := c.Conns[0]
|
|
if len(ci.SubsDetail) != 1 || ci.SubsDetail[0].Subject != "hello.foo" {
|
|
t.Fatalf("Expected subsdetail of 1, got %v\n", ci.Subs)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestClosedConnzWithSubsDetail(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := createClientConnSubscribeAndPublish(t, s)
|
|
|
|
nc.Subscribe("hello.foo", func(m *nats.Msg) {})
|
|
ensureServerActivityRecorded(t, nc)
|
|
nc.Close()
|
|
|
|
s.mu.Lock()
|
|
for len(s.clients) != 0 {
|
|
s.mu.Unlock()
|
|
<-time.After(100 * time.Millisecond)
|
|
s.mu.Lock()
|
|
}
|
|
s.mu.Unlock()
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
for mode := 0; mode < 2; mode++ {
|
|
c := pollConz(t, s, mode, url+"connz?state=closed&subs=detail", &ConnzOptions{State: ConnClosed,
|
|
SubscriptionsDetail: true})
|
|
// Test inside details of each connection
|
|
ci := c.Conns[0]
|
|
if len(ci.SubsDetail) != 1 || ci.SubsDetail[0].Subject != "hello.foo" {
|
|
t.Fatalf("Expected subsdetail of 1, got %v\n", ci.Subs)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestConnzWithCID(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
// The one we will request
|
|
cid := 5
|
|
total := 10
|
|
|
|
// Create 10
|
|
for i := 1; i <= total; i++ {
|
|
nc := createClientConnSubscribeAndPublish(t, s)
|
|
defer nc.Close()
|
|
if i == cid {
|
|
nc.Subscribe("hello.foo", func(m *nats.Msg) {})
|
|
nc.Subscribe("hello.bar", func(m *nats.Msg) {})
|
|
ensureServerActivityRecorded(t, nc)
|
|
}
|
|
}
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/connz?cid=%d", s.MonitorAddr().Port, cid)
|
|
for mode := 0; mode < 2; mode++ {
|
|
c := pollConz(t, s, mode, url, &ConnzOptions{CID: uint64(cid)})
|
|
// Test inside details of each connection
|
|
if len(c.Conns) != 1 {
|
|
t.Fatalf("Expected only one connection, but got %d\n", len(c.Conns))
|
|
}
|
|
if c.NumConns != 1 {
|
|
t.Fatalf("Expected NumConns to be 1, but got %d\n", c.NumConns)
|
|
}
|
|
ci := c.Conns[0]
|
|
if ci.Cid != uint64(cid) {
|
|
t.Fatalf("Expected to receive connection %v, but received %v\n", cid, ci.Cid)
|
|
}
|
|
if ci.NumSubs != 2 {
|
|
t.Fatalf("Expected to receive connection with %d subs, but received %d\n", 2, ci.NumSubs)
|
|
}
|
|
// Now test a miss
|
|
badUrl := fmt.Sprintf("http://127.0.0.1:%d/connz?cid=%d", s.MonitorAddr().Port, 100)
|
|
c = pollConz(t, s, mode, badUrl, &ConnzOptions{CID: uint64(100)})
|
|
if len(c.Conns) != 0 {
|
|
t.Fatalf("Expected no connections, got %d\n", len(c.Conns))
|
|
}
|
|
if c.NumConns != 0 {
|
|
t.Fatalf("Expected NumConns of 0, got %d\n", c.NumConns)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Helper to map to connection name
|
|
func createConnMap(t *testing.T, cz *Connz) map[string]*ConnInfo {
|
|
cm := make(map[string]*ConnInfo)
|
|
for _, c := range cz.Conns {
|
|
cm[c.Name] = c
|
|
}
|
|
return cm
|
|
}
|
|
|
|
func getFooAndBar(t *testing.T, cm map[string]*ConnInfo) (*ConnInfo, *ConnInfo) {
|
|
return cm["foo"], cm["bar"]
|
|
}
|
|
|
|
func ensureServerActivityRecorded(t *testing.T, nc *nats.Conn) {
|
|
nc.Flush()
|
|
err := nc.Flush()
|
|
if err != nil {
|
|
t.Fatalf("Error flushing: %v\n", err)
|
|
}
|
|
}
|
|
|
|
func TestConnzRTT(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
|
|
testRTT := func(mode int) {
|
|
// Test with connections.
|
|
nc := createClientConnSubscribeAndPublish(t, s)
|
|
defer nc.Close()
|
|
|
|
c := pollConz(t, s, mode, url+"connz", nil)
|
|
|
|
if c.NumConns != 1 {
|
|
t.Fatalf("Expected 1 connection, got %d\n", c.NumConns)
|
|
}
|
|
|
|
// Send a server side PING to record RTT
|
|
s.mu.Lock()
|
|
ci := c.Conns[0]
|
|
sc := s.clients[ci.Cid]
|
|
if sc == nil {
|
|
t.Fatalf("Error looking up client %v\n", ci.Cid)
|
|
}
|
|
s.mu.Unlock()
|
|
sc.mu.Lock()
|
|
sc.sendPing()
|
|
sc.mu.Unlock()
|
|
|
|
// Wait for client to respond with PONG
|
|
time.Sleep(20 * time.Millisecond)
|
|
|
|
// Repoll for updated information.
|
|
c = pollConz(t, s, mode, url+"connz", nil)
|
|
ci = c.Conns[0]
|
|
|
|
rtt, err := time.ParseDuration(ci.RTT)
|
|
if err != nil {
|
|
t.Fatalf("Could not parse RTT properly, %v (ci.RTT=%v)", err, ci.RTT)
|
|
}
|
|
if rtt <= 0 {
|
|
t.Fatal("Expected RTT to be valid and non-zero\n")
|
|
}
|
|
if (runtime.GOOS == "windows" && rtt > 20*time.Millisecond) ||
|
|
rtt > 20*time.Millisecond || rtt < 100*time.Nanosecond {
|
|
t.Fatalf("Invalid RTT of %s\n", ci.RTT)
|
|
}
|
|
}
|
|
|
|
for mode := 0; mode < 2; mode++ {
|
|
testRTT(mode)
|
|
checkClientsCount(t, s, 0)
|
|
}
|
|
}
|
|
|
|
func TestConnzLastActivity(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
url += "connz?subs=1"
|
|
opts := &ConnzOptions{Subscriptions: true}
|
|
|
|
var sleepTime time.Duration
|
|
if runtime.GOOS == "windows" {
|
|
sleepTime = 10 * time.Millisecond
|
|
}
|
|
|
|
testActivity := func(mode int) {
|
|
ncFoo := createClientConnWithName(t, "foo", s)
|
|
defer ncFoo.Close()
|
|
|
|
ncBar := createClientConnWithName(t, "bar", s)
|
|
defer ncBar.Close()
|
|
|
|
// Test inside details of each connection
|
|
ciFoo, ciBar := getFooAndBar(t, createConnMap(t, pollConz(t, s, mode, url, opts)))
|
|
|
|
// Test that LastActivity is non-zero
|
|
if ciFoo.LastActivity.IsZero() {
|
|
t.Fatalf("Expected LastActivity for connection '%s'to be valid\n", ciFoo.Name)
|
|
}
|
|
if ciBar.LastActivity.IsZero() {
|
|
t.Fatalf("Expected LastActivity for connection '%s'to be valid\n", ciBar.Name)
|
|
}
|
|
// Foo should be older than Bar
|
|
if ciFoo.LastActivity.After(ciBar.LastActivity) {
|
|
t.Fatal("Expected connection 'foo' to be older than 'bar'\n")
|
|
}
|
|
|
|
fooLA := ciFoo.LastActivity
|
|
barLA := ciBar.LastActivity
|
|
|
|
ensureServerActivityRecorded(t, ncFoo)
|
|
ensureServerActivityRecorded(t, ncBar)
|
|
|
|
time.Sleep(sleepTime)
|
|
|
|
// Sub should trigger update.
|
|
sub, _ := ncFoo.Subscribe("hello.world", func(m *nats.Msg) {})
|
|
ensureServerActivityRecorded(t, ncFoo)
|
|
|
|
ciFoo, _ = getFooAndBar(t, createConnMap(t, pollConz(t, s, mode, url, opts)))
|
|
nextLA := ciFoo.LastActivity
|
|
if fooLA.Equal(nextLA) {
|
|
t.Fatalf("Subscribe should have triggered update to LastActivity %+v\n", ciFoo)
|
|
}
|
|
fooLA = nextLA
|
|
|
|
time.Sleep(sleepTime)
|
|
|
|
// Publish and Message Delivery should trigger as well. So both connections
|
|
// should have updates.
|
|
ncBar.Publish("hello.world", []byte("Hello"))
|
|
|
|
ensureServerActivityRecorded(t, ncFoo)
|
|
ensureServerActivityRecorded(t, ncBar)
|
|
|
|
ciFoo, ciBar = getFooAndBar(t, createConnMap(t, pollConz(t, s, mode, url, opts)))
|
|
nextLA = ciBar.LastActivity
|
|
if barLA.Equal(nextLA) {
|
|
t.Fatalf("Publish should have triggered update to LastActivity\n")
|
|
}
|
|
barLA = nextLA
|
|
|
|
// Message delivery on ncFoo should have triggered as well.
|
|
nextLA = ciFoo.LastActivity
|
|
if fooLA.Equal(nextLA) {
|
|
t.Fatalf("Message delivery should have triggered update to LastActivity\n")
|
|
}
|
|
fooLA = nextLA
|
|
|
|
time.Sleep(sleepTime)
|
|
|
|
// Unsub should trigger as well
|
|
sub.Unsubscribe()
|
|
ensureServerActivityRecorded(t, ncFoo)
|
|
|
|
ciFoo, _ = getFooAndBar(t, createConnMap(t, pollConz(t, s, mode, url, opts)))
|
|
nextLA = ciFoo.LastActivity
|
|
if fooLA.Equal(nextLA) {
|
|
t.Fatalf("Message delivery should have triggered update to LastActivity\n")
|
|
}
|
|
}
|
|
|
|
for mode := 0; mode < 2; mode++ {
|
|
testActivity(mode)
|
|
}
|
|
}
|
|
|
|
func TestConnzWithOffsetAndLimit(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
|
|
for mode := 0; mode < 2; mode++ {
|
|
c := pollConz(t, s, mode, url+"connz?offset=1&limit=1", &ConnzOptions{Offset: 1, Limit: 1})
|
|
if c.Conns == nil || len(c.Conns) != 0 {
|
|
t.Fatalf("Expected 0 connections in array, got %p\n", c.Conns)
|
|
}
|
|
|
|
// Test that when given negative values, 0 or default is used
|
|
c = pollConz(t, s, mode, url+"connz?offset=-1&limit=-1", &ConnzOptions{Offset: -11, Limit: -11})
|
|
if c.Conns == nil || len(c.Conns) != 0 {
|
|
t.Fatalf("Expected 0 connections in array, got %p\n", c.Conns)
|
|
}
|
|
if c.Offset != 0 {
|
|
t.Fatalf("Expected offset to be 0, and limit to be %v, got %v and %v",
|
|
DefaultConnListSize, c.Offset, c.Limit)
|
|
}
|
|
}
|
|
|
|
cl1 := createClientConnSubscribeAndPublish(t, s)
|
|
defer cl1.Close()
|
|
|
|
cl2 := createClientConnSubscribeAndPublish(t, s)
|
|
defer cl2.Close()
|
|
|
|
for mode := 0; mode < 2; mode++ {
|
|
c := pollConz(t, s, mode, url+"connz?offset=1&limit=1", &ConnzOptions{Offset: 1, Limit: 1})
|
|
if c.Limit != 1 {
|
|
t.Fatalf("Expected limit of 1, got %v\n", c.Limit)
|
|
}
|
|
|
|
if c.Offset != 1 {
|
|
t.Fatalf("Expected offset of 1, got %v\n", c.Offset)
|
|
}
|
|
|
|
if len(c.Conns) != 1 {
|
|
t.Fatalf("Expected conns of 1, got %v\n", len(c.Conns))
|
|
}
|
|
|
|
if c.NumConns != 1 {
|
|
t.Fatalf("Expected NumConns to be 1, got %v\n", c.NumConns)
|
|
}
|
|
|
|
if c.Total != 2 {
|
|
t.Fatalf("Expected Total to be at least 2, got %v", c.Total)
|
|
}
|
|
|
|
c = pollConz(t, s, mode, url+"connz?offset=2&limit=1", &ConnzOptions{Offset: 2, Limit: 1})
|
|
if c.Limit != 1 {
|
|
t.Fatalf("Expected limit of 1, got %v\n", c.Limit)
|
|
}
|
|
|
|
if c.Offset != 2 {
|
|
t.Fatalf("Expected offset of 2, got %v\n", c.Offset)
|
|
}
|
|
|
|
if len(c.Conns) != 0 {
|
|
t.Fatalf("Expected conns of 0, got %v\n", len(c.Conns))
|
|
}
|
|
|
|
if c.NumConns != 0 {
|
|
t.Fatalf("Expected NumConns to be 0, got %v\n", c.NumConns)
|
|
}
|
|
|
|
if c.Total != 2 {
|
|
t.Fatalf("Expected Total to be 2, got %v", c.Total)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestConnzDefaultSorted(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
clients := make([]*nats.Conn, 4)
|
|
for i := range clients {
|
|
clients[i] = createClientConnSubscribeAndPublish(t, s)
|
|
defer clients[i].Close()
|
|
}
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
for mode := 0; mode < 2; mode++ {
|
|
c := pollConz(t, s, mode, url+"connz", nil)
|
|
if c.Conns[0].Cid > c.Conns[1].Cid ||
|
|
c.Conns[1].Cid > c.Conns[2].Cid ||
|
|
c.Conns[2].Cid > c.Conns[3].Cid {
|
|
t.Fatalf("Expected conns sorted in ascending order by cid, got %v < %v\n", c.Conns[0].Cid, c.Conns[3].Cid)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestConnzSortedByCid(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
clients := make([]*nats.Conn, 4)
|
|
for i := range clients {
|
|
clients[i] = createClientConnSubscribeAndPublish(t, s)
|
|
defer clients[i].Close()
|
|
}
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
for mode := 0; mode < 2; mode++ {
|
|
c := pollConz(t, s, mode, url+"connz?sort=cid", &ConnzOptions{Sort: ByCid})
|
|
if c.Conns[0].Cid > c.Conns[1].Cid ||
|
|
c.Conns[1].Cid > c.Conns[2].Cid ||
|
|
c.Conns[2].Cid > c.Conns[3].Cid {
|
|
t.Fatalf("Expected conns sorted in ascending order by cid, got [%v, %v, %v, %v]\n",
|
|
c.Conns[0].Cid, c.Conns[1].Cid, c.Conns[2].Cid, c.Conns[3].Cid)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestConnzSortedByStart(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
clients := make([]*nats.Conn, 4)
|
|
for i := range clients {
|
|
clients[i] = createClientConnSubscribeAndPublish(t, s)
|
|
defer clients[i].Close()
|
|
}
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
for mode := 0; mode < 2; mode++ {
|
|
c := pollConz(t, s, mode, url+"connz?sort=start", &ConnzOptions{Sort: ByStart})
|
|
if c.Conns[0].Start.After(c.Conns[1].Start) ||
|
|
c.Conns[1].Start.After(c.Conns[2].Start) ||
|
|
c.Conns[2].Start.After(c.Conns[3].Start) {
|
|
t.Fatalf("Expected conns sorted in ascending order by startime, got [%v, %v, %v, %v]\n",
|
|
c.Conns[0].Start, c.Conns[1].Start, c.Conns[2].Start, c.Conns[3].Start)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestConnzSortedByBytesAndMsgs(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
// Create a connection and make it send more messages than others
|
|
firstClient := createClientConnSubscribeAndPublish(t, s)
|
|
for i := 0; i < 100; i++ {
|
|
firstClient.Publish("foo", []byte("Hello World"))
|
|
}
|
|
defer firstClient.Close()
|
|
firstClient.Flush()
|
|
|
|
clients := make([]*nats.Conn, 3)
|
|
for i := range clients {
|
|
clients[i] = createClientConnSubscribeAndPublish(t, s)
|
|
defer clients[i].Close()
|
|
}
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
for mode := 0; mode < 2; mode++ {
|
|
c := pollConz(t, s, mode, url+"connz?sort=bytes_to", &ConnzOptions{Sort: ByOutBytes})
|
|
if c.Conns[0].OutBytes < c.Conns[1].OutBytes ||
|
|
c.Conns[0].OutBytes < c.Conns[2].OutBytes ||
|
|
c.Conns[0].OutBytes < c.Conns[3].OutBytes {
|
|
t.Fatalf("Expected conns sorted in descending order by bytes to, got %v < one of [%v, %v, %v]\n",
|
|
c.Conns[0].OutBytes, c.Conns[1].OutBytes, c.Conns[2].OutBytes, c.Conns[3].OutBytes)
|
|
}
|
|
|
|
c = pollConz(t, s, mode, url+"connz?sort=msgs_to", &ConnzOptions{Sort: ByOutMsgs})
|
|
if c.Conns[0].OutMsgs < c.Conns[1].OutMsgs ||
|
|
c.Conns[0].OutMsgs < c.Conns[2].OutMsgs ||
|
|
c.Conns[0].OutMsgs < c.Conns[3].OutMsgs {
|
|
t.Fatalf("Expected conns sorted in descending order by msgs from, got %v < one of [%v, %v, %v]\n",
|
|
c.Conns[0].OutMsgs, c.Conns[1].OutMsgs, c.Conns[2].OutMsgs, c.Conns[3].OutMsgs)
|
|
}
|
|
|
|
c = pollConz(t, s, mode, url+"connz?sort=bytes_from", &ConnzOptions{Sort: ByInBytes})
|
|
if c.Conns[0].InBytes < c.Conns[1].InBytes ||
|
|
c.Conns[0].InBytes < c.Conns[2].InBytes ||
|
|
c.Conns[0].InBytes < c.Conns[3].InBytes {
|
|
t.Fatalf("Expected conns sorted in descending order by bytes from, got %v < one of [%v, %v, %v]\n",
|
|
c.Conns[0].InBytes, c.Conns[1].InBytes, c.Conns[2].InBytes, c.Conns[3].InBytes)
|
|
}
|
|
|
|
c = pollConz(t, s, mode, url+"connz?sort=msgs_from", &ConnzOptions{Sort: ByInMsgs})
|
|
if c.Conns[0].InMsgs < c.Conns[1].InMsgs ||
|
|
c.Conns[0].InMsgs < c.Conns[2].InMsgs ||
|
|
c.Conns[0].InMsgs < c.Conns[3].InMsgs {
|
|
t.Fatalf("Expected conns sorted in descending order by msgs from, got %v < one of [%v, %v, %v]\n",
|
|
c.Conns[0].InMsgs, c.Conns[1].InMsgs, c.Conns[2].InMsgs, c.Conns[3].InMsgs)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestConnzSortedByPending(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
firstClient := createClientConnSubscribeAndPublish(t, s)
|
|
firstClient.Subscribe("hello.world", func(m *nats.Msg) {})
|
|
clients := make([]*nats.Conn, 3)
|
|
for i := range clients {
|
|
clients[i] = createClientConnSubscribeAndPublish(t, s)
|
|
defer clients[i].Close()
|
|
}
|
|
defer firstClient.Close()
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
for mode := 0; mode < 2; mode++ {
|
|
c := pollConz(t, s, mode, url+"connz?sort=pending", &ConnzOptions{Sort: ByPending})
|
|
if c.Conns[0].Pending < c.Conns[1].Pending ||
|
|
c.Conns[0].Pending < c.Conns[2].Pending ||
|
|
c.Conns[0].Pending < c.Conns[3].Pending {
|
|
t.Fatalf("Expected conns sorted in descending order by number of pending, got %v < one of [%v, %v, %v]\n",
|
|
c.Conns[0].Pending, c.Conns[1].Pending, c.Conns[2].Pending, c.Conns[3].Pending)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestConnzSortedBySubs(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
firstClient := createClientConnSubscribeAndPublish(t, s)
|
|
firstClient.Subscribe("hello.world", func(m *nats.Msg) {})
|
|
defer firstClient.Close()
|
|
|
|
clients := make([]*nats.Conn, 3)
|
|
for i := range clients {
|
|
clients[i] = createClientConnSubscribeAndPublish(t, s)
|
|
defer clients[i].Close()
|
|
}
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
for mode := 0; mode < 2; mode++ {
|
|
c := pollConz(t, s, mode, url+"connz?sort=subs", &ConnzOptions{Sort: BySubs})
|
|
if c.Conns[0].NumSubs < c.Conns[1].NumSubs ||
|
|
c.Conns[0].NumSubs < c.Conns[2].NumSubs ||
|
|
c.Conns[0].NumSubs < c.Conns[3].NumSubs {
|
|
t.Fatalf("Expected conns sorted in descending order by number of subs, got %v < one of [%v, %v, %v]\n",
|
|
c.Conns[0].NumSubs, c.Conns[1].NumSubs, c.Conns[2].NumSubs, c.Conns[3].NumSubs)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestConnzSortedByLast(t *testing.T) {
|
|
resetPreviousHTTPConnections()
|
|
opts := DefaultMonitorOptions()
|
|
opts.NoSystemAccount = true
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
firstClient := createClientConnSubscribeAndPublish(t, s)
|
|
defer firstClient.Close()
|
|
firstClient.Subscribe("hello.world", func(m *nats.Msg) {})
|
|
firstClient.Flush()
|
|
|
|
clients := make([]*nats.Conn, 3)
|
|
for i := range clients {
|
|
clients[i] = createClientConnSubscribeAndPublish(t, s)
|
|
defer clients[i].Close()
|
|
clients[i].Flush()
|
|
}
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
for mode := 0; mode < 2; mode++ {
|
|
c := pollConz(t, s, mode, url+"connz?sort=last", &ConnzOptions{Sort: ByLast})
|
|
if c.Conns[0].LastActivity.UnixNano() < c.Conns[1].LastActivity.UnixNano() ||
|
|
c.Conns[1].LastActivity.UnixNano() < c.Conns[2].LastActivity.UnixNano() ||
|
|
c.Conns[2].LastActivity.UnixNano() < c.Conns[3].LastActivity.UnixNano() {
|
|
t.Fatalf("Expected conns sorted in descending order by lastActivity, got %v < one of [%v, %v, %v]\n",
|
|
c.Conns[0].LastActivity, c.Conns[1].LastActivity, c.Conns[2].LastActivity, c.Conns[3].LastActivity)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestConnzSortedByUptime(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
for i := 0; i < 4; i++ {
|
|
client := createClientConnSubscribeAndPublish(t, s)
|
|
defer client.Close()
|
|
// Since we check times (now-start) does not have to be big.
|
|
time.Sleep(50 * time.Millisecond)
|
|
}
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
for mode := 0; mode < 2; mode++ {
|
|
c := pollConz(t, s, mode, url+"connz?sort=uptime", &ConnzOptions{Sort: ByUptime})
|
|
now := time.Now()
|
|
ups := make([]int, 4)
|
|
for i := 0; i < 4; i++ {
|
|
ups[i] = int(now.Sub(c.Conns[i].Start))
|
|
}
|
|
if !sort.IntsAreSorted(ups) {
|
|
d := make([]time.Duration, 4)
|
|
for i := 0; i < 4; i++ {
|
|
d[i] = time.Duration(ups[i])
|
|
}
|
|
t.Fatalf("Expected conns sorted in ascending order by uptime (now-Start), got %+v\n", d)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestConnzSortedByUptimeClosedConn(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
for i := time.Duration(1); i <= 4; i++ {
|
|
c := createClientConnSubscribeAndPublish(t, s)
|
|
|
|
// Grab client and asjust start time such that
|
|
client := s.getClient(uint64(i))
|
|
if client == nil {
|
|
t.Fatalf("Could nopt retrieve client for %d\n", i)
|
|
}
|
|
client.mu.Lock()
|
|
client.start = client.start.Add(-10 * (4 - i) * time.Second)
|
|
client.mu.Unlock()
|
|
|
|
c.Close()
|
|
}
|
|
|
|
checkClosedConns(t, s, 4, time.Second)
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
for mode := 0; mode < 2; mode++ {
|
|
c := pollConz(t, s, mode, url+"connz?state=closed&sort=uptime", &ConnzOptions{State: ConnClosed, Sort: ByUptime})
|
|
ups := make([]int, 4)
|
|
for i := 0; i < 4; i++ {
|
|
ups[i] = int(c.Conns[i].Stop.Sub(c.Conns[i].Start))
|
|
}
|
|
if !sort.IntsAreSorted(ups) {
|
|
d := make([]time.Duration, 4)
|
|
for i := 0; i < 4; i++ {
|
|
d[i] = time.Duration(ups[i])
|
|
}
|
|
t.Fatalf("Expected conns sorted in ascending order by uptime, got %+v\n", d)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestConnzSortedByStopOnOpen(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
opts := s.getOpts()
|
|
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
|
|
|
// 4 clients
|
|
for i := 0; i < 4; i++ {
|
|
c, err := nats.Connect(url)
|
|
if err != nil {
|
|
t.Fatalf("Could not create client: %v\n", err)
|
|
}
|
|
defer c.Close()
|
|
}
|
|
|
|
c, err := s.Connz(&ConnzOptions{Sort: ByStop})
|
|
if err == nil {
|
|
t.Fatalf("Expected err to be non-nil, got %+v\n", c)
|
|
}
|
|
}
|
|
|
|
func TestConnzSortedByStopTimeClosedConn(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
opts := s.getOpts()
|
|
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
|
|
|
// 4 clients
|
|
for i := 0; i < 4; i++ {
|
|
c, err := nats.Connect(url)
|
|
if err != nil {
|
|
t.Fatalf("Could not create client: %v\n", err)
|
|
}
|
|
c.Close()
|
|
}
|
|
checkClosedConns(t, s, 4, time.Second)
|
|
|
|
//Now adjust the Stop times for these with some random values.
|
|
s.mu.Lock()
|
|
now := time.Now().UTC()
|
|
ccs := s.closed.closedClients()
|
|
for _, cc := range ccs {
|
|
newStop := now.Add(time.Duration(rand.Int()%120) * -time.Minute)
|
|
cc.Stop = &newStop
|
|
}
|
|
s.mu.Unlock()
|
|
|
|
url = fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
for mode := 0; mode < 2; mode++ {
|
|
c := pollConz(t, s, mode, url+"connz?state=closed&sort=stop", &ConnzOptions{State: ConnClosed, Sort: ByStop})
|
|
ups := make([]int, 4)
|
|
nowU := time.Now().UnixNano()
|
|
for i := 0; i < 4; i++ {
|
|
ups[i] = int(nowU - c.Conns[i].Stop.UnixNano())
|
|
}
|
|
if !sort.IntsAreSorted(ups) {
|
|
d := make([]time.Duration, 4)
|
|
for i := 0; i < 4; i++ {
|
|
d[i] = time.Duration(ups[i])
|
|
}
|
|
t.Fatalf("Expected conns sorted in ascending order by stop time, got %+v\n", d)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestConnzSortedByReason(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
opts := s.getOpts()
|
|
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
|
|
|
// 20 clients
|
|
for i := 0; i < 20; i++ {
|
|
c, err := nats.Connect(url)
|
|
if err != nil {
|
|
t.Fatalf("Could not create client: %v\n", err)
|
|
}
|
|
c.Close()
|
|
}
|
|
checkClosedConns(t, s, 20, time.Second)
|
|
|
|
//Now adjust the Reasons for these with some random values.
|
|
s.mu.Lock()
|
|
ccs := s.closed.closedClients()
|
|
max := int(ServerShutdown)
|
|
for _, cc := range ccs {
|
|
cc.Reason = ClosedState(rand.Int() % max).String()
|
|
}
|
|
s.mu.Unlock()
|
|
|
|
url = fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
for mode := 0; mode < 2; mode++ {
|
|
c := pollConz(t, s, mode, url+"connz?state=closed&sort=reason", &ConnzOptions{State: ConnClosed, Sort: ByReason})
|
|
rs := make([]string, 20)
|
|
for i := 0; i < 20; i++ {
|
|
rs[i] = c.Conns[i].Reason
|
|
}
|
|
if !sort.StringsAreSorted(rs) {
|
|
t.Fatalf("Expected conns sorted in order by stop reason, got %#v\n", rs)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestConnzSortedByReasonOnOpen(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
opts := s.getOpts()
|
|
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
|
|
|
// 4 clients
|
|
for i := 0; i < 4; i++ {
|
|
c, err := nats.Connect(url)
|
|
if err != nil {
|
|
t.Fatalf("Could not create client: %v\n", err)
|
|
}
|
|
defer c.Close()
|
|
}
|
|
|
|
c, err := s.Connz(&ConnzOptions{Sort: ByReason})
|
|
if err == nil {
|
|
t.Fatalf("Expected err to be non-nil, got %+v\n", c)
|
|
}
|
|
}
|
|
|
|
func TestConnzSortedByIdle(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
|
|
testIdle := func(mode int) {
|
|
firstClient := createClientConnSubscribeAndPublish(t, s)
|
|
defer firstClient.Close()
|
|
firstClient.Subscribe("client.1", func(m *nats.Msg) {})
|
|
firstClient.Flush()
|
|
|
|
secondClient := createClientConnSubscribeAndPublish(t, s)
|
|
defer secondClient.Close()
|
|
|
|
// Make it such that the second client started 10 secs ago. 10 is important since bug
|
|
// was strcmp, e.g. 1s vs 11s
|
|
var cid uint64
|
|
switch mode {
|
|
case 0:
|
|
cid = uint64(2)
|
|
case 1:
|
|
cid = uint64(4)
|
|
}
|
|
client := s.getClient(cid)
|
|
if client == nil {
|
|
t.Fatalf("Error looking up client %v\n", 2)
|
|
}
|
|
|
|
// We want to make sure that we set start/last after the server has finished
|
|
// updating this client's last activity. Doing another Flush() now (even though
|
|
// one is done in createClientConnSubscribeAndPublish) ensures that server has
|
|
// finished updating the client's last activity, since for that last flush there
|
|
// should be no new message/sub/unsub activity.
|
|
secondClient.Flush()
|
|
|
|
client.mu.Lock()
|
|
client.start = client.start.Add(-10 * time.Second)
|
|
client.last = client.start
|
|
client.mu.Unlock()
|
|
|
|
// The Idle granularity is a whole second
|
|
time.Sleep(time.Second)
|
|
firstClient.Publish("client.1", []byte("new message"))
|
|
|
|
c := pollConz(t, s, mode, url+"connz?sort=idle", &ConnzOptions{Sort: ByIdle})
|
|
// Make sure we are returned 2 connections...
|
|
if len(c.Conns) != 2 {
|
|
t.Fatalf("Expected to get two connections, got %v", len(c.Conns))
|
|
}
|
|
|
|
// And that the Idle time is valid (even if equal to "0s")
|
|
if c.Conns[0].Idle == "" || c.Conns[1].Idle == "" {
|
|
t.Fatal("Expected Idle value to be valid")
|
|
}
|
|
|
|
idle1, err := time.ParseDuration(c.Conns[0].Idle)
|
|
if err != nil {
|
|
t.Fatalf("Unable to parse duration %v, err=%v", c.Conns[0].Idle, err)
|
|
}
|
|
idle2, err := time.ParseDuration(c.Conns[1].Idle)
|
|
if err != nil {
|
|
t.Fatalf("Unable to parse duration %v, err=%v", c.Conns[0].Idle, err)
|
|
}
|
|
|
|
if idle2 < idle1 {
|
|
t.Fatalf("Expected conns sorted in descending order by Idle, got %v < %v\n",
|
|
idle2, idle1)
|
|
}
|
|
}
|
|
for mode := 0; mode < 2; mode++ {
|
|
testIdle(mode)
|
|
}
|
|
}
|
|
|
|
func TestConnzSortBadRequest(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
firstClient := createClientConnSubscribeAndPublish(t, s)
|
|
firstClient.Subscribe("hello.world", func(m *nats.Msg) {})
|
|
clients := make([]*nats.Conn, 3)
|
|
for i := range clients {
|
|
clients[i] = createClientConnSubscribeAndPublish(t, s)
|
|
defer clients[i].Close()
|
|
}
|
|
defer firstClient.Close()
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
readBodyEx(t, url+"connz?sort=foo", http.StatusBadRequest, textPlain)
|
|
|
|
if _, err := s.Connz(&ConnzOptions{Sort: "foo"}); err == nil {
|
|
t.Fatal("Expected error, got none")
|
|
}
|
|
}
|
|
|
|
func pollRoutez(t *testing.T, s *Server, mode int, url string, opts *RoutezOptions) *Routez {
|
|
t.Helper()
|
|
if mode == 0 {
|
|
rz := &Routez{}
|
|
body := readBody(t, url)
|
|
if err := json.Unmarshal(body, rz); err != nil {
|
|
t.Fatalf("Got an error unmarshalling the body: %v\n", err)
|
|
}
|
|
return rz
|
|
}
|
|
rz, err := s.Routez(opts)
|
|
if err != nil {
|
|
t.Fatalf("Error on Routez: %v", err)
|
|
}
|
|
return rz
|
|
}
|
|
|
|
func TestConnzWithRoutes(t *testing.T) {
|
|
resetPreviousHTTPConnections()
|
|
opts := DefaultMonitorOptions()
|
|
opts.NoSystemAccount = true
|
|
opts.Cluster.Name = "A"
|
|
opts.Cluster.Host = "127.0.0.1"
|
|
opts.Cluster.Port = CLUSTER_PORT
|
|
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
opts = &Options{
|
|
Host: "127.0.0.1",
|
|
Port: -1,
|
|
Cluster: ClusterOpts{
|
|
Name: "A",
|
|
Host: "127.0.0.1",
|
|
Port: -1,
|
|
},
|
|
NoLog: true,
|
|
NoSigs: true,
|
|
NoSystemAccount: true,
|
|
}
|
|
routeURL, _ := url.Parse(fmt.Sprintf("nats-route://127.0.0.1:%d", s.ClusterAddr().Port))
|
|
opts.Routes = []*url.URL{routeURL}
|
|
|
|
sc := RunServer(opts)
|
|
defer sc.Shutdown()
|
|
|
|
checkClusterFormed(t, s, sc)
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
for mode := 0; mode < 2; mode++ {
|
|
c := pollConz(t, s, mode, url+"connz", nil)
|
|
// Test contents..
|
|
// Make sure routes don't show up under connz, but do under routez
|
|
if c.NumConns != 0 {
|
|
t.Fatalf("Expected 0 connections, got %d\n", c.NumConns)
|
|
}
|
|
if c.Conns == nil || len(c.Conns) != 0 {
|
|
t.Fatalf("Expected 0 connections in array, got %p\n", c.Conns)
|
|
}
|
|
}
|
|
|
|
nc := createClientConnSubscribeAndPublish(t, sc)
|
|
defer nc.Close()
|
|
|
|
nc.Subscribe("hello.bar", func(m *nats.Msg) {})
|
|
nc.Flush()
|
|
checkExpectedSubs(t, 1, s, sc)
|
|
|
|
// Now check routez
|
|
urls := []string{"routez", "routez?subs=1", "routez?subs=detail"}
|
|
for subs, urlSuffix := range urls {
|
|
for mode := 0; mode < 2; mode++ {
|
|
rz := pollRoutez(t, s, mode, url+urlSuffix, &RoutezOptions{Subscriptions: subs == 1, SubscriptionsDetail: subs == 2})
|
|
|
|
if rz.NumRoutes != 1 {
|
|
t.Fatalf("Expected 1 route, got %d\n", rz.NumRoutes)
|
|
}
|
|
|
|
if len(rz.Routes) != 1 {
|
|
t.Fatalf("Expected route array of 1, got %v\n", len(rz.Routes))
|
|
}
|
|
|
|
route := rz.Routes[0]
|
|
|
|
if route.DidSolicit {
|
|
t.Fatalf("Expected unsolicited route, got %v\n", route.DidSolicit)
|
|
}
|
|
|
|
// Don't ask for subs, so there should not be any
|
|
if subs == 0 {
|
|
if len(route.Subs) != 0 {
|
|
t.Fatalf("There should not be subs, got %v", len(route.Subs))
|
|
}
|
|
} else if subs == 1 {
|
|
if len(route.Subs) != 1 && len(route.SubsDetail) != 0 {
|
|
t.Fatalf("There should be 1 sub, got %v", len(route.Subs))
|
|
}
|
|
} else if subs == 2 {
|
|
if len(route.SubsDetail) != 1 && len(route.Subs) != 0 {
|
|
t.Fatalf("There should be 1 sub, got %v", len(route.SubsDetail))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Test JSONP
|
|
readBodyEx(t, url+"routez?callback=callback", http.StatusOK, appJSContent)
|
|
}
|
|
|
|
func TestRoutezWithBadParams(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/routez?", s.MonitorAddr().Port)
|
|
readBodyEx(t, url+"subs=xxx", http.StatusBadRequest, textPlain)
|
|
}
|
|
|
|
func pollSubsz(t *testing.T, s *Server, mode int, url string, opts *SubszOptions) *Subsz {
|
|
t.Helper()
|
|
if mode == 0 {
|
|
body := readBody(t, url)
|
|
sz := &Subsz{}
|
|
if err := json.Unmarshal(body, sz); err != nil {
|
|
t.Fatalf("Got an error unmarshalling the body: %v\n", err)
|
|
}
|
|
return sz
|
|
}
|
|
sz, err := s.Subsz(opts)
|
|
if err != nil {
|
|
t.Fatalf("Error on Subsz: %v", err)
|
|
}
|
|
return sz
|
|
}
|
|
|
|
func TestSubsz(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := createClientConnSubscribeAndPublish(t, s)
|
|
defer nc.Close()
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
|
|
for mode := 0; mode < 2; mode++ {
|
|
sl := pollSubsz(t, s, mode, url+"subsz", nil)
|
|
if sl.NumSubs != 0 {
|
|
t.Fatalf("Expected NumSubs of 0, got %d\n", sl.NumSubs)
|
|
}
|
|
if sl.NumInserts != 1 {
|
|
t.Fatalf("Expected NumInserts of 1, got %d\n", sl.NumInserts)
|
|
}
|
|
if sl.NumMatches != 1 {
|
|
t.Fatalf("Expected NumMatches of 1, got %d\n", sl.NumMatches)
|
|
}
|
|
}
|
|
|
|
// Test JSONP
|
|
readBodyEx(t, url+"subsz?callback=callback", http.StatusOK, appJSContent)
|
|
}
|
|
|
|
func TestSubszDetails(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := createClientConnSubscribeAndPublish(t, s)
|
|
defer nc.Close()
|
|
|
|
nc.Subscribe("foo.*", func(m *nats.Msg) {})
|
|
nc.Subscribe("foo.bar", func(m *nats.Msg) {})
|
|
nc.Subscribe("foo.foo", func(m *nats.Msg) {})
|
|
|
|
nc.Publish("foo.bar", []byte("Hello"))
|
|
nc.Publish("foo.baz", []byte("Hello"))
|
|
nc.Publish("foo.foo", []byte("Hello"))
|
|
|
|
nc.Flush()
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
|
|
for mode := 0; mode < 2; mode++ {
|
|
sl := pollSubsz(t, s, mode, url+"subsz?subs=1", &SubszOptions{Subscriptions: true})
|
|
if sl.NumSubs != 3 {
|
|
t.Fatalf("Expected NumSubs of 3, got %d\n", sl.NumSubs)
|
|
}
|
|
if sl.Total != 3 {
|
|
t.Fatalf("Expected Total of 3, got %d\n", sl.Total)
|
|
}
|
|
if len(sl.Subs) != 3 {
|
|
t.Fatalf("Expected subscription details for 3 subs, got %d\n", len(sl.Subs))
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestSubszWithOffsetAndLimit(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := createClientConnSubscribeAndPublish(t, s)
|
|
defer nc.Close()
|
|
|
|
for i := 0; i < 200; i++ {
|
|
nc.Subscribe(fmt.Sprintf("foo.%d", i), func(m *nats.Msg) {})
|
|
}
|
|
nc.Flush()
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
for mode := 0; mode < 2; mode++ {
|
|
sl := pollSubsz(t, s, mode, url+"subsz?subs=1&offset=10&limit=100", &SubszOptions{Subscriptions: true, Offset: 10, Limit: 100})
|
|
if sl.NumSubs != 200 {
|
|
t.Fatalf("Expected NumSubs of 200, got %d\n", sl.NumSubs)
|
|
}
|
|
if sl.Total != 100 {
|
|
t.Fatalf("Expected Total of 100, got %d\n", sl.Total)
|
|
}
|
|
if sl.Offset != 10 {
|
|
t.Fatalf("Expected Offset of 10, got %d\n", sl.Offset)
|
|
}
|
|
if sl.Limit != 100 {
|
|
t.Fatalf("Expected Total of 100, got %d\n", sl.Limit)
|
|
}
|
|
if len(sl.Subs) != 100 {
|
|
t.Fatalf("Expected subscription details for 100 subs, got %d\n", len(sl.Subs))
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestSubszTestPubSubject(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := createClientConnSubscribeAndPublish(t, s)
|
|
defer nc.Close()
|
|
|
|
nc.Subscribe("foo.*", func(m *nats.Msg) {})
|
|
nc.Subscribe("foo.bar", func(m *nats.Msg) {})
|
|
nc.Subscribe("foo.foo", func(m *nats.Msg) {})
|
|
nc.Flush()
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
for mode := 0; mode < 2; mode++ {
|
|
sl := pollSubsz(t, s, mode, url+"subsz?subs=1&test=foo.foo", &SubszOptions{Subscriptions: true, Test: "foo.foo"})
|
|
if sl.Total != 2 {
|
|
t.Fatalf("Expected Total of 2 match, got %d\n", sl.Total)
|
|
}
|
|
if len(sl.Subs) != 2 {
|
|
t.Fatalf("Expected subscription details for 2 matching subs, got %d\n", len(sl.Subs))
|
|
}
|
|
sl = pollSubsz(t, s, mode, url+"subsz?subs=1&test=foo", &SubszOptions{Subscriptions: true, Test: "foo"})
|
|
if len(sl.Subs) != 0 {
|
|
t.Fatalf("Expected no matching subs, got %d\n", len(sl.Subs))
|
|
}
|
|
}
|
|
// Make sure we get an error with invalid test subject.
|
|
testUrl := url + "subsz?subs=1&"
|
|
readBodyEx(t, testUrl+"test=*", http.StatusBadRequest, textPlain)
|
|
readBodyEx(t, testUrl+"test=foo.*", http.StatusBadRequest, textPlain)
|
|
readBodyEx(t, testUrl+"test=foo.>", http.StatusBadRequest, textPlain)
|
|
readBodyEx(t, testUrl+"test=foo..bar", http.StatusBadRequest, textPlain)
|
|
}
|
|
|
|
func TestSubszMultiAccount(t *testing.T) {
|
|
s := runMonitorServerWithAccounts()
|
|
defer s.Shutdown()
|
|
|
|
ncA := createClientConnWithUserSubscribeAndPublish(t, s, "a", "a")
|
|
defer ncA.Close()
|
|
|
|
ncA.Subscribe("foo.*", func(m *nats.Msg) {})
|
|
ncA.Subscribe("foo.bar", func(m *nats.Msg) {})
|
|
ncA.Subscribe("foo.foo", func(m *nats.Msg) {})
|
|
|
|
ncA.Publish("foo.bar", []byte("Hello"))
|
|
ncA.Publish("foo.baz", []byte("Hello"))
|
|
ncA.Publish("foo.foo", []byte("Hello"))
|
|
|
|
ncA.Flush()
|
|
|
|
ncB := createClientConnWithUserSubscribeAndPublish(t, s, "b", "b")
|
|
defer ncB.Close()
|
|
|
|
ncB.Subscribe("foo.*", func(m *nats.Msg) {})
|
|
ncB.Subscribe("foo.bar", func(m *nats.Msg) {})
|
|
ncB.Subscribe("foo.foo", func(m *nats.Msg) {})
|
|
|
|
ncB.Publish("foo.bar", []byte("Hello"))
|
|
ncB.Publish("foo.baz", []byte("Hello"))
|
|
ncB.Publish("foo.foo", []byte("Hello"))
|
|
|
|
ncB.Flush()
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
|
|
for mode := 0; mode < 2; mode++ {
|
|
sl := pollSubsz(t, s, mode, url+"subsz?subs=1", &SubszOptions{Subscriptions: true})
|
|
if sl.NumSubs != 6 {
|
|
t.Fatalf("Expected NumSubs of 6, got %d\n", sl.NumSubs)
|
|
}
|
|
if sl.Total != 6 {
|
|
t.Fatalf("Expected Total of 6, got %d\n", sl.Total)
|
|
}
|
|
if len(sl.Subs) != 6 {
|
|
t.Fatalf("Expected subscription details for 6 subs, got %d\n", len(sl.Subs))
|
|
}
|
|
for _, sd := range sl.Subs {
|
|
if sd.Account != "A" && sd.Account != "B" {
|
|
t.Fatalf("Expected account information to be present and be 'A' or 'B', got %q", sd.Account)
|
|
}
|
|
}
|
|
|
|
// Now make sure we can filter on account.
|
|
sl = pollSubsz(t, s, mode, url+"subsz?subs=1&acc=A", &SubszOptions{Account: "A", Subscriptions: true})
|
|
if sl.NumSubs != 3 {
|
|
t.Fatalf("Expected NumSubs of 3, got %d\n", sl.NumSubs)
|
|
}
|
|
if sl.Total != 3 {
|
|
t.Fatalf("Expected Total of 6, got %d\n", sl.Total)
|
|
}
|
|
if len(sl.Subs) != 3 {
|
|
t.Fatalf("Expected subscription details for 6 subs, got %d\n", len(sl.Subs))
|
|
}
|
|
for _, sd := range sl.Subs {
|
|
if sd.Account != "A" {
|
|
t.Fatalf("Expected account information to be present and be 'A', got %q", sd.Account)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestSubszMultiAccountWithOffsetAndLimit(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
ncA := createClientConnWithUserSubscribeAndPublish(t, s, "a", "a")
|
|
defer ncA.Close()
|
|
|
|
for i := 0; i < 200; i++ {
|
|
ncA.Subscribe(fmt.Sprintf("foo.%d", i), func(m *nats.Msg) {})
|
|
}
|
|
ncA.Flush()
|
|
|
|
ncB := createClientConnWithUserSubscribeAndPublish(t, s, "b", "b")
|
|
defer ncB.Close()
|
|
|
|
for i := 0; i < 200; i++ {
|
|
ncB.Subscribe(fmt.Sprintf("foo.%d", i), func(m *nats.Msg) {})
|
|
}
|
|
ncB.Flush()
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
for mode := 0; mode < 2; mode++ {
|
|
sl := pollSubsz(t, s, mode, url+"subsz?subs=1&offset=10&limit=100", &SubszOptions{Subscriptions: true, Offset: 10, Limit: 100})
|
|
if sl.NumSubs != 400 {
|
|
t.Fatalf("Expected NumSubs of 200, got %d\n", sl.NumSubs)
|
|
}
|
|
if sl.Total != 100 {
|
|
t.Fatalf("Expected Total of 100, got %d\n", sl.Total)
|
|
}
|
|
if sl.Offset != 10 {
|
|
t.Fatalf("Expected Offset of 10, got %d\n", sl.Offset)
|
|
}
|
|
if sl.Limit != 100 {
|
|
t.Fatalf("Expected Total of 100, got %d\n", sl.Limit)
|
|
}
|
|
if len(sl.Subs) != 100 {
|
|
t.Fatalf("Expected subscription details for 100 subs, got %d\n", len(sl.Subs))
|
|
}
|
|
}
|
|
}
|
|
|
|
// Tests handle root
|
|
func TestHandleRoot(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := createClientConnSubscribeAndPublish(t, s)
|
|
defer nc.Close()
|
|
|
|
resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port))
|
|
if err != nil {
|
|
t.Fatalf("Expected no error: Got %v\n", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusOK {
|
|
t.Fatalf("Expected a %d response, got %d\n", http.StatusOK, resp.StatusCode)
|
|
}
|
|
|
|
body, err := ioutil.ReadAll(resp.Body)
|
|
if err != nil {
|
|
t.Fatalf("Expected no error reading body: Got %v\n", err)
|
|
}
|
|
for _, b := range body {
|
|
if b > unicode.MaxASCII {
|
|
t.Fatalf("Expected body to contain only ASCII characters, but got %v\n", b)
|
|
}
|
|
}
|
|
|
|
ct := resp.Header.Get("Content-Type")
|
|
if !strings.Contains(ct, "text/html") {
|
|
t.Fatalf("Expected text/html response, got %s\n", ct)
|
|
}
|
|
}
|
|
|
|
func TestConnzWithNamedClient(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
clientName := "test-client"
|
|
nc := createClientConnWithName(t, clientName, s)
|
|
defer nc.Close()
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
for mode := 0; mode < 2; mode++ {
|
|
// Confirm server is exposing client name in monitoring endpoint.
|
|
c := pollConz(t, s, mode, url+"connz", nil)
|
|
got := len(c.Conns)
|
|
expected := 1
|
|
if got != expected {
|
|
t.Fatalf("Expected %d connection in array, got %d\n", expected, got)
|
|
}
|
|
|
|
conn := c.Conns[0]
|
|
if conn.Name != clientName {
|
|
t.Fatalf("Expected client to have name %q. got %q", clientName, conn.Name)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestConnzWithStateForClosedConns(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
numEach := 10
|
|
// Create 10 closed, and 10 to leave open.
|
|
for i := 0; i < numEach; i++ {
|
|
nc := createClientConnSubscribeAndPublish(t, s)
|
|
nc.Subscribe("hello.closed.conns", func(m *nats.Msg) {})
|
|
nc.Close()
|
|
nc = createClientConnSubscribeAndPublish(t, s)
|
|
nc.Subscribe("hello.open.conns", func(m *nats.Msg) {})
|
|
defer nc.Close()
|
|
}
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
|
|
for mode := 0; mode < 2; mode++ {
|
|
checkFor(t, 2*time.Second, 10*time.Millisecond, func() error {
|
|
// Look at all open
|
|
c := pollConz(t, s, mode, url+"connz?state=open", &ConnzOptions{State: ConnOpen})
|
|
if lc := len(c.Conns); lc != numEach {
|
|
return fmt.Errorf("Expected %d connections in array, got %d", numEach, lc)
|
|
}
|
|
// Look at all closed
|
|
c = pollConz(t, s, mode, url+"connz?state=closed", &ConnzOptions{State: ConnClosed})
|
|
if lc := len(c.Conns); lc != numEach {
|
|
return fmt.Errorf("Expected %d connections in array, got %d", numEach, lc)
|
|
}
|
|
// Look at all
|
|
c = pollConz(t, s, mode, url+"connz?state=ALL", &ConnzOptions{State: ConnAll})
|
|
if lc := len(c.Conns); lc != numEach*2 {
|
|
return fmt.Errorf("Expected %d connections in array, got %d", 2*numEach, lc)
|
|
}
|
|
// Look at CID #1, which is in closed.
|
|
c = pollConz(t, s, mode, url+"connz?cid=1&state=open", &ConnzOptions{CID: 1, State: ConnOpen})
|
|
if lc := len(c.Conns); lc != 0 {
|
|
return fmt.Errorf("Expected no connections in open array, got %d", lc)
|
|
}
|
|
c = pollConz(t, s, mode, url+"connz?cid=1&state=closed", &ConnzOptions{CID: 1, State: ConnClosed})
|
|
if lc := len(c.Conns); lc != 1 {
|
|
return fmt.Errorf("Expected a connection in closed array, got %d", lc)
|
|
}
|
|
c = pollConz(t, s, mode, url+"connz?cid=1&state=ALL", &ConnzOptions{CID: 1, State: ConnAll})
|
|
if lc := len(c.Conns); lc != 1 {
|
|
return fmt.Errorf("Expected a connection in closed array, got %d", lc)
|
|
}
|
|
c = pollConz(t, s, mode, url+"connz?cid=1&state=closed&subs=true",
|
|
&ConnzOptions{CID: 1, State: ConnClosed, Subscriptions: true})
|
|
if lc := len(c.Conns); lc != 1 {
|
|
return fmt.Errorf("Expected a connection in closed array, got %d", lc)
|
|
}
|
|
ci := c.Conns[0]
|
|
if ci.NumSubs != 1 {
|
|
return fmt.Errorf("Expected NumSubs to be 1, got %d", ci.NumSubs)
|
|
}
|
|
if len(ci.Subs) != 1 {
|
|
return fmt.Errorf("Expected len(ci.Subs) to be 1 also, got %d", len(ci.Subs))
|
|
}
|
|
// Now ask for same thing without subs and make sure they are not returned.
|
|
c = pollConz(t, s, mode, url+"connz?cid=1&state=closed&subs=false",
|
|
&ConnzOptions{CID: 1, State: ConnClosed, Subscriptions: false})
|
|
if lc := len(c.Conns); lc != 1 {
|
|
return fmt.Errorf("Expected a connection in closed array, got %d", lc)
|
|
}
|
|
ci = c.Conns[0]
|
|
if ci.NumSubs != 1 {
|
|
return fmt.Errorf("Expected NumSubs to be 1, got %d", ci.NumSubs)
|
|
}
|
|
if len(ci.Subs) != 0 {
|
|
return fmt.Errorf("Expected len(ci.Subs) to be 0 since subs=false, got %d", len(ci.Subs))
|
|
}
|
|
|
|
// CID #2 is in open
|
|
c = pollConz(t, s, mode, url+"connz?cid=2&state=open", &ConnzOptions{CID: 2, State: ConnOpen})
|
|
if lc := len(c.Conns); lc != 1 {
|
|
return fmt.Errorf("Expected a connection in open array, got %d", lc)
|
|
}
|
|
c = pollConz(t, s, mode, url+"connz?cid=2&state=closed", &ConnzOptions{CID: 2, State: ConnClosed})
|
|
if lc := len(c.Conns); lc != 0 {
|
|
return fmt.Errorf("Expected no connections in closed array, got %d", lc)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
}
|
|
|
|
// Make sure options for ConnInfo like subs=1, authuser, etc do not cause a race.
|
|
func TestConnzClosedConnsRace(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
// Create 100 closed connections.
|
|
for i := 0; i < 100; i++ {
|
|
nc := createClientConnSubscribeAndPublish(t, s)
|
|
nc.Close()
|
|
}
|
|
|
|
urlWithoutSubs := fmt.Sprintf("http://127.0.0.1:%d/connz?state=closed", s.MonitorAddr().Port)
|
|
urlWithSubs := urlWithoutSubs + "&subs=true"
|
|
|
|
checkClosedConns(t, s, 100, 2*time.Second)
|
|
|
|
wg := &sync.WaitGroup{}
|
|
|
|
fn := func(url string) {
|
|
deadline := time.Now().Add(1 * time.Second)
|
|
for time.Now().Before(deadline) {
|
|
c := pollConz(t, s, 0, url, nil)
|
|
if len(c.Conns) != 100 {
|
|
t.Errorf("Incorrect Results: %+v\n", c)
|
|
}
|
|
}
|
|
wg.Done()
|
|
}
|
|
|
|
wg.Add(2)
|
|
go fn(urlWithSubs)
|
|
go fn(urlWithoutSubs)
|
|
wg.Wait()
|
|
}
|
|
|
|
// Make sure a bad client that is disconnected right away has proper values.
|
|
func TestConnzClosedConnsBadClient(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
opts := s.getOpts()
|
|
|
|
rc, err := net.Dial("tcp", fmt.Sprintf("%s:%d", opts.Host, opts.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on dial: %v", err)
|
|
}
|
|
rc.Close()
|
|
|
|
checkClosedConns(t, s, 1, 2*time.Second)
|
|
|
|
c := pollConz(t, s, 1, "", &ConnzOptions{State: ConnClosed})
|
|
if len(c.Conns) != 1 {
|
|
t.Errorf("Incorrect Results: %+v\n", c)
|
|
}
|
|
ci := c.Conns[0]
|
|
|
|
uptime := ci.Stop.Sub(ci.Start)
|
|
idle, err := time.ParseDuration(ci.Idle)
|
|
if err != nil {
|
|
t.Fatalf("Could not parse Idle: %v\n", err)
|
|
}
|
|
if idle > uptime {
|
|
t.Fatalf("Idle can't be larger then uptime, %v vs %v\n", idle, uptime)
|
|
}
|
|
if ci.LastActivity.IsZero() {
|
|
t.Fatalf("LastActivity should not be Zero\n")
|
|
}
|
|
}
|
|
|
|
// Make sure a bad client that tries to connect plain to TLS has proper values.
|
|
func TestConnzClosedConnsBadTLSClient(t *testing.T) {
|
|
resetPreviousHTTPConnections()
|
|
|
|
tc := &TLSConfigOpts{}
|
|
tc.CertFile = "configs/certs/server.pem"
|
|
tc.KeyFile = "configs/certs/key.pem"
|
|
|
|
var err error
|
|
opts := DefaultMonitorOptions()
|
|
opts.NoSystemAccount = true
|
|
opts.TLSTimeout = 1.5 // 1.5 seconds
|
|
opts.TLSConfig, err = GenTLSConfig(tc)
|
|
if err != nil {
|
|
t.Fatalf("Error creating TSL config: %v", err)
|
|
}
|
|
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
opts = s.getOpts()
|
|
|
|
rc, err := net.Dial("tcp", fmt.Sprintf("%s:%d", opts.Host, opts.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on dial: %v", err)
|
|
}
|
|
rc.Write([]byte("CONNECT {}\r\n"))
|
|
rc.Close()
|
|
|
|
checkClosedConns(t, s, 1, 2*time.Second)
|
|
|
|
c := pollConz(t, s, 1, "", &ConnzOptions{State: ConnClosed})
|
|
if len(c.Conns) != 1 {
|
|
t.Errorf("Incorrect Results: %+v\n", c)
|
|
}
|
|
ci := c.Conns[0]
|
|
|
|
uptime := ci.Stop.Sub(ci.Start)
|
|
idle, err := time.ParseDuration(ci.Idle)
|
|
if err != nil {
|
|
t.Fatalf("Could not parse Idle: %v\n", err)
|
|
}
|
|
if idle > uptime {
|
|
t.Fatalf("Idle can't be larger then uptime, %v vs %v\n", idle, uptime)
|
|
}
|
|
if ci.LastActivity.IsZero() {
|
|
t.Fatalf("LastActivity should not be Zero\n")
|
|
}
|
|
}
|
|
|
|
// Create a connection to test ConnInfo
|
|
func createClientConnWithUserSubscribeAndPublish(t *testing.T, s *Server, user, pwd string) *nats.Conn {
|
|
natsURL := ""
|
|
if user == "" {
|
|
natsURL = fmt.Sprintf("nats://127.0.0.1:%d", s.Addr().(*net.TCPAddr).Port)
|
|
} else {
|
|
natsURL = fmt.Sprintf("nats://%s:%s@127.0.0.1:%d", user, pwd, s.Addr().(*net.TCPAddr).Port)
|
|
}
|
|
client := nats.DefaultOptions
|
|
client.Servers = []string{natsURL}
|
|
nc, err := client.Connect()
|
|
if err != nil {
|
|
t.Fatalf("Error creating client: %v to: %s\n", err, natsURL)
|
|
}
|
|
|
|
ch := make(chan bool)
|
|
inbox := nats.NewInbox()
|
|
sub, err := nc.Subscribe(inbox, func(m *nats.Msg) { ch <- true })
|
|
if err != nil {
|
|
t.Fatalf("Error subscribing to `%s`: %v\n", inbox, err)
|
|
}
|
|
nc.Publish(inbox, []byte("Hello"))
|
|
// Wait for message
|
|
<-ch
|
|
sub.Unsubscribe()
|
|
close(ch)
|
|
nc.Flush()
|
|
return nc
|
|
}
|
|
|
|
func createClientConnSubscribeAndPublish(t *testing.T, s *Server) *nats.Conn {
|
|
return createClientConnWithUserSubscribeAndPublish(t, s, "", "")
|
|
}
|
|
|
|
func createClientConnWithName(t *testing.T, name string, s *Server) *nats.Conn {
|
|
natsURI := fmt.Sprintf("nats://127.0.0.1:%d", s.Addr().(*net.TCPAddr).Port)
|
|
|
|
client := nats.DefaultOptions
|
|
client.Servers = []string{natsURI}
|
|
client.Name = name
|
|
nc, err := client.Connect()
|
|
if err != nil {
|
|
t.Fatalf("Error creating client: %v\n", err)
|
|
}
|
|
return nc
|
|
}
|
|
|
|
func TestStacksz(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
body := readBody(t, url+"stacksz")
|
|
// Check content
|
|
str := string(body)
|
|
if !strings.Contains(str, "HandleStacksz") {
|
|
t.Fatalf("Result does not seem to contain server's stacks:\n%v", str)
|
|
}
|
|
}
|
|
|
|
func TestConcurrentMonitoring(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
// Get some endpoints. Make sure we have at least varz,
|
|
// and the more the merrier.
|
|
endpoints := []string{"varz", "varz", "varz", "connz", "connz", "subsz", "subsz", "routez", "routez"}
|
|
wg := &sync.WaitGroup{}
|
|
wg.Add(len(endpoints))
|
|
ech := make(chan string, len(endpoints))
|
|
|
|
for _, e := range endpoints {
|
|
go func(endpoint string) {
|
|
defer wg.Done()
|
|
for i := 0; i < 50; i++ {
|
|
resp, err := http.Get(url + endpoint)
|
|
if err != nil {
|
|
ech <- fmt.Sprintf("Expected no error: Got %v\n", err)
|
|
return
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusOK {
|
|
ech <- fmt.Sprintf("Expected a %v response, got %d\n", http.StatusOK, resp.StatusCode)
|
|
return
|
|
}
|
|
ct := resp.Header.Get("Content-Type")
|
|
if ct != "application/json" {
|
|
ech <- fmt.Sprintf("Expected application/json content-type, got %s\n", ct)
|
|
return
|
|
}
|
|
if _, err := ioutil.ReadAll(resp.Body); err != nil {
|
|
ech <- fmt.Sprintf("Got an error reading the body: %v\n", err)
|
|
return
|
|
}
|
|
resp.Body.Close()
|
|
}
|
|
}(e)
|
|
}
|
|
wg.Wait()
|
|
// Check for any errors
|
|
select {
|
|
case err := <-ech:
|
|
t.Fatal(err)
|
|
default:
|
|
}
|
|
}
|
|
|
|
func TestMonitorHandler(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
handler := s.HTTPHandler()
|
|
if handler == nil {
|
|
t.Fatal("HTTP Handler should be set")
|
|
}
|
|
s.Shutdown()
|
|
handler = s.HTTPHandler()
|
|
if handler != nil {
|
|
t.Fatal("HTTP Handler should be nil")
|
|
}
|
|
}
|
|
|
|
func TestMonitorRoutezRace(t *testing.T) {
|
|
resetPreviousHTTPConnections()
|
|
srvAOpts := DefaultMonitorOptions()
|
|
srvAOpts.NoSystemAccount = true
|
|
srvAOpts.Cluster.Name = "B"
|
|
srvAOpts.Cluster.Port = -1
|
|
srvA := RunServer(srvAOpts)
|
|
defer srvA.Shutdown()
|
|
|
|
srvBOpts := nextServerOpts(srvAOpts)
|
|
srvBOpts.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port))
|
|
|
|
doneCh := make(chan struct{})
|
|
go func() {
|
|
defer func() {
|
|
doneCh <- struct{}{}
|
|
}()
|
|
for i := 0; i < 10; i++ {
|
|
time.Sleep(10 * time.Millisecond)
|
|
// Reset ports
|
|
srvBOpts.Port = -1
|
|
srvBOpts.Cluster.Port = -1
|
|
srvB := RunServer(srvBOpts)
|
|
time.Sleep(20 * time.Millisecond)
|
|
srvB.Shutdown()
|
|
}
|
|
}()
|
|
done := false
|
|
for !done {
|
|
if _, err := srvA.Routez(nil); err != nil {
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
select {
|
|
case <-doneCh:
|
|
done = true
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestConnzTLSInHandshake(t *testing.T) {
|
|
resetPreviousHTTPConnections()
|
|
|
|
tc := &TLSConfigOpts{}
|
|
tc.CertFile = "configs/certs/server.pem"
|
|
tc.KeyFile = "configs/certs/key.pem"
|
|
|
|
var err error
|
|
opts := DefaultMonitorOptions()
|
|
opts.NoSystemAccount = true
|
|
opts.TLSTimeout = 1.5 // 1.5 seconds
|
|
opts.TLSConfig, err = GenTLSConfig(tc)
|
|
if err != nil {
|
|
t.Fatalf("Error creating TSL config: %v", err)
|
|
}
|
|
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
// Create bare TCP connection to delay client TLS handshake
|
|
c, err := net.Dial("tcp", fmt.Sprintf("%s:%d", opts.Host, opts.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on dial: %v", err)
|
|
}
|
|
defer c.Close()
|
|
|
|
// Wait for the connection to be registered
|
|
checkClientsCount(t, s, 1)
|
|
|
|
start := time.Now()
|
|
endpoint := fmt.Sprintf("http://%s:%d/connz", opts.HTTPHost, s.MonitorAddr().Port)
|
|
for mode := 0; mode < 2; mode++ {
|
|
connz := pollConz(t, s, mode, endpoint, nil)
|
|
duration := time.Since(start)
|
|
if duration >= 1500*time.Millisecond {
|
|
t.Fatalf("Looks like connz blocked on handshake, took %v", duration)
|
|
}
|
|
if len(connz.Conns) != 1 {
|
|
t.Fatalf("Expected 1 conn, got %v", len(connz.Conns))
|
|
}
|
|
conn := connz.Conns[0]
|
|
// TLS fields should be not set
|
|
if conn.TLSVersion != "" || conn.TLSCipher != "" {
|
|
t.Fatalf("Expected TLS fields to not be set, got version:%v cipher:%v", conn.TLSVersion, conn.TLSCipher)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestConnzTLSCfg(t *testing.T) {
|
|
resetPreviousHTTPConnections()
|
|
|
|
tc := &TLSConfigOpts{}
|
|
tc.CertFile = "configs/certs/server.pem"
|
|
tc.KeyFile = "configs/certs/key.pem"
|
|
|
|
var err error
|
|
opts := DefaultMonitorOptions()
|
|
opts.NoSystemAccount = true
|
|
opts.TLSTimeout = 1.5 // 1.5 seconds
|
|
opts.TLSConfig, err = GenTLSConfig(tc)
|
|
require_NoError(t, err)
|
|
opts.TLSConfig.ClientAuth = tls.RequireAndVerifyClientCert
|
|
opts.Gateway.TLSConfig, err = GenTLSConfig(tc)
|
|
require_NoError(t, err)
|
|
opts.Gateway.TLSTimeout = 1.5
|
|
opts.LeafNode.TLSConfig, err = GenTLSConfig(tc)
|
|
require_NoError(t, err)
|
|
opts.LeafNode.TLSConfig.ClientAuth = tls.RequireAndVerifyClientCert
|
|
opts.LeafNode.TLSTimeout = 1.5
|
|
opts.Cluster.TLSConfig, err = GenTLSConfig(tc)
|
|
require_NoError(t, err)
|
|
opts.Cluster.TLSTimeout = 1.5
|
|
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
check := func(verify, required bool, timeout float64) {
|
|
t.Helper()
|
|
if !verify {
|
|
t.Fatalf("Expected tls_verify to be true")
|
|
}
|
|
if !required {
|
|
t.Fatalf("Expected tls_required to be true")
|
|
}
|
|
if timeout != 1.5 {
|
|
t.Fatalf("Expected tls_timeout to be 1.5")
|
|
}
|
|
}
|
|
|
|
start := time.Now()
|
|
endpoint := fmt.Sprintf("http://%s:%d/varz", opts.HTTPHost, s.MonitorAddr().Port)
|
|
for mode := 0; mode < 2; mode++ {
|
|
varz := pollVarz(t, s, mode, endpoint, nil)
|
|
duration := time.Since(start)
|
|
if duration >= 1500*time.Millisecond {
|
|
t.Fatalf("Looks like varz blocked on handshake, took %v", duration)
|
|
}
|
|
check(varz.TLSVerify, varz.TLSRequired, varz.TLSTimeout)
|
|
check(varz.Cluster.TLSVerify, varz.Cluster.TLSRequired, varz.Cluster.TLSTimeout)
|
|
check(varz.Gateway.TLSVerify, varz.Gateway.TLSRequired, varz.Gateway.TLSTimeout)
|
|
check(varz.LeafNode.TLSVerify, varz.LeafNode.TLSRequired, varz.LeafNode.TLSTimeout)
|
|
}
|
|
}
|
|
|
|
func TestServerIDs(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
murl := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
|
|
|
for mode := 0; mode < 2; mode++ {
|
|
v := pollVarz(t, s, mode, murl+"varz", nil)
|
|
if v.ID == "" {
|
|
t.Fatal("Varz ID is empty")
|
|
}
|
|
c := pollConz(t, s, mode, murl+"connz", nil)
|
|
if c.ID == "" {
|
|
t.Fatal("Connz ID is empty")
|
|
}
|
|
r := pollRoutez(t, s, mode, murl+"routez", nil)
|
|
if r.ID == "" {
|
|
t.Fatal("Routez ID is empty")
|
|
}
|
|
if v.ID != c.ID || v.ID != r.ID {
|
|
t.Fatalf("Varz ID [%s] is not equal to Connz ID [%s] or Routez ID [%s]", v.ID, c.ID, r.ID)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestHttpStatsNoUpdatedWhenUsingServerFuncs(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
for i := 0; i < 10; i++ {
|
|
s.Varz(nil)
|
|
s.Connz(nil)
|
|
s.Routez(nil)
|
|
s.Subsz(nil)
|
|
}
|
|
|
|
v, _ := s.Varz(nil)
|
|
endpoints := []string{VarzPath, ConnzPath, RoutezPath, SubszPath}
|
|
for _, e := range endpoints {
|
|
stats := v.HTTPReqStats[e]
|
|
if stats != 0 {
|
|
t.Fatalf("Expected HTTPReqStats for %q to be 0, got %v", e, stats)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestClusterEmptyWhenNotDefined(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
body := readBody(t, fmt.Sprintf("http://127.0.0.1:%d/varz", s.MonitorAddr().Port))
|
|
var v map[string]interface{}
|
|
if err := json.Unmarshal(body, &v); err != nil {
|
|
stackFatalf(t, "Got an error unmarshalling the body: %v\n", err)
|
|
}
|
|
// Cluster can empty, or be defined but that needs to be empty.
|
|
c, ok := v["cluster"]
|
|
if !ok {
|
|
return
|
|
}
|
|
if len(c.(map[string]interface{})) != 0 {
|
|
t.Fatalf("Expected an empty cluster definition, instead got %+v\n", c)
|
|
}
|
|
}
|
|
|
|
func TestRoutezPermissions(t *testing.T) {
|
|
resetPreviousHTTPConnections()
|
|
opts := DefaultMonitorOptions()
|
|
opts.NoSystemAccount = true
|
|
opts.Cluster.Name = "A"
|
|
opts.Cluster.Host = "127.0.0.1"
|
|
opts.Cluster.Port = -1
|
|
opts.Cluster.Permissions = &RoutePermissions{
|
|
Import: &SubjectPermission{
|
|
Allow: []string{"foo"},
|
|
},
|
|
Export: &SubjectPermission{
|
|
Allow: []string{"*"},
|
|
Deny: []string{"foo", "nats"},
|
|
},
|
|
}
|
|
|
|
s1 := RunServer(opts)
|
|
defer s1.Shutdown()
|
|
|
|
opts = DefaultMonitorOptions()
|
|
opts.ServerName = "monitor_server_2"
|
|
opts.Cluster.Host = "127.0.0.1"
|
|
opts.Cluster.Name = "A"
|
|
opts.Cluster.Port = -1
|
|
routeURL, _ := url.Parse(fmt.Sprintf("nats-route://127.0.0.1:%d", s1.ClusterAddr().Port))
|
|
opts.Routes = []*url.URL{routeURL}
|
|
opts.HTTPPort = -1
|
|
|
|
s2 := RunServer(opts)
|
|
defer s2.Shutdown()
|
|
|
|
checkClusterFormed(t, s1, s2)
|
|
|
|
urls := []string{
|
|
fmt.Sprintf("http://127.0.0.1:%d/routez", s1.MonitorAddr().Port),
|
|
fmt.Sprintf("http://127.0.0.1:%d/routez", s2.MonitorAddr().Port),
|
|
}
|
|
servers := []*Server{s1, s2}
|
|
|
|
for i, url := range urls {
|
|
for mode := 0; mode < 2; mode++ {
|
|
rz := pollRoutez(t, servers[i], mode, url, nil)
|
|
// For server 1, we expect to see imports and exports
|
|
if i == 0 {
|
|
if rz.Import == nil || rz.Import.Allow == nil ||
|
|
len(rz.Import.Allow) != 1 || rz.Import.Allow[0] != "foo" ||
|
|
rz.Import.Deny != nil {
|
|
t.Fatalf("Unexpected Import %v", rz.Import)
|
|
}
|
|
if rz.Export == nil || rz.Export.Allow == nil || rz.Export.Deny == nil ||
|
|
len(rz.Export.Allow) != 1 || rz.Export.Allow[0] != "*" ||
|
|
len(rz.Export.Deny) != 2 || rz.Export.Deny[0] != "foo" || rz.Export.Deny[1] != "nats" {
|
|
t.Fatalf("Unexpected Export %v", rz.Export)
|
|
}
|
|
} else {
|
|
// We expect to see NO imports and exports for server B by default.
|
|
if rz.Import != nil {
|
|
t.Fatal("Routez body should NOT contain \"import\" information.")
|
|
}
|
|
if rz.Export != nil {
|
|
t.Fatal("Routez body should NOT contain \"export\" information.")
|
|
}
|
|
// We do expect to see them show up for the information we have on Server A though.
|
|
if len(rz.Routes) != 1 {
|
|
t.Fatalf("Expected route array of 1, got %v\n", len(rz.Routes))
|
|
}
|
|
route := rz.Routes[0]
|
|
if route.Import == nil || route.Import.Allow == nil ||
|
|
len(route.Import.Allow) != 1 || route.Import.Allow[0] != "foo" ||
|
|
route.Import.Deny != nil {
|
|
t.Fatalf("Unexpected Import %v", route.Import)
|
|
}
|
|
if route.Export == nil || route.Export.Allow == nil || route.Export.Deny == nil ||
|
|
len(route.Export.Allow) != 1 || route.Export.Allow[0] != "*" ||
|
|
len(route.Export.Deny) != 2 || route.Export.Deny[0] != "foo" || route.Export.Deny[1] != "nats" {
|
|
t.Fatalf("Unexpected Export %v", route.Export)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Benchmark our Connz generation. Don't use HTTP here, just measure server endpoint.
|
|
func Benchmark_Connz(b *testing.B) {
|
|
runtime.MemProfileRate = 0
|
|
|
|
s := runMonitorServerNoHTTPPort()
|
|
defer s.Shutdown()
|
|
|
|
opts := s.getOpts()
|
|
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
|
|
|
// Create 250 connections with 100 subs each.
|
|
for i := 0; i < 250; i++ {
|
|
nc, err := nats.Connect(url)
|
|
if err != nil {
|
|
b.Fatalf("Error on connection[%d] to %s: %v", i, url, err)
|
|
}
|
|
for x := 0; x < 100; x++ {
|
|
subj := fmt.Sprintf("foo.%d", x)
|
|
nc.Subscribe(subj, func(m *nats.Msg) {})
|
|
}
|
|
nc.Flush()
|
|
defer nc.Close()
|
|
}
|
|
|
|
b.ResetTimer()
|
|
runtime.MemProfileRate = 1
|
|
|
|
copts := &ConnzOptions{Subscriptions: false}
|
|
for i := 0; i < b.N; i++ {
|
|
_, err := s.Connz(copts)
|
|
if err != nil {
|
|
b.Fatalf("Error on Connz(): %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func Benchmark_Varz(b *testing.B) {
|
|
runtime.MemProfileRate = 0
|
|
|
|
s := runMonitorServerNoHTTPPort()
|
|
defer s.Shutdown()
|
|
|
|
b.ResetTimer()
|
|
runtime.MemProfileRate = 1
|
|
|
|
for i := 0; i < b.N; i++ {
|
|
_, err := s.Varz(nil)
|
|
if err != nil {
|
|
b.Fatalf("Error on Connz(): %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func Benchmark_VarzHttp(b *testing.B) {
|
|
runtime.MemProfileRate = 0
|
|
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
murl := fmt.Sprintf("http://127.0.0.1:%d/varz", s.MonitorAddr().Port)
|
|
|
|
b.ResetTimer()
|
|
runtime.MemProfileRate = 1
|
|
|
|
for i := 0; i < b.N; i++ {
|
|
v := &Varz{}
|
|
resp, err := http.Get(murl)
|
|
if err != nil {
|
|
b.Fatalf("Expected no error: Got %v\n", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
body, err := ioutil.ReadAll(resp.Body)
|
|
if err != nil {
|
|
b.Fatalf("Got an error reading the body: %v\n", err)
|
|
}
|
|
if err := json.Unmarshal(body, v); err != nil {
|
|
b.Fatalf("Got an error unmarshalling the body: %v\n", err)
|
|
}
|
|
resp.Body.Close()
|
|
}
|
|
}
|
|
|
|
func TestVarzRaces(t *testing.T) {
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
|
|
murl := fmt.Sprintf("http://127.0.0.1:%d/varz", s.MonitorAddr().Port)
|
|
done := make(chan struct{})
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for {
|
|
for i := 0; i < 2; i++ {
|
|
v := pollVarz(t, s, i, murl, nil)
|
|
// Check the field that we are setting in main thread
|
|
// to ensure that we have a copy and there is no
|
|
// race with fields set in s.info and s.opts
|
|
if v.ID == "abc" || v.MaxConn == -1 {
|
|
// We will not get there. Need to have something
|
|
// otherwise staticcheck will report empty branch
|
|
return
|
|
}
|
|
|
|
select {
|
|
case <-done:
|
|
return
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
for i := 0; i < 1000; i++ {
|
|
// Simulate a change in server's info and options
|
|
// by changing something.
|
|
s.mu.Lock()
|
|
s.info.ID = fmt.Sprintf("serverid_%d", i)
|
|
s.opts.MaxConn = 100 + i
|
|
s.mu.Unlock()
|
|
time.Sleep(time.Nanosecond)
|
|
}
|
|
close(done)
|
|
wg.Wait()
|
|
|
|
// Now check that there is no race doing parallel polling
|
|
wg.Add(3)
|
|
done = make(chan struct{})
|
|
poll := func() {
|
|
defer wg.Done()
|
|
for {
|
|
for mode := 0; mode < 2; mode++ {
|
|
pollVarz(t, s, mode, murl, nil)
|
|
}
|
|
select {
|
|
case <-done:
|
|
return
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
for i := 0; i < 3; i++ {
|
|
go poll()
|
|
}
|
|
time.Sleep(500 * time.Millisecond)
|
|
close(done)
|
|
wg.Wait()
|
|
}
|
|
|
|
func testMonitorStructPresent(t *testing.T, tag string) {
|
|
t.Helper()
|
|
|
|
resetPreviousHTTPConnections()
|
|
opts := DefaultMonitorOptions()
|
|
opts.NoSystemAccount = true
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
varzURL := fmt.Sprintf("http://127.0.0.1:%d/varz", s.MonitorAddr().Port)
|
|
body := readBody(t, varzURL)
|
|
if !bytes.Contains(body, []byte(`"`+tag+`": {}`)) {
|
|
t.Fatalf("%s should be present and empty, got %s", tag, body)
|
|
}
|
|
}
|
|
|
|
func TestMonitorCluster(t *testing.T) {
|
|
testMonitorStructPresent(t, "cluster")
|
|
|
|
resetPreviousHTTPConnections()
|
|
opts := DefaultMonitorOptions()
|
|
opts.NoSystemAccount = true
|
|
opts.Cluster.Name = "A"
|
|
opts.Cluster.Port = -1
|
|
opts.Cluster.AuthTimeout = 1
|
|
opts.Routes = RoutesFromStr("nats://127.0.0.1:1234")
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
expected := ClusterOptsVarz{
|
|
"A",
|
|
opts.Cluster.Host,
|
|
opts.Cluster.Port,
|
|
opts.Cluster.AuthTimeout,
|
|
[]string{"127.0.0.1:1234"},
|
|
opts.Cluster.TLSTimeout,
|
|
opts.Cluster.TLSConfig != nil,
|
|
opts.Cluster.TLSConfig != nil,
|
|
}
|
|
|
|
varzURL := fmt.Sprintf("http://127.0.0.1:%d/varz", s.MonitorAddr().Port)
|
|
for mode := 0; mode < 2; mode++ {
|
|
check := func(t *testing.T, v *Varz) {
|
|
t.Helper()
|
|
if !reflect.DeepEqual(v.Cluster, expected) {
|
|
t.Fatalf("mode=%v - expected %+v, got %+v", mode, expected, v.Cluster)
|
|
}
|
|
}
|
|
v := pollVarz(t, s, mode, varzURL, nil)
|
|
check(t, v)
|
|
|
|
// Having this here to make sure that if fields are added in ClusterOptsVarz,
|
|
// we make sure to update this test (compiler will report an error if we don't)
|
|
_ = ClusterOptsVarz{"", "", 0, 0, nil, 2, false, false}
|
|
|
|
// Alter the fields to make sure that we have a proper deep copy
|
|
// of what may be stored in the server. Anything we change here
|
|
// should not affect the next returned value.
|
|
v.Cluster.Name = "wrong"
|
|
v.Cluster.Host = "wrong"
|
|
v.Cluster.Port = 0
|
|
v.Cluster.AuthTimeout = 0
|
|
v.Cluster.URLs = []string{"wrong"}
|
|
v = pollVarz(t, s, mode, varzURL, nil)
|
|
check(t, v)
|
|
}
|
|
}
|
|
|
|
func TestMonitorClusterURLs(t *testing.T) {
|
|
resetPreviousHTTPConnections()
|
|
|
|
o2 := DefaultOptions()
|
|
o2.Cluster.Host = "127.0.0.1"
|
|
o2.Cluster.Name = "A"
|
|
|
|
s2 := RunServer(o2)
|
|
defer s2.Shutdown()
|
|
|
|
s2ClusterHostPort := fmt.Sprintf("127.0.0.1:%d", s2.ClusterAddr().Port)
|
|
|
|
template := `
|
|
port: -1
|
|
http: -1
|
|
cluster: {
|
|
name: "A"
|
|
port: -1
|
|
routes [
|
|
%s
|
|
%s
|
|
]
|
|
}
|
|
`
|
|
conf := createConfFile(t, []byte(fmt.Sprintf(template, "nats://"+s2ClusterHostPort, "")))
|
|
defer removeFile(t, conf)
|
|
s1, _ := RunServerWithConfig(conf)
|
|
defer s1.Shutdown()
|
|
|
|
checkClusterFormed(t, s1, s2)
|
|
|
|
// Check /varz cluster{} to see the URLs from s1 to s2
|
|
varzURL := fmt.Sprintf("http://127.0.0.1:%d/varz", s1.MonitorAddr().Port)
|
|
for mode := 0; mode < 2; mode++ {
|
|
v := pollVarz(t, s1, mode, varzURL, nil)
|
|
if n := len(v.Cluster.URLs); n != 1 {
|
|
t.Fatalf("mode=%v - Expected 1 URL, got %v", mode, n)
|
|
}
|
|
if v.Cluster.URLs[0] != s2ClusterHostPort {
|
|
t.Fatalf("mode=%v - Expected url %q, got %q", mode, s2ClusterHostPort, v.Cluster.URLs[0])
|
|
}
|
|
}
|
|
|
|
otherClusterHostPort := "127.0.0.1:1234"
|
|
// Now update the config and add a route
|
|
changeCurrentConfigContentWithNewContent(t, conf, []byte(fmt.Sprintf(template, "nats://"+s2ClusterHostPort, "nats://"+otherClusterHostPort)))
|
|
|
|
if err := s1.Reload(); err != nil {
|
|
t.Fatalf("Error on reload: %v", err)
|
|
}
|
|
|
|
// Verify cluster still ok
|
|
checkClusterFormed(t, s1, s2)
|
|
|
|
// Now verify that s1 reports in /varz the new URL
|
|
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
|
|
for mode := 0; mode < 2; mode++ {
|
|
v := pollVarz(t, s1, mode, varzURL, nil)
|
|
if n := len(v.Cluster.URLs); n != 2 {
|
|
t.Fatalf("mode=%v - Expected 2 URL, got %v", mode, n)
|
|
}
|
|
gotS2 := false
|
|
gotOther := false
|
|
for _, u := range v.Cluster.URLs {
|
|
if u == s2ClusterHostPort {
|
|
gotS2 = true
|
|
} else if u == otherClusterHostPort {
|
|
gotOther = true
|
|
} else {
|
|
t.Fatalf("mode=%v - Incorrect url: %q", mode, u)
|
|
}
|
|
}
|
|
if !gotS2 {
|
|
t.Fatalf("mode=%v - Did not get cluster URL for s2", mode)
|
|
}
|
|
if !gotOther {
|
|
t.Fatalf("mode=%v - Did not get the new cluster URL", mode)
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// Remove all routes from config
|
|
changeCurrentConfigContentWithNewContent(t, conf, []byte(fmt.Sprintf(template, "", "")))
|
|
|
|
if err := s1.Reload(); err != nil {
|
|
t.Fatalf("Error on reload: %v", err)
|
|
}
|
|
|
|
// Now verify that s1 reports no ULRs in /varz
|
|
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
|
|
for mode := 0; mode < 2; mode++ {
|
|
v := pollVarz(t, s1, mode, varzURL, nil)
|
|
if n := len(v.Cluster.URLs); n != 0 {
|
|
t.Fatalf("mode=%v - Expected 0 URL, got %v", mode, n)
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func TestMonitorGateway(t *testing.T) {
|
|
testMonitorStructPresent(t, "gateway")
|
|
|
|
resetPreviousHTTPConnections()
|
|
opts := DefaultMonitorOptions()
|
|
opts.NoSystemAccount = true
|
|
opts.Gateway.Name = "A"
|
|
opts.Gateway.Port = -1
|
|
opts.Gateway.AuthTimeout = 1
|
|
opts.Gateway.TLSTimeout = 1
|
|
opts.Gateway.Advertise = "127.0.0.1"
|
|
opts.Gateway.ConnectRetries = 1
|
|
opts.Gateway.RejectUnknown = false
|
|
u1, _ := url.Parse("nats://ivan:pwd@localhost:1234")
|
|
u2, _ := url.Parse("nats://localhost:1235")
|
|
opts.Gateway.Gateways = []*RemoteGatewayOpts{
|
|
{
|
|
Name: "B",
|
|
TLSTimeout: 1,
|
|
URLs: []*url.URL{
|
|
u1,
|
|
u2,
|
|
},
|
|
},
|
|
}
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
expected := GatewayOptsVarz{
|
|
"A",
|
|
opts.Gateway.Host,
|
|
opts.Gateway.Port,
|
|
opts.Gateway.AuthTimeout,
|
|
opts.Gateway.TLSTimeout,
|
|
opts.Gateway.TLSConfig != nil,
|
|
opts.Gateway.TLSConfig != nil,
|
|
opts.Gateway.Advertise,
|
|
opts.Gateway.ConnectRetries,
|
|
[]RemoteGatewayOptsVarz{{"B", 1, nil}},
|
|
opts.Gateway.RejectUnknown,
|
|
}
|
|
// Since URLs array is not guaranteed to be always the same order,
|
|
// we don't add it in the expected GatewayOptsVarz, instead we
|
|
// maintain here.
|
|
expectedURLs := []string{"localhost:1234", "localhost:1235"}
|
|
|
|
varzURL := fmt.Sprintf("http://127.0.0.1:%d/varz", s.MonitorAddr().Port)
|
|
for mode := 0; mode < 2; mode++ {
|
|
check := func(t *testing.T, v *Varz) {
|
|
t.Helper()
|
|
var urls []string
|
|
if len(v.Gateway.Gateways) == 1 {
|
|
urls = v.Gateway.Gateways[0].URLs
|
|
v.Gateway.Gateways[0].URLs = nil
|
|
}
|
|
if !reflect.DeepEqual(v.Gateway, expected) {
|
|
t.Fatalf("mode=%v - expected %+v, got %+v", mode, expected, v.Gateway)
|
|
}
|
|
// Now compare urls
|
|
for _, u := range expectedURLs {
|
|
ok := false
|
|
for _, u2 := range urls {
|
|
if u == u2 {
|
|
ok = true
|
|
break
|
|
}
|
|
}
|
|
if !ok {
|
|
t.Fatalf("mode=%v - expected urls to be %v, got %v", mode, expected.Gateways[0].URLs, urls)
|
|
}
|
|
}
|
|
}
|
|
v := pollVarz(t, s, mode, varzURL, nil)
|
|
check(t, v)
|
|
|
|
// Having this here to make sure that if fields are added in GatewayOptsVarz,
|
|
// we make sure to update this test (compiler will report an error if we don't)
|
|
_ = GatewayOptsVarz{"", "", 0, 0, 0, false, false, "", 0, []RemoteGatewayOptsVarz{{"", 0, nil}}, false}
|
|
|
|
// Alter the fields to make sure that we have a proper deep copy
|
|
// of what may be stored in the server. Anything we change here
|
|
// should not affect the next returned value.
|
|
v.Gateway.Name = "wrong"
|
|
v.Gateway.Host = "wrong"
|
|
v.Gateway.Port = 0
|
|
v.Gateway.AuthTimeout = 1234.5
|
|
v.Gateway.TLSTimeout = 1234.5
|
|
v.Gateway.Advertise = "wrong"
|
|
v.Gateway.ConnectRetries = 1234
|
|
v.Gateway.Gateways[0].Name = "wrong"
|
|
v.Gateway.Gateways[0].TLSTimeout = 1234.5
|
|
v.Gateway.Gateways[0].URLs = []string{"wrong"}
|
|
v.Gateway.RejectUnknown = true
|
|
v = pollVarz(t, s, mode, varzURL, nil)
|
|
check(t, v)
|
|
}
|
|
}
|
|
|
|
func TestMonitorGatewayURLsUpdated(t *testing.T) {
|
|
resetPreviousHTTPConnections()
|
|
|
|
ob1 := testDefaultOptionsForGateway("B")
|
|
sb1 := runGatewayServer(ob1)
|
|
defer sb1.Shutdown()
|
|
|
|
// Start a1 that has a single URL to sb1.
|
|
oa := testGatewayOptionsFromToWithServers(t, "A", "B", sb1)
|
|
oa.HTTPHost = "127.0.0.1"
|
|
oa.HTTPPort = MONITOR_PORT
|
|
sa := runGatewayServer(oa)
|
|
defer sa.Shutdown()
|
|
|
|
waitForOutboundGateways(t, sa, 1, 2*time.Second)
|
|
|
|
varzURL := fmt.Sprintf("http://127.0.0.1:%d/varz", sa.MonitorAddr().Port)
|
|
// Check the /varz gateway's URLs
|
|
for mode := 0; mode < 2; mode++ {
|
|
v := pollVarz(t, sa, mode, varzURL, nil)
|
|
if n := len(v.Gateway.Gateways); n != 1 {
|
|
t.Fatalf("mode=%v - Expected 1 remote gateway, got %v", mode, n)
|
|
}
|
|
gw := v.Gateway.Gateways[0]
|
|
if n := len(gw.URLs); n != 1 {
|
|
t.Fatalf("mode=%v - Expected 1 url, got %v", mode, n)
|
|
}
|
|
expected := oa.Gateway.Gateways[0].URLs[0].Host
|
|
if u := gw.URLs[0]; u != expected {
|
|
t.Fatalf("mode=%v - Expected URL %q, got %q", mode, expected, u)
|
|
}
|
|
}
|
|
|
|
// Now start sb2 that clusters with sb1. sa should add to its list of URLs
|
|
// sb2 gateway's connect URL.
|
|
ob2 := testDefaultOptionsForGateway("B")
|
|
ob2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", sb1.ClusterAddr().Port))
|
|
sb2 := runGatewayServer(ob2)
|
|
defer sb2.Shutdown()
|
|
|
|
// Wait for sb1 and sb2 to connect
|
|
checkClusterFormed(t, sb1, sb2)
|
|
// sb2 should be made aware of gateway A and connect to sa
|
|
waitForInboundGateways(t, sa, 2, 2*time.Second)
|
|
// Now check that URLs in /varz get updated
|
|
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
|
|
for mode := 0; mode < 2; mode++ {
|
|
v := pollVarz(t, sa, mode, varzURL, nil)
|
|
if n := len(v.Gateway.Gateways); n != 1 {
|
|
return fmt.Errorf("mode=%v - Expected 1 remote gateway, got %v", mode, n)
|
|
}
|
|
gw := v.Gateway.Gateways[0]
|
|
if n := len(gw.URLs); n != 2 {
|
|
return fmt.Errorf("mode=%v - Expected 2 urls, got %v", mode, n)
|
|
}
|
|
|
|
gotSB1 := false
|
|
gotSB2 := false
|
|
for _, u := range gw.URLs {
|
|
if u == fmt.Sprintf("127.0.0.1:%d", sb1.GatewayAddr().Port) {
|
|
gotSB1 = true
|
|
} else if u == fmt.Sprintf("127.0.0.1:%d", sb2.GatewayAddr().Port) {
|
|
gotSB2 = true
|
|
} else {
|
|
return fmt.Errorf("mode=%v - Incorrect URL to gateway B: %v", mode, u)
|
|
}
|
|
}
|
|
if !gotSB1 {
|
|
return fmt.Errorf("mode=%v - Did not get URL to sb1", mode)
|
|
}
|
|
if !gotSB2 {
|
|
return fmt.Errorf("mode=%v - Did not get URL to sb2", mode)
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func TestMonitorLeafNode(t *testing.T) {
|
|
testMonitorStructPresent(t, "leaf")
|
|
|
|
resetPreviousHTTPConnections()
|
|
opts := DefaultMonitorOptions()
|
|
opts.NoSystemAccount = true
|
|
opts.LeafNode.Port = -1
|
|
opts.LeafNode.AuthTimeout = 1
|
|
opts.LeafNode.TLSTimeout = 1
|
|
opts.Accounts = []*Account{NewAccount("acc")}
|
|
u, _ := url.Parse("nats://ivan:pwd@localhost:1234")
|
|
opts.LeafNode.Remotes = []*RemoteLeafOpts{
|
|
{
|
|
LocalAccount: "acc",
|
|
URLs: []*url.URL{u},
|
|
TLSTimeout: 1,
|
|
},
|
|
}
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
expected := LeafNodeOptsVarz{
|
|
opts.LeafNode.Host,
|
|
opts.LeafNode.Port,
|
|
opts.LeafNode.AuthTimeout,
|
|
opts.LeafNode.TLSTimeout,
|
|
opts.LeafNode.TLSConfig != nil,
|
|
opts.LeafNode.TLSConfig != nil,
|
|
[]RemoteLeafOptsVarz{
|
|
{
|
|
"acc", 1, []string{"localhost:1234"}, nil,
|
|
},
|
|
},
|
|
}
|
|
|
|
varzURL := fmt.Sprintf("http://127.0.0.1:%d/varz", s.MonitorAddr().Port)
|
|
|
|
for mode := 0; mode < 2; mode++ {
|
|
check := func(t *testing.T, v *Varz) {
|
|
t.Helper()
|
|
if !reflect.DeepEqual(v.LeafNode, expected) {
|
|
t.Fatalf("mode=%v - expected %+v, got %+v", mode, expected, v.LeafNode)
|
|
}
|
|
}
|
|
v := pollVarz(t, s, mode, varzURL, nil)
|
|
check(t, v)
|
|
|
|
// Having this here to make sure that if fields are added in ClusterOptsVarz,
|
|
// we make sure to update this test (compiler will report an error if we don't)
|
|
_ = LeafNodeOptsVarz{"", 0, 0, 0, false, false, []RemoteLeafOptsVarz{{"", 0, nil, nil}}}
|
|
|
|
// Alter the fields to make sure that we have a proper deep copy
|
|
// of what may be stored in the server. Anything we change here
|
|
// should not affect the next returned value.
|
|
v.LeafNode.Host = "wrong"
|
|
v.LeafNode.Port = 0
|
|
v.LeafNode.AuthTimeout = 1234.5
|
|
v.LeafNode.TLSTimeout = 1234.5
|
|
v.LeafNode.Remotes[0].LocalAccount = "wrong"
|
|
v.LeafNode.Remotes[0].URLs = append(v.LeafNode.Remotes[0].URLs, "wrong")
|
|
v.LeafNode.Remotes[0].TLSTimeout = 1234.5
|
|
v = pollVarz(t, s, mode, varzURL, nil)
|
|
check(t, v)
|
|
}
|
|
}
|
|
|
|
func pollGatewayz(t *testing.T, s *Server, mode int, url string, opts *GatewayzOptions) *Gatewayz {
|
|
t.Helper()
|
|
if mode == 0 {
|
|
g := &Gatewayz{}
|
|
body := readBody(t, url)
|
|
if err := json.Unmarshal(body, g); err != nil {
|
|
t.Fatalf("Got an error unmarshalling the body: %v\n", err)
|
|
}
|
|
return g
|
|
}
|
|
g, err := s.Gatewayz(opts)
|
|
if err != nil {
|
|
t.Fatalf("Error on Gatewayz: %v", err)
|
|
}
|
|
return g
|
|
}
|
|
|
|
func TestMonitorGatewayz(t *testing.T) {
|
|
resetPreviousHTTPConnections()
|
|
|
|
// First check that without gateway configured
|
|
s := runMonitorServer()
|
|
defer s.Shutdown()
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/gatewayz", s.MonitorAddr().Port)
|
|
for pollMode := 0; pollMode < 2; pollMode++ {
|
|
g := pollGatewayz(t, s, pollMode, url, nil)
|
|
// Expect Name and port to be empty
|
|
if g.Name != _EMPTY_ || g.Port != 0 {
|
|
t.Fatalf("Expected no gateway, got %+v", g)
|
|
}
|
|
}
|
|
s.Shutdown()
|
|
|
|
ob1 := testDefaultOptionsForGateway("B")
|
|
sb1 := runGatewayServer(ob1)
|
|
defer sb1.Shutdown()
|
|
|
|
// Start a1 that has a single URL to sb1.
|
|
oa := testGatewayOptionsFromToWithServers(t, "A", "B", sb1)
|
|
oa.HTTPHost = "127.0.0.1"
|
|
oa.HTTPPort = MONITOR_PORT
|
|
sa := runGatewayServer(oa)
|
|
defer sa.Shutdown()
|
|
|
|
waitForOutboundGateways(t, sa, 1, 2*time.Second)
|
|
waitForInboundGateways(t, sa, 1, 2*time.Second)
|
|
|
|
waitForOutboundGateways(t, sb1, 1, 2*time.Second)
|
|
waitForInboundGateways(t, sb1, 1, 2*time.Second)
|
|
|
|
gatewayzURL := fmt.Sprintf("http://127.0.0.1:%d/gatewayz", sa.MonitorAddr().Port)
|
|
for pollMode := 0; pollMode < 2; pollMode++ {
|
|
g := pollGatewayz(t, sa, pollMode, gatewayzURL, nil)
|
|
if g.Host != oa.Gateway.Host {
|
|
t.Fatalf("mode=%v - Expected host to be %q, got %q", pollMode, oa.Gateway.Host, g.Host)
|
|
}
|
|
if g.Port != oa.Gateway.Port {
|
|
t.Fatalf("mode=%v - Expected port to be %v, got %v", pollMode, oa.Gateway.Port, g.Port)
|
|
}
|
|
if n := len(g.OutboundGateways); n != 1 {
|
|
t.Fatalf("mode=%v - Expected outbound to 1 gateway, got %v", pollMode, n)
|
|
}
|
|
if n := len(g.InboundGateways); n != 1 {
|
|
t.Fatalf("mode=%v - Expected inbound from 1 gateway, got %v", pollMode, n)
|
|
}
|
|
og := g.OutboundGateways["B"]
|
|
if og == nil {
|
|
t.Fatalf("mode=%v - Expected to find outbound connection to B, got none", pollMode)
|
|
}
|
|
if !og.IsConfigured {
|
|
t.Fatalf("mode=%v - Expected gw connection to be configured, was not", pollMode)
|
|
}
|
|
if og.Connection == nil {
|
|
t.Fatalf("mode=%v - Expected outbound connection to B to be set, wat not", pollMode)
|
|
}
|
|
if og.Connection.Name != sb1.ID() {
|
|
t.Fatalf("mode=%v - Expected outbound connection to B to have name %q, got %q", pollMode, sb1.ID(), og.Connection.Name)
|
|
}
|
|
if n := len(og.Accounts); n != 0 {
|
|
t.Fatalf("mode=%v - Expected no account, got %v", pollMode, n)
|
|
}
|
|
ig := g.InboundGateways["B"]
|
|
if ig == nil {
|
|
t.Fatalf("mode=%v - Expected to find inbound connection from B, got none", pollMode)
|
|
}
|
|
if n := len(ig); n != 1 {
|
|
t.Fatalf("mode=%v - Expected 1 inbound connection, got %v", pollMode, n)
|
|
}
|
|
igc := ig[0]
|
|
if igc.Connection == nil {
|
|
t.Fatalf("mode=%v - Expected inbound connection to B to be set, wat not", pollMode)
|
|
}
|
|
if igc.Connection.Name != sb1.ID() {
|
|
t.Fatalf("mode=%v - Expected inbound connection to B to have name %q, got %q", pollMode, sb1.ID(), igc.Connection.Name)
|
|
}
|
|
}
|
|
|
|
// Now start sb2 that clusters with sb1. sa should add to its list of URLs
|
|
// sb2 gateway's connect URL.
|
|
ob2 := testDefaultOptionsForGateway("B")
|
|
ob2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", sb1.ClusterAddr().Port))
|
|
sb2 := runGatewayServer(ob2)
|
|
defer sb2.Shutdown()
|
|
|
|
// Wait for sb1 and sb2 to connect
|
|
checkClusterFormed(t, sb1, sb2)
|
|
// sb2 should be made aware of gateway A and connect to sa
|
|
waitForInboundGateways(t, sa, 2, 2*time.Second)
|
|
// Now check that URLs in /varz get updated
|
|
checkGatewayB := func(t *testing.T, url string, opts *GatewayzOptions) {
|
|
t.Helper()
|
|
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
|
|
for pollMode := 0; pollMode < 2; pollMode++ {
|
|
g := pollGatewayz(t, sa, pollMode, url, opts)
|
|
if n := len(g.OutboundGateways); n != 1 {
|
|
t.Fatalf("mode=%v - Expected outbound to 1 gateway, got %v", pollMode, n)
|
|
}
|
|
// The InboundGateways is a map with key the gateway names,
|
|
// then value is array of connections. So should be 1 here.
|
|
if n := len(g.InboundGateways); n != 1 {
|
|
t.Fatalf("mode=%v - Expected inbound from 1 gateway, got %v", pollMode, n)
|
|
}
|
|
ig := g.InboundGateways["B"]
|
|
if ig == nil {
|
|
t.Fatalf("mode=%v - Expected to find inbound connection from B, got none", pollMode)
|
|
}
|
|
if n := len(ig); n != 2 {
|
|
t.Fatalf("mode=%v - Expected 2 inbound connections from gateway B, got %v", pollMode, n)
|
|
}
|
|
gotSB1 := false
|
|
gotSB2 := false
|
|
for _, rg := range ig {
|
|
if rg.Connection != nil {
|
|
if rg.Connection.Name == sb1.ID() {
|
|
gotSB1 = true
|
|
} else if rg.Connection.Name == sb2.ID() {
|
|
gotSB2 = true
|
|
}
|
|
}
|
|
}
|
|
if !gotSB1 {
|
|
t.Fatalf("mode=%v - Missing inbound connection from sb1", pollMode)
|
|
}
|
|
if !gotSB2 {
|
|
t.Fatalf("mode=%v - Missing inbound connection from sb2", pollMode)
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
checkGatewayB(t, gatewayzURL, nil)
|
|
|
|
// Start a new cluser C that connects to B. A should see it as
|
|
// a non-configured gateway.
|
|
oc := testGatewayOptionsFromToWithServers(t, "C", "B", sb1)
|
|
sc := runGatewayServer(oc)
|
|
defer sc.Shutdown()
|
|
|
|
// All servers should have 2 outbound connections (one for each other cluster)
|
|
waitForOutboundGateways(t, sa, 2, 2*time.Second)
|
|
waitForOutboundGateways(t, sb1, 2, 2*time.Second)
|
|
waitForOutboundGateways(t, sb2, 2, 2*time.Second)
|
|
waitForOutboundGateways(t, sc, 2, 2*time.Second)
|
|
|
|
// Server sa should have 3 inbounds now
|
|
waitForInboundGateways(t, sa, 3, 2*time.Second)
|
|
|
|
// Check gatewayz again to see that we have C now.
|
|
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
|
|
for pollMode := 0; pollMode < 2; pollMode++ {
|
|
g := pollGatewayz(t, sa, pollMode, gatewayzURL, nil)
|
|
if n := len(g.OutboundGateways); n != 2 {
|
|
t.Fatalf("mode=%v - Expected outbound to 2 gateways, got %v", pollMode, n)
|
|
}
|
|
// The InboundGateways is a map with key the gateway names,
|
|
// then value is array of connections. So should be 2 here.
|
|
if n := len(g.InboundGateways); n != 2 {
|
|
t.Fatalf("mode=%v - Expected inbound from 2 gateways, got %v", pollMode, n)
|
|
}
|
|
og := g.OutboundGateways["C"]
|
|
if og == nil {
|
|
t.Fatalf("mode=%v - Expected to find outbound connection to C, got none", pollMode)
|
|
}
|
|
if og.IsConfigured {
|
|
t.Fatalf("mode=%v - Expected IsConfigured for gateway C to be false, was true", pollMode)
|
|
}
|
|
if og.Connection == nil {
|
|
t.Fatalf("mode=%v - Expected connection to C, got none", pollMode)
|
|
}
|
|
if og.Connection.Name != sc.ID() {
|
|
t.Fatalf("mode=%v - Expected outbound connection to C to have name %q, got %q", pollMode, sc.ID(), og.Connection.Name)
|
|
}
|
|
ig := g.InboundGateways["C"]
|
|
if ig == nil {
|
|
t.Fatalf("mode=%v - Expected to find inbound connection from C, got none", pollMode)
|
|
}
|
|
if n := len(ig); n != 1 {
|
|
t.Fatalf("mode=%v - Expected 1 inbound connections from gateway C, got %v", pollMode, n)
|
|
}
|
|
igc := ig[0]
|
|
if igc.Connection == nil {
|
|
t.Fatalf("mode=%v - Expected connection to C, got none", pollMode)
|
|
}
|
|
if igc.Connection.Name != sc.ID() {
|
|
t.Fatalf("mode=%v - Expected outbound connection to C to have name %q, got %q", pollMode, sc.ID(), og.Connection.Name)
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// Select only 1 gateway by passing the name to option/url
|
|
opts := &GatewayzOptions{Name: "B"}
|
|
checkGatewayB(t, gatewayzURL+"?gw_name=B", opts)
|
|
|
|
// Stop gateway C and check that we have only B, with and without filter.
|
|
sc.Shutdown()
|
|
checkGatewayB(t, gatewayzURL+"?gw_name=B", opts)
|
|
checkGatewayB(t, gatewayzURL, nil)
|
|
}
|
|
|
|
func TestMonitorGatewayzAccounts(t *testing.T) {
|
|
resetPreviousHTTPConnections()
|
|
|
|
// Create bunch of Accounts
|
|
totalAccounts := 15
|
|
accounts := ""
|
|
for i := 0; i < totalAccounts; i++ {
|
|
acc := fmt.Sprintf(" acc_%d: { users=[{user:user_%d, password: pwd}] }\n", i, i)
|
|
accounts += acc
|
|
}
|
|
|
|
bConf := createConfFile(t, []byte(fmt.Sprintf(`
|
|
accounts {
|
|
%s
|
|
}
|
|
port: -1
|
|
http: -1
|
|
gateway: {
|
|
name: "B"
|
|
port: -1
|
|
}
|
|
no_sys_acc = true
|
|
`, accounts)))
|
|
defer removeFile(t, bConf)
|
|
|
|
sb, ob := RunServerWithConfig(bConf)
|
|
defer sb.Shutdown()
|
|
sb.SetLogger(&DummyLogger{}, true, true)
|
|
|
|
// Start a1 that has a single URL to sb1.
|
|
aConf := createConfFile(t, []byte(fmt.Sprintf(`
|
|
accounts {
|
|
%s
|
|
}
|
|
port: -1
|
|
http: -1
|
|
gateway: {
|
|
name: "A"
|
|
port: -1
|
|
gateways [
|
|
{
|
|
name: "B"
|
|
url: "nats://127.0.0.1:%d"
|
|
}
|
|
]
|
|
}
|
|
no_sys_acc = true
|
|
`, accounts, sb.GatewayAddr().Port)))
|
|
defer removeFile(t, aConf)
|
|
|
|
sa, oa := RunServerWithConfig(aConf)
|
|
defer sa.Shutdown()
|
|
sa.SetLogger(&DummyLogger{}, true, true)
|
|
|
|
waitForOutboundGateways(t, sa, 1, 2*time.Second)
|
|
waitForInboundGateways(t, sa, 1, 2*time.Second)
|
|
waitForOutboundGateways(t, sb, 1, 2*time.Second)
|
|
waitForInboundGateways(t, sb, 1, 2*time.Second)
|
|
|
|
// Create clients for each account on A and publish a message
|
|
// so that list of accounts appear in gatewayz
|
|
produceMsgsFromA := func(t *testing.T) {
|
|
t.Helper()
|
|
for i := 0; i < totalAccounts; i++ {
|
|
nc, err := nats.Connect(fmt.Sprintf("nats://user_%d:pwd@%s:%d", i, oa.Host, oa.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
nc.Publish("foo", []byte("hello"))
|
|
nc.Flush()
|
|
nc.Close()
|
|
}
|
|
}
|
|
produceMsgsFromA(t)
|
|
|
|
// Wait for A- for all accounts
|
|
gwc := sa.getOutboundGatewayConnection("B")
|
|
for i := 0; i < totalAccounts; i++ {
|
|
checkForAccountNoInterest(t, gwc, fmt.Sprintf("acc_%d", i), true, 2*time.Second)
|
|
}
|
|
|
|
// Check accounts...
|
|
gatewayzURL := fmt.Sprintf("http://127.0.0.1:%d/gatewayz", sa.MonitorAddr().Port)
|
|
for pollMode := 0; pollMode < 2; pollMode++ {
|
|
// First, without asking for it, they should not be present.
|
|
g := pollGatewayz(t, sa, pollMode, gatewayzURL, nil)
|
|
og := g.OutboundGateways["B"]
|
|
if og == nil {
|
|
t.Fatalf("mode=%v - Expected outbound gateway to B, got none", pollMode)
|
|
}
|
|
if n := len(og.Accounts); n != 0 {
|
|
t.Fatalf("mode=%v - Expected accounts list to not be present by default, got %v", pollMode, n)
|
|
}
|
|
// Now ask for the accounts
|
|
g = pollGatewayz(t, sa, pollMode, gatewayzURL+"?accs=1", &GatewayzOptions{Accounts: true})
|
|
og = g.OutboundGateways["B"]
|
|
if og == nil {
|
|
t.Fatalf("mode=%v - Expected outbound gateway to B, got none", pollMode)
|
|
}
|
|
if n := len(og.Accounts); n != totalAccounts {
|
|
t.Fatalf("mode=%v - Expected to get all %d accounts, got %v", pollMode, totalAccounts, n)
|
|
}
|
|
// Now account details
|
|
for _, acc := range og.Accounts {
|
|
if acc.InterestMode != Optimistic.String() {
|
|
t.Fatalf("mode=%v - Expected optimistic mode, got %q", pollMode, acc.InterestMode)
|
|
}
|
|
// Since there is no interest at all on B, the publish
|
|
// will have resulted in total account no interest, so
|
|
// the number of no interest (subject wise) should be 0
|
|
if acc.NoInterestCount != 0 {
|
|
t.Fatalf("mode=%v - Expected 0 no-interest, got %v", pollMode, acc.NoInterestCount)
|
|
}
|
|
if acc.NumQueueSubscriptions != 0 || acc.TotalSubscriptions != 0 {
|
|
t.Fatalf("mode=%v - Expected total subs to be 0, got %v - and num queue subs to be 0, got %v",
|
|
pollMode, acc.TotalSubscriptions, acc.NumQueueSubscriptions)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Check inbound on B
|
|
gwURLServerB := fmt.Sprintf("http://127.0.0.1:%d/gatewayz", sb.MonitorAddr().Port)
|
|
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
|
|
for pollMode := 0; pollMode < 2; pollMode++ {
|
|
// First, without asking for it, they should not be present.
|
|
g := pollGatewayz(t, sb, pollMode, gwURLServerB, nil)
|
|
igs := g.InboundGateways["A"]
|
|
if igs == nil {
|
|
return fmt.Errorf("mode=%v - Expected inbound gateway to A, got none", pollMode)
|
|
}
|
|
if len(igs) != 1 {
|
|
return fmt.Errorf("mode=%v - Expected single inbound, got %v", pollMode, len(igs))
|
|
}
|
|
ig := igs[0]
|
|
if n := len(ig.Accounts); n != 0 {
|
|
return fmt.Errorf("mode=%v - Expected no account, got %v", pollMode, n)
|
|
}
|
|
// Check that list of accounts
|
|
g = pollGatewayz(t, sb, pollMode, gwURLServerB+"?accs=1", &GatewayzOptions{Accounts: true})
|
|
igs = g.InboundGateways["A"]
|
|
if igs == nil {
|
|
return fmt.Errorf("mode=%v - Expected inbound gateway to A, got none", pollMode)
|
|
}
|
|
if len(igs) != 1 {
|
|
return fmt.Errorf("mode=%v - Expected single inbound, got %v", pollMode, len(igs))
|
|
}
|
|
ig = igs[0]
|
|
if ig.Connection == nil {
|
|
return fmt.Errorf("mode=%v - Expected inbound connection from A to be set, wat not", pollMode)
|
|
}
|
|
if ig.Connection.Name != sa.ID() {
|
|
t.Fatalf("mode=%v - Expected inbound connection from A to have name %q, got %q", pollMode, sa.ID(), ig.Connection.Name)
|
|
}
|
|
if n := len(ig.Accounts); n != totalAccounts {
|
|
return fmt.Errorf("mode=%v - Expected to get all %d accounts, got %v", pollMode, totalAccounts, n)
|
|
}
|
|
// Now account details
|
|
for _, acc := range ig.Accounts {
|
|
if acc.InterestMode != Optimistic.String() {
|
|
return fmt.Errorf("mode=%v - Expected optimistic mode, got %q", pollMode, acc.InterestMode)
|
|
}
|
|
// Since there is no interest at all on B, the publish
|
|
// will have resulted in total account no interest, so
|
|
// the number of no interest (subject wise) should be 0
|
|
if acc.NoInterestCount != 0 {
|
|
t.Fatalf("mode=%v - Expected 0 no-interest, got %v", pollMode, acc.NoInterestCount)
|
|
}
|
|
// For inbound gateway, NumQueueSubscriptions and TotalSubscriptions
|
|
// are not relevant.
|
|
if acc.NumQueueSubscriptions != 0 || acc.TotalSubscriptions != 0 {
|
|
return fmt.Errorf("mode=%v - For inbound connection, expected num queue subs and total subs to be 0, got %v and %v",
|
|
pollMode, acc.TotalSubscriptions, acc.NumQueueSubscriptions)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// Now create subscriptions on B to prevent A- and check on subject no interest
|
|
for i := 0; i < totalAccounts; i++ {
|
|
nc, err := nats.Connect(fmt.Sprintf("nats://user_%d:pwd@%s:%d", i, ob.Host, ob.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
// Create a queue sub so it shows up in gatewayz
|
|
nc.QueueSubscribeSync("bar", "queue")
|
|
// Create plain subscriptions on baz.0, baz.1 and baz.2.
|
|
// Create to for each subject. Since gateways will send
|
|
// only once per subject, the number of subs should be 3, not 6.
|
|
for j := 0; j < 3; j++ {
|
|
subj := fmt.Sprintf("baz.%d", j)
|
|
nc.SubscribeSync(subj)
|
|
nc.SubscribeSync(subj)
|
|
}
|
|
nc.Flush()
|
|
}
|
|
|
|
for i := 0; i < totalAccounts; i++ {
|
|
accName := fmt.Sprintf("acc_%d", i)
|
|
checkForRegisteredQSubInterest(t, sa, "B", accName, "bar", 1, 2*time.Second)
|
|
}
|
|
|
|
// Resend msgs from A on foo, on all accounts. There will be no interest on this subject.
|
|
produceMsgsFromA(t)
|
|
|
|
for i := 0; i < totalAccounts; i++ {
|
|
accName := fmt.Sprintf("acc_%d", i)
|
|
checkForSubjectNoInterest(t, gwc, accName, "foo", true, 2*time.Second)
|
|
// Verify that we still have the queue interest registered
|
|
checkForRegisteredQSubInterest(t, sa, "B", accName, "bar", 1, 2*time.Second)
|
|
}
|
|
|
|
// Check accounts...
|
|
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
|
|
for pollMode := 0; pollMode < 2; pollMode++ {
|
|
g := pollGatewayz(t, sa, pollMode, gatewayzURL+"?accs=1", &GatewayzOptions{Accounts: true})
|
|
og := g.OutboundGateways["B"]
|
|
if og == nil {
|
|
return fmt.Errorf("mode=%v - Expected outbound gateway to B, got none", pollMode)
|
|
}
|
|
if n := len(og.Accounts); n != totalAccounts {
|
|
return fmt.Errorf("mode=%v - Expected to get all %d accounts, got %v", pollMode, totalAccounts, n)
|
|
}
|
|
// Now account details
|
|
for _, acc := range og.Accounts {
|
|
if acc.InterestMode != Optimistic.String() {
|
|
return fmt.Errorf("mode=%v - Expected optimistic mode, got %q", pollMode, acc.InterestMode)
|
|
}
|
|
if acc.NoInterestCount != 1 {
|
|
return fmt.Errorf("mode=%v - Expected 1 no-interest, got %v", pollMode, acc.NoInterestCount)
|
|
}
|
|
if acc.NumQueueSubscriptions != 1 || acc.TotalSubscriptions != 1 {
|
|
return fmt.Errorf("mode=%v - Expected total subs to be 1, got %v - and num queue subs to be 1, got %v",
|
|
pollMode, acc.TotalSubscriptions, acc.NumQueueSubscriptions)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// Check inbound on server B
|
|
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
|
|
for pollMode := 0; pollMode < 2; pollMode++ {
|
|
// Ask for accounts list
|
|
g := pollGatewayz(t, sb, pollMode, gwURLServerB+"?accs=1", &GatewayzOptions{Accounts: true})
|
|
igs := g.InboundGateways["A"]
|
|
if igs == nil {
|
|
return fmt.Errorf("mode=%v - Expected inbound gateway to A, got none", pollMode)
|
|
}
|
|
if len(igs) != 1 {
|
|
return fmt.Errorf("mode=%v - Expected single inbound, got %v", pollMode, len(igs))
|
|
}
|
|
ig := igs[0]
|
|
if ig.Connection == nil {
|
|
return fmt.Errorf("mode=%v - Expected inbound connection from A to be set, wat not", pollMode)
|
|
}
|
|
if ig.Connection.Name != sa.ID() {
|
|
t.Fatalf("mode=%v - Expected inbound connection from A to have name %q, got %q", pollMode, sa.ID(), ig.Connection.Name)
|
|
}
|
|
if n := len(ig.Accounts); n != totalAccounts {
|
|
return fmt.Errorf("mode=%v - Expected to get all %d accounts, got %v", pollMode, totalAccounts, n)
|
|
}
|
|
// Now account details
|
|
for _, acc := range ig.Accounts {
|
|
if acc.InterestMode != Optimistic.String() {
|
|
return fmt.Errorf("mode=%v - Expected optimistic mode, got %q", pollMode, acc.InterestMode)
|
|
}
|
|
if acc.NoInterestCount != 1 {
|
|
return fmt.Errorf("mode=%v - Expected 1 no-interest, got %v", pollMode, acc.NoInterestCount)
|
|
}
|
|
// For inbound gateway, NumQueueSubscriptions and TotalSubscriptions
|
|
// are not relevant.
|
|
if acc.NumQueueSubscriptions != 0 || acc.TotalSubscriptions != 0 {
|
|
return fmt.Errorf("mode=%v - For inbound connection, expected num queue subs and total subs to be 0, got %v and %v",
|
|
pollMode, acc.TotalSubscriptions, acc.NumQueueSubscriptions)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// Make one of the account to switch to interest only
|
|
nc, err := nats.Connect(fmt.Sprintf("nats://user_1:pwd@%s:%d", oa.Host, oa.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
for i := 0; i < 1100; i++ {
|
|
nc.Publish(fmt.Sprintf("foo.%d", i), []byte("hello"))
|
|
}
|
|
nc.Flush()
|
|
nc.Close()
|
|
|
|
// Check that we can select single account
|
|
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
|
|
for pollMode := 0; pollMode < 2; pollMode++ {
|
|
g := pollGatewayz(t, sa, pollMode, gatewayzURL+"?gw_name=B&acc_name=acc_1", &GatewayzOptions{Name: "B", AccountName: "acc_1"})
|
|
og := g.OutboundGateways["B"]
|
|
if og == nil {
|
|
return fmt.Errorf("mode=%v - Expected outbound gateway to B, got none", pollMode)
|
|
}
|
|
if n := len(og.Accounts); n != 1 {
|
|
return fmt.Errorf("mode=%v - Expected to get 1 account, got %v", pollMode, n)
|
|
}
|
|
// Now account details
|
|
acc := og.Accounts[0]
|
|
if acc.InterestMode != InterestOnly.String() {
|
|
return fmt.Errorf("mode=%v - Expected interest-only mode, got %q", pollMode, acc.InterestMode)
|
|
}
|
|
// Since we switched, this should be set to 0
|
|
if acc.NoInterestCount != 0 {
|
|
return fmt.Errorf("mode=%v - Expected 0 no-interest, got %v", pollMode, acc.NoInterestCount)
|
|
}
|
|
// We have created 3 subs on that account on B, and 1 queue sub.
|
|
// So total should be 4 and 1 for queue sub.
|
|
if acc.NumQueueSubscriptions != 1 {
|
|
return fmt.Errorf("mode=%v - Expected num queue subs to be 1, got %v",
|
|
pollMode, acc.NumQueueSubscriptions)
|
|
}
|
|
if acc.TotalSubscriptions != 4 {
|
|
return fmt.Errorf("mode=%v - Expected total subs to be 4, got %v",
|
|
pollMode, acc.TotalSubscriptions)
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// Check inbound on B now...
|
|
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
|
|
for pollMode := 0; pollMode < 2; pollMode++ {
|
|
g := pollGatewayz(t, sb, pollMode, gwURLServerB+"?gw_name=A&acc_name=acc_1", &GatewayzOptions{Name: "A", AccountName: "acc_1"})
|
|
igs := g.InboundGateways["A"]
|
|
if igs == nil {
|
|
return fmt.Errorf("mode=%v - Expected inbound gateway from A, got none", pollMode)
|
|
}
|
|
if len(igs) != 1 {
|
|
return fmt.Errorf("mode=%v - Expected single inbound, got %v", pollMode, len(igs))
|
|
}
|
|
ig := igs[0]
|
|
if n := len(ig.Accounts); n != 1 {
|
|
return fmt.Errorf("mode=%v - Expected to get 1 account, got %v", pollMode, n)
|
|
}
|
|
// Now account details
|
|
acc := ig.Accounts[0]
|
|
if acc.InterestMode != InterestOnly.String() {
|
|
return fmt.Errorf("mode=%v - Expected interest-only mode, got %q", pollMode, acc.InterestMode)
|
|
}
|
|
if acc.InterestMode != InterestOnly.String() {
|
|
return fmt.Errorf("Should be in %q mode, got %q", InterestOnly.String(), acc.InterestMode)
|
|
}
|
|
// Since we switched, this should be set to 0
|
|
if acc.NoInterestCount != 0 {
|
|
return fmt.Errorf("mode=%v - Expected 0 no-interest, got %v", pollMode, acc.NoInterestCount)
|
|
}
|
|
// Again, for inbound, these should be always 0.
|
|
if acc.NumQueueSubscriptions != 0 || acc.TotalSubscriptions != 0 {
|
|
return fmt.Errorf("mode=%v - For inbound connection, expected num queue subs and total subs to be 0, got %v and %v",
|
|
pollMode, acc.TotalSubscriptions, acc.NumQueueSubscriptions)
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func TestMonitorRouteRTT(t *testing.T) {
|
|
// Do not change default PingInterval and expect RTT to still be reported
|
|
|
|
ob := DefaultOptions()
|
|
sb := RunServer(ob)
|
|
defer sb.Shutdown()
|
|
|
|
oa := DefaultOptions()
|
|
oa.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", ob.Cluster.Host, ob.Cluster.Port))
|
|
sa := RunServer(oa)
|
|
defer sa.Shutdown()
|
|
|
|
checkClusterFormed(t, sa, sb)
|
|
|
|
checkRouteInfo := func(t *testing.T, s *Server) {
|
|
t.Helper()
|
|
routezURL := fmt.Sprintf("http://127.0.0.1:%d/routez", s.MonitorAddr().Port)
|
|
for pollMode := 0; pollMode < 2; pollMode++ {
|
|
checkFor(t, 2*firstPingInterval, 15*time.Millisecond, func() error {
|
|
rz := pollRoutez(t, s, pollMode, routezURL, nil)
|
|
if len(rz.Routes) != 1 {
|
|
return fmt.Errorf("Expected 1 route, got %v", len(rz.Routes))
|
|
}
|
|
ri := rz.Routes[0]
|
|
if ri.RTT == _EMPTY_ {
|
|
return fmt.Errorf("Route's RTT not reported")
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
}
|
|
checkRouteInfo(t, sa)
|
|
checkRouteInfo(t, sb)
|
|
}
|
|
|
|
func pollLeafz(t *testing.T, s *Server, mode int, url string, opts *LeafzOptions) *Leafz {
|
|
t.Helper()
|
|
if mode == 0 {
|
|
l := &Leafz{}
|
|
body := readBody(t, url)
|
|
if err := json.Unmarshal(body, l); err != nil {
|
|
t.Fatalf("Got an error unmarshalling the body: %v\n", err)
|
|
}
|
|
return l
|
|
}
|
|
l, err := s.Leafz(opts)
|
|
if err != nil {
|
|
t.Fatalf("Error on Leafz: %v", err)
|
|
}
|
|
return l
|
|
}
|
|
|
|
func TestMonitorOpJWT(t *testing.T) {
|
|
content := `
|
|
listen: "127.0.0.1:-1"
|
|
http: "127.0.0.1:-1"
|
|
operator = "../test/configs/nkeys/op.jwt"
|
|
resolver = MEMORY
|
|
`
|
|
conf := createConfFile(t, []byte(content))
|
|
defer removeFile(t, conf)
|
|
sa, _ := RunServerWithConfig(conf)
|
|
defer sa.Shutdown()
|
|
|
|
theJWT, err := ioutil.ReadFile("../test/configs/nkeys/op.jwt")
|
|
require_NoError(t, err)
|
|
theJWT = []byte(strings.Split(string(theJWT), "\n")[1])
|
|
claim, err := jwt.DecodeOperatorClaims(string(theJWT))
|
|
require_NoError(t, err)
|
|
|
|
pollURL := fmt.Sprintf("http://127.0.0.1:%d/varz", sa.MonitorAddr().Port)
|
|
for pollMode := 1; pollMode < 2; pollMode++ {
|
|
l := pollVarz(t, sa, pollMode, pollURL, nil)
|
|
|
|
if len(l.TrustedOperatorsJwt) != 1 {
|
|
t.Fatalf("Expected one operator jwt")
|
|
}
|
|
if len(l.TrustedOperatorsClaim) != 1 {
|
|
t.Fatalf("Expected one operator claim")
|
|
}
|
|
if l.TrustedOperatorsJwt[0] != string(theJWT) {
|
|
t.Fatalf("Expected operator to be identical to configuration")
|
|
}
|
|
if !reflect.DeepEqual(l.TrustedOperatorsClaim[0], claim) {
|
|
t.Fatal("claims need to be equal")
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestMonitorLeafz(t *testing.T) {
|
|
content := `
|
|
listen: "127.0.0.1:-1"
|
|
http: "127.0.0.1:-1"
|
|
operator = "../test/configs/nkeys/op.jwt"
|
|
resolver = MEMORY
|
|
ping_interval = 1
|
|
leafnodes {
|
|
listen: "127.0.0.1:-1"
|
|
}
|
|
`
|
|
conf := createConfFile(t, []byte(content))
|
|
defer removeFile(t, conf)
|
|
sb, ob := RunServerWithConfig(conf)
|
|
defer sb.Shutdown()
|
|
|
|
createAcc := func(t *testing.T) (*Account, string) {
|
|
t.Helper()
|
|
acc, akp := createAccount(sb)
|
|
kp, _ := nkeys.CreateUser()
|
|
pub, _ := kp.PublicKey()
|
|
nuc := jwt.NewUserClaims(pub)
|
|
ujwt, err := nuc.Encode(akp)
|
|
if err != nil {
|
|
t.Fatalf("Error generating user JWT: %v", err)
|
|
}
|
|
seed, _ := kp.Seed()
|
|
creds := genCredsFile(t, ujwt, seed)
|
|
return acc, creds
|
|
}
|
|
acc1, mycreds1 := createAcc(t)
|
|
defer removeFile(t, mycreds1)
|
|
acc2, mycreds2 := createAcc(t)
|
|
defer removeFile(t, mycreds2)
|
|
|
|
content = `
|
|
port: -1
|
|
http: "127.0.0.1:-1"
|
|
ping_interval = 1
|
|
accounts {
|
|
%s {
|
|
users [
|
|
{user: user1, password: pwd}
|
|
]
|
|
}
|
|
%s {
|
|
users [
|
|
{user: user2, password: pwd}
|
|
]
|
|
}
|
|
}
|
|
leafnodes {
|
|
remotes = [
|
|
{
|
|
account: "%s"
|
|
url: nats-leaf://127.0.0.1:%d
|
|
credentials: '%s'
|
|
}
|
|
{
|
|
account: "%s"
|
|
url: nats-leaf://127.0.0.1:%d
|
|
credentials: '%s'
|
|
}
|
|
]
|
|
}
|
|
`
|
|
config := fmt.Sprintf(content,
|
|
acc1.Name, acc2.Name,
|
|
acc1.Name, ob.LeafNode.Port, mycreds1,
|
|
acc2.Name, ob.LeafNode.Port, mycreds2)
|
|
conf = createConfFile(t, []byte(config))
|
|
defer removeFile(t, conf)
|
|
sa, oa := RunServerWithConfig(conf)
|
|
defer sa.Shutdown()
|
|
|
|
checkFor(t, time.Second, 15*time.Millisecond, func() error {
|
|
if n := sa.NumLeafNodes(); n != 2 {
|
|
return fmt.Errorf("Expected 2 leaf connections, got %v", n)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// Wait for initial RTT to be computed
|
|
time.Sleep(firstPingInterval + 500*time.Millisecond)
|
|
|
|
ch := make(chan bool, 1)
|
|
nc1B := natsConnect(t, fmt.Sprintf("nats://127.0.0.1:%d", ob.Port), nats.UserCredentials(mycreds1))
|
|
defer nc1B.Close()
|
|
natsSub(t, nc1B, "foo", func(_ *nats.Msg) { ch <- true })
|
|
natsSub(t, nc1B, "bar", func(_ *nats.Msg) {})
|
|
natsFlush(t, nc1B)
|
|
|
|
nc2B := natsConnect(t, fmt.Sprintf("nats://127.0.0.1:%d", ob.Port), nats.UserCredentials(mycreds2))
|
|
defer nc2B.Close()
|
|
natsSub(t, nc2B, "bar", func(_ *nats.Msg) { ch <- true })
|
|
natsSub(t, nc2B, "foo", func(_ *nats.Msg) {})
|
|
natsFlush(t, nc2B)
|
|
|
|
nc1A := natsConnect(t, fmt.Sprintf("nats://user1:pwd@127.0.0.1:%d", oa.Port))
|
|
defer nc1A.Close()
|
|
natsPub(t, nc1A, "foo", []byte("hello"))
|
|
natsFlush(t, nc1A)
|
|
|
|
waitCh(t, ch, "Did not get the message")
|
|
|
|
nc2A := natsConnect(t, fmt.Sprintf("nats://user2:pwd@127.0.0.1:%d", oa.Port))
|
|
defer nc2A.Close()
|
|
natsPub(t, nc2A, "bar", []byte("hello"))
|
|
natsPub(t, nc2A, "bar", []byte("hello"))
|
|
natsFlush(t, nc2A)
|
|
|
|
waitCh(t, ch, "Did not get the message")
|
|
waitCh(t, ch, "Did not get the message")
|
|
|
|
// Let's poll server A
|
|
pollURL := fmt.Sprintf("http://127.0.0.1:%d/leafz?subs=1", sa.MonitorAddr().Port)
|
|
for pollMode := 1; pollMode < 2; pollMode++ {
|
|
l := pollLeafz(t, sa, pollMode, pollURL, &LeafzOptions{Subscriptions: true})
|
|
if l.ID != sa.ID() {
|
|
t.Fatalf("Expected ID to be %q, got %q", sa.ID(), l.ID)
|
|
}
|
|
if l.Now.IsZero() {
|
|
t.Fatalf("Expected Now to be set, was not")
|
|
}
|
|
if l.NumLeafs != 2 {
|
|
t.Fatalf("Expected NumLeafs to be 2, got %v", l.NumLeafs)
|
|
}
|
|
if len(l.Leafs) != 2 {
|
|
t.Fatalf("Expected array to be len 2, got %v", len(l.Leafs))
|
|
}
|
|
for _, ln := range l.Leafs {
|
|
if ln.Account == acc1.Name {
|
|
if ln.OutMsgs != 1 || ln.OutBytes == 0 || ln.InMsgs != 0 || ln.InBytes != 0 {
|
|
t.Fatalf("Expected 1 OutMsgs/Bytes and 0 InMsgs/Bytes, got %+v", ln)
|
|
}
|
|
} else if ln.Account == acc2.Name {
|
|
if ln.OutMsgs != 2 || ln.OutBytes == 0 || ln.InMsgs != 0 || ln.InBytes != 0 {
|
|
t.Fatalf("Expected 2 OutMsgs/Bytes and 0 InMsgs/Bytes, got %+v", ln)
|
|
}
|
|
} else {
|
|
t.Fatalf("Expected account to be %q or %q, got %q", acc1.Name, acc2.Name, ln.Account)
|
|
}
|
|
if ln.RTT == "" {
|
|
t.Fatalf("RTT not tracked?")
|
|
}
|
|
if ln.NumSubs != 3 {
|
|
t.Fatalf("Expected 3 subs, got %v", ln.NumSubs)
|
|
}
|
|
if len(ln.Subs) != 3 {
|
|
t.Fatalf("Expected subs to be returned, got %v", len(ln.Subs))
|
|
}
|
|
var foundFoo bool
|
|
var foundBar bool
|
|
for _, sub := range ln.Subs {
|
|
if sub == "foo" {
|
|
foundFoo = true
|
|
} else if sub == "bar" {
|
|
foundBar = true
|
|
}
|
|
}
|
|
if !foundFoo {
|
|
t.Fatal("Did not find subject foo")
|
|
}
|
|
if !foundBar {
|
|
t.Fatal("Did not find subject bar")
|
|
}
|
|
}
|
|
}
|
|
// Make sure that if we don't ask for subs, we don't get them
|
|
pollURL = fmt.Sprintf("http://127.0.0.1:%d/leafz", sa.MonitorAddr().Port)
|
|
for pollMode := 1; pollMode < 2; pollMode++ {
|
|
l := pollLeafz(t, sa, pollMode, pollURL, nil)
|
|
for _, ln := range l.Leafs {
|
|
if ln.NumSubs != 3 {
|
|
t.Fatalf("Number of subs should be 3, got %v", ln.NumSubs)
|
|
}
|
|
if len(ln.Subs) != 0 {
|
|
t.Fatalf("Subs should not have been returned, got %v", ln.Subs)
|
|
}
|
|
}
|
|
}
|
|
// Make sure that we can request per account - existing account
|
|
pollURL = fmt.Sprintf("http://127.0.0.1:%d/leafz?acc=%s", sa.MonitorAddr().Port, acc1.Name)
|
|
for pollMode := 1; pollMode < 2; pollMode++ {
|
|
l := pollLeafz(t, sa, pollMode, pollURL, &LeafzOptions{Account: acc1.Name})
|
|
for _, ln := range l.Leafs {
|
|
if ln.Account != acc1.Name {
|
|
t.Fatalf("Expected leaf node to be from account %s, got: %v", acc1.Name, ln)
|
|
}
|
|
}
|
|
if len(l.Leafs) != 1 {
|
|
t.Fatalf("Expected only two leaf node for this account, got: %v", len(l.Leafs))
|
|
}
|
|
}
|
|
// Make sure that we can request per account - non existing account
|
|
pollURL = fmt.Sprintf("http://127.0.0.1:%d/leafz?acc=%s", sa.MonitorAddr().Port, "DOESNOTEXIST")
|
|
for pollMode := 1; pollMode < 2; pollMode++ {
|
|
l := pollLeafz(t, sa, pollMode, pollURL, &LeafzOptions{Account: "DOESNOTEXIST"})
|
|
if len(l.Leafs) != 0 {
|
|
t.Fatalf("Expected no leaf node for this account, got: %v", len(l.Leafs))
|
|
}
|
|
}
|
|
// Now polling server B.
|
|
pollURL = fmt.Sprintf("http://127.0.0.1:%d/leafz?subs=1", sb.MonitorAddr().Port)
|
|
for pollMode := 1; pollMode < 2; pollMode++ {
|
|
l := pollLeafz(t, sb, pollMode, pollURL, &LeafzOptions{Subscriptions: true})
|
|
if l.ID != sb.ID() {
|
|
t.Fatalf("Expected ID to be %q, got %q", sb.ID(), l.ID)
|
|
}
|
|
if l.Now.IsZero() {
|
|
t.Fatalf("Expected Now to be set, was not")
|
|
}
|
|
if l.NumLeafs != 2 {
|
|
t.Fatalf("Expected NumLeafs to be 1, got %v", l.NumLeafs)
|
|
}
|
|
if len(l.Leafs) != 2 {
|
|
t.Fatalf("Expected array to be len 2, got %v", len(l.Leafs))
|
|
}
|
|
for _, ln := range l.Leafs {
|
|
if ln.Account == acc1.Name {
|
|
if ln.OutMsgs != 0 || ln.OutBytes != 0 || ln.InMsgs != 1 || ln.InBytes == 0 {
|
|
t.Fatalf("Expected 1 InMsgs/Bytes and 0 OutMsgs/Bytes, got %+v", ln)
|
|
}
|
|
} else if ln.Account == acc2.Name {
|
|
if ln.OutMsgs != 0 || ln.OutBytes != 0 || ln.InMsgs != 2 || ln.InBytes == 0 {
|
|
t.Fatalf("Expected 2 InMsgs/Bytes and 0 OutMsgs/Bytes, got %+v", ln)
|
|
}
|
|
} else {
|
|
t.Fatalf("Expected account to be %q or %q, got %q", acc1.Name, acc2.Name, ln.Account)
|
|
}
|
|
if ln.RTT == "" {
|
|
t.Fatalf("RTT not tracked?")
|
|
}
|
|
// LDS should be only one.
|
|
if ln.NumSubs != 1 || len(ln.Subs) != 1 {
|
|
t.Fatalf("Expected 1 sub, got %v (%v)", ln.NumSubs, ln.Subs)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestMonitorAccountz(t *testing.T) {
|
|
s := RunServer(DefaultMonitorOptions())
|
|
defer s.Shutdown()
|
|
body := string(readBody(t, fmt.Sprintf("http://127.0.0.1:%d/accountz", s.MonitorAddr().Port)))
|
|
if !strings.Contains(body, `$G`) {
|
|
t.Fatalf("Body missing value. Contains: %s", body)
|
|
} else if !strings.Contains(body, `$SYS`) {
|
|
t.Fatalf("Body missing value. Contains: %s", body)
|
|
} else if !strings.Contains(body, `"accounts": [`) {
|
|
t.Fatalf("Body missing value. Contains: %s", body)
|
|
} else if !strings.Contains(body, `"system_account": "$SYS"`) {
|
|
t.Fatalf("Body missing value. Contains: %s", body)
|
|
}
|
|
body = string(readBody(t, fmt.Sprintf("http://127.0.0.1:%d/accountz?acc=$SYS", s.MonitorAddr().Port)))
|
|
if !strings.Contains(body, `"account_detail": {`) {
|
|
t.Fatalf("Body missing value. Contains: %s", body)
|
|
} else if !strings.Contains(body, `"account_name": "$SYS",`) {
|
|
t.Fatalf("Body missing value. Contains: %s", body)
|
|
} else if !strings.Contains(body, `"subscriptions": 36,`) {
|
|
t.Fatalf("Body missing value. Contains: %s", body)
|
|
} else if !strings.Contains(body, `"is_system": true,`) {
|
|
t.Fatalf("Body missing value. Contains: %s", body)
|
|
} else if !strings.Contains(body, `"system_account": "$SYS"`) {
|
|
t.Fatalf("Body missing value. Contains: %s", body)
|
|
}
|
|
}
|
|
|
|
func TestMonitorAuthorizedUsers(t *testing.T) {
|
|
kp, _ := nkeys.FromSeed(seed)
|
|
usrNKey, _ := kp.PublicKey()
|
|
opts := DefaultMonitorOptions()
|
|
opts.Nkeys = []*NkeyUser{{Nkey: string(usrNKey)}}
|
|
opts.Users = []*User{{Username: "user", Password: "pwd"}}
|
|
s := RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
checkAuthUser := func(expected string) {
|
|
t.Helper()
|
|
resetPreviousHTTPConnections()
|
|
url := fmt.Sprintf("http://127.0.0.1:%d/connz?auth=true", s.MonitorAddr().Port)
|
|
for mode := 0; mode < 2; mode++ {
|
|
connz := pollConz(t, s, mode, url, &ConnzOptions{Username: true})
|
|
if l := len(connz.Conns); l != 1 {
|
|
t.Fatalf("Expected 1, got %v", l)
|
|
}
|
|
conn := connz.Conns[0]
|
|
au := conn.AuthorizedUser
|
|
if au == _EMPTY_ {
|
|
t.Fatal("AuthorizedUser is empty!")
|
|
}
|
|
if au != expected {
|
|
t.Fatalf("Expected %q, got %q", expected, au)
|
|
}
|
|
}
|
|
}
|
|
|
|
c := natsConnect(t, fmt.Sprintf("nats://user:pwd@127.0.0.1:%d", opts.Port))
|
|
defer c.Close()
|
|
checkAuthUser("user")
|
|
c.Close()
|
|
|
|
c = natsConnect(t, fmt.Sprintf("nats://127.0.0.1:%d", opts.Port),
|
|
nats.Nkey(usrNKey, func(nonce []byte) ([]byte, error) {
|
|
return kp.Sign(nonce)
|
|
}))
|
|
defer c.Close()
|
|
// we should get the user's NKey
|
|
checkAuthUser(usrNKey)
|
|
c.Close()
|
|
|
|
s.Shutdown()
|
|
opts = DefaultMonitorOptions()
|
|
opts.Authorization = "sometoken"
|
|
s = RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
c = natsConnect(t, fmt.Sprintf("nats://127.0.0.1:%d", opts.Port),
|
|
nats.Token("sometoken"))
|
|
defer c.Close()
|
|
// We should get the token specified by the user
|
|
checkAuthUser("sometoken")
|
|
c.Close()
|
|
s.Shutdown()
|
|
|
|
opts = DefaultMonitorOptions()
|
|
// User an operator seed
|
|
kp, _ = nkeys.FromSeed(oSeed)
|
|
pub, _ := kp.PublicKey()
|
|
opts.TrustedKeys = []string{pub}
|
|
s = RunServer(opts)
|
|
defer s.Shutdown()
|
|
|
|
akp, _ := nkeys.CreateAccount()
|
|
apub, _ := akp.PublicKey()
|
|
nac := jwt.NewAccountClaims(apub)
|
|
ajwt, err := nac.Encode(oKp)
|
|
if err != nil {
|
|
t.Fatalf("Error generating account JWT: %v", err)
|
|
}
|
|
|
|
nkp, _ := nkeys.CreateUser()
|
|
upub, _ := nkp.PublicKey()
|
|
nuc := jwt.NewUserClaims(upub)
|
|
jwt, err := nuc.Encode(akp)
|
|
if err != nil {
|
|
t.Fatalf("Error generating user JWT: %v", err)
|
|
}
|
|
|
|
buildMemAccResolver(s)
|
|
addAccountToMemResolver(s, apub, ajwt)
|
|
|
|
c = natsConnect(t, fmt.Sprintf("nats://127.0.0.1:%d", opts.Port),
|
|
nats.UserJWT(
|
|
func() (string, error) { return jwt, nil },
|
|
func(nonce []byte) ([]byte, error) { return nkp.Sign(nonce) }))
|
|
defer c.Close()
|
|
// we should get the user's pubkey
|
|
checkAuthUser(upub)
|
|
}
|
|
|
|
// Helper function to check that a JS cluster is formed
|
|
func checkForJSClusterUp(t *testing.T, servers ...*Server) {
|
|
t.Helper()
|
|
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
|
|
for _, s := range servers {
|
|
if !s.JetStreamEnabled() {
|
|
return fmt.Errorf("jetstream not enabled")
|
|
}
|
|
if !s.JetStreamIsCurrent() {
|
|
return fmt.Errorf("jetstream not current")
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func TestMonitorJsz(t *testing.T) {
|
|
readJsInfo := func(url string) *JSInfo {
|
|
t.Helper()
|
|
body := readBody(t, url)
|
|
info := &JSInfo{}
|
|
err := json.Unmarshal(body, info)
|
|
require_NoError(t, err)
|
|
return info
|
|
}
|
|
srvs := []*Server{}
|
|
for _, test := range []struct {
|
|
port int
|
|
mport int
|
|
cport int
|
|
routed int
|
|
}{
|
|
{7500, 7501, 7502, 5502},
|
|
{5500, 5501, 5502, 7502},
|
|
} {
|
|
tmpDir := createDir(t, fmt.Sprintf("srv_%d", test.port))
|
|
defer removeDir(t, tmpDir)
|
|
cf := createConfFile(t, []byte(fmt.Sprintf(`
|
|
listen: 127.0.0.1:%d
|
|
http: 127.0.0.1:%d
|
|
system_account: SYS
|
|
accounts {
|
|
SYS {
|
|
users [{user: sys, password: pwd}]
|
|
}
|
|
ACC {
|
|
users [{user: usr, password: pwd}]
|
|
jetstream: enabled
|
|
}
|
|
BCC_TO_HAVE_ONE_EXTRA {
|
|
users [{user: usr2, password: pwd}]
|
|
jetstream: enabled
|
|
}
|
|
}
|
|
jetstream: {
|
|
max_mem_store: 10Mb
|
|
max_file_store: 10Mb
|
|
store_dir: %s
|
|
}
|
|
cluster {
|
|
name: cluster_name
|
|
listen: 127.0.0.1:%d
|
|
routes: [nats-route://127.0.0.1:%d]
|
|
}
|
|
server_name: server_%d `, test.port, test.mport, tmpDir, test.cport, test.routed, test.port)))
|
|
defer removeFile(t, cf)
|
|
|
|
s, _ := RunServerWithConfig(cf)
|
|
defer s.Shutdown()
|
|
srvs = append(srvs, s)
|
|
}
|
|
checkClusterFormed(t, srvs...)
|
|
checkForJSClusterUp(t, srvs...)
|
|
|
|
nc := natsConnect(t, "nats://usr:pwd@127.0.0.1:7500")
|
|
defer nc.Close()
|
|
js, err := nc.JetStream(nats.MaxWait(5 * time.Second))
|
|
require_NoError(t, err)
|
|
_, err = js.AddStream(&nats.StreamConfig{
|
|
Name: "my-stream-replicated",
|
|
Subjects: []string{"foo", "bar"},
|
|
Replicas: 2,
|
|
})
|
|
require_NoError(t, err)
|
|
_, err = js.AddStream(&nats.StreamConfig{
|
|
Name: "my-stream-non-replicated",
|
|
Subjects: []string{"baz"},
|
|
Replicas: 1,
|
|
})
|
|
require_NoError(t, err)
|
|
_, err = js.AddConsumer("my-stream-replicated", &nats.ConsumerConfig{
|
|
Durable: "my-consumer-replicated",
|
|
AckPolicy: nats.AckExplicitPolicy,
|
|
})
|
|
require_NoError(t, err)
|
|
_, err = js.AddConsumer("my-stream-non-replicated", &nats.ConsumerConfig{
|
|
Durable: "my-consumer-non-replicated",
|
|
AckPolicy: nats.AckExplicitPolicy,
|
|
})
|
|
require_NoError(t, err)
|
|
nc.Flush()
|
|
_, err = js.Publish("foo", nil)
|
|
require_NoError(t, err)
|
|
|
|
monUrl1 := fmt.Sprintf("http://127.0.0.1:%d/jsz", 7501)
|
|
monUrl2 := fmt.Sprintf("http://127.0.0.1:%d/jsz", 5501)
|
|
|
|
t.Run("default", func(t *testing.T) {
|
|
for _, url := range []string{monUrl1, monUrl2} {
|
|
info := readJsInfo(url)
|
|
if len(info.AccountDetails) != 0 {
|
|
t.Fatalf("expected no account to be returned by %s but got %v", url, info)
|
|
}
|
|
if info.Streams == 0 {
|
|
t.Fatalf("expected stream count to be 2 but got %d", info.Streams)
|
|
}
|
|
if info.Consumers == 0 {
|
|
t.Fatalf("expected consumer count to be 2 but got %d", info.Consumers)
|
|
}
|
|
if info.Messages != 1 {
|
|
t.Fatalf("expected one message but got %d", info.Messages)
|
|
}
|
|
}
|
|
})
|
|
t.Run("accounts", func(t *testing.T) {
|
|
for _, url := range []string{monUrl1, monUrl2} {
|
|
info := readJsInfo(url + "?accounts=true")
|
|
if len(info.AccountDetails) != 2 {
|
|
t.Fatalf("expected both accounts to be returned by %s but got %v", url, info)
|
|
}
|
|
}
|
|
})
|
|
t.Run("offset-too-big", func(t *testing.T) {
|
|
for _, url := range []string{monUrl1, monUrl2} {
|
|
info := readJsInfo(url + "?accounts=true&offset=10")
|
|
if len(info.AccountDetails) != 0 {
|
|
t.Fatalf("expected no accounts to be returned by %s but got %v", url, info)
|
|
}
|
|
}
|
|
})
|
|
t.Run("limit", func(t *testing.T) {
|
|
for _, url := range []string{monUrl1, monUrl2} {
|
|
info := readJsInfo(url + "?accounts=true&limit=1")
|
|
if len(info.AccountDetails) != 1 {
|
|
t.Fatalf("expected one account to be returned by %s but got %v", url, info)
|
|
}
|
|
if info := readJsInfo(url + "?accounts=true&offset=1&limit=1"); len(info.AccountDetails) != 1 {
|
|
t.Fatalf("expected one account to be returned by %s but got %v", url, info)
|
|
}
|
|
}
|
|
})
|
|
t.Run("offset-stable", func(t *testing.T) {
|
|
for _, url := range []string{monUrl1, monUrl2} {
|
|
info1 := readJsInfo(url + "?accounts=true&offset=1&limit=1")
|
|
if len(info1.AccountDetails) != 1 {
|
|
t.Fatalf("expected one account to be returned by %s but got %v", url, info1)
|
|
}
|
|
info2 := readJsInfo(url + "?accounts=true&offset=1&limit=1")
|
|
if len(info2.AccountDetails) != 1 {
|
|
t.Fatalf("expected one account to be returned by %s but got %v", url, info2)
|
|
}
|
|
if info1.AccountDetails[0].Name != info2.AccountDetails[0].Name {
|
|
t.Fatalf("absent changes, same offset should result in same account but gut: %v %v",
|
|
info1.AccountDetails[0].Name, info2.AccountDetails[0].Name)
|
|
}
|
|
}
|
|
})
|
|
t.Run("filter-account", func(t *testing.T) {
|
|
for _, url := range []string{monUrl1, monUrl2} {
|
|
info := readJsInfo(url + "?acc=ACC")
|
|
if len(info.AccountDetails) != 1 {
|
|
t.Fatalf("expected account ACC to be returned by %s but got %v", url, info)
|
|
}
|
|
if info.AccountDetails[0].Name != "ACC" {
|
|
t.Fatalf("expected account ACC to be returned by %s but got %v", url, info)
|
|
}
|
|
if len(info.AccountDetails[0].Streams) != 0 {
|
|
t.Fatalf("expected account ACC to be returned by %s but got %v", url, info)
|
|
}
|
|
}
|
|
})
|
|
t.Run("streams", func(t *testing.T) {
|
|
for _, url := range []string{monUrl1, monUrl2} {
|
|
info := readJsInfo(url + "?acc=ACC&streams=true")
|
|
if len(info.AccountDetails) != 1 {
|
|
t.Fatalf("expected account ACC to be returned by %s but got %v", url, info)
|
|
}
|
|
if len(info.AccountDetails[0].Streams) == 0 {
|
|
t.Fatalf("expected streams to be returned by %s but got %v", url, info)
|
|
}
|
|
if len(info.AccountDetails[0].Streams[0].Consumer) != 0 {
|
|
t.Fatalf("expected no consumers to be returned by %s but got %v", url, info)
|
|
}
|
|
}
|
|
})
|
|
t.Run("consumers", func(t *testing.T) {
|
|
for _, url := range []string{monUrl1, monUrl2} {
|
|
info := readJsInfo(url + "?acc=ACC&consumers=true")
|
|
if len(info.AccountDetails) != 1 {
|
|
t.Fatalf("expected account ACC to be returned by %s but got %v", url, info)
|
|
}
|
|
if len(info.AccountDetails[0].Streams[0].Consumer) == 0 {
|
|
t.Fatalf("expected consumers to be returned by %s but got %v", url, info)
|
|
}
|
|
if info.AccountDetails[0].Streams[0].Config != nil {
|
|
t.Fatal("Config expected to not be present")
|
|
}
|
|
if info.AccountDetails[0].Streams[0].Consumer[0].Config != nil {
|
|
t.Fatal("Config expected to not be present")
|
|
}
|
|
}
|
|
})
|
|
t.Run("config", func(t *testing.T) {
|
|
for _, url := range []string{monUrl1, monUrl2} {
|
|
info := readJsInfo(url + "?acc=ACC&consumers=true&config=true")
|
|
if len(info.AccountDetails) != 1 {
|
|
t.Fatalf("expected account ACC to be returned by %s but got %v", url, info)
|
|
}
|
|
if info.AccountDetails[0].Streams[0].Config == nil {
|
|
t.Fatal("Config expected to be present")
|
|
}
|
|
if info.AccountDetails[0].Streams[0].Consumer[0].Config == nil {
|
|
t.Fatal("Config expected to be present")
|
|
}
|
|
}
|
|
})
|
|
t.Run("account-non-existing", func(t *testing.T) {
|
|
for _, url := range []string{monUrl1, monUrl2} {
|
|
info := readJsInfo(url + "?acc=DOES_NOT_EXIT")
|
|
if len(info.AccountDetails) != 0 {
|
|
t.Fatalf("expected no account to be returned by %s but got %v", url, info)
|
|
}
|
|
}
|
|
})
|
|
}
|