Allow servers to send and receive messages directly

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2018-11-29 11:48:26 -08:00
parent 8a662a5760
commit 574fd62e01
15 changed files with 729 additions and 92 deletions

View File

@@ -229,7 +229,7 @@ func (s *Server) configureAuthorization() {
// checkAuthentication will check based on client type and
// return boolean indicating if client is authorized.
func (s *Server) checkAuthentication(c *client) bool {
switch c.typ {
switch c.kind {
case CLIENT:
return s.isClientAuthorized(c)
case ROUTER:
@@ -353,6 +353,9 @@ func (s *Server) isClientAuthorized(c *client) bool {
nkey = buildInternalNkeyUser(juc, acc)
c.RegisterNkeyUser(nkey)
// Generate an event if we have a system account.
s.accountConnectEvent(c)
// Check if we need to set an auth timer if the user jwt expires.
c.checkExpiration(juc.Claims())
return true
@@ -360,17 +363,21 @@ func (s *Server) isClientAuthorized(c *client) bool {
if nkey != nil {
if c.opts.Sig == "" {
c.Debugf("Signature missing")
return false
}
sig, err := base64.StdEncoding.DecodeString(c.opts.Sig)
if err != nil {
c.Debugf("Signature not valid base64")
return false
}
pub, err := nkeys.FromPublicKey(c.opts.Nkey)
if err != nil {
c.Debugf("User nkey not valid: %v", err)
return false
}
if err := pub.Verify(c.nonce, sig); err != nil {
c.Debugf("Signature not verified")
return false
}
c.RegisterNkeyUser(nkey)

View File

@@ -38,6 +38,8 @@ const (
ROUTER
// GATEWAY is a link between 2 clusters.
GATEWAY
// SYSTEM is an internal system client.
SYSTEM
)
const (
@@ -145,7 +147,7 @@ type client struct {
mpay int32
msubs int
mu sync.Mutex
typ int
kind int
cid uint64
opts clientOpts
start time.Time
@@ -155,6 +157,9 @@ type client struct {
out outbound
srv *Server
acc *Account
user *NkeyUser
host string
port int
subs map[string]*subscription
perms *permissions
mperms *msgDeny
@@ -365,19 +370,23 @@ func (c *client) initClient() {
c.pcd = make(map[*client]struct{})
// snapshot the string version of the connection
conn := "-"
var conn string
if ip, ok := c.nc.(*net.TCPConn); ok {
addr := ip.RemoteAddr().(*net.TCPAddr)
c.host = addr.IP.String()
c.port = addr.Port
conn = fmt.Sprintf("%s:%d", addr.IP, addr.Port)
}
switch c.typ {
switch c.kind {
case CLIENT:
c.ncs = fmt.Sprintf("%s - cid:%d", conn, c.cid)
case ROUTER:
c.ncs = fmt.Sprintf("%s - rid:%d", conn, c.cid)
case GATEWAY:
c.ncs = fmt.Sprintf("%s - gid:%d", conn, c.cid)
case SYSTEM:
c.ncs = "SYSTEM"
}
}
@@ -499,6 +508,7 @@ func (c *client) RegisterNkeyUser(user *NkeyUser) {
c.mu.Lock()
defer c.mu.Unlock()
c.user = user
// Assign permissions.
if user.Permissions == nil {
@@ -673,7 +683,7 @@ func (c *client) readLoop() {
// Client will be checked on several fronts to see
// if applicable. Routes will never wait in place.
budget := 500 * time.Microsecond
if c.typ == ROUTER {
if c.kind == ROUTER {
budget = 0
}
@@ -862,8 +872,8 @@ func (c *client) traceMsg(msg []byte) {
if !c.trace {
return
}
// FIXME(dlc), allow limits to printable payload
c.Tracef("<<- MSG_PAYLOAD: [%s]", string(msg[:len(msg)-LEN_CR_LF]))
// FIXME(dlc), allow limits to printable payload.
c.Tracef("<<- MSG_PAYLOAD: [%q]", msg[:len(msg)-LEN_CR_LF])
}
func (c *client) traceInOp(op string, arg []byte) {
@@ -895,7 +905,7 @@ func (c *client) processInfo(arg []byte) error {
if err := json.Unmarshal(arg, &info); err != nil {
return err
}
switch c.typ {
switch c.kind {
case ROUTER:
c.processRouteInfo(&info)
case GATEWAY:
@@ -905,7 +915,7 @@ func (c *client) processInfo(arg []byte) error {
}
func (c *client) processErr(errStr string) {
switch c.typ {
switch c.kind {
case CLIENT:
c.Errorf("Client Error %s", errStr)
case ROUTER:
@@ -968,8 +978,10 @@ func (c *client) processConnect(arg []byte) error {
return nil
}
c.last = time.Now()
typ := c.typ
kind := c.kind
srv := c.srv
// Moved unmarshalling of clients' Options under the lock.
// The client has already been added to the server map, so it is possible
// that other routines lookup the client, and access its options under
@@ -997,7 +1009,7 @@ func (c *client) processConnect(arg []byte) error {
// least ClientProtoInfo, we need to increment the following counter.
// This is decremented when client is removed from the server's
// clients map.
if typ == CLIENT && proto >= ClientProtoInfo {
if kind == CLIENT && proto >= ClientProtoInfo {
srv.mu.Lock()
srv.cproto++
srv.mu.Unlock()
@@ -1045,7 +1057,7 @@ func (c *client) processConnect(arg []byte) error {
}
switch typ {
switch kind {
case CLIENT:
// Check client protocol request if it exists.
if proto < ClientProtoZero || proto > ClientProtoInfo {
@@ -1281,7 +1293,7 @@ func (c *client) processPing() {
c.sendPong()
// If not a CLIENT, we are done
if c.typ != CLIENT {
if c.kind != CLIENT {
c.mu.Unlock()
return
}
@@ -1333,7 +1345,7 @@ func (c *client) processPong() {
c.ping.out = 0
c.rtt = time.Since(c.rttStart)
srv := c.srv
reorderGWs := c.typ == GATEWAY && c.gw.outbound
reorderGWs := c.kind == GATEWAY && c.gw.outbound
c.mu.Unlock()
if reorderGWs {
srv.gateway.orderOutboundConnections()
@@ -1445,21 +1457,22 @@ func (c *client) processSub(argo []byte) (err error) {
}
c.mu.Lock()
if c.nc == nil {
// Grab connection type.
kind := c.kind
if c.nc == nil && kind != SYSTEM {
c.mu.Unlock()
return nil
}
// Grab connection type.
ctype := c.typ
// Check permissions if applicable.
if ctype == ROUTER {
if kind == ROUTER {
if !c.canExport(string(sub.subject)) {
c.mu.Unlock()
return nil
}
} else if !c.canSubscribe(string(sub.subject)) {
} else if kind == CLIENT && !c.canSubscribe(string(sub.subject)) {
c.mu.Unlock()
c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
c.Errorf("Subscription Violation - User %q, Subject %q, SID %s",
@@ -1492,7 +1505,7 @@ func (c *client) processSub(argo []byte) (err error) {
if err != nil {
c.sendErr("Invalid Subject")
return nil
} else if c.opts.Verbose {
} else if c.opts.Verbose && kind != SYSTEM {
c.sendOK()
}
@@ -1501,7 +1514,7 @@ func (c *client) processSub(argo []byte) (err error) {
c.Errorf(err.Error())
}
// If we are routing and this is a local sub, add to the route map for the associated account.
if ctype == CLIENT {
if kind == CLIENT {
c.srv.updateRouteSubscriptionMap(acc, sub, 1)
}
}
@@ -1679,7 +1692,7 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force bool) {
c.traceOp("<-> %s", "DELSUB", sub.sid)
delete(c.subs, string(sub.sid))
if c.typ != CLIENT {
if c.kind != CLIENT && c.kind != SYSTEM {
c.removeReplySubTimeout(sub)
}
@@ -1691,7 +1704,7 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force bool) {
for _, nsub := range sub.shadow {
if err := nsub.im.acc.sl.Remove(nsub); err != nil {
c.Debugf("Could not remove shadow import subscription for account %q", nsub.im.acc.Name)
} else if c.typ == CLIENT && c.srv != nil {
} else if c.kind == CLIENT && c.srv != nil {
c.srv.updateRouteSubscriptionMap(nsub.im.acc, nsub, -1)
}
}
@@ -1724,7 +1737,7 @@ func (c *client) processUnsub(arg []byte) error {
c.mu.Lock()
// Grab connection type.
ctype := c.typ
kind := c.kind
var acc *Account
if sub, ok = c.subs[string(sid)]; ok {
@@ -1745,7 +1758,7 @@ func (c *client) processUnsub(arg []byte) error {
if unsub {
c.unsubscribe(acc, sub, false)
if acc != nil && ctype == CLIENT {
if acc != nil && kind == CLIENT {
c.srv.updateRouteSubscriptionMap(acc, sub, -1)
}
}
@@ -1814,13 +1827,13 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
sub.nm++
// Check if we should auto-unsubscribe.
if sub.max > 0 {
if client.typ == ROUTER && sub.nm >= sub.max {
if client.kind == ROUTER && sub.nm >= sub.max {
// The only router based messages that we will see here are remoteReplies.
// We handle these slightly differently.
defer client.removeReplySub(sub)
} else {
// For routing..
shouldForward := client.typ == CLIENT && client.srv != nil
shouldForward := client.kind == CLIENT && client.srv != nil
// If we are at the exact number, unsubscribe but
// still process the message in hand, otherwise
// unsubscribe and drop message on the floor.
@@ -1844,12 +1857,6 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
}
}
// Check for closed connection
if client.nc == nil {
client.mu.Unlock()
return false
}
// Update statistics
// The msg includes the CR_LF, so pull back out for accounting.
@@ -1863,6 +1870,20 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
atomic.AddInt64(&srv.outMsgs, 1)
atomic.AddInt64(&srv.outBytes, msgSize)
// Check for internal subscription.
if client.kind == SYSTEM {
s := client.srv
client.mu.Unlock()
s.deliverInternalMsg(sub, c.pa.subject, c.pa.reply, msg[:msgSize])
return true
}
// Check for closed connection
if client.nc == nil {
client.mu.Unlock()
return false
}
// Queue to outbound buffer
client.queueOutbound(mh)
client.queueOutbound(msg)
@@ -1981,7 +2002,7 @@ func isServiceReply(reply []byte) bool {
// This will decide to call the client code or router code.
func (c *client) processInboundMsg(msg []byte) {
switch c.typ {
switch c.kind {
case CLIENT:
c.processInboundClientMsg(msg)
case ROUTER:
@@ -2162,13 +2183,13 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
for _, sub := range r.psubs {
// Check if this is a send to a ROUTER. We now process
// these after everything else.
if sub.client.typ == ROUTER {
if c.typ == ROUTER {
if sub.client.kind == ROUTER {
if c.kind == ROUTER {
continue
}
c.addSubToRouteTargets(sub)
continue
} else if sub.client.typ == GATEWAY {
} else if sub.client.kind == GATEWAY {
// Never send to gateway from here.
continue
}
@@ -2177,7 +2198,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
// Redo the subject here on the fly.
msgh = c.msgb[1:msgHeadProtoLen]
msgh = append(msgh, sub.im.prefix...)
msgh = append(msgh, c.pa.subject...)
//msgh = append(msgh, c.pa.subject...)
msgh = append(msgh, subject...)
msgh = append(msgh, ' ')
si = len(msgh)
}
@@ -2187,7 +2209,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
}
// If we are sourced from a route we need to have direct filtered queues.
if c.typ == ROUTER && c.pa.queues == nil {
if c.kind == ROUTER && c.pa.queues == nil {
return
}
@@ -2198,7 +2220,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
// For gateway connections, we still want to send messages to routes
// even if there is no queue filters.
if c.typ == GATEWAY && qf == nil {
if c.kind == GATEWAY && qf == nil {
goto sendToRoutes
}
@@ -2241,8 +2263,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
continue
}
// Potentially sending to a remote sub across a route.
if sub.client.typ == ROUTER {
if c.typ == ROUTER {
if sub.client.kind == ROUTER {
if c.kind == ROUTER {
// We just came from a route, so skip and prefer local subs.
// Keep our first rsub in case all else fails.
if rsub == nil {
@@ -2262,7 +2284,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
// Redo the subject here on the fly.
msgh = c.msgb[1:msgHeadProtoLen]
msgh = append(msgh, sub.im.prefix...)
msgh = append(msgh, c.pa.subject...)
msgh = append(msgh, subject...)
msgh = append(msgh, ' ')
si = len(msgh)
}
@@ -2295,8 +2317,8 @@ sendToRoutes:
return
}
// We address by index to avoid struct copy. We have inline structs for memory
// layout and cache coherency.
// We address by index to avoid struct copy.
// We have inline structs for memory layout and cache coherency.
for i := range c.in.rts {
rt := &c.in.rts[i]
@@ -2444,13 +2466,13 @@ func (c *client) clearConnection(reason ClosedState) {
nc.SetWriteDeadline(time.Time{})
// Save off the connection if its a client.
if c.typ == CLIENT && c.srv != nil {
if c.kind == CLIENT && c.srv != nil {
go c.srv.saveClosedClient(c, nc, reason)
}
}
func (c *client) typeString() string {
switch c.typ {
switch c.kind {
case CLIENT:
return "Client"
case ROUTER:
@@ -2546,7 +2568,7 @@ func (c *client) closeConnection(reason ClosedState) {
// Be consistent with the creation: for routes and gateways,
// we use Noticef on create, so use that too for delete.
if c.typ == ROUTER || c.typ == GATEWAY {
if c.kind == ROUTER || c.kind == GATEWAY {
c.Noticef("%s connection closed", c.typeString())
} else {
c.Debugf("%s connection closed", c.typeString())
@@ -2563,7 +2585,7 @@ func (c *client) closeConnection(reason ClosedState) {
gwName string
gwIsOutbound bool
gwCfg *gatewayCfg
ctype = c.typ
kind = c.kind
srv = c.srv
noReconnect = c.flags.isSet(noReconnect)
acc = c.acc
@@ -2573,7 +2595,7 @@ func (c *client) closeConnection(reason ClosedState) {
// FIXME(dlc) - we can just stub in a new one for client
// and reference existing one.
var subs []*subscription
if ctype == CLIENT {
if kind == CLIENT {
subs = make([]*subscription, 0, len(c.subs))
for _, sub := range c.subs {
// Auto-unsubscribe subscriptions must be unsubscribed forcibly.
@@ -2588,7 +2610,7 @@ func (c *client) closeConnection(reason ClosedState) {
}
connectURLs = c.route.connectURLs
}
if ctype == GATEWAY {
if kind == GATEWAY {
gwName = c.gw.name
gwIsOutbound = c.gw.outbound
gwCfg = c.gw.cfg
@@ -2597,7 +2619,7 @@ func (c *client) closeConnection(reason ClosedState) {
c.mu.Unlock()
// Remove clients subscriptions.
if ctype == CLIENT {
if kind == CLIENT {
acc.sl.RemoveBatch(subs)
} else {
go c.removeRemoteSubs()
@@ -2617,7 +2639,7 @@ func (c *client) closeConnection(reason ClosedState) {
srv.removeClient(c)
// Update remote subscriptions.
if acc != nil && ctype == CLIENT {
if acc != nil && kind == CLIENT {
qsubs := map[string]*qsub{}
for _, sub := range subs {
if sub.queue == nil {
@@ -2683,7 +2705,7 @@ func (c *client) closeConnection(reason ClosedState) {
// server shutdown.
srv.startGoRoutine(func() { srv.reConnectToRoute(rurl, rtype) })
}
} else if srv != nil && ctype == GATEWAY && gwIsOutbound {
} else if srv != nil && kind == GATEWAY && gwIsOutbound {
if gwCfg != nil {
srv.Debugf("Attempting reconnect for gateway %q", gwName)
// Run this as a go routine since we may be called within

View File

@@ -92,6 +92,10 @@ var (
// request from a remote Gateway with a destination name that does not match the server's
// Gateway's name.
ErrWrongGateway = errors.New("Wrong Gateway")
// ErrNoSysAccount is returned when an attempt to publish or subscribe is made
// when there is no internal system account defined.
ErrNoSysAccount = errors.New("System Account Not Setup")
)
// configErr is a configuration error.

271
server/events.go Normal file
View File

@@ -0,0 +1,271 @@
// Copyright 2018 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"encoding/json"
"fmt"
"strconv"
"time"
)
const (
connectEventSubj = "$SYS.%s.CLIENT.CONNECT"
disconnectEventSubj = "$SYS.%s.CLIENT.DISCONNECT"
)
// ConnectEventMsg is sent when a new connection is made that is part of an account.
type ConnectEventMsg struct {
Server ServerInfo `json:"server"`
Client ClientInfo `json:"client"`
}
// DisconnectEventMsg is sent when a new connection previously defined from a
// ConnectEventMsg is closed.
type DisconnectEventMsg struct {
Server ServerInfo `json:"server"`
Client ClientInfo `json:"client"`
Sent DataStats `json:"sent"`
Received DataStats `json:"received"`
Reason string `json:"reason"`
}
type ServerInfo struct {
Host string `json:"host"`
ID string `json:"id"`
Seq uint64 `json:"seq"`
}
// ClientInfo is detailed information about the client forming a connection.
type ClientInfo struct {
Start time.Time `json:"start,omitempty"`
Host string `json:"host,omitempty"`
ID uint64 `json:"id"`
Account string `json:"acc"`
User string `json:"user,omitempty"`
Name string `json:"name,omitempty"`
Lang string `json:"lang,omitempty"`
Version string `json:"ver,omitempty"`
Stop *time.Time `json:"stop,omitempty"`
}
// DataStats reports how may msg and bytes. Applicable for both sent and received.
type DataStats struct {
Msgs int64 `json:"msgs"`
Bytes int64 `json:"bytes"`
}
// This will send a message.
// TODO(dlc) - Note we want the sequence numbers to be serialized but right now they may not be.
func (s *Server) sendInternalMsg(r *SublistResult, subj string, msg []byte) {
if s.sys == nil {
return
}
c := s.sys.client
acc := s.sys.account
// Prep internl structures needed to send message.
c.pa.subject = []byte(subj)
c.pa.size = len(msg)
c.pa.szb = []byte(strconv.FormatInt(int64(len(msg)), 10))
// Add in NL
msg = append(msg, _CRLF_...)
// Check to see if we need to map/route to another account.
if acc.imports.services != nil {
c.checkForImportServices(acc, msg)
}
c.processMsgResults(acc, r, msg, []byte(subj), nil, nil)
c.flushClients()
}
// accountConnectEvent will send an account client connect event if there is interest.
func (s *Server) accountConnectEvent(c *client) {
if s.sys == nil || s.sys.client == nil || s.sys.account == nil {
return
}
acc := s.sys.account
subj := fmt.Sprintf(connectEventSubj, c.acc.Name)
r := acc.sl.Match(subj)
if s.noOutSideInterest(r) {
return
}
c.mu.Lock()
m := ConnectEventMsg{
Client: ClientInfo{
Start: c.start,
Host: c.host,
ID: c.cid,
Account: c.acc.Name,
User: nameForClient(c),
Name: c.opts.Name,
Lang: c.opts.Lang,
Version: c.opts.Version,
},
}
c.mu.Unlock()
s.stampServerInfo(&m.Server)
msg, _ := json.MarshalIndent(m, "", " ")
s.sendInternalMsg(r, subj, msg)
}
// accountDisconnectEvent will send an account client disconnect event if there is interest.
func (s *Server) accountDisconnectEvent(c *client, now time.Time, reason string) {
if s.sys == nil || s.sys.client == nil || s.sys.account == nil {
return
}
acc := s.sys.account
subj := fmt.Sprintf(disconnectEventSubj, c.acc.Name)
r := acc.sl.Match(subj)
if s.noOutSideInterest(r) {
return
}
c.mu.Lock()
m := DisconnectEventMsg{
Client: ClientInfo{
Start: c.start,
Stop: &now,
Host: c.host,
ID: c.cid,
Account: c.acc.Name,
User: nameForClient(c),
Name: c.opts.Name,
Lang: c.opts.Lang,
Version: c.opts.Version,
},
Sent: DataStats{
Msgs: c.inMsgs,
Bytes: c.inBytes,
},
Received: DataStats{
Msgs: c.outMsgs,
Bytes: c.outBytes,
},
Reason: reason,
}
c.mu.Unlock()
s.stampServerInfo(&m.Server)
msg, _ := json.MarshalIndent(m, "", " ")
s.sendInternalMsg(r, subj, msg)
}
// Internal message callback. If the msg is needed past the callback it is
// required to be copied.
type msgHandler func(sub *subscription, subject, reply string, msg []byte)
func (s *Server) deliverInternalMsg(sub *subscription, subject, reply, msg []byte) {
if s.sys == nil {
return
}
s.mu.Lock()
cb := s.sys.subs[string(sub.sid)]
s.mu.Unlock()
if cb != nil {
cb(sub, string(subject), string(reply), msg)
}
}
// Create an internal subscription. No support for queue groups atm.
func (s *Server) sysSubscribe(subject string, cb msgHandler) (*subscription, error) {
if s.sys == nil {
return nil, ErrNoSysAccount
}
if cb == nil {
return nil, fmt.Errorf("Undefined message handler")
}
s.mu.Lock()
sid := strconv.FormatInt(int64(s.sys.sid), 10)
s.sys.subs[sid] = cb
s.sys.sid++
c := s.sys.client
s.mu.Unlock()
// Now create the subscription
if err := c.processSub([]byte(subject + " " + sid)); err != nil {
return nil, err
}
c.mu.Lock()
sub := c.subs[sid]
c.mu.Unlock()
return sub, nil
}
func (s *Server) sysUnsubscribe(sub *subscription) {
if sub == nil || s.sys == nil {
return
}
s.mu.Lock()
acc := s.sys.account
c := s.sys.client
delete(s.sys.subs, string(sub.sid))
s.mu.Unlock()
c.unsubscribe(acc, sub, true)
}
func (s *Server) noOutSideInterest(r *SublistResult) bool {
sc := s.sys.client
if sc == nil || r == nil {
return true
}
nsubs := len(r.psubs) + len(r.qsubs)
if nsubs == 0 {
return true
}
// We will always be no-echo but will determine that on delivery.
// Here we try to avoid generating the payload if there is only us.
// We only check normal subs. If we introduce queue subs into the
// internal subscribers we should add in the check.
for _, sub := range r.psubs {
if sub.client != sc {
return false
}
}
return false
}
func (s *Server) stampServerInfo(si *ServerInfo) {
s.mu.Lock()
si.ID = s.info.ID
si.Seq = s.sys.seq
si.Host = s.info.Host
s.sys.seq++
s.mu.Unlock()
}
func (c *client) flushClients() {
last := time.Now()
for cp := range c.pcd {
// Queue up a flush for those in the set
cp.mu.Lock()
// Update last activity for message delivery
cp.last = last
cp.out.fsp--
cp.flushSignal()
cp.mu.Unlock()
delete(c.pcd, cp)
}
}
func nameForClient(c *client) string {
if c.user != nil {
return c.user.Nkey
}
return "N/A"
}

299
server/events_test.go Normal file
View File

@@ -0,0 +1,299 @@
// Copyright 2018 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"bytes"
"encoding/json"
"fmt"
"strings"
"testing"
"time"
"github.com/nats-io/go-nats"
"github.com/nats-io/jwt"
"github.com/nats-io/nkeys"
)
func createAccount(s *Server) (*Account, nkeys.KeyPair) {
okp, _ := nkeys.FromSeed(oSeed)
akp, _ := nkeys.CreateAccount()
pub, _ := akp.PublicKey()
nac := jwt.NewAccountClaims(pub)
jwt, _ := nac.Encode(okp)
addAccountToMemResolver(s, pub, jwt)
return s.LookupAccount(pub), akp
}
func TestSystemAccount(t *testing.T) {
s := opTrustBasicSetup()
defer s.Shutdown()
buildMemAccResolver(s)
acc, _ := createAccount(s)
s.setSystemAccount(acc)
s.mu.Lock()
defer s.mu.Unlock()
if s.sys == nil || s.sys.account == nil {
t.Fatalf("Expected sys.account to be non-nil")
}
if s.sys.client == nil {
t.Fatalf("Expected sys.client to be non-nil")
}
if s.sys.client.echo {
t.Fatalf("Internal clients should always have echo false")
}
}
func createUserCreds(t *testing.T, s *Server, akp nkeys.KeyPair) nats.Option {
t.Helper()
kp, _ := nkeys.CreateUser()
pub, _ := kp.PublicKey()
nuc := jwt.NewUserClaims(pub)
ujwt, err := nuc.Encode(akp)
if err != nil {
t.Fatalf("Error generating user JWT: %v", err)
}
userCB := func() (string, error) {
return ujwt, nil
}
sigCB := func(nonce []byte) ([]byte, error) {
sig, _ := kp.Sign(nonce)
return sig, nil
}
return nats.UserJWT(userCB, sigCB)
}
func runTrustedServer(t *testing.T) (*Server, *Options) {
t.Helper()
opts := DefaultOptions()
kp, _ := nkeys.FromSeed(oSeed)
pub, _ := kp.PublicKey()
opts.TrustedNkeys = []string{pub}
s := RunServer(opts)
buildMemAccResolver(s)
return s, opts
}
func TestSystemAccountNewConnection(t *testing.T) {
s, opts := runTrustedServer(t)
defer s.Shutdown()
acc, akp := createAccount(s)
s.setSystemAccount(acc)
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
ncs, err := nats.Connect(url, createUserCreds(t, s, akp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer ncs.Close()
sub, _ := ncs.SubscribeSync(">")
defer sub.Unsubscribe()
// We can't hear ourselves, so we need to create a second client to
// trigger the connect/disconnect events.
acc2, akp2 := createAccount(s)
nc, err := nats.Connect(url, createUserCreds(t, s, akp2), nats.Name("TEST EVENTS"))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
if !strings.HasPrefix(msg.Subject, fmt.Sprintf("$SYS.%s.CLIENT.CONNECT", acc2.Name)) {
t.Fatalf("Expected subject to start with %q, got %q", "$SYS.<ACCOUNT>.CLIENT.CONNECT", msg.Subject)
}
tokens := strings.Split(msg.Subject, ".")
if len(tokens) < 4 {
t.Fatalf("Expected 4 tokens, got %d", len(tokens))
}
account := tokens[1]
if account != acc2.Name {
t.Fatalf("Expected %q for account, got %q", acc2.Name, account)
}
cem := ConnectEventMsg{}
if err := json.Unmarshal(msg.Data, &cem); err != nil {
t.Fatalf("Error unmarshalling connect event message: %v", err)
}
if cem.Server.ID != s.ID() {
t.Fatalf("Expected server to be %q, got %q", s.ID(), cem.Server)
}
if cem.Server.Seq == 0 {
t.Fatalf("Expected sequence to be non-zero")
}
if cem.Client.Name != "TEST EVENTS" {
t.Fatalf("Expected client name to be %q, got %q", "TEST EVENTS", cem.Client.Name)
}
if cem.Client.Lang != "go" {
t.Fatalf("Expected client lang to be \"go\", got %q", cem.Client.Lang)
}
// Now close the other client. Should fire a disconnect event.
// First send and receive some messages.
sub2, _ := nc.SubscribeSync("foo")
defer sub2.Unsubscribe()
sub3, _ := nc.SubscribeSync("*")
defer sub3.Unsubscribe()
for i := 0; i < 10; i++ {
nc.Publish("foo", []byte("HELLO WORLD"))
}
nc.Flush()
nc.Close()
msg, err = sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
if !strings.HasPrefix(msg.Subject, fmt.Sprintf("$SYS.%s.CLIENT.DISCONNECT", acc2.Name)) {
t.Fatalf("Expected subject to start with %q, got %q", "$SYS.<ACCOUNT>.CLIENT.DISCONNECT", msg.Subject)
}
tokens = strings.Split(msg.Subject, ".")
if len(tokens) < 4 {
t.Fatalf("Expected 4 tokens, got %d", len(tokens))
}
account = tokens[1]
if account != acc2.Name {
t.Fatalf("Expected %q for account, got %q", acc2.Name, account)
}
dem := DisconnectEventMsg{}
if err := json.Unmarshal(msg.Data, &dem); err != nil {
t.Fatalf("Error unmarshalling disconnect event message: %v", err)
}
if dem.Server.ID != s.ID() {
t.Fatalf("Expected server to be %q, got %q", s.ID(), dem.Server)
}
if dem.Server.Seq == 0 {
t.Fatalf("Expected sequence to be non-zero")
}
if dem.Server.Seq <= cem.Server.Seq {
t.Fatalf("Expected sequence to be increasing")
}
if cem.Client.Name != "TEST EVENTS" {
t.Fatalf("Expected client name to be %q, got %q", "TEST EVENTS", dem.Client.Name)
}
if dem.Client.Lang != "go" {
t.Fatalf("Expected client lang to be \"go\", got %q", dem.Client.Lang)
}
if dem.Sent.Msgs != 10 {
t.Fatalf("Expected 10 msgs sent, got %d", dem.Sent.Msgs)
}
if dem.Sent.Bytes != 110 {
t.Fatalf("Expected 110 bytes sent, got %d", dem.Sent.Bytes)
}
if dem.Received.Msgs != 20 {
t.Fatalf("Expected 20 msgs received, got %d", dem.Sent.Msgs)
}
if dem.Received.Bytes != 220 {
t.Fatalf("Expected 220 bytes sent, got %d", dem.Sent.Bytes)
}
}
func TestSystemInternalSubscriptions(t *testing.T) {
s, opts := runTrustedServer(t)
defer s.Shutdown()
sub, err := s.sysSubscribe("foo", nil)
if sub != nil || err != ErrNoSysAccount {
t.Fatalf("Expected to get proper error, got %v", err)
}
acc, akp := createAccount(s)
s.setSystemAccount(acc)
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
nc, err := nats.Connect(url, createUserCreds(t, s, akp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
sub, err = s.sysSubscribe("foo", nil)
if sub != nil || err == nil {
t.Fatalf("Expected to get error for no handler, got %v", err)
}
received := make(chan *nats.Msg)
// Create message callback handler.
cb := func(sub *subscription, subject, reply string, msg []byte) {
copy := append([]byte(nil), msg...)
received <- &nats.Msg{Subject: subject, Reply: reply, Data: copy}
}
// Now create an internal subscription
sub, err = s.sysSubscribe("foo", cb)
if sub == nil || err != nil {
t.Fatalf("Expected to subscribe, got %v", err)
}
// Now send out a message from our normal client.
nc.Publish("foo", []byte("HELLO WORLD"))
var msg *nats.Msg
select {
case msg = <-received:
if msg.Subject != "foo" {
t.Fatalf("Expected \"foo\" as subject, got %q", msg.Subject)
}
if msg.Reply != "" {
t.Fatalf("Expected no reply, got %q", msg.Reply)
}
if !bytes.Equal(msg.Data, []byte("HELLO WORLD")) {
t.Fatalf("Got the wrong msg payload: %q", msg.Data)
}
break
case <-time.After(time.Second):
t.Fatalf("Did not receive the message")
}
s.sysUnsubscribe(sub)
// Now send out a message from our normal client.
// We should not see this one.
nc.Publish("foo", []byte("You There?"))
select {
case <-received:
t.Fatalf("Received a message when we should not have")
case <-time.After(100 * time.Millisecond):
break
}
// Now make sure we do not hear ourselves. We optimize this for internally
// generated messages.
r := SublistResult{psubs: []*subscription{sub}}
s.sendInternalMsg(&r, "foo", msg.Data)
select {
case <-received:
t.Fatalf("Received a message when we should not have")
case <-time.After(100 * time.Millisecond):
break
}
}

View File

@@ -462,7 +462,7 @@ func (s *Server) createGateway(cfg *gatewayCfg, url *url.URL, conn net.Conn) {
// Snapshot server options.
opts := s.getOpts()
c := &client{srv: s, nc: conn, typ: GATEWAY}
c := &client{srv: s, nc: conn, kind: GATEWAY}
// Are we creating the gateway based on the configuration
solicit := cfg != nil

View File

@@ -383,11 +383,9 @@ func (ci *ConnInfo) fill(client *client, nc net.Conn, now time.Time) {
ci.TLSCipher = tlsCipher(cs.CipherSuite)
}
switch conn := nc.(type) {
case *net.TCPConn, *tls.Conn:
addr := conn.RemoteAddr().(*net.TCPAddr)
ci.Port = addr.Port
ci.IP = addr.IP.String()
if client.port != 0 {
ci.Port = client.port
ci.IP = client.host
}
}
@@ -1002,7 +1000,7 @@ func ResponseHandler(w http.ResponseWriter, r *http.Request, data []byte) {
func (reason ClosedState) String() string {
switch reason {
case ClientClosed:
return "Client"
return "Client Closed"
case AuthenticationTimeout:
return "Authentication Timeout"
case AuthenticationViolation:

View File

@@ -136,13 +136,13 @@ func (c *client) parse(buf []byte) error {
case 'U', 'u':
c.state = OP_U
case 'R', 'r':
if c.typ == CLIENT {
if c.kind == CLIENT {
goto parseErr
} else {
c.state = OP_R
}
case 'A', 'a':
if c.typ == CLIENT {
if c.kind == CLIENT {
goto parseErr
} else {
c.state = OP_A
@@ -388,7 +388,8 @@ func (c *client) parse(buf []byte) error {
arg = buf[c.as : i-c.drop]
}
var err error
switch c.typ {
switch c.kind {
case CLIENT:
err = c.processSub(arg)
case ROUTER:
@@ -479,7 +480,8 @@ func (c *client) parse(buf []byte) error {
arg = buf[c.as : i-c.drop]
}
var err error
switch c.typ {
switch c.kind {
case CLIENT:
err = c.processUnsub(arg)
case ROUTER:

View File

@@ -23,7 +23,7 @@ func dummyClient() *client {
}
func dummyRouteClient() *client {
return &client{srv: New(&defaultServerOptions), typ: ROUTER}
return &client{srv: New(&defaultServerOptions), kind: ROUTER}
}
func TestParsePing(t *testing.T) {
@@ -456,7 +456,7 @@ func TestShouldFail(t *testing.T) {
wrongProtos = []string{"Mx", "MSx", "MSGx", "MSG \r\n"}
for _, proto := range wrongProtos {
c := dummyClient()
c.typ = ROUTER
c.kind = ROUTER
if err := c.parse([]byte(proto)); err == nil {
t.Fatalf("Should have received a parse error for: %v", proto)
}

View File

@@ -141,7 +141,7 @@ func (c *client) removeReplySubTimeout(sub *subscription) {
func (c *client) processAccountSub(arg []byte) error {
c.traceInOp("A+", arg)
accName := string(arg)
if c.typ == GATEWAY {
if c.kind == GATEWAY {
return c.processGatewayAccountSub(accName)
}
return nil
@@ -150,7 +150,7 @@ func (c *client) processAccountSub(arg []byte) error {
func (c *client) processAccountUnsub(arg []byte) {
c.traceInOp("A-", arg)
accName := string(arg)
if c.typ == GATEWAY {
if c.kind == GATEWAY {
c.processGatewayAccountUnsub(accName)
}
}
@@ -1025,7 +1025,7 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
}
}
c := &client{srv: s, nc: conn, opts: clientOpts{}, typ: ROUTER, route: r}
c := &client{srv: s, nc: conn, opts: clientOpts{}, kind: ROUTER, route: r}
// Grab server variables
s.mu.Lock()
@@ -1232,7 +1232,7 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del
}
// We only store state on local subs for transmission across routes.
if sub.client == nil || sub.client.typ != CLIENT {
if sub.client == nil || sub.client.kind != CLIENT {
return
}
@@ -1568,7 +1568,7 @@ func (s *Server) connectToRoute(rURL *url.URL, tryForEver bool) {
func (c *client) isSolicitedRoute() bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.typ == ROUTER && c.route != nil && c.route.didSolicit
return c.kind == ROUTER && c.route != nil && c.route.didSolicit
}
func (s *Server) solicitRoutes(routes []*url.URL) {

View File

@@ -77,11 +77,21 @@ type Info struct {
GatewayCmd byte `json:"gateway_cmd,omitempty"` // Command code for the receiving server to know what to do
}
// Used to send and receive messages from inside the server.
type internal struct {
account *Account
client *client
seq uint64
sid uint64
subs map[string]msgHandler
}
// Server is our main struct.
type Server struct {
gcid uint64
stats
mu sync.Mutex
kp nkeys.KeyPair
prand *rand.Rand
info Info
configFile string
@@ -91,6 +101,7 @@ type Server struct {
shutdown bool
listener net.Listener
gacc *Account
sys *internal
accounts map[string]*Account
activeAccounts int
accResolver AccountResolver
@@ -178,6 +189,8 @@ func New(opts *Options) *Server {
// Process TLS options, including whether we require client certificates.
tlsReq := opts.TLSConfig != nil
verify := (tlsReq && opts.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert)
kp, _ := nkeys.CreateServer()
pub, _ := kp.PublicKey()
// Validate some options. This is here because we cannot assume that
// server will always be started with configuration parsing (that could
@@ -190,7 +203,7 @@ func New(opts *Options) *Server {
}
info := Info{
ID: genID(),
ID: pub,
Version: VERSION,
Proto: PROTO,
GitCommit: gitCommit,
@@ -205,6 +218,7 @@ func New(opts *Options) *Server {
now := time.Now()
s := &Server{
kp: kp,
configFile: opts.ConfigFile,
info: info,
prand: rand.New(rand.NewSource(time.Now().UnixNano())),
@@ -482,6 +496,33 @@ func (s *Server) RegisterAccount(name string) (*Account, error) {
return acc, nil
}
// Assign an system account. Should only be called once.
// This sets up a server to send and receive messages from inside
// the server itself.
func (s *Server) setSystemAccount(acc *Account) error {
if !s.isTrustedIssuer(acc.Issuer) {
return fmt.Errorf("system account not a trusted account")
}
s.mu.Lock()
if s.sys != nil {
s.mu.Unlock()
return fmt.Errorf("system account already exists")
}
s.sys = &internal{
account: acc,
client: &client{srv: s, kind: SYSTEM, opts: defaultOpts, start: time.Now(), last: time.Now()},
seq: 1,
sid: 1,
subs: make(map[string]msgHandler, 8),
}
s.sys.client.initClient()
s.sys.client.echo = false
s.mu.Unlock()
// Register with the account.
s.sys.client.registerWithAccount(acc)
return nil
}
// Place common account setup here.
func (s *Server) registerAccount(acc *Account) {
if acc.sl == nil {
@@ -1235,6 +1276,8 @@ func (s *Server) createClient(conn net.Conn) *client {
func (s *Server) saveClosedClient(c *client, nc net.Conn, reason ClosedState) {
now := time.Now()
s.accountDisconnectEvent(c, now, reason.String())
c.mu.Lock()
cc := &closedClient{}
@@ -1357,12 +1400,12 @@ func tlsCipher(cs uint16) string {
// Remove a client or route from our internal accounting.
func (s *Server) removeClient(c *client) {
// type is immutable, so can check without lock
switch c.typ {
switch c.kind {
case CLIENT:
c.mu.Lock()
cid := c.cid
updateProtoInfoCount := false
if c.typ == CLIENT && c.opts.Protocol >= ClientProtoInfo {
if c.kind == CLIENT && c.opts.Protocol >= ClientProtoInfo {
updateProtoInfoCount = true
}
c.mu.Unlock()

View File

@@ -365,7 +365,7 @@ func TestSplitDanglingArgBuf(t *testing.T) {
}
// MSG (the client has to be a ROUTE)
c = &client{subs: make(map[string]*subscription), typ: ROUTER}
c = &client{subs: make(map[string]*subscription), kind: ROUTER}
msgop := []byte("RMSG $foo foo 5\r\nhello\r\n")
c.parse(msgop[:5])
c.parse(msgop[5:10])
@@ -419,7 +419,7 @@ func TestSplitDanglingArgBuf(t *testing.T) {
func TestSplitRoutedMsgArg(t *testing.T) {
_, c, _ := setupClient()
// Allow parser to process RMSG
c.typ = ROUTER
c.kind = ROUTER
b := make([]byte, 1024)
@@ -445,7 +445,7 @@ func TestSplitRoutedMsgArg(t *testing.T) {
}
func TestSplitBufferMsgOp(t *testing.T) {
c := &client{subs: make(map[string]*subscription), typ: ROUTER}
c := &client{subs: make(map[string]*subscription), kind: ROUTER}
msg := []byte("RMSG $G foo.bar _INBOX.22 11\r\nhello world\r")
msg1 := msg[:2]
msg2 := msg[2:9]

View File

@@ -323,7 +323,7 @@ func (s *Sublist) reduceCacheCount() {
// Helper function for auto-expanding remote qsubs.
func isRemoteQSub(sub *subscription) bool {
return sub != nil && sub.queue != nil && sub.client != nil && sub.client.typ == ROUTER
return sub != nil && sub.queue != nil && sub.client != nil && sub.client.kind == ROUTER
}
// UpdateRemoteQSub should be called when we update the weight of an existing
@@ -922,7 +922,7 @@ func matchLiteral(literal, subject string) bool {
}
func addLocalSub(sub *subscription, subs *[]*subscription) {
if sub != nil && sub.client != nil && sub.client.typ == CLIENT && sub.im == nil {
if sub != nil && sub.client != nil && sub.client.kind == CLIENT && sub.im == nil {
*subs = append(*subs, sub)
}
}

View File

@@ -87,7 +87,7 @@ func verifyMember(r []*subscription, val *subscription, t *testing.T) {
// Helpers to generate test subscriptions.
func newSub(subject string) *subscription {
c := &client{typ: CLIENT}
c := &client{kind: CLIENT}
return &subscription{client: c, subject: []byte(subject)}
}
@@ -100,7 +100,7 @@ func newQSub(subject, queue string) *subscription {
func newRemoteQSub(subject, queue string, num int32) *subscription {
if queue != "" {
c := &client{typ: ROUTER}
c := &client{kind: ROUTER}
return &subscription{client: c, subject: []byte(subject), queue: []byte(queue), qw: num}
}
return newSub(subject)

View File

@@ -22,17 +22,8 @@ import (
"strconv"
"strings"
"time"
"github.com/nats-io/nkeys"
)
// Use nkeys and the public key.
func genID() string {
kp, _ := nkeys.CreateServer()
pub, _ := kp.PublicKey()
return string(pub)
}
// Ascii numbers 0-9
const (
asciiZero = 48