mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Queue Subscriber rework
This commit is contained in:
@@ -158,7 +158,7 @@ func (c *client) readLoop() {
|
||||
}
|
||||
|
||||
func (c *client) traceMsg(msg []byte) {
|
||||
pm := fmt.Sprintf("Processing msg: %d", c.inMsgs)
|
||||
pm := fmt.Sprintf("Processing %s msg: %d", c.typeString(), c.inMsgs)
|
||||
opa := []interface{}{pm, string(c.pa.subject), string(c.pa.reply), string(msg)}
|
||||
Trace(logStr(opa), fmt.Sprintf("c: %d", c.cid))
|
||||
}
|
||||
@@ -248,6 +248,9 @@ func (c *client) processPong() {
|
||||
}
|
||||
|
||||
func (c *client) processMsgArgs(arg []byte) error {
|
||||
if trace > 0 {
|
||||
c.traceOp("MSG", arg)
|
||||
}
|
||||
|
||||
// Unroll splitArgs to avoid runtime/heap issues
|
||||
a := [MAX_MSG_ARGS][]byte{}
|
||||
@@ -592,9 +595,45 @@ func (c *client) processMsg(msg []byte) {
|
||||
isRoute := c.typ == ROUTER
|
||||
var rmap map[string]struct{}
|
||||
|
||||
// If we are a route and we have a queue subscription, deliver direct
|
||||
// since they are sent direct via L2 semantics.
|
||||
if isRoute {
|
||||
if sub := c.srv.routeSidQueueSubscriber(c.pa.sid); sub != nil {
|
||||
mh := c.msgHeader(msgh[:si], sub)
|
||||
c.deliverMsg(sub, mh, msg)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Loop over all subscriptions that match.
|
||||
|
||||
for _, v := range r {
|
||||
sub := v.(*subscription)
|
||||
|
||||
// Process queue group subscriptions by gathering them all up
|
||||
// here. We will pick the winners when we are done processing
|
||||
// all of the subscriptions.
|
||||
if sub.queue != nil {
|
||||
// Queue subscriptions handled from routes directly above.
|
||||
if isRoute {
|
||||
continue
|
||||
}
|
||||
// FIXME(dlc), this can be more efficient
|
||||
if qmap == nil {
|
||||
qmap = make(map[string][]*subscription)
|
||||
}
|
||||
qname := string(sub.queue)
|
||||
qsubs = qmap[qname]
|
||||
if qsubs == nil {
|
||||
qsubs = make([]*subscription, 0, 4)
|
||||
}
|
||||
qsubs = append(qsubs, sub)
|
||||
qmap[qname] = qsubs
|
||||
continue
|
||||
}
|
||||
|
||||
// Process normal, non-queue group subscriptions.
|
||||
|
||||
// If this is a send to a ROUTER, make sure we only send it
|
||||
// once. The other side will handle the appropriate re-processing.
|
||||
// Also enforce 1-Hop.
|
||||
@@ -619,23 +658,10 @@ func (c *client) processMsg(msg []byte) {
|
||||
rmap[sub.client.route.remoteId] = routeSeen
|
||||
}
|
||||
|
||||
if sub.queue != nil {
|
||||
// FIXME(dlc), this can be more efficient
|
||||
if qmap == nil {
|
||||
qmap = make(map[string][]*subscription)
|
||||
}
|
||||
qname := string(sub.queue)
|
||||
qsubs = qmap[qname]
|
||||
if qsubs == nil {
|
||||
qsubs = make([]*subscription, 0, 4)
|
||||
}
|
||||
qsubs = append(qsubs, sub)
|
||||
qmap[qname] = qsubs
|
||||
continue
|
||||
}
|
||||
mh := c.msgHeader(msgh[:si], sub)
|
||||
c.deliverMsg(sub, mh, msg)
|
||||
}
|
||||
|
||||
if qmap != nil {
|
||||
for _, qsubs := range qmap {
|
||||
index := rand.Int() % len(qsubs)
|
||||
|
||||
@@ -3,10 +3,12 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
"regexp"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -94,10 +96,46 @@ const (
|
||||
unsubProto = "UNSUB %s%s" + _CRLF_
|
||||
)
|
||||
|
||||
const RSID = "RSID"
|
||||
// FIXME(dlc) - Make these reserved and reject if they come in as a sid
|
||||
// from a client connection.
|
||||
|
||||
const (
|
||||
RSID = "RSID"
|
||||
QRSID = "QRSID"
|
||||
RSID_CID_INDEX = 1
|
||||
RSID_SID_INDEX = 2
|
||||
EXPECTED_MATCHES = 3
|
||||
)
|
||||
|
||||
// FIXME(dlc) - This may be too slow, check at later date.
|
||||
var qrsidRe = regexp.MustCompile(`QRSID:(\d+):([^\s]+)`)
|
||||
|
||||
func (s *Server) routeSidQueueSubscriber(rsid []byte) *subscription {
|
||||
if !bytes.HasPrefix(rsid, []byte(QRSID)) {
|
||||
return nil
|
||||
}
|
||||
matches := qrsidRe.FindSubmatch(rsid)
|
||||
if matches == nil || len(matches) != EXPECTED_MATCHES {
|
||||
return nil
|
||||
}
|
||||
cid := uint64(parseInt64(matches[RSID_CID_INDEX]))
|
||||
client := s.clients[cid]
|
||||
if client == nil {
|
||||
return nil
|
||||
}
|
||||
sid := matches[RSID_SID_INDEX]
|
||||
if sub, ok := (client.subs.Get(sid)).(*subscription); ok {
|
||||
return sub
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func routeSid(sub *subscription) string {
|
||||
return fmt.Sprintf("%s:%d:%s", RSID, sub.client.cid, sub.sid)
|
||||
var qi string
|
||||
if len(sub.queue) > 0 {
|
||||
qi = "Q"
|
||||
}
|
||||
return fmt.Sprintf("%s%s:%d:%s", qi, RSID, sub.client.cid, sub.sid)
|
||||
}
|
||||
|
||||
func (s *Server) broadcastToRoutes(proto string) {
|
||||
|
||||
@@ -36,6 +36,21 @@ func parseSize(d []byte) (n int) {
|
||||
return n
|
||||
}
|
||||
|
||||
// parseInt64 expects decimal positive numbers. We
|
||||
// return -1 to signal error
|
||||
func parseInt64(d []byte) (n int64) {
|
||||
if len(d) == 0 {
|
||||
return -1
|
||||
}
|
||||
for _, dec := range d {
|
||||
if dec < ascii_0 || dec > ascii_9 {
|
||||
return -1
|
||||
}
|
||||
n = n*10 + (int64(dec) - ascii_0)
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func secondsToDuration(seconds float64) time.Duration {
|
||||
ttl := seconds * float64(time.Second)
|
||||
return time.Duration(ttl)
|
||||
|
||||
@@ -237,6 +237,8 @@ func TestRouteQueueSemantics(t *testing.T) {
|
||||
|
||||
client := createClientConn(t, opts.Host, opts.Port)
|
||||
clientSend, clientExpect := setupConn(t, client)
|
||||
clientExpectMsgs := expectMsgsCommand(t, clientExpect)
|
||||
|
||||
defer client.Close()
|
||||
|
||||
route := createRouteConn(t, opts.ClusterHost, opts.ClusterPort)
|
||||
@@ -245,41 +247,78 @@ func TestRouteQueueSemantics(t *testing.T) {
|
||||
expectMsgs := expectMsgsCommand(t, routeExpect)
|
||||
|
||||
// Express multiple interest on this route for foo, queue group bar.
|
||||
routeSend("SUB foo bar RSID:2:1\r\n")
|
||||
routeSend("SUB foo bar RSID:2:2\r\n")
|
||||
qrsid1 := "RSID:2:1"
|
||||
routeSend(fmt.Sprintf("SUB foo bar %s\r\n", qrsid1))
|
||||
qrsid2 := "RSID:2:2"
|
||||
routeSend(fmt.Sprintf("SUB foo bar %s\r\n", qrsid2))
|
||||
|
||||
// Use ping roundtrip to make sure its processed.
|
||||
routeSend("PING\r\n")
|
||||
routeExpect(pongRe)
|
||||
|
||||
// Send PUB via client connection
|
||||
clientSend("PUB foo 2\r\nok\r\n")
|
||||
|
||||
// Only 1
|
||||
matches := expectMsgs(1)
|
||||
checkMsg(t, matches[0], "foo", "RSID:2:1", "", "2", "ok")
|
||||
|
||||
// Normal Interest as well.
|
||||
routeSend("SUB foo RSID:2:1\r\n")
|
||||
|
||||
// Send PUB via client connection
|
||||
clientSend("PUB foo 2\r\nok\r\n")
|
||||
|
||||
// Still only 1
|
||||
expectMsgs(1)
|
||||
|
||||
// Subscribe to foo on client
|
||||
clientSend("SUB foo bar 1\r\n")
|
||||
// Use ping roundtrip to make sure its processed.
|
||||
clientSend("PING\r\n")
|
||||
clientExpect(pongRe)
|
||||
|
||||
// Receive notification on route
|
||||
routeExpect(subRe)
|
||||
// Only 1
|
||||
matches := expectMsgs(1)
|
||||
checkMsg(t, matches[0], "foo", "", "", "2", "ok")
|
||||
|
||||
// Add normal Interest as well to route interest.
|
||||
routeSend("SUB foo RSID:2:4\r\n")
|
||||
|
||||
// Use ping roundtrip to make sure its processed.
|
||||
routeSend("PING\r\n")
|
||||
routeExpect(pongRe)
|
||||
|
||||
// Send PUB via client connection
|
||||
clientSend("PUB foo 2\r\nok\r\n")
|
||||
// Use ping roundtrip to make sure its processed.
|
||||
clientSend("PING\r\n")
|
||||
clientExpect(pongRe)
|
||||
|
||||
// Still only 1 for route
|
||||
expectMsgs(1)
|
||||
// Should be 2 now, 1 for all normal, and one for specific queue subscriber.
|
||||
matches = expectMsgs(2)
|
||||
|
||||
// We could get one on client
|
||||
// Expect first to be the normal subscriber, next will be the queue one.
|
||||
checkMsg(t, matches[0], "foo", "RSID:2:4", "", "2", "ok")
|
||||
checkMsg(t, matches[1], "foo", "", "", "2", "ok")
|
||||
|
||||
// Check the rsid to verify it is one of the queue group subscribers.
|
||||
rsid := string(matches[1][SID_INDEX])
|
||||
if rsid != qrsid1 && rsid != qrsid2 {
|
||||
t.Fatalf("Expected a queue group rsid, got %s\n", rsid)
|
||||
}
|
||||
|
||||
// Now create a queue subscription for the client as well as a normal one.
|
||||
clientSend("SUB foo 1\r\n")
|
||||
// Use ping roundtrip to make sure its processed.
|
||||
clientSend("PING\r\n")
|
||||
clientExpect(pongRe)
|
||||
routeExpect(subRe)
|
||||
|
||||
clientSend("SUB foo bar 2\r\n")
|
||||
// Use ping roundtrip to make sure its processed.
|
||||
clientSend("PING\r\n")
|
||||
clientExpect(pongRe)
|
||||
routeExpect(subRe)
|
||||
|
||||
// Deliver a MSG from the route itself, make sure the client receives both.
|
||||
routeSend("MSG foo RSID:2:1 2\r\nok\r\n")
|
||||
// Queue group one.
|
||||
routeSend("MSG foo QRSID:2:2 2\r\nok\r\n")
|
||||
|
||||
// Use ping roundtrip to make sure its processed.
|
||||
routeSend("PING\r\n")
|
||||
routeExpect(pongRe)
|
||||
|
||||
// Should be 2 now, 1 for all normal, and one for specific queue subscriber.
|
||||
matches = clientExpectMsgs(2)
|
||||
// Expect first to be the normal subscriber, next will be the queue one.
|
||||
checkMsg(t, matches[0], "foo", "1", "", "2", "ok")
|
||||
checkMsg(t, matches[1], "foo", "2", "", "2", "ok")
|
||||
}
|
||||
|
||||
func TestSolicitRouteReconnect(t *testing.T) {
|
||||
|
||||
@@ -285,7 +285,7 @@ var expBuf = make([]byte, 32768)
|
||||
// Test result from server against regexp
|
||||
func expectResult(t tLogger, c net.Conn, re *regexp.Regexp) []byte {
|
||||
// Wait for commands to be processed and results queued for read
|
||||
c.SetReadDeadline(time.Now().Add(1 * time.Second))
|
||||
c.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
defer c.SetReadDeadline(time.Time{})
|
||||
|
||||
n, err := c.Read(expBuf)
|
||||
@@ -315,7 +315,7 @@ func checkMsg(t tLogger, m [][]byte, subject, sid, reply, len, msg string) {
|
||||
if string(m[SUB_INDEX]) != subject {
|
||||
stackFatalf(t, "Did not get correct subject: expected '%s' got '%s'\n", subject, m[SUB_INDEX])
|
||||
}
|
||||
if string(m[SID_INDEX]) != sid {
|
||||
if sid != "" && string(m[SID_INDEX]) != sid {
|
||||
stackFatalf(t, "Did not get correct sid: expected '%s' got '%s'\n", sid, m[SID_INDEX])
|
||||
}
|
||||
if string(m[REPLY_INDEX]) != reply {
|
||||
|
||||
Reference in New Issue
Block a user