mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Only send over route once
This commit is contained in:
@@ -181,17 +181,23 @@ func (c *client) processConnect(arg []byte) error {
|
||||
// so we can just clear it here.
|
||||
c.clearAuthTimer()
|
||||
|
||||
// FIXME, check err
|
||||
if err := json.Unmarshal(arg, &c.opts); err != nil {
|
||||
return err
|
||||
}
|
||||
// Check for Auth
|
||||
|
||||
if c.srv != nil {
|
||||
// Check for Auth
|
||||
if ok := c.srv.checkAuth(c); !ok {
|
||||
c.sendErr("Authorization is Required")
|
||||
return fmt.Errorf("Authorization Error")
|
||||
}
|
||||
}
|
||||
|
||||
// Copy over name if router.
|
||||
if c.typ == ROUTER && c.route != nil {
|
||||
c.route.remoteId = c.opts.Name
|
||||
}
|
||||
|
||||
if c.opts.Verbose {
|
||||
c.sendOK()
|
||||
}
|
||||
@@ -456,11 +462,9 @@ func (c *client) msgHeader(mh []byte, sub *subscription) []byte {
|
||||
return mh
|
||||
}
|
||||
|
||||
// FIXME(dlc) extra type might negate..
|
||||
// Used to treat map as efficient set
|
||||
type empty struct{}
|
||||
|
||||
var needFlush = empty{}
|
||||
// Used to treat maps as efficient set
|
||||
var needFlush = struct{}{}
|
||||
var routeSeen = struct{}{}
|
||||
|
||||
func (c *client) deliverMsg(sub *subscription, mh, msg []byte) {
|
||||
if sub.client == nil {
|
||||
@@ -549,9 +553,12 @@ func (c *client) processMsg(msg []byte) {
|
||||
c.inMsgs++
|
||||
c.inBytes += int64(len(msg))
|
||||
|
||||
if c.srv != nil {
|
||||
atomic.AddInt64(&c.srv.inMsgs, 1)
|
||||
atomic.AddInt64(&c.srv.inBytes, int64(len(msg)))
|
||||
// Snapshot server.
|
||||
srv := c.srv
|
||||
|
||||
if srv != nil {
|
||||
atomic.AddInt64(&srv.inMsgs, 1)
|
||||
atomic.AddInt64(&srv.inBytes, int64(len(msg)))
|
||||
}
|
||||
|
||||
if trace > 0 {
|
||||
@@ -583,14 +590,33 @@ func (c *client) processMsg(msg []byte) {
|
||||
var qsubs []*subscription
|
||||
|
||||
isRoute := c.typ == ROUTER
|
||||
var rmap map[string]struct{}
|
||||
|
||||
for _, v := range r {
|
||||
sub := v.(*subscription)
|
||||
|
||||
// Skip if sourced from a ROUTER and going to another ROUYTER.
|
||||
// This is one-hop semantics for ROUTERs.
|
||||
if isRoute && sub.client.typ == ROUTER {
|
||||
continue
|
||||
// If this is a send to a ROUTER, make sure we only send it
|
||||
// once. The other side will handle the appropriate re-processing.
|
||||
// Also enforce 1-Hop.
|
||||
if sub.client.typ == ROUTER {
|
||||
// Skip if sourced from a ROUTER and going to another ROUTER.
|
||||
// This is 1-Hop semantics for ROUTERs.
|
||||
if isRoute {
|
||||
continue
|
||||
}
|
||||
// Check to see if we have already sent it here.
|
||||
if rmap == nil {
|
||||
rmap = make(map[string]struct{}, len(srv.routes))
|
||||
}
|
||||
|
||||
if sub.client == nil || sub.client.route == nil || sub.client.route.remoteId == "" {
|
||||
Debug("Bad or Missing ROUTER Identity, not processing msg", clientConnStr(c.nc), c.cid)
|
||||
continue
|
||||
}
|
||||
if _, ok := rmap[sub.client.route.remoteId]; ok {
|
||||
continue
|
||||
}
|
||||
rmap[sub.client.route.remoteId] = routeSeen
|
||||
}
|
||||
|
||||
if sub.queue != nil {
|
||||
@@ -598,7 +624,6 @@ func (c *client) processMsg(msg []byte) {
|
||||
if qmap == nil {
|
||||
qmap = make(map[string][]*subscription)
|
||||
}
|
||||
//qname := *(*string)(unsafe.Pointer(&sub.queue))
|
||||
qname := string(sub.queue)
|
||||
qsubs = qmap[qname]
|
||||
if qsubs == nil {
|
||||
|
||||
@@ -205,4 +205,29 @@ func TestRouteOneHopSemantics(t *testing.T) {
|
||||
|
||||
// Make sure it does not come back!
|
||||
expectNothing(t, route)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRouteOnlySendOnce(t *testing.T) {
|
||||
s, opts := runRouteServer(t)
|
||||
defer s.Shutdown()
|
||||
|
||||
client := createClientConn(t, opts.Host, opts.Port)
|
||||
defer client.Close()
|
||||
|
||||
clientSend, _ := setupConn(t, client)
|
||||
|
||||
route := createRouteConn(t, opts.ClusterHost, opts.ClusterPort)
|
||||
expectAuthRequired(t, route)
|
||||
routeSend, routeExpect := setupRoute(t, route, opts)
|
||||
expectMsgs := expectMsgsCommand(t, routeExpect)
|
||||
|
||||
// Express multiple interest on this route for foo.
|
||||
routeSend("SUB foo RSID:2:1\r\n")
|
||||
routeSend("SUB foo RSID:2:2\r\n")
|
||||
|
||||
// Send PUB via client connection
|
||||
clientSend("PUB foo 2\r\nok\r\n")
|
||||
|
||||
matches := expectMsgs(1)
|
||||
checkMsg(t, matches[0], "foo", "RSID:2:1", "", "2", "ok")
|
||||
}
|
||||
|
||||
16
test/test.go
16
test/test.go
@@ -3,8 +3,11 @@
|
||||
package test
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"os/exec"
|
||||
"regexp"
|
||||
@@ -195,15 +198,22 @@ func checkSocket(t tLogger, addr string, wait time.Duration) {
|
||||
t.Fatalf("Failed to connect to the socket: %q", addr)
|
||||
}
|
||||
|
||||
func doRouteAuthConnect(t tLogger, c net.Conn, user, pass string) {
|
||||
cs := fmt.Sprintf("CONNECT {\"verbose\":false,\"user\":\"%s\",\"pass\":\"%s\"}\r\n", user, pass)
|
||||
const CONNECT_F = "CONNECT {\"verbose\":false,\"user\":\"%s\",\"pass\":\"%s\",\"name\":\"%s\"}\r\n"
|
||||
|
||||
func doRouteAuthConnect(t tLogger, c net.Conn, user, pass, id string) {
|
||||
cs := fmt.Sprintf(CONNECT_F, user, pass, id)
|
||||
sendProto(t, c, cs)
|
||||
}
|
||||
|
||||
func setupRoute(t tLogger, c net.Conn, opts *server.Options) (sendFun, expectFun) {
|
||||
user := opts.ClusterUsername
|
||||
pass := opts.ClusterPassword
|
||||
doRouteAuthConnect(t, c, user, pass)
|
||||
|
||||
u := make([]byte, 16)
|
||||
io.ReadFull(rand.Reader, u)
|
||||
id := fmt.Sprintf("ROUTER:%s", hex.EncodeToString(u))
|
||||
|
||||
doRouteAuthConnect(t, c, user, pass, id)
|
||||
send := sendCommand(t, c)
|
||||
expect := expectCommand(t, c)
|
||||
return send, expect
|
||||
|
||||
Reference in New Issue
Block a user