diff --git a/server/auth.go b/server/auth.go index 27107822..f6fb7a90 100644 --- a/server/auth.go +++ b/server/auth.go @@ -229,7 +229,7 @@ func (s *Server) configureAuthorization() { // checkAuthentication will check based on client type and // return boolean indicating if client is authorized. func (s *Server) checkAuthentication(c *client) bool { - switch c.typ { + switch c.kind { case CLIENT: return s.isClientAuthorized(c) case ROUTER: @@ -353,6 +353,9 @@ func (s *Server) isClientAuthorized(c *client) bool { nkey = buildInternalNkeyUser(juc, acc) c.RegisterNkeyUser(nkey) + // Generate an event if we have a system account. + s.accountConnectEvent(c) + // Check if we need to set an auth timer if the user jwt expires. c.checkExpiration(juc.Claims()) return true @@ -360,17 +363,21 @@ func (s *Server) isClientAuthorized(c *client) bool { if nkey != nil { if c.opts.Sig == "" { + c.Debugf("Signature missing") return false } sig, err := base64.StdEncoding.DecodeString(c.opts.Sig) if err != nil { + c.Debugf("Signature not valid base64") return false } pub, err := nkeys.FromPublicKey(c.opts.Nkey) if err != nil { + c.Debugf("User nkey not valid: %v", err) return false } if err := pub.Verify(c.nonce, sig); err != nil { + c.Debugf("Signature not verified") return false } c.RegisterNkeyUser(nkey) diff --git a/server/client.go b/server/client.go index f5cd692e..a148d336 100644 --- a/server/client.go +++ b/server/client.go @@ -38,6 +38,8 @@ const ( ROUTER // GATEWAY is a link between 2 clusters. GATEWAY + // SYSTEM is an internal system client. + SYSTEM ) const ( @@ -145,7 +147,7 @@ type client struct { mpay int32 msubs int mu sync.Mutex - typ int + kind int cid uint64 opts clientOpts start time.Time @@ -155,6 +157,9 @@ type client struct { out outbound srv *Server acc *Account + user *NkeyUser + host string + port int subs map[string]*subscription perms *permissions mperms *msgDeny @@ -365,19 +370,23 @@ func (c *client) initClient() { c.pcd = make(map[*client]struct{}) // snapshot the string version of the connection - conn := "-" + var conn string if ip, ok := c.nc.(*net.TCPConn); ok { addr := ip.RemoteAddr().(*net.TCPAddr) + c.host = addr.IP.String() + c.port = addr.Port conn = fmt.Sprintf("%s:%d", addr.IP, addr.Port) } - switch c.typ { + switch c.kind { case CLIENT: c.ncs = fmt.Sprintf("%s - cid:%d", conn, c.cid) case ROUTER: c.ncs = fmt.Sprintf("%s - rid:%d", conn, c.cid) case GATEWAY: c.ncs = fmt.Sprintf("%s - gid:%d", conn, c.cid) + case SYSTEM: + c.ncs = "SYSTEM" } } @@ -499,6 +508,7 @@ func (c *client) RegisterNkeyUser(user *NkeyUser) { c.mu.Lock() defer c.mu.Unlock() + c.user = user // Assign permissions. if user.Permissions == nil { @@ -673,7 +683,7 @@ func (c *client) readLoop() { // Client will be checked on several fronts to see // if applicable. Routes will never wait in place. budget := 500 * time.Microsecond - if c.typ == ROUTER { + if c.kind == ROUTER { budget = 0 } @@ -862,8 +872,8 @@ func (c *client) traceMsg(msg []byte) { if !c.trace { return } - // FIXME(dlc), allow limits to printable payload - c.Tracef("<<- MSG_PAYLOAD: [%s]", string(msg[:len(msg)-LEN_CR_LF])) + // FIXME(dlc), allow limits to printable payload. + c.Tracef("<<- MSG_PAYLOAD: [%q]", msg[:len(msg)-LEN_CR_LF]) } func (c *client) traceInOp(op string, arg []byte) { @@ -895,7 +905,7 @@ func (c *client) processInfo(arg []byte) error { if err := json.Unmarshal(arg, &info); err != nil { return err } - switch c.typ { + switch c.kind { case ROUTER: c.processRouteInfo(&info) case GATEWAY: @@ -905,7 +915,7 @@ func (c *client) processInfo(arg []byte) error { } func (c *client) processErr(errStr string) { - switch c.typ { + switch c.kind { case CLIENT: c.Errorf("Client Error %s", errStr) case ROUTER: @@ -968,8 +978,10 @@ func (c *client) processConnect(arg []byte) error { return nil } c.last = time.Now() - typ := c.typ + + kind := c.kind srv := c.srv + // Moved unmarshalling of clients' Options under the lock. // The client has already been added to the server map, so it is possible // that other routines lookup the client, and access its options under @@ -997,7 +1009,7 @@ func (c *client) processConnect(arg []byte) error { // least ClientProtoInfo, we need to increment the following counter. // This is decremented when client is removed from the server's // clients map. - if typ == CLIENT && proto >= ClientProtoInfo { + if kind == CLIENT && proto >= ClientProtoInfo { srv.mu.Lock() srv.cproto++ srv.mu.Unlock() @@ -1045,7 +1057,7 @@ func (c *client) processConnect(arg []byte) error { } - switch typ { + switch kind { case CLIENT: // Check client protocol request if it exists. if proto < ClientProtoZero || proto > ClientProtoInfo { @@ -1281,7 +1293,7 @@ func (c *client) processPing() { c.sendPong() // If not a CLIENT, we are done - if c.typ != CLIENT { + if c.kind != CLIENT { c.mu.Unlock() return } @@ -1333,7 +1345,7 @@ func (c *client) processPong() { c.ping.out = 0 c.rtt = time.Since(c.rttStart) srv := c.srv - reorderGWs := c.typ == GATEWAY && c.gw.outbound + reorderGWs := c.kind == GATEWAY && c.gw.outbound c.mu.Unlock() if reorderGWs { srv.gateway.orderOutboundConnections() @@ -1445,21 +1457,22 @@ func (c *client) processSub(argo []byte) (err error) { } c.mu.Lock() - if c.nc == nil { + + // Grab connection type. + kind := c.kind + + if c.nc == nil && kind != SYSTEM { c.mu.Unlock() return nil } - // Grab connection type. - ctype := c.typ - // Check permissions if applicable. - if ctype == ROUTER { + if kind == ROUTER { if !c.canExport(string(sub.subject)) { c.mu.Unlock() return nil } - } else if !c.canSubscribe(string(sub.subject)) { + } else if kind == CLIENT && !c.canSubscribe(string(sub.subject)) { c.mu.Unlock() c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject)) c.Errorf("Subscription Violation - User %q, Subject %q, SID %s", @@ -1492,7 +1505,7 @@ func (c *client) processSub(argo []byte) (err error) { if err != nil { c.sendErr("Invalid Subject") return nil - } else if c.opts.Verbose { + } else if c.opts.Verbose && kind != SYSTEM { c.sendOK() } @@ -1501,7 +1514,7 @@ func (c *client) processSub(argo []byte) (err error) { c.Errorf(err.Error()) } // If we are routing and this is a local sub, add to the route map for the associated account. - if ctype == CLIENT { + if kind == CLIENT { c.srv.updateRouteSubscriptionMap(acc, sub, 1) } } @@ -1679,7 +1692,7 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force bool) { c.traceOp("<-> %s", "DELSUB", sub.sid) delete(c.subs, string(sub.sid)) - if c.typ != CLIENT { + if c.kind != CLIENT && c.kind != SYSTEM { c.removeReplySubTimeout(sub) } @@ -1691,7 +1704,7 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force bool) { for _, nsub := range sub.shadow { if err := nsub.im.acc.sl.Remove(nsub); err != nil { c.Debugf("Could not remove shadow import subscription for account %q", nsub.im.acc.Name) - } else if c.typ == CLIENT && c.srv != nil { + } else if c.kind == CLIENT && c.srv != nil { c.srv.updateRouteSubscriptionMap(nsub.im.acc, nsub, -1) } } @@ -1724,7 +1737,7 @@ func (c *client) processUnsub(arg []byte) error { c.mu.Lock() // Grab connection type. - ctype := c.typ + kind := c.kind var acc *Account if sub, ok = c.subs[string(sid)]; ok { @@ -1745,7 +1758,7 @@ func (c *client) processUnsub(arg []byte) error { if unsub { c.unsubscribe(acc, sub, false) - if acc != nil && ctype == CLIENT { + if acc != nil && kind == CLIENT { c.srv.updateRouteSubscriptionMap(acc, sub, -1) } } @@ -1814,13 +1827,13 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool { sub.nm++ // Check if we should auto-unsubscribe. if sub.max > 0 { - if client.typ == ROUTER && sub.nm >= sub.max { + if client.kind == ROUTER && sub.nm >= sub.max { // The only router based messages that we will see here are remoteReplies. // We handle these slightly differently. defer client.removeReplySub(sub) } else { // For routing.. - shouldForward := client.typ == CLIENT && client.srv != nil + shouldForward := client.kind == CLIENT && client.srv != nil // If we are at the exact number, unsubscribe but // still process the message in hand, otherwise // unsubscribe and drop message on the floor. @@ -1844,12 +1857,6 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool { } } - // Check for closed connection - if client.nc == nil { - client.mu.Unlock() - return false - } - // Update statistics // The msg includes the CR_LF, so pull back out for accounting. @@ -1863,6 +1870,20 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool { atomic.AddInt64(&srv.outMsgs, 1) atomic.AddInt64(&srv.outBytes, msgSize) + // Check for internal subscription. + if client.kind == SYSTEM { + s := client.srv + client.mu.Unlock() + s.deliverInternalMsg(sub, c.pa.subject, c.pa.reply, msg[:msgSize]) + return true + } + + // Check for closed connection + if client.nc == nil { + client.mu.Unlock() + return false + } + // Queue to outbound buffer client.queueOutbound(mh) client.queueOutbound(msg) @@ -1981,7 +2002,7 @@ func isServiceReply(reply []byte) bool { // This will decide to call the client code or router code. func (c *client) processInboundMsg(msg []byte) { - switch c.typ { + switch c.kind { case CLIENT: c.processInboundClientMsg(msg) case ROUTER: @@ -2162,13 +2183,13 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, for _, sub := range r.psubs { // Check if this is a send to a ROUTER. We now process // these after everything else. - if sub.client.typ == ROUTER { - if c.typ == ROUTER { + if sub.client.kind == ROUTER { + if c.kind == ROUTER { continue } c.addSubToRouteTargets(sub) continue - } else if sub.client.typ == GATEWAY { + } else if sub.client.kind == GATEWAY { // Never send to gateway from here. continue } @@ -2177,7 +2198,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, // Redo the subject here on the fly. msgh = c.msgb[1:msgHeadProtoLen] msgh = append(msgh, sub.im.prefix...) - msgh = append(msgh, c.pa.subject...) + msgh = append(msgh, subject...) msgh = append(msgh, ' ') si = len(msgh) } @@ -2187,7 +2208,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, } // If we are sourced from a route we need to have direct filtered queues. - if c.typ == ROUTER && c.pa.queues == nil { + if c.kind == ROUTER && c.pa.queues == nil { return } @@ -2198,7 +2219,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, // For gateway connections, we still want to send messages to routes // even if there is no queue filters. - if c.typ == GATEWAY && qf == nil { + if c.kind == GATEWAY && qf == nil { goto sendToRoutes } @@ -2241,8 +2262,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, continue } // Potentially sending to a remote sub across a route. - if sub.client.typ == ROUTER { - if c.typ == ROUTER { + if sub.client.kind == ROUTER { + if c.kind == ROUTER { // We just came from a route, so skip and prefer local subs. // Keep our first rsub in case all else fails. if rsub == nil { @@ -2262,7 +2283,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, // Redo the subject here on the fly. msgh = c.msgb[1:msgHeadProtoLen] msgh = append(msgh, sub.im.prefix...) - msgh = append(msgh, c.pa.subject...) + msgh = append(msgh, subject...) msgh = append(msgh, ' ') si = len(msgh) } @@ -2295,8 +2316,8 @@ sendToRoutes: return } - // We address by index to avoid struct copy. We have inline structs for memory - // layout and cache coherency. + // We address by index to avoid struct copy. + // We have inline structs for memory layout and cache coherency. for i := range c.in.rts { rt := &c.in.rts[i] @@ -2444,13 +2465,13 @@ func (c *client) clearConnection(reason ClosedState) { nc.SetWriteDeadline(time.Time{}) // Save off the connection if its a client. - if c.typ == CLIENT && c.srv != nil { + if c.kind == CLIENT && c.srv != nil { go c.srv.saveClosedClient(c, nc, reason) } } func (c *client) typeString() string { - switch c.typ { + switch c.kind { case CLIENT: return "Client" case ROUTER: @@ -2546,7 +2567,7 @@ func (c *client) closeConnection(reason ClosedState) { // Be consistent with the creation: for routes and gateways, // we use Noticef on create, so use that too for delete. - if c.typ == ROUTER || c.typ == GATEWAY { + if c.kind == ROUTER || c.kind == GATEWAY { c.Noticef("%s connection closed", c.typeString()) } else { c.Debugf("%s connection closed", c.typeString()) @@ -2563,7 +2584,7 @@ func (c *client) closeConnection(reason ClosedState) { gwName string gwIsOutbound bool gwCfg *gatewayCfg - ctype = c.typ + kind = c.kind srv = c.srv noReconnect = c.flags.isSet(noReconnect) acc = c.acc @@ -2573,7 +2594,7 @@ func (c *client) closeConnection(reason ClosedState) { // FIXME(dlc) - we can just stub in a new one for client // and reference existing one. var subs []*subscription - if ctype == CLIENT { + if kind == CLIENT { subs = make([]*subscription, 0, len(c.subs)) for _, sub := range c.subs { // Auto-unsubscribe subscriptions must be unsubscribed forcibly. @@ -2588,7 +2609,7 @@ func (c *client) closeConnection(reason ClosedState) { } connectURLs = c.route.connectURLs } - if ctype == GATEWAY { + if kind == GATEWAY { gwName = c.gw.name gwIsOutbound = c.gw.outbound gwCfg = c.gw.cfg @@ -2597,7 +2618,7 @@ func (c *client) closeConnection(reason ClosedState) { c.mu.Unlock() // Remove clients subscriptions. - if ctype == CLIENT { + if kind == CLIENT { acc.sl.RemoveBatch(subs) } else { go c.removeRemoteSubs() @@ -2617,7 +2638,7 @@ func (c *client) closeConnection(reason ClosedState) { srv.removeClient(c) // Update remote subscriptions. - if acc != nil && ctype == CLIENT { + if acc != nil && kind == CLIENT { qsubs := map[string]*qsub{} for _, sub := range subs { if sub.queue == nil { @@ -2683,7 +2704,7 @@ func (c *client) closeConnection(reason ClosedState) { // server shutdown. srv.startGoRoutine(func() { srv.reConnectToRoute(rurl, rtype) }) } - } else if srv != nil && ctype == GATEWAY && gwIsOutbound { + } else if srv != nil && kind == GATEWAY && gwIsOutbound { if gwCfg != nil { srv.Debugf("Attempting reconnect for gateway %q", gwName) // Run this as a go routine since we may be called within diff --git a/server/errors.go b/server/errors.go index f40d9440..ec0b812b 100644 --- a/server/errors.go +++ b/server/errors.go @@ -92,6 +92,10 @@ var ( // request from a remote Gateway with a destination name that does not match the server's // Gateway's name. ErrWrongGateway = errors.New("Wrong Gateway") + + // ErrNoSysAccount is returned when an attempt to publish or subscribe is made + // when there is no internal system account defined. + ErrNoSysAccount = errors.New("System Account Not Setup") ) // configErr is a configuration error. diff --git a/server/events.go b/server/events.go new file mode 100644 index 00000000..3ffa027a --- /dev/null +++ b/server/events.go @@ -0,0 +1,298 @@ +// Copyright 2018 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "encoding/json" + "fmt" + "strconv" + "time" +) + +const ( + connectEventSubj = "$SYS.%s.CLIENT.CONNECT" + disconnectEventSubj = "$SYS.%s.CLIENT.DISCONNECT" +) + +// ConnectEventMsg is sent when a new connection is made that is part of an account. +type ConnectEventMsg struct { + Server ServerInfo `json:"server"` + Client ClientInfo `json:"client"` +} + +// DisconnectEventMsg is sent when a new connection previously defined from a +// ConnectEventMsg is closed. +type DisconnectEventMsg struct { + Server ServerInfo `json:"server"` + Client ClientInfo `json:"client"` + Sent DataStats `json:"sent"` + Received DataStats `json:"received"` + Reason string `json:"reason"` +} + +type ServerInfo struct { + Host string `json:"host"` + ID string `json:"id"` + Seq uint64 `json:"seq"` +} + +// ClientInfo is detailed information about the client forming a connection. +type ClientInfo struct { + Start time.Time `json:"start,omitempty"` + Host string `json:"host,omitempty"` + ID uint64 `json:"id"` + Account string `json:"acc"` + User string `json:"user,omitempty"` + Name string `json:"name,omitempty"` + Lang string `json:"lang,omitempty"` + Version string `json:"ver,omitempty"` + Stop *time.Time `json:"stop,omitempty"` +} + +// DataStats reports how may msg and bytes. Applicable for both sent and received. +type DataStats struct { + Msgs int64 `json:"msgs"` + Bytes int64 `json:"bytes"` +} + +// Used for internally queueing up messages that the server wants to send. +type pubMsg struct { + r *SublistResult + sub string + si *ServerInfo + msg interface{} +} + +func (s *Server) internalSendLoop() { + defer s.grWG.Done() + s.mu.Lock() + if s.sys == nil { + s.mu.Unlock() + return + } + c := s.sys.client + acc := s.sys.account + sendq := s.sys.sendq + s.mu.Unlock() + + for s.isRunning() { + select { + case pm := <-sendq: + s.stampServerInfo(pm.si) + b, _ := json.MarshalIndent(pm.msg, "", " ") + + // Prep internal structures needed to send message. + c.pa.subject = []byte(pm.sub) + c.pa.size = len(b) + c.pa.szb = []byte(strconv.FormatInt(int64(len(b)), 10)) + // Add in NL + b = append(b, _CRLF_...) + // Check to see if we need to map/route to another account. + if acc.imports.services != nil { + c.checkForImportServices(acc, b) + } + c.processMsgResults(acc, pm.r, b, []byte(pm.sub), nil, nil) + c.flushClients() + case <-s.quitCh: + return + } + } +} + +// This will queue up a message to be sent. +func (s *Server) sendInternalMsg(r *SublistResult, sub string, si *ServerInfo, msg interface{}) { + if s.sys == nil { + return + } + s.sys.sendq <- &pubMsg{r, sub, si, msg} +} + +// accountConnectEvent will send an account client connect event if there is interest. +func (s *Server) accountConnectEvent(c *client) { + if s.sys == nil || s.sys.client == nil || s.sys.account == nil { + return + } + acc := s.sys.account + + subj := fmt.Sprintf(connectEventSubj, c.acc.Name) + r := acc.sl.Match(subj) + if s.noOutSideInterest(r) { + return + } + + c.mu.Lock() + m := ConnectEventMsg{ + Client: ClientInfo{ + Start: c.start, + Host: c.host, + ID: c.cid, + Account: c.acc.Name, + User: nameForClient(c), + Name: c.opts.Name, + Lang: c.opts.Lang, + Version: c.opts.Version, + }, + } + c.mu.Unlock() + + s.sendInternalMsg(r, subj, &m.Server, &m) +} + +// accountDisconnectEvent will send an account client disconnect event if there is interest. +func (s *Server) accountDisconnectEvent(c *client, now time.Time, reason string) { + if s.sys == nil || s.sys.client == nil || s.sys.account == nil { + return + } + acc := s.sys.account + + subj := fmt.Sprintf(disconnectEventSubj, c.acc.Name) + r := acc.sl.Match(subj) + if s.noOutSideInterest(r) { + return + } + + c.mu.Lock() + m := DisconnectEventMsg{ + Client: ClientInfo{ + Start: c.start, + Stop: &now, + Host: c.host, + ID: c.cid, + Account: c.acc.Name, + User: nameForClient(c), + Name: c.opts.Name, + Lang: c.opts.Lang, + Version: c.opts.Version, + }, + Sent: DataStats{ + Msgs: c.inMsgs, + Bytes: c.inBytes, + }, + Received: DataStats{ + Msgs: c.outMsgs, + Bytes: c.outBytes, + }, + Reason: reason, + } + c.mu.Unlock() + + s.sendInternalMsg(r, subj, &m.Server, &m) +} + +// Internal message callback. If the msg is needed past the callback it is +// required to be copied. +type msgHandler func(sub *subscription, subject, reply string, msg []byte) + +func (s *Server) deliverInternalMsg(sub *subscription, subject, reply, msg []byte) { + if s.sys == nil { + return + } + s.mu.Lock() + cb := s.sys.subs[string(sub.sid)] + s.mu.Unlock() + if cb != nil { + cb(sub, string(subject), string(reply), msg) + } +} + +// Create an internal subscription. No support for queue groups atm. +func (s *Server) sysSubscribe(subject string, cb msgHandler) (*subscription, error) { + if s.sys == nil { + return nil, ErrNoSysAccount + } + if cb == nil { + return nil, fmt.Errorf("Undefined message handler") + } + s.mu.Lock() + sid := strconv.FormatInt(int64(s.sys.sid), 10) + s.sys.subs[sid] = cb + s.sys.sid++ + c := s.sys.client + s.mu.Unlock() + + // Now create the subscription + if err := c.processSub([]byte(subject + " " + sid)); err != nil { + return nil, err + } + c.mu.Lock() + sub := c.subs[sid] + c.mu.Unlock() + return sub, nil +} + +func (s *Server) sysUnsubscribe(sub *subscription) { + if sub == nil || s.sys == nil { + return + } + s.mu.Lock() + acc := s.sys.account + c := s.sys.client + delete(s.sys.subs, string(sub.sid)) + s.mu.Unlock() + c.unsubscribe(acc, sub, true) +} + +func (s *Server) noOutSideInterest(r *SublistResult) bool { + sc := s.sys.client + if sc == nil || r == nil { + return true + } + nsubs := len(r.psubs) + len(r.qsubs) + if nsubs == 0 { + return true + } + // We will always be no-echo but will determine that on delivery. + // Here we try to avoid generating the payload if there is only us. + // We only check normal subs. If we introduce queue subs into the + // internal subscribers we should add in the check. + for _, sub := range r.psubs { + if sub.client != sc { + return false + } + } + return true +} + +func (s *Server) stampServerInfo(si *ServerInfo) { + if si == nil { + return + } + s.mu.Lock() + si.ID = s.info.ID + si.Seq = s.sys.seq + si.Host = s.info.Host + s.sys.seq++ + s.mu.Unlock() +} + +func (c *client) flushClients() { + last := time.Now() + for cp := range c.pcd { + // Queue up a flush for those in the set + cp.mu.Lock() + // Update last activity for message delivery + cp.last = last + cp.out.fsp-- + cp.flushSignal() + cp.mu.Unlock() + delete(c.pcd, cp) + } +} + +func nameForClient(c *client) string { + if c.user != nil { + return c.user.Nkey + } + return "N/A" +} diff --git a/server/events_test.go b/server/events_test.go new file mode 100644 index 00000000..5b63f9ac --- /dev/null +++ b/server/events_test.go @@ -0,0 +1,300 @@ +// Copyright 2018 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "bytes" + "encoding/json" + "fmt" + "strings" + "testing" + "time" + + "github.com/nats-io/go-nats" + "github.com/nats-io/jwt" + "github.com/nats-io/nkeys" +) + +func createAccount(s *Server) (*Account, nkeys.KeyPair) { + okp, _ := nkeys.FromSeed(oSeed) + akp, _ := nkeys.CreateAccount() + pub, _ := akp.PublicKey() + nac := jwt.NewAccountClaims(pub) + jwt, _ := nac.Encode(okp) + addAccountToMemResolver(s, pub, jwt) + return s.LookupAccount(pub), akp +} + +func TestSystemAccount(t *testing.T) { + s := opTrustBasicSetup() + defer s.Shutdown() + buildMemAccResolver(s) + + acc, _ := createAccount(s) + s.setSystemAccount(acc) + + s.mu.Lock() + defer s.mu.Unlock() + if s.sys == nil || s.sys.account == nil { + t.Fatalf("Expected sys.account to be non-nil") + } + if s.sys.client == nil { + t.Fatalf("Expected sys.client to be non-nil") + } + if s.sys.client.echo { + t.Fatalf("Internal clients should always have echo false") + } +} + +func createUserCreds(t *testing.T, s *Server, akp nkeys.KeyPair) nats.Option { + t.Helper() + kp, _ := nkeys.CreateUser() + pub, _ := kp.PublicKey() + nuc := jwt.NewUserClaims(pub) + ujwt, err := nuc.Encode(akp) + if err != nil { + t.Fatalf("Error generating user JWT: %v", err) + } + userCB := func() (string, error) { + return ujwt, nil + } + sigCB := func(nonce []byte) ([]byte, error) { + sig, _ := kp.Sign(nonce) + return sig, nil + } + return nats.UserJWT(userCB, sigCB) +} + +func runTrustedServer(t *testing.T) (*Server, *Options) { + t.Helper() + opts := DefaultOptions() + kp, _ := nkeys.FromSeed(oSeed) + pub, _ := kp.PublicKey() + opts.TrustedNkeys = []string{pub} + s := RunServer(opts) + buildMemAccResolver(s) + return s, opts +} + +func TestSystemAccountNewConnection(t *testing.T) { + s, opts := runTrustedServer(t) + defer s.Shutdown() + + acc, akp := createAccount(s) + s.setSystemAccount(acc) + + url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) + + ncs, err := nats.Connect(url, createUserCreds(t, s, akp)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer ncs.Close() + + sub, _ := ncs.SubscribeSync(">") + defer sub.Unsubscribe() + ncs.Flush() + + // We can't hear ourselves, so we need to create a second client to + // trigger the connect/disconnect events. + acc2, akp2 := createAccount(s) + + nc, err := nats.Connect(url, createUserCreds(t, s, akp2), nats.Name("TEST EVENTS")) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + msg, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error receiving msg: %v", err) + } + + if !strings.HasPrefix(msg.Subject, fmt.Sprintf("$SYS.%s.CLIENT.CONNECT", acc2.Name)) { + t.Fatalf("Expected subject to start with %q, got %q", "$SYS..CLIENT.CONNECT", msg.Subject) + } + tokens := strings.Split(msg.Subject, ".") + if len(tokens) < 4 { + t.Fatalf("Expected 4 tokens, got %d", len(tokens)) + } + account := tokens[1] + if account != acc2.Name { + t.Fatalf("Expected %q for account, got %q", acc2.Name, account) + } + + cem := ConnectEventMsg{} + if err := json.Unmarshal(msg.Data, &cem); err != nil { + t.Fatalf("Error unmarshalling connect event message: %v", err) + } + if cem.Server.ID != s.ID() { + t.Fatalf("Expected server to be %q, got %q", s.ID(), cem.Server) + } + if cem.Server.Seq == 0 { + t.Fatalf("Expected sequence to be non-zero") + } + if cem.Client.Name != "TEST EVENTS" { + t.Fatalf("Expected client name to be %q, got %q", "TEST EVENTS", cem.Client.Name) + } + if cem.Client.Lang != "go" { + t.Fatalf("Expected client lang to be \"go\", got %q", cem.Client.Lang) + } + + // Now close the other client. Should fire a disconnect event. + // First send and receive some messages. + sub2, _ := nc.SubscribeSync("foo") + defer sub2.Unsubscribe() + sub3, _ := nc.SubscribeSync("*") + defer sub3.Unsubscribe() + + for i := 0; i < 10; i++ { + nc.Publish("foo", []byte("HELLO WORLD")) + } + nc.Flush() + nc.Close() + + msg, err = sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error receiving msg: %v", err) + } + + if !strings.HasPrefix(msg.Subject, fmt.Sprintf("$SYS.%s.CLIENT.DISCONNECT", acc2.Name)) { + t.Fatalf("Expected subject to start with %q, got %q", "$SYS..CLIENT.DISCONNECT", msg.Subject) + } + tokens = strings.Split(msg.Subject, ".") + if len(tokens) < 4 { + t.Fatalf("Expected 4 tokens, got %d", len(tokens)) + } + account = tokens[1] + if account != acc2.Name { + t.Fatalf("Expected %q for account, got %q", acc2.Name, account) + } + + dem := DisconnectEventMsg{} + if err := json.Unmarshal(msg.Data, &dem); err != nil { + t.Fatalf("Error unmarshalling disconnect event message: %v", err) + } + + if dem.Server.ID != s.ID() { + t.Fatalf("Expected server to be %q, got %q", s.ID(), dem.Server) + } + if dem.Server.Seq == 0 { + t.Fatalf("Expected sequence to be non-zero") + } + if dem.Server.Seq <= cem.Server.Seq { + t.Fatalf("Expected sequence to be increasing") + } + + if cem.Client.Name != "TEST EVENTS" { + t.Fatalf("Expected client name to be %q, got %q", "TEST EVENTS", dem.Client.Name) + } + if dem.Client.Lang != "go" { + t.Fatalf("Expected client lang to be \"go\", got %q", dem.Client.Lang) + } + + if dem.Sent.Msgs != 10 { + t.Fatalf("Expected 10 msgs sent, got %d", dem.Sent.Msgs) + } + if dem.Sent.Bytes != 110 { + t.Fatalf("Expected 110 bytes sent, got %d", dem.Sent.Bytes) + } + if dem.Received.Msgs != 20 { + t.Fatalf("Expected 20 msgs received, got %d", dem.Sent.Msgs) + } + if dem.Received.Bytes != 220 { + t.Fatalf("Expected 220 bytes sent, got %d", dem.Sent.Bytes) + } +} + +func TestSystemInternalSubscriptions(t *testing.T) { + s, opts := runTrustedServer(t) + defer s.Shutdown() + + sub, err := s.sysSubscribe("foo", nil) + if sub != nil || err != ErrNoSysAccount { + t.Fatalf("Expected to get proper error, got %v", err) + } + + acc, akp := createAccount(s) + s.setSystemAccount(acc) + + url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) + + nc, err := nats.Connect(url, createUserCreds(t, s, akp)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + sub, err = s.sysSubscribe("foo", nil) + if sub != nil || err == nil { + t.Fatalf("Expected to get error for no handler, got %v", err) + } + + received := make(chan *nats.Msg) + // Create message callback handler. + cb := func(sub *subscription, subject, reply string, msg []byte) { + copy := append([]byte(nil), msg...) + received <- &nats.Msg{Subject: subject, Reply: reply, Data: copy} + } + + // Now create an internal subscription + sub, err = s.sysSubscribe("foo", cb) + if sub == nil || err != nil { + t.Fatalf("Expected to subscribe, got %v", err) + } + // Now send out a message from our normal client. + nc.Publish("foo", []byte("HELLO WORLD")) + + var msg *nats.Msg + + select { + case msg = <-received: + if msg.Subject != "foo" { + t.Fatalf("Expected \"foo\" as subject, got %q", msg.Subject) + } + if msg.Reply != "" { + t.Fatalf("Expected no reply, got %q", msg.Reply) + } + if !bytes.Equal(msg.Data, []byte("HELLO WORLD")) { + t.Fatalf("Got the wrong msg payload: %q", msg.Data) + } + break + case <-time.After(time.Second): + t.Fatalf("Did not receive the message") + } + s.sysUnsubscribe(sub) + + // Now send out a message from our normal client. + // We should not see this one. + nc.Publish("foo", []byte("You There?")) + + select { + case <-received: + t.Fatalf("Received a message when we should not have") + case <-time.After(100 * time.Millisecond): + break + } + + // Now make sure we do not hear ourselves. We optimize this for internally + // generated messages. + r := SublistResult{psubs: []*subscription{sub}} + s.sendInternalMsg(&r, "foo", nil, msg.Data) + + select { + case <-received: + t.Fatalf("Received a message when we should not have") + case <-time.After(100 * time.Millisecond): + break + } +} diff --git a/server/gateway.go b/server/gateway.go index 6a6291ae..6ab1b962 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -456,7 +456,7 @@ func (s *Server) createGateway(cfg *gatewayCfg, url *url.URL, conn net.Conn) { // Snapshot server options. opts := s.getOpts() - c := &client{srv: s, nc: conn, typ: GATEWAY} + c := &client{srv: s, nc: conn, kind: GATEWAY} // Are we creating the gateway based on the configuration solicit := cfg != nil diff --git a/server/monitor.go b/server/monitor.go index c5bb4d71..3c62ad22 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -383,11 +383,9 @@ func (ci *ConnInfo) fill(client *client, nc net.Conn, now time.Time) { ci.TLSCipher = tlsCipher(cs.CipherSuite) } - switch conn := nc.(type) { - case *net.TCPConn, *tls.Conn: - addr := conn.RemoteAddr().(*net.TCPAddr) - ci.Port = addr.Port - ci.IP = addr.IP.String() + if client.port != 0 { + ci.Port = client.port + ci.IP = client.host } } @@ -1002,7 +1000,7 @@ func ResponseHandler(w http.ResponseWriter, r *http.Request, data []byte) { func (reason ClosedState) String() string { switch reason { case ClientClosed: - return "Client" + return "Client Closed" case AuthenticationTimeout: return "Authentication Timeout" case AuthenticationViolation: diff --git a/server/parser.go b/server/parser.go index a80b5dd3..d841ec09 100644 --- a/server/parser.go +++ b/server/parser.go @@ -136,13 +136,13 @@ func (c *client) parse(buf []byte) error { case 'U', 'u': c.state = OP_U case 'R', 'r': - if c.typ == CLIENT { + if c.kind == CLIENT { goto parseErr } else { c.state = OP_R } case 'A', 'a': - if c.typ == CLIENT { + if c.kind == CLIENT { goto parseErr } else { c.state = OP_A @@ -388,7 +388,8 @@ func (c *client) parse(buf []byte) error { arg = buf[c.as : i-c.drop] } var err error - switch c.typ { + + switch c.kind { case CLIENT: err = c.processSub(arg) case ROUTER: @@ -479,7 +480,8 @@ func (c *client) parse(buf []byte) error { arg = buf[c.as : i-c.drop] } var err error - switch c.typ { + + switch c.kind { case CLIENT: err = c.processUnsub(arg) case ROUTER: diff --git a/server/parser_test.go b/server/parser_test.go index 5773e06a..2bb01623 100644 --- a/server/parser_test.go +++ b/server/parser_test.go @@ -23,7 +23,7 @@ func dummyClient() *client { } func dummyRouteClient() *client { - return &client{srv: New(&defaultServerOptions), typ: ROUTER} + return &client{srv: New(&defaultServerOptions), kind: ROUTER} } func TestParsePing(t *testing.T) { @@ -456,7 +456,7 @@ func TestShouldFail(t *testing.T) { wrongProtos = []string{"Mx", "MSx", "MSGx", "MSG \r\n"} for _, proto := range wrongProtos { c := dummyClient() - c.typ = ROUTER + c.kind = ROUTER if err := c.parse([]byte(proto)); err == nil { t.Fatalf("Should have received a parse error for: %v", proto) } diff --git a/server/route.go b/server/route.go index 34944901..a8372ba2 100644 --- a/server/route.go +++ b/server/route.go @@ -141,7 +141,7 @@ func (c *client) removeReplySubTimeout(sub *subscription) { func (c *client) processAccountSub(arg []byte) error { c.traceInOp("A+", arg) accName := string(arg) - if c.typ == GATEWAY { + if c.kind == GATEWAY { return c.processGatewayAccountSub(accName) } return nil @@ -150,7 +150,7 @@ func (c *client) processAccountSub(arg []byte) error { func (c *client) processAccountUnsub(arg []byte) { c.traceInOp("A-", arg) accName := string(arg) - if c.typ == GATEWAY { + if c.kind == GATEWAY { c.processGatewayAccountUnsub(accName) } } @@ -1025,7 +1025,7 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { } } - c := &client{srv: s, nc: conn, opts: clientOpts{}, typ: ROUTER, route: r} + c := &client{srv: s, nc: conn, opts: clientOpts{}, kind: ROUTER, route: r} // Grab server variables s.mu.Lock() @@ -1232,7 +1232,7 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del } // We only store state on local subs for transmission across routes. - if sub.client == nil || sub.client.typ != CLIENT { + if sub.client == nil || sub.client.kind != CLIENT { return } @@ -1568,7 +1568,7 @@ func (s *Server) connectToRoute(rURL *url.URL, tryForEver bool) { func (c *client) isSolicitedRoute() bool { c.mu.Lock() defer c.mu.Unlock() - return c.typ == ROUTER && c.route != nil && c.route.didSolicit + return c.kind == ROUTER && c.route != nil && c.route.didSolicit } func (s *Server) solicitRoutes(routes []*url.URL) { diff --git a/server/server.go b/server/server.go index 70df4091..0370b8e8 100644 --- a/server/server.go +++ b/server/server.go @@ -77,11 +77,22 @@ type Info struct { GatewayCmd byte `json:"gateway_cmd,omitempty"` // Command code for the receiving server to know what to do } +// Used to send and receive messages from inside the server. +type internal struct { + account *Account + client *client + seq uint64 + sid uint64 + subs map[string]msgHandler + sendq chan *pubMsg +} + // Server is our main struct. type Server struct { gcid uint64 stats mu sync.Mutex + kp nkeys.KeyPair prand *rand.Rand info Info configFile string @@ -91,6 +102,7 @@ type Server struct { shutdown bool listener net.Listener gacc *Account + sys *internal accounts map[string]*Account activeAccounts int accResolver AccountResolver @@ -178,6 +190,8 @@ func New(opts *Options) *Server { // Process TLS options, including whether we require client certificates. tlsReq := opts.TLSConfig != nil verify := (tlsReq && opts.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert) + kp, _ := nkeys.CreateServer() + pub, _ := kp.PublicKey() // Validate some options. This is here because we cannot assume that // server will always be started with configuration parsing (that could @@ -190,7 +204,7 @@ func New(opts *Options) *Server { } info := Info{ - ID: genID(), + ID: pub, Version: VERSION, Proto: PROTO, GitCommit: gitCommit, @@ -205,6 +219,7 @@ func New(opts *Options) *Server { now := time.Now() s := &Server{ + kp: kp, configFile: opts.ConfigFile, info: info, prand: rand.New(rand.NewSource(time.Now().UnixNano())), @@ -482,6 +497,40 @@ func (s *Server) RegisterAccount(name string) (*Account, error) { return acc, nil } +// Assign an system account. Should only be called once. +// This sets up a server to send and receive messages from inside +// the server itself. +func (s *Server) setSystemAccount(acc *Account) error { + if !s.isTrustedIssuer(acc.Issuer) { + return fmt.Errorf("system account not a trusted account") + } + s.mu.Lock() + if s.sys != nil { + s.mu.Unlock() + return fmt.Errorf("system account already exists") + } + s.sys = &internal{ + account: acc, + client: &client{srv: s, kind: SYSTEM, opts: defaultOpts, start: time.Now(), last: time.Now()}, + seq: 1, + sid: 1, + subs: make(map[string]msgHandler, 8), + sendq: make(chan *pubMsg, 128), + } + s.sys.client.initClient() + s.sys.client.echo = false + s.mu.Unlock() + // Register with the account. + s.sys.client.registerWithAccount(acc) + + // Start our internal loop to serialize outbound messages. + s.startGoRoutine(func() { + s.internalSendLoop() + }) + + return nil +} + // Place common account setup here. func (s *Server) registerAccount(acc *Account) { if acc.sl == nil { @@ -1235,6 +1284,8 @@ func (s *Server) createClient(conn net.Conn) *client { func (s *Server) saveClosedClient(c *client, nc net.Conn, reason ClosedState) { now := time.Now() + s.accountDisconnectEvent(c, now, reason.String()) + c.mu.Lock() cc := &closedClient{} @@ -1357,12 +1408,12 @@ func tlsCipher(cs uint16) string { // Remove a client or route from our internal accounting. func (s *Server) removeClient(c *client) { // type is immutable, so can check without lock - switch c.typ { + switch c.kind { case CLIENT: c.mu.Lock() cid := c.cid updateProtoInfoCount := false - if c.typ == CLIENT && c.opts.Protocol >= ClientProtoInfo { + if c.kind == CLIENT && c.opts.Protocol >= ClientProtoInfo { updateProtoInfoCount = true } c.mu.Unlock() diff --git a/server/split_test.go b/server/split_test.go index c610f54b..9abf0744 100644 --- a/server/split_test.go +++ b/server/split_test.go @@ -365,7 +365,7 @@ func TestSplitDanglingArgBuf(t *testing.T) { } // MSG (the client has to be a ROUTE) - c = &client{subs: make(map[string]*subscription), typ: ROUTER} + c = &client{subs: make(map[string]*subscription), kind: ROUTER} msgop := []byte("RMSG $foo foo 5\r\nhello\r\n") c.parse(msgop[:5]) c.parse(msgop[5:10]) @@ -419,7 +419,7 @@ func TestSplitDanglingArgBuf(t *testing.T) { func TestSplitRoutedMsgArg(t *testing.T) { _, c, _ := setupClient() // Allow parser to process RMSG - c.typ = ROUTER + c.kind = ROUTER b := make([]byte, 1024) @@ -445,7 +445,7 @@ func TestSplitRoutedMsgArg(t *testing.T) { } func TestSplitBufferMsgOp(t *testing.T) { - c := &client{subs: make(map[string]*subscription), typ: ROUTER} + c := &client{subs: make(map[string]*subscription), kind: ROUTER} msg := []byte("RMSG $G foo.bar _INBOX.22 11\r\nhello world\r") msg1 := msg[:2] msg2 := msg[2:9] diff --git a/server/sublist.go b/server/sublist.go index 2389b586..37ff2ccb 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -323,7 +323,7 @@ func (s *Sublist) reduceCacheCount() { // Helper function for auto-expanding remote qsubs. func isRemoteQSub(sub *subscription) bool { - return sub != nil && sub.queue != nil && sub.client != nil && sub.client.typ == ROUTER + return sub != nil && sub.queue != nil && sub.client != nil && sub.client.kind == ROUTER } // UpdateRemoteQSub should be called when we update the weight of an existing @@ -922,7 +922,7 @@ func matchLiteral(literal, subject string) bool { } func addLocalSub(sub *subscription, subs *[]*subscription) { - if sub != nil && sub.client != nil && sub.client.typ == CLIENT && sub.im == nil { + if sub != nil && sub.client != nil && sub.client.kind == CLIENT && sub.im == nil { *subs = append(*subs, sub) } } diff --git a/server/sublist_test.go b/server/sublist_test.go index 83e613a8..d8abaece 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -87,7 +87,7 @@ func verifyMember(r []*subscription, val *subscription, t *testing.T) { // Helpers to generate test subscriptions. func newSub(subject string) *subscription { - c := &client{typ: CLIENT} + c := &client{kind: CLIENT} return &subscription{client: c, subject: []byte(subject)} } @@ -100,7 +100,7 @@ func newQSub(subject, queue string) *subscription { func newRemoteQSub(subject, queue string, num int32) *subscription { if queue != "" { - c := &client{typ: ROUTER} + c := &client{kind: ROUTER} return &subscription{client: c, subject: []byte(subject), queue: []byte(queue), qw: num} } return newSub(subject) diff --git a/server/util.go b/server/util.go index 241e9345..da95fbd0 100644 --- a/server/util.go +++ b/server/util.go @@ -22,17 +22,8 @@ import ( "strconv" "strings" "time" - - "github.com/nats-io/nkeys" ) -// Use nkeys and the public key. -func genID() string { - kp, _ := nkeys.CreateServer() - pub, _ := kp.PublicKey() - return string(pub) -} - // Ascii numbers 0-9 const ( asciiZero = 48