Files
nats-server/server/events_test.go
Derek Collison 10eb491898 Better serializing of server info
Signed-off-by: Derek Collison <derek@nats.io>
2018-11-29 15:22:04 -08:00

301 lines
8.0 KiB
Go

// Copyright 2018 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"bytes"
"encoding/json"
"fmt"
"strings"
"testing"
"time"
"github.com/nats-io/go-nats"
"github.com/nats-io/jwt"
"github.com/nats-io/nkeys"
)
func createAccount(s *Server) (*Account, nkeys.KeyPair) {
okp, _ := nkeys.FromSeed(oSeed)
akp, _ := nkeys.CreateAccount()
pub, _ := akp.PublicKey()
nac := jwt.NewAccountClaims(pub)
jwt, _ := nac.Encode(okp)
addAccountToMemResolver(s, pub, jwt)
return s.LookupAccount(pub), akp
}
func TestSystemAccount(t *testing.T) {
s := opTrustBasicSetup()
defer s.Shutdown()
buildMemAccResolver(s)
acc, _ := createAccount(s)
s.setSystemAccount(acc)
s.mu.Lock()
defer s.mu.Unlock()
if s.sys == nil || s.sys.account == nil {
t.Fatalf("Expected sys.account to be non-nil")
}
if s.sys.client == nil {
t.Fatalf("Expected sys.client to be non-nil")
}
if s.sys.client.echo {
t.Fatalf("Internal clients should always have echo false")
}
}
func createUserCreds(t *testing.T, s *Server, akp nkeys.KeyPair) nats.Option {
t.Helper()
kp, _ := nkeys.CreateUser()
pub, _ := kp.PublicKey()
nuc := jwt.NewUserClaims(pub)
ujwt, err := nuc.Encode(akp)
if err != nil {
t.Fatalf("Error generating user JWT: %v", err)
}
userCB := func() (string, error) {
return ujwt, nil
}
sigCB := func(nonce []byte) ([]byte, error) {
sig, _ := kp.Sign(nonce)
return sig, nil
}
return nats.UserJWT(userCB, sigCB)
}
func runTrustedServer(t *testing.T) (*Server, *Options) {
t.Helper()
opts := DefaultOptions()
kp, _ := nkeys.FromSeed(oSeed)
pub, _ := kp.PublicKey()
opts.TrustedNkeys = []string{pub}
s := RunServer(opts)
buildMemAccResolver(s)
return s, opts
}
func TestSystemAccountNewConnection(t *testing.T) {
s, opts := runTrustedServer(t)
defer s.Shutdown()
acc, akp := createAccount(s)
s.setSystemAccount(acc)
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
ncs, err := nats.Connect(url, createUserCreds(t, s, akp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer ncs.Close()
sub, _ := ncs.SubscribeSync(">")
defer sub.Unsubscribe()
ncs.Flush()
// We can't hear ourselves, so we need to create a second client to
// trigger the connect/disconnect events.
acc2, akp2 := createAccount(s)
nc, err := nats.Connect(url, createUserCreds(t, s, akp2), nats.Name("TEST EVENTS"))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
if !strings.HasPrefix(msg.Subject, fmt.Sprintf("$SYS.%s.CLIENT.CONNECT", acc2.Name)) {
t.Fatalf("Expected subject to start with %q, got %q", "$SYS.<ACCOUNT>.CLIENT.CONNECT", msg.Subject)
}
tokens := strings.Split(msg.Subject, ".")
if len(tokens) < 4 {
t.Fatalf("Expected 4 tokens, got %d", len(tokens))
}
account := tokens[1]
if account != acc2.Name {
t.Fatalf("Expected %q for account, got %q", acc2.Name, account)
}
cem := ConnectEventMsg{}
if err := json.Unmarshal(msg.Data, &cem); err != nil {
t.Fatalf("Error unmarshalling connect event message: %v", err)
}
if cem.Server.ID != s.ID() {
t.Fatalf("Expected server to be %q, got %q", s.ID(), cem.Server)
}
if cem.Server.Seq == 0 {
t.Fatalf("Expected sequence to be non-zero")
}
if cem.Client.Name != "TEST EVENTS" {
t.Fatalf("Expected client name to be %q, got %q", "TEST EVENTS", cem.Client.Name)
}
if cem.Client.Lang != "go" {
t.Fatalf("Expected client lang to be \"go\", got %q", cem.Client.Lang)
}
// Now close the other client. Should fire a disconnect event.
// First send and receive some messages.
sub2, _ := nc.SubscribeSync("foo")
defer sub2.Unsubscribe()
sub3, _ := nc.SubscribeSync("*")
defer sub3.Unsubscribe()
for i := 0; i < 10; i++ {
nc.Publish("foo", []byte("HELLO WORLD"))
}
nc.Flush()
nc.Close()
msg, err = sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
if !strings.HasPrefix(msg.Subject, fmt.Sprintf("$SYS.%s.CLIENT.DISCONNECT", acc2.Name)) {
t.Fatalf("Expected subject to start with %q, got %q", "$SYS.<ACCOUNT>.CLIENT.DISCONNECT", msg.Subject)
}
tokens = strings.Split(msg.Subject, ".")
if len(tokens) < 4 {
t.Fatalf("Expected 4 tokens, got %d", len(tokens))
}
account = tokens[1]
if account != acc2.Name {
t.Fatalf("Expected %q for account, got %q", acc2.Name, account)
}
dem := DisconnectEventMsg{}
if err := json.Unmarshal(msg.Data, &dem); err != nil {
t.Fatalf("Error unmarshalling disconnect event message: %v", err)
}
if dem.Server.ID != s.ID() {
t.Fatalf("Expected server to be %q, got %q", s.ID(), dem.Server)
}
if dem.Server.Seq == 0 {
t.Fatalf("Expected sequence to be non-zero")
}
if dem.Server.Seq <= cem.Server.Seq {
t.Fatalf("Expected sequence to be increasing")
}
if cem.Client.Name != "TEST EVENTS" {
t.Fatalf("Expected client name to be %q, got %q", "TEST EVENTS", dem.Client.Name)
}
if dem.Client.Lang != "go" {
t.Fatalf("Expected client lang to be \"go\", got %q", dem.Client.Lang)
}
if dem.Sent.Msgs != 10 {
t.Fatalf("Expected 10 msgs sent, got %d", dem.Sent.Msgs)
}
if dem.Sent.Bytes != 110 {
t.Fatalf("Expected 110 bytes sent, got %d", dem.Sent.Bytes)
}
if dem.Received.Msgs != 20 {
t.Fatalf("Expected 20 msgs received, got %d", dem.Sent.Msgs)
}
if dem.Received.Bytes != 220 {
t.Fatalf("Expected 220 bytes sent, got %d", dem.Sent.Bytes)
}
}
func TestSystemInternalSubscriptions(t *testing.T) {
s, opts := runTrustedServer(t)
defer s.Shutdown()
sub, err := s.sysSubscribe("foo", nil)
if sub != nil || err != ErrNoSysAccount {
t.Fatalf("Expected to get proper error, got %v", err)
}
acc, akp := createAccount(s)
s.setSystemAccount(acc)
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
nc, err := nats.Connect(url, createUserCreds(t, s, akp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
sub, err = s.sysSubscribe("foo", nil)
if sub != nil || err == nil {
t.Fatalf("Expected to get error for no handler, got %v", err)
}
received := make(chan *nats.Msg)
// Create message callback handler.
cb := func(sub *subscription, subject, reply string, msg []byte) {
copy := append([]byte(nil), msg...)
received <- &nats.Msg{Subject: subject, Reply: reply, Data: copy}
}
// Now create an internal subscription
sub, err = s.sysSubscribe("foo", cb)
if sub == nil || err != nil {
t.Fatalf("Expected to subscribe, got %v", err)
}
// Now send out a message from our normal client.
nc.Publish("foo", []byte("HELLO WORLD"))
var msg *nats.Msg
select {
case msg = <-received:
if msg.Subject != "foo" {
t.Fatalf("Expected \"foo\" as subject, got %q", msg.Subject)
}
if msg.Reply != "" {
t.Fatalf("Expected no reply, got %q", msg.Reply)
}
if !bytes.Equal(msg.Data, []byte("HELLO WORLD")) {
t.Fatalf("Got the wrong msg payload: %q", msg.Data)
}
break
case <-time.After(time.Second):
t.Fatalf("Did not receive the message")
}
s.sysUnsubscribe(sub)
// Now send out a message from our normal client.
// We should not see this one.
nc.Publish("foo", []byte("You There?"))
select {
case <-received:
t.Fatalf("Received a message when we should not have")
case <-time.After(100 * time.Millisecond):
break
}
// Now make sure we do not hear ourselves. We optimize this for internally
// generated messages.
r := SublistResult{psubs: []*subscription{sub}}
s.sendInternalMsg(&r, "foo", nil, msg.Data)
select {
case <-received:
t.Fatalf("Received a message when we should not have")
case <-time.After(100 * time.Millisecond):
break
}
}