diff --git a/server/client.go b/server/client.go index abf26d97..f5dc3923 100644 --- a/server/client.go +++ b/server/client.go @@ -158,7 +158,7 @@ func (c *client) readLoop() { } func (c *client) traceMsg(msg []byte) { - pm := fmt.Sprintf("Processing msg: %d", c.inMsgs) + pm := fmt.Sprintf("Processing %s msg: %d", c.typeString(), c.inMsgs) opa := []interface{}{pm, string(c.pa.subject), string(c.pa.reply), string(msg)} Trace(logStr(opa), fmt.Sprintf("c: %d", c.cid)) } @@ -248,6 +248,9 @@ func (c *client) processPong() { } func (c *client) processMsgArgs(arg []byte) error { + if trace > 0 { + c.traceOp("MSG", arg) + } // Unroll splitArgs to avoid runtime/heap issues a := [MAX_MSG_ARGS][]byte{} @@ -592,9 +595,45 @@ func (c *client) processMsg(msg []byte) { isRoute := c.typ == ROUTER var rmap map[string]struct{} + // If we are a route and we have a queue subscription, deliver direct + // since they are sent direct via L2 semantics. + if isRoute { + if sub := c.srv.routeSidQueueSubscriber(c.pa.sid); sub != nil { + mh := c.msgHeader(msgh[:si], sub) + c.deliverMsg(sub, mh, msg) + return + } + } + + // Loop over all subscriptions that match. + for _, v := range r { sub := v.(*subscription) + // Process queue group subscriptions by gathering them all up + // here. We will pick the winners when we are done processing + // all of the subscriptions. + if sub.queue != nil { + // Queue subscriptions handled from routes directly above. + if isRoute { + continue + } + // FIXME(dlc), this can be more efficient + if qmap == nil { + qmap = make(map[string][]*subscription) + } + qname := string(sub.queue) + qsubs = qmap[qname] + if qsubs == nil { + qsubs = make([]*subscription, 0, 4) + } + qsubs = append(qsubs, sub) + qmap[qname] = qsubs + continue + } + + // Process normal, non-queue group subscriptions. + // If this is a send to a ROUTER, make sure we only send it // once. The other side will handle the appropriate re-processing. // Also enforce 1-Hop. @@ -619,23 +658,10 @@ func (c *client) processMsg(msg []byte) { rmap[sub.client.route.remoteId] = routeSeen } - if sub.queue != nil { - // FIXME(dlc), this can be more efficient - if qmap == nil { - qmap = make(map[string][]*subscription) - } - qname := string(sub.queue) - qsubs = qmap[qname] - if qsubs == nil { - qsubs = make([]*subscription, 0, 4) - } - qsubs = append(qsubs, sub) - qmap[qname] = qsubs - continue - } mh := c.msgHeader(msgh[:si], sub) c.deliverMsg(sub, mh, msg) } + if qmap != nil { for _, qsubs := range qmap { index := rand.Int() % len(qsubs) diff --git a/server/route.go b/server/route.go index 74a808e7..8e5784c0 100644 --- a/server/route.go +++ b/server/route.go @@ -3,10 +3,12 @@ package server import ( + "bytes" "encoding/json" "fmt" "net" "net/url" + "regexp" "time" ) @@ -94,10 +96,46 @@ const ( unsubProto = "UNSUB %s%s" + _CRLF_ ) -const RSID = "RSID" +// FIXME(dlc) - Make these reserved and reject if they come in as a sid +// from a client connection. + +const ( + RSID = "RSID" + QRSID = "QRSID" + RSID_CID_INDEX = 1 + RSID_SID_INDEX = 2 + EXPECTED_MATCHES = 3 +) + +// FIXME(dlc) - This may be too slow, check at later date. +var qrsidRe = regexp.MustCompile(`QRSID:(\d+):([^\s]+)`) + +func (s *Server) routeSidQueueSubscriber(rsid []byte) *subscription { + if !bytes.HasPrefix(rsid, []byte(QRSID)) { + return nil + } + matches := qrsidRe.FindSubmatch(rsid) + if matches == nil || len(matches) != EXPECTED_MATCHES { + return nil + } + cid := uint64(parseInt64(matches[RSID_CID_INDEX])) + client := s.clients[cid] + if client == nil { + return nil + } + sid := matches[RSID_SID_INDEX] + if sub, ok := (client.subs.Get(sid)).(*subscription); ok { + return sub + } + return nil +} func routeSid(sub *subscription) string { - return fmt.Sprintf("%s:%d:%s", RSID, sub.client.cid, sub.sid) + var qi string + if len(sub.queue) > 0 { + qi = "Q" + } + return fmt.Sprintf("%s%s:%d:%s", qi, RSID, sub.client.cid, sub.sid) } func (s *Server) broadcastToRoutes(proto string) { diff --git a/server/util.go b/server/util.go index edf56d9d..840359b2 100644 --- a/server/util.go +++ b/server/util.go @@ -36,6 +36,21 @@ func parseSize(d []byte) (n int) { return n } +// parseInt64 expects decimal positive numbers. We +// return -1 to signal error +func parseInt64(d []byte) (n int64) { + if len(d) == 0 { + return -1 + } + for _, dec := range d { + if dec < ascii_0 || dec > ascii_9 { + return -1 + } + n = n*10 + (int64(dec) - ascii_0) + } + return n +} + func secondsToDuration(seconds float64) time.Duration { ttl := seconds * float64(time.Second) return time.Duration(ttl) diff --git a/test/routes_test.go b/test/routes_test.go index aef9ec90..8737f4e5 100644 --- a/test/routes_test.go +++ b/test/routes_test.go @@ -237,6 +237,8 @@ func TestRouteQueueSemantics(t *testing.T) { client := createClientConn(t, opts.Host, opts.Port) clientSend, clientExpect := setupConn(t, client) + clientExpectMsgs := expectMsgsCommand(t, clientExpect) + defer client.Close() route := createRouteConn(t, opts.ClusterHost, opts.ClusterPort) @@ -245,41 +247,78 @@ func TestRouteQueueSemantics(t *testing.T) { expectMsgs := expectMsgsCommand(t, routeExpect) // Express multiple interest on this route for foo, queue group bar. - routeSend("SUB foo bar RSID:2:1\r\n") - routeSend("SUB foo bar RSID:2:2\r\n") + qrsid1 := "RSID:2:1" + routeSend(fmt.Sprintf("SUB foo bar %s\r\n", qrsid1)) + qrsid2 := "RSID:2:2" + routeSend(fmt.Sprintf("SUB foo bar %s\r\n", qrsid2)) + + // Use ping roundtrip to make sure its processed. + routeSend("PING\r\n") + routeExpect(pongRe) // Send PUB via client connection clientSend("PUB foo 2\r\nok\r\n") - - // Only 1 - matches := expectMsgs(1) - checkMsg(t, matches[0], "foo", "RSID:2:1", "", "2", "ok") - - // Normal Interest as well. - routeSend("SUB foo RSID:2:1\r\n") - - // Send PUB via client connection - clientSend("PUB foo 2\r\nok\r\n") - - // Still only 1 - expectMsgs(1) - - // Subscribe to foo on client - clientSend("SUB foo bar 1\r\n") // Use ping roundtrip to make sure its processed. clientSend("PING\r\n") clientExpect(pongRe) - // Receive notification on route - routeExpect(subRe) + // Only 1 + matches := expectMsgs(1) + checkMsg(t, matches[0], "foo", "", "", "2", "ok") + + // Add normal Interest as well to route interest. + routeSend("SUB foo RSID:2:4\r\n") + + // Use ping roundtrip to make sure its processed. + routeSend("PING\r\n") + routeExpect(pongRe) // Send PUB via client connection clientSend("PUB foo 2\r\nok\r\n") + // Use ping roundtrip to make sure its processed. + clientSend("PING\r\n") + clientExpect(pongRe) - // Still only 1 for route - expectMsgs(1) + // Should be 2 now, 1 for all normal, and one for specific queue subscriber. + matches = expectMsgs(2) - // We could get one on client + // Expect first to be the normal subscriber, next will be the queue one. + checkMsg(t, matches[0], "foo", "RSID:2:4", "", "2", "ok") + checkMsg(t, matches[1], "foo", "", "", "2", "ok") + + // Check the rsid to verify it is one of the queue group subscribers. + rsid := string(matches[1][SID_INDEX]) + if rsid != qrsid1 && rsid != qrsid2 { + t.Fatalf("Expected a queue group rsid, got %s\n", rsid) + } + + // Now create a queue subscription for the client as well as a normal one. + clientSend("SUB foo 1\r\n") + // Use ping roundtrip to make sure its processed. + clientSend("PING\r\n") + clientExpect(pongRe) + routeExpect(subRe) + + clientSend("SUB foo bar 2\r\n") + // Use ping roundtrip to make sure its processed. + clientSend("PING\r\n") + clientExpect(pongRe) + routeExpect(subRe) + + // Deliver a MSG from the route itself, make sure the client receives both. + routeSend("MSG foo RSID:2:1 2\r\nok\r\n") + // Queue group one. + routeSend("MSG foo QRSID:2:2 2\r\nok\r\n") + + // Use ping roundtrip to make sure its processed. + routeSend("PING\r\n") + routeExpect(pongRe) + + // Should be 2 now, 1 for all normal, and one for specific queue subscriber. + matches = clientExpectMsgs(2) + // Expect first to be the normal subscriber, next will be the queue one. + checkMsg(t, matches[0], "foo", "1", "", "2", "ok") + checkMsg(t, matches[1], "foo", "2", "", "2", "ok") } func TestSolicitRouteReconnect(t *testing.T) { diff --git a/test/test.go b/test/test.go index dfe7be57..8e6a6f12 100644 --- a/test/test.go +++ b/test/test.go @@ -285,7 +285,7 @@ var expBuf = make([]byte, 32768) // Test result from server against regexp func expectResult(t tLogger, c net.Conn, re *regexp.Regexp) []byte { // Wait for commands to be processed and results queued for read - c.SetReadDeadline(time.Now().Add(1 * time.Second)) + c.SetReadDeadline(time.Now().Add(2 * time.Second)) defer c.SetReadDeadline(time.Time{}) n, err := c.Read(expBuf) @@ -315,7 +315,7 @@ func checkMsg(t tLogger, m [][]byte, subject, sid, reply, len, msg string) { if string(m[SUB_INDEX]) != subject { stackFatalf(t, "Did not get correct subject: expected '%s' got '%s'\n", subject, m[SUB_INDEX]) } - if string(m[SID_INDEX]) != sid { + if sid != "" && string(m[SID_INDEX]) != sid { stackFatalf(t, "Did not get correct sid: expected '%s' got '%s'\n", sid, m[SID_INDEX]) } if string(m[REPLY_INDEX]) != reply {