mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-14 02:07:59 -07:00
Revise queue msg action
We think it marks a queue subscription via QRSID prefix.
This commit is contained in:
@@ -446,8 +446,7 @@ const (
|
||||
RSID = "RSID"
|
||||
QRSID = "QRSID"
|
||||
|
||||
QRSID_PREFIX = QRSID + ":"
|
||||
QRSID_PREFIX_LEN = len(QRSID_PREFIX)
|
||||
QRSID_LEN = len(QRSID)
|
||||
)
|
||||
|
||||
func (s *Server) routeSidQueueSubscriber(rsid []byte) (*subscription, bool) {
|
||||
@@ -482,17 +481,18 @@ func routeSid(sub *subscription) string {
|
||||
}
|
||||
|
||||
func parseRouteSid(rsid []byte) (uint64, []byte, bool) {
|
||||
if !bytes.HasPrefix(rsid, []byte(QRSID_PREFIX)) {
|
||||
if !bytes.HasPrefix(rsid, []byte(QRSID)) {
|
||||
return 0, nil, false
|
||||
}
|
||||
|
||||
for i := QRSID_PREFIX_LEN; i < len(rsid); i++ {
|
||||
// We don't care what's char of rsid[QRSID_LEN+1], it should be ':'
|
||||
for i, count := QRSID_LEN+1, len(rsid); i < count; i++ {
|
||||
switch rsid[i] {
|
||||
case ':':
|
||||
return uint64(parseInt64(rsid[QRSID_PREFIX_LEN:i])), rsid[i+1:], len(rsid[i+1:]) > 0
|
||||
return uint64(parseInt64(rsid[QRSID_LEN+1 : i])), rsid[i+1:], true
|
||||
}
|
||||
}
|
||||
return 0, nil, false
|
||||
return 0, nil, true
|
||||
}
|
||||
|
||||
func (s *Server) addRoute(c *client, info *Info) (bool, bool) {
|
||||
|
||||
@@ -399,17 +399,12 @@ func TestRouteQueueSemantics(t *testing.T) {
|
||||
routeSend("PING\r\n")
|
||||
routeExpect(pongRe)
|
||||
|
||||
// Should be 3 now, 1 for all normal, and one for specific queue subscriber,
|
||||
// others for normal
|
||||
matches = clientExpectMsgs(5)
|
||||
// Should be 2 now, 1 for all normal, and one for specific queue subscriber.
|
||||
matches = clientExpectMsgs(2)
|
||||
|
||||
// Expect first to be the normal subscriber, next will be the queue one,
|
||||
// others to be the normal subscriber
|
||||
// Expect first to be the normal subscriber, next will be the queue one.
|
||||
checkMsg(t, matches[0], "foo", "1", "", "2", "ok")
|
||||
checkMsg(t, matches[1], "foo", "2", "", "2", "ok")
|
||||
checkMsg(t, matches[2], "foo", "1", "", "2", "ok")
|
||||
checkMsg(t, matches[3], "foo", "1", "", "2", "ok")
|
||||
checkMsg(t, matches[4], "foo", "1", "", "2", "ok")
|
||||
}
|
||||
|
||||
func TestSolicitRouteReconnect(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user