From c61412a862d996815e1abdeddc9feee5b3b65b8a Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 29 Jul 2013 18:13:32 -0700 Subject: [PATCH] Only send over route once --- server/client.go | 55 ++++++++++++++++++++++++++++++++------------- test/routes_test.go | 27 +++++++++++++++++++++- test/test.go | 16 ++++++++++--- 3 files changed, 79 insertions(+), 19 deletions(-) diff --git a/server/client.go b/server/client.go index 0ca5c806..541e8a88 100644 --- a/server/client.go +++ b/server/client.go @@ -181,17 +181,23 @@ func (c *client) processConnect(arg []byte) error { // so we can just clear it here. c.clearAuthTimer() - // FIXME, check err if err := json.Unmarshal(arg, &c.opts); err != nil { return err } - // Check for Auth + if c.srv != nil { + // Check for Auth if ok := c.srv.checkAuth(c); !ok { c.sendErr("Authorization is Required") return fmt.Errorf("Authorization Error") } } + + // Copy over name if router. + if c.typ == ROUTER && c.route != nil { + c.route.remoteId = c.opts.Name + } + if c.opts.Verbose { c.sendOK() } @@ -456,11 +462,9 @@ func (c *client) msgHeader(mh []byte, sub *subscription) []byte { return mh } -// FIXME(dlc) extra type might negate.. -// Used to treat map as efficient set -type empty struct{} - -var needFlush = empty{} +// Used to treat maps as efficient set +var needFlush = struct{}{} +var routeSeen = struct{}{} func (c *client) deliverMsg(sub *subscription, mh, msg []byte) { if sub.client == nil { @@ -549,9 +553,12 @@ func (c *client) processMsg(msg []byte) { c.inMsgs++ c.inBytes += int64(len(msg)) - if c.srv != nil { - atomic.AddInt64(&c.srv.inMsgs, 1) - atomic.AddInt64(&c.srv.inBytes, int64(len(msg))) + // Snapshot server. + srv := c.srv + + if srv != nil { + atomic.AddInt64(&srv.inMsgs, 1) + atomic.AddInt64(&srv.inBytes, int64(len(msg))) } if trace > 0 { @@ -583,14 +590,33 @@ func (c *client) processMsg(msg []byte) { var qsubs []*subscription isRoute := c.typ == ROUTER + var rmap map[string]struct{} for _, v := range r { sub := v.(*subscription) - // Skip if sourced from a ROUTER and going to another ROUYTER. - // This is one-hop semantics for ROUTERs. - if isRoute && sub.client.typ == ROUTER { - continue + // If this is a send to a ROUTER, make sure we only send it + // once. The other side will handle the appropriate re-processing. + // Also enforce 1-Hop. + if sub.client.typ == ROUTER { + // Skip if sourced from a ROUTER and going to another ROUTER. + // This is 1-Hop semantics for ROUTERs. + if isRoute { + continue + } + // Check to see if we have already sent it here. + if rmap == nil { + rmap = make(map[string]struct{}, len(srv.routes)) + } + + if sub.client == nil || sub.client.route == nil || sub.client.route.remoteId == "" { + Debug("Bad or Missing ROUTER Identity, not processing msg", clientConnStr(c.nc), c.cid) + continue + } + if _, ok := rmap[sub.client.route.remoteId]; ok { + continue + } + rmap[sub.client.route.remoteId] = routeSeen } if sub.queue != nil { @@ -598,7 +624,6 @@ func (c *client) processMsg(msg []byte) { if qmap == nil { qmap = make(map[string][]*subscription) } - //qname := *(*string)(unsafe.Pointer(&sub.queue)) qname := string(sub.queue) qsubs = qmap[qname] if qsubs == nil { diff --git a/test/routes_test.go b/test/routes_test.go index c16908ec..92e523b5 100644 --- a/test/routes_test.go +++ b/test/routes_test.go @@ -205,4 +205,29 @@ func TestRouteOneHopSemantics(t *testing.T) { // Make sure it does not come back! expectNothing(t, route) -} \ No newline at end of file +} + +func TestRouteOnlySendOnce(t *testing.T) { + s, opts := runRouteServer(t) + defer s.Shutdown() + + client := createClientConn(t, opts.Host, opts.Port) + defer client.Close() + + clientSend, _ := setupConn(t, client) + + route := createRouteConn(t, opts.ClusterHost, opts.ClusterPort) + expectAuthRequired(t, route) + routeSend, routeExpect := setupRoute(t, route, opts) + expectMsgs := expectMsgsCommand(t, routeExpect) + + // Express multiple interest on this route for foo. + routeSend("SUB foo RSID:2:1\r\n") + routeSend("SUB foo RSID:2:2\r\n") + + // Send PUB via client connection + clientSend("PUB foo 2\r\nok\r\n") + + matches := expectMsgs(1) + checkMsg(t, matches[0], "foo", "RSID:2:1", "", "2", "ok") +} diff --git a/test/test.go b/test/test.go index 4c0065e6..1bc6ca13 100644 --- a/test/test.go +++ b/test/test.go @@ -3,8 +3,11 @@ package test import ( + "crypto/rand" + "encoding/hex" "encoding/json" "fmt" + "io" "net" "os/exec" "regexp" @@ -195,15 +198,22 @@ func checkSocket(t tLogger, addr string, wait time.Duration) { t.Fatalf("Failed to connect to the socket: %q", addr) } -func doRouteAuthConnect(t tLogger, c net.Conn, user, pass string) { - cs := fmt.Sprintf("CONNECT {\"verbose\":false,\"user\":\"%s\",\"pass\":\"%s\"}\r\n", user, pass) +const CONNECT_F = "CONNECT {\"verbose\":false,\"user\":\"%s\",\"pass\":\"%s\",\"name\":\"%s\"}\r\n" + +func doRouteAuthConnect(t tLogger, c net.Conn, user, pass, id string) { + cs := fmt.Sprintf(CONNECT_F, user, pass, id) sendProto(t, c, cs) } func setupRoute(t tLogger, c net.Conn, opts *server.Options) (sendFun, expectFun) { user := opts.ClusterUsername pass := opts.ClusterPassword - doRouteAuthConnect(t, c, user, pass) + + u := make([]byte, 16) + io.ReadFull(rand.Reader, u) + id := fmt.Sprintf("ROUTER:%s", hex.EncodeToString(u)) + + doRouteAuthConnect(t, c, user, pass, id) send := sendCommand(t, c) expect := expectCommand(t, c) return send, expect