mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
Allow service import to work with Gateways
This is not complete solution and is a bit hacky but is a start to be able to have service import work at least in some basic cases. Also fixed a bug where replySub would not be removed from connection's list of subs after delivery. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -640,9 +640,6 @@ func (c *client) readLoop() {
|
||||
s := c.srv
|
||||
c.in.rsz = startBufSize
|
||||
defer s.grWG.Done()
|
||||
if c.gw != nil && c.gw.outbound {
|
||||
defer c.gatewayOutboundConnectionReadLoopExited()
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
if nc == nil {
|
||||
@@ -2121,6 +2118,8 @@ func (c *client) processInboundClientMsg(msg []byte) {
|
||||
var qa [16][]byte
|
||||
queues := qa[:0]
|
||||
|
||||
var replySub *subscription
|
||||
|
||||
// Check for no interest, short circuit if so.
|
||||
// This is the fanout scale.
|
||||
if len(r.psubs)+len(r.qsubs) > 0 {
|
||||
@@ -2133,12 +2132,20 @@ func (c *client) processInboundClientMsg(msg []byte) {
|
||||
atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
|
||||
qnames = &queues
|
||||
}
|
||||
c.processMsgResults(c.acc, r, msg, c.pa.subject, c.pa.reply, qnames)
|
||||
replySub = c.processMsgResults(c.acc, r, msg, c.pa.subject, c.pa.reply, qnames)
|
||||
}
|
||||
|
||||
// Now deal with gateways
|
||||
if c.srv.gateway.enabled {
|
||||
c.sendMsgToGateways(msg, queues)
|
||||
// TODO(ik): Need to revisit all that.
|
||||
// If replySub is not nil it means that this is
|
||||
// a reply sent on the _R_ subject and associated with
|
||||
// an outbound connection. Send direct here.
|
||||
if replySub != nil {
|
||||
c.sendReplyMsgDirectToGateway(c.acc, replySub, msg)
|
||||
} else {
|
||||
c.sendMsgToGateways(c.acc, msg, c.pa.subject, c.pa.reply, queues)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2167,6 +2174,11 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) {
|
||||
// FIXME(dlc) - Do L1 cache trick from above.
|
||||
rr := rm.acc.sl.Match(rm.to)
|
||||
c.processMsgResults(rm.acc, rr, msg, []byte(rm.to), nrr, nil)
|
||||
// If this is not a gateway connection but gateway is enabled,
|
||||
// try to send this converted message to all gateways.
|
||||
if c.kind != GATEWAY && c.srv.gateway.enabled {
|
||||
c.sendMsgToGateways(rm.acc, msg, []byte(rm.to), nrr, nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2206,7 +2218,10 @@ func (c *client) addSubToRouteTargets(sub *subscription) {
|
||||
}
|
||||
|
||||
// This processes the sublist results for a given message.
|
||||
func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, reply []byte, queues *[][]byte) {
|
||||
// If the incoming message was for a service reply (subject starts with `_R_.`)
|
||||
// and the sub's bound client is a gateway (will only be 1), then return
|
||||
// this subscription so that it can be sent direct to GW.
|
||||
func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, reply []byte, queues *[][]byte) (replySub *subscription) {
|
||||
// msg header for clients.
|
||||
msgh := c.msgb[1:msgHeadProtoLen]
|
||||
msgh = append(msgh, subject...)
|
||||
@@ -2231,6 +2246,14 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
|
||||
continue
|
||||
} else if sub.client.kind == GATEWAY {
|
||||
// Never send to gateway from here.
|
||||
if c.kind == GATEWAY {
|
||||
continue
|
||||
}
|
||||
// The only case we can be here is if this message
|
||||
// is a reply sent on the _R_.xxx subject. We don't
|
||||
// send here, return this sub so that processInboundClientMsg
|
||||
// can send direct to GW.
|
||||
replySub = sub
|
||||
continue
|
||||
}
|
||||
// Check for stream import mapped subs. These apply to local subs only.
|
||||
@@ -2384,6 +2407,7 @@ sendToRoutes:
|
||||
mh = append(mh, _CRLF_...)
|
||||
c.deliverMsg(rt.sub, mh, msg)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (c *client) pubPermissionViolation(subject []byte) {
|
||||
|
||||
@@ -121,10 +121,11 @@ type gateway struct {
|
||||
name string
|
||||
outbound bool
|
||||
cfg *gatewayCfg
|
||||
connectURL *url.URL // Needed when sending CONNECT after receiving INFO from remote
|
||||
infoJSON []byte // Needed when sending INFO after receiving INFO from remote
|
||||
outsim *sync.Map // Per-account subject interest (or no-interest) (outbound conn)
|
||||
insim map[string]*insie // Per-account subject no-interest sent or send-all-subs mode (inbound conn)
|
||||
connectURL *url.URL // Needed when sending CONNECT after receiving INFO from remote
|
||||
infoJSON []byte // Needed when sending INFO after receiving INFO from remote
|
||||
outsim *sync.Map // Per-account subject interest (or no-interest) (outbound conn)
|
||||
insim map[string]*insie // Per-account subject no-interest sent or send-all-subs mode (inbound conn)
|
||||
replySubs map[*subscription]*time.Timer // Same than replySubs in client.route
|
||||
}
|
||||
|
||||
// Outbound subject interest entry.
|
||||
@@ -1320,7 +1321,7 @@ func (s *Server) getAllGatewayConnections(conns map[uint64]*client) {
|
||||
}
|
||||
|
||||
// Register the given gateway connection (*client) in the inbound gateways
|
||||
// map with the given name as the key.
|
||||
// map. The key is the connection ID (like for clients and routes).
|
||||
func (s *Server) registerInboundGatewayConnection(cid uint64, gwc *client) {
|
||||
s.gateway.Lock()
|
||||
s.gateway.in[cid] = gwc
|
||||
@@ -1384,25 +1385,6 @@ func (s *Server) getInboundGatewayConnections(a *[]*client) {
|
||||
s.gateway.RUnlock()
|
||||
}
|
||||
|
||||
// Returns the inbound gateway connection for the given gateway name,
|
||||
// or nil if it does not exist.
|
||||
func (s *Server) getInboundGatewayConnection(name string) *client {
|
||||
var c *client
|
||||
s.gateway.RLock()
|
||||
defer s.gateway.RUnlock()
|
||||
for _, gwc := range s.gateway.in {
|
||||
gwc.mu.Lock()
|
||||
if gwc.gw.name == name {
|
||||
c = gwc
|
||||
}
|
||||
gwc.mu.Unlock()
|
||||
if c != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
// This is invoked when a gateway connection is closed and the server
|
||||
// is removing this connection from its state.
|
||||
func (s *Server) removeRemoteGatewayConnection(c *client) {
|
||||
@@ -1436,21 +1418,30 @@ func (s *Server) removeRemoteGatewayConnection(c *client) {
|
||||
}
|
||||
gw.Unlock()
|
||||
s.removeFromTempClients(cid)
|
||||
}
|
||||
|
||||
// This allows some cleanup with guarantee that readloop has
|
||||
// exited and so no protocol message is being processed.
|
||||
func (c *client) gatewayOutboundConnectionReadLoopExited() {
|
||||
qSubsRemoved := int64(0)
|
||||
c.mu.Lock()
|
||||
for _, sub := range c.subs {
|
||||
if sub.queue != nil {
|
||||
qSubsRemoved++
|
||||
if isOutbound {
|
||||
var subsa [1024]*subscription
|
||||
var subs = subsa[:0]
|
||||
|
||||
// Update number of totalQSubs for this gateway
|
||||
qSubsRemoved := int64(0)
|
||||
c.mu.Lock()
|
||||
for _, sub := range c.subs {
|
||||
if sub.queue != nil {
|
||||
qSubsRemoved++
|
||||
} else if sub.qw > 0 {
|
||||
// Hack to track _R_ reply subs that need
|
||||
// removal.
|
||||
subs = append(subs, sub)
|
||||
}
|
||||
}
|
||||
c.mu.Unlock()
|
||||
for _, sub := range subs {
|
||||
c.removeReplySub(sub)
|
||||
}
|
||||
// Update total count of qsubs in remote gateways.
|
||||
atomic.AddInt64(&c.srv.gateway.totalQSubs, qSubsRemoved*-1)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
// Update total count of qsubs in remote gateways.
|
||||
atomic.AddInt64(&c.srv.gateway.totalQSubs, qSubsRemoved*-1)
|
||||
}
|
||||
|
||||
// GatewayAddr returns the net.Addr object for the gateway listener.
|
||||
@@ -1520,6 +1511,8 @@ func (c *client) processGatewayRUnsub(arg []byte) error {
|
||||
// the sublist. Look for it and remove.
|
||||
if useSl {
|
||||
key := arg
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
// m[string()] does not cause mem allocation
|
||||
sub, ok := c.subs[string(key)]
|
||||
// if RS- for a sub that we don't have, just ignore.
|
||||
@@ -1605,6 +1598,8 @@ func (c *client) processGatewayRSub(arg []byte) error {
|
||||
} else {
|
||||
key = arg
|
||||
}
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
// If RS+ for a sub that we already have, ignore.
|
||||
// (m[string()] does not allocate memory)
|
||||
if _, ok := c.subs[string(key)]; ok {
|
||||
@@ -1913,12 +1908,28 @@ func (s *Server) gatewayUpdateSubInterest(accName string, sub *subscription, cha
|
||||
}
|
||||
}
|
||||
|
||||
// Invoked by a PUB's connection to send a reply message on _R_ directly
|
||||
// to the outbound gateway connection (that is referenced in sub.client)
|
||||
func (c *client) sendReplyMsgDirectToGateway(acc *Account, sub *subscription, msg []byte) {
|
||||
mh := c.msgb[:msgHeadProtoLen]
|
||||
mh = append(mh, acc.Name...)
|
||||
mh = append(mh, ' ')
|
||||
mh = append(mh, sub.subject...)
|
||||
mh = append(mh, ' ')
|
||||
mh = append(mh, c.pa.szb...)
|
||||
mh = append(mh, CR_LF...)
|
||||
c.deliverMsg(sub, mh, msg)
|
||||
// cleanup, use sub.client here, not `c` which is not the
|
||||
// gateway connection but instead a PUB's connection.
|
||||
sub.client.removeReplySub(sub)
|
||||
}
|
||||
|
||||
// May send a message to all outbound gateways. It is possible
|
||||
// that message is not sent to a given gateway if for instance
|
||||
// it is known that this gateway has no interest in account or
|
||||
// subject, etc..
|
||||
// <Invoked from any client connection's readLoop>
|
||||
func (c *client) sendMsgToGateways(msg []byte, qgroups [][]byte) {
|
||||
func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgroups [][]byte) {
|
||||
gwsa := [4]*client{}
|
||||
gws := gwsa[:0]
|
||||
// This is in fast path, so avoid calling function when possible.
|
||||
@@ -1934,10 +1945,10 @@ func (c *client) sendMsgToGateways(msg []byte, qgroups [][]byte) {
|
||||
return
|
||||
}
|
||||
var (
|
||||
subj = string(c.pa.subject)
|
||||
subj = string(subject)
|
||||
queuesa = [512]byte{}
|
||||
queues = queuesa[:0]
|
||||
accName = c.acc.Name
|
||||
accName = acc.Name
|
||||
)
|
||||
for i := 0; i < len(gws); i++ {
|
||||
gwc := gws[i]
|
||||
@@ -1973,24 +1984,24 @@ func (c *client) sendMsgToGateways(msg []byte, qgroups [][]byte) {
|
||||
mh := c.msgb[:msgHeadProtoLen]
|
||||
mh = append(mh, accName...)
|
||||
mh = append(mh, ' ')
|
||||
mh = append(mh, c.pa.subject...)
|
||||
mh = append(mh, subject...)
|
||||
mh = append(mh, ' ')
|
||||
if len(queues) > 0 {
|
||||
if c.pa.reply != nil {
|
||||
if reply != nil {
|
||||
mh = append(mh, "+ "...) // Signal that there is a reply.
|
||||
mh = append(mh, c.pa.reply...)
|
||||
mh = append(mh, reply...)
|
||||
mh = append(mh, ' ')
|
||||
} else {
|
||||
mh = append(mh, "| "...) // Only queues
|
||||
}
|
||||
mh = append(mh, queues...)
|
||||
} else if c.pa.reply != nil {
|
||||
mh = append(mh, c.pa.reply...)
|
||||
} else if reply != nil {
|
||||
mh = append(mh, reply...)
|
||||
mh = append(mh, ' ')
|
||||
}
|
||||
mh = append(mh, c.pa.szb...)
|
||||
mh = append(mh, CR_LF...)
|
||||
sub := subscription{client: gwc}
|
||||
sub := subscription{client: gwc, subject: c.pa.subject}
|
||||
c.deliverMsg(&sub, mh, msg)
|
||||
}
|
||||
}
|
||||
@@ -2107,15 +2118,30 @@ func (c *client) processInboundGatewayMsg(msg []byte) {
|
||||
// Copy off the reply since otherwise we are referencing a buffer that will be reused.
|
||||
reply := make([]byte, len(c.pa.reply))
|
||||
copy(reply, c.pa.reply)
|
||||
sub := &subscription{client: c, subject: reply, sid: sid, max: 1}
|
||||
// If the connection is a GW connection, it is necessarily an
|
||||
// inbound connection. We will switch the the outbound GW connectiom
|
||||
// instead.
|
||||
conn := c
|
||||
sub := &subscription{client: conn, subject: reply, sid: sid, max: 1}
|
||||
if c.srv.gateway.enabled && c.gw != nil {
|
||||
conn = c.srv.getOutboundGatewayConnection(c.gw.name)
|
||||
if conn == nil {
|
||||
c.Errorf("Did not find outbound connection for %q", c.gw.name)
|
||||
} else {
|
||||
sub.client = conn
|
||||
// TODO(ik): Biggest hack ever, since this is not a queue
|
||||
// use `qw` to mark this sub for cleanup on connection close.
|
||||
sub.qw = 1
|
||||
}
|
||||
}
|
||||
if err := acc.sl.Insert(sub); err != nil {
|
||||
c.Errorf("Could not insert subscription: %v", err)
|
||||
} else {
|
||||
ttl := acc.AutoExpireTTL()
|
||||
c.mu.Lock()
|
||||
c.subs[string(sid)] = sub
|
||||
c.addReplySubTimeout(acc, sub, ttl)
|
||||
c.mu.Unlock()
|
||||
conn.mu.Lock()
|
||||
conn.subs[string(sid)] = sub
|
||||
conn.addReplySubTimeout(acc, sub, ttl)
|
||||
conn.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -147,6 +147,15 @@ func natsSub(t *testing.T, nc *nats.Conn, subj string, cb nats.MsgHandler) *nats
|
||||
return sub
|
||||
}
|
||||
|
||||
func natsSubSync(t *testing.T, nc *nats.Conn, subj string) *nats.Subscription {
|
||||
t.Helper()
|
||||
sub, err := nc.SubscribeSync(subj)
|
||||
if err != nil {
|
||||
t.Fatalf("Error on subscribe: %v", err)
|
||||
}
|
||||
return sub
|
||||
}
|
||||
|
||||
func natsQueueSub(t *testing.T, nc *nats.Conn, subj, queue string, cb nats.MsgHandler) *nats.Subscription {
|
||||
t.Helper()
|
||||
sub, err := nc.QueueSubscribe(subj, queue, cb)
|
||||
@@ -2661,6 +2670,17 @@ func TestGatewayRaceBetweenPubAndSub(t *testing.T) {
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// Returns the first (if any) of the inbound connections for this name.
|
||||
func getInboundGatewayConnection(s *Server, name string) *client {
|
||||
var gwsa [4]*client
|
||||
var gws = gwsa[:0]
|
||||
s.getInboundGatewayConnections(&gws)
|
||||
if len(gws) > 0 {
|
||||
return gws[0]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestGatewaySendAllSubs(t *testing.T) {
|
||||
gatewayMaxRUnsubBeforeSwitch = 100
|
||||
defer func() { gatewayMaxRUnsubBeforeSwitch = defaultGatewayMaxRUnsubBeforeSwitch }()
|
||||
@@ -2766,7 +2786,7 @@ func TestGatewaySendAllSubs(t *testing.T) {
|
||||
// instruct B to send only if there is explicit interest.
|
||||
checkFor(t, 2*time.Second, 50*time.Millisecond, func() error {
|
||||
// Check C inbound connection from B
|
||||
c := sc.getInboundGatewayConnection("B")
|
||||
c := getInboundGatewayConnection(sc, "B")
|
||||
c.mu.Lock()
|
||||
var switchedMode bool
|
||||
e := c.gw.insim[globalAccountName]
|
||||
@@ -2879,10 +2899,13 @@ func TestGatewaySendAllSubsBadProtocol(t *testing.T) {
|
||||
waitForInboundGateways(t, sa, 1, time.Second)
|
||||
waitForInboundGateways(t, sb, 1, time.Second)
|
||||
|
||||
c := sa.getInboundGatewayConnection("B")
|
||||
// For this test, make sure to use inbound from A so
|
||||
// A will reconnect when we send bad proto that
|
||||
// causes connection to be closed.
|
||||
c := getInboundGatewayConnection(sa, "A")
|
||||
// Mock an invalid protocol (account name missing)
|
||||
info := &Info{
|
||||
Gateway: "A",
|
||||
Gateway: "B",
|
||||
GatewayCmd: gatewayCmdAllSubsStart,
|
||||
}
|
||||
b, _ := json.Marshal(info)
|
||||
@@ -2892,7 +2915,7 @@ func TestGatewaySendAllSubsBadProtocol(t *testing.T) {
|
||||
|
||||
orgConn := c
|
||||
checkFor(t, 3*time.Second, 100*time.Millisecond, func() error {
|
||||
curConn := sa.getInboundGatewayConnection("B")
|
||||
curConn := getInboundGatewayConnection(sa, "A")
|
||||
if orgConn == curConn {
|
||||
return fmt.Errorf("Not reconnected")
|
||||
}
|
||||
@@ -2903,7 +2926,14 @@ func TestGatewaySendAllSubsBadProtocol(t *testing.T) {
|
||||
waitForOutboundGateways(t, sb, 1, 2*time.Second)
|
||||
|
||||
// Refresh
|
||||
c = sa.getInboundGatewayConnection("B")
|
||||
c = nil
|
||||
checkFor(t, 3*time.Second, 15*time.Millisecond, func() error {
|
||||
c = getInboundGatewayConnection(sa, "A")
|
||||
if c == nil {
|
||||
t.Fatalf("Did not reconnect")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
// Do correct start
|
||||
info.GatewayCmdPayload = []byte(globalAccountName)
|
||||
b, _ = json.Marshal(info)
|
||||
@@ -2920,7 +2950,7 @@ func TestGatewaySendAllSubsBadProtocol(t *testing.T) {
|
||||
|
||||
orgConn = c
|
||||
checkFor(t, 3*time.Second, 100*time.Millisecond, func() error {
|
||||
curConn := sa.getInboundGatewayConnection("B")
|
||||
curConn := getInboundGatewayConnection(sa, "A")
|
||||
if orgConn == curConn {
|
||||
return fmt.Errorf("Not reconnected")
|
||||
}
|
||||
@@ -2928,6 +2958,220 @@ func TestGatewaySendAllSubsBadProtocol(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestGatewayRaceOnClose(t *testing.T) {
|
||||
ob := testDefaultOptionsForGateway("B")
|
||||
sb := runGatewayServer(ob)
|
||||
defer sb.Shutdown()
|
||||
|
||||
oa := testGatewayOptionsFromToWithServers(t, "A", "B", sb)
|
||||
sa := runGatewayServer(oa)
|
||||
defer sa.Shutdown()
|
||||
|
||||
waitForOutboundGateways(t, sa, 1, time.Second)
|
||||
waitForOutboundGateways(t, sb, 1, time.Second)
|
||||
waitForInboundGateways(t, sa, 1, time.Second)
|
||||
waitForInboundGateways(t, sb, 1, time.Second)
|
||||
|
||||
bURL := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port)
|
||||
ncB := natsConnect(t, bURL, nats.NoReconnect())
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
cb := func(_ *nats.Msg) {}
|
||||
for {
|
||||
// Expect failure at one point and just return.
|
||||
qsub, err := ncB.QueueSubscribe("foo", "bar", cb)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if err := qsub.Unsubscribe(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
// Wait a bit and kill B
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
sb.Shutdown()
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// Similar to TestNewRoutesServiceImport but with 2 GW servers instead
|
||||
// of a cluster of 2 servers.
|
||||
func TestGatewayServiceImport(t *testing.T) {
|
||||
oa := testDefaultOptionsForGateway("A")
|
||||
setAccountUserPassInOptions(oa, "$foo", "clientA", "password")
|
||||
setAccountUserPassInOptions(oa, "$bar", "yyyyyyy", "password")
|
||||
sa := runGatewayServer(oa)
|
||||
defer sa.Shutdown()
|
||||
|
||||
ob := testGatewayOptionsFromToWithServers(t, "B", "A", sa)
|
||||
setAccountUserPassInOptions(ob, "$foo", "xxxxxxx", "password")
|
||||
setAccountUserPassInOptions(ob, "$bar", "clientB", "password")
|
||||
sb := runGatewayServer(ob)
|
||||
defer sb.Shutdown()
|
||||
|
||||
waitForOutboundGateways(t, sa, 1, time.Second)
|
||||
waitForOutboundGateways(t, sb, 1, time.Second)
|
||||
waitForInboundGateways(t, sa, 1, time.Second)
|
||||
waitForInboundGateways(t, sb, 1, time.Second)
|
||||
|
||||
// Get accounts
|
||||
fooA, _ := sa.LookupAccount("$foo")
|
||||
barA, _ := sa.LookupAccount("$bar")
|
||||
fooB, _ := sb.LookupAccount("$foo")
|
||||
barB, _ := sb.LookupAccount("$bar")
|
||||
|
||||
// Add in the service export for the requests. Make it public.
|
||||
fooA.AddServiceExport("test.request", nil)
|
||||
fooB.AddServiceExport("test.request", nil)
|
||||
|
||||
// Add import abilities to server B's bar account from foo.
|
||||
if err := barB.AddServiceImport(fooB, "foo.request", "test.request"); err != nil {
|
||||
t.Fatalf("Error adding service import: %v", err)
|
||||
}
|
||||
// Same on A.
|
||||
if err := barA.AddServiceImport(fooA, "foo.request", "test.request"); err != nil {
|
||||
t.Fatalf("Error adding service import: %v", err)
|
||||
}
|
||||
|
||||
// clientA will be connected to srvA and be the service endpoint and responder.
|
||||
aURL := fmt.Sprintf("nats://clientA:password@127.0.0.1:%d", oa.Port)
|
||||
clientA := natsConnect(t, aURL)
|
||||
defer clientA.Close()
|
||||
|
||||
subA := natsSubSync(t, clientA, "test.request")
|
||||
natsFlush(t, clientA)
|
||||
|
||||
// Now setup client B on srvB who will do a sub from account $bar
|
||||
// that should map account $foo's foo subject.
|
||||
bURL := fmt.Sprintf("nats://clientB:password@127.0.0.1:%d", ob.Port)
|
||||
clientB := natsConnect(t, bURL)
|
||||
defer clientB.Close()
|
||||
|
||||
subB := natsSubSync(t, clientB, "reply")
|
||||
natsFlush(t, clientB)
|
||||
|
||||
for i := 0; i < 1; i++ {
|
||||
// Send the request from clientB on foo.request,
|
||||
natsPubReq(t, clientB, "foo.request", "reply", []byte("hi"))
|
||||
natsFlush(t, clientB)
|
||||
|
||||
// Expect the request on A
|
||||
msg, err := subA.NextMsg(time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("subA failed to get request: %v", err)
|
||||
}
|
||||
if msg.Subject != "test.request" || string(msg.Data) != "hi" {
|
||||
t.Fatalf("Unexpected message: %v", msg)
|
||||
}
|
||||
if msg.Reply == "reply" {
|
||||
t.Fatalf("Expected randomized reply, but got original")
|
||||
}
|
||||
|
||||
// Send reply
|
||||
natsPub(t, clientA, msg.Reply, []byte("ok"))
|
||||
natsFlush(t, clientA)
|
||||
|
||||
msg, err = subB.NextMsg(time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("subB failed to get reply: %v", err)
|
||||
}
|
||||
if msg.Subject != "reply" || string(msg.Data) != "ok" {
|
||||
t.Fatalf("Unexpected message: %v", msg)
|
||||
}
|
||||
|
||||
expected := int64((i + 1) * 3)
|
||||
vz, _ := sa.Varz(nil)
|
||||
if vz.OutMsgs != expected {
|
||||
t.Fatalf("Expected %d outMsgs for A, got %v", expected, vz.OutMsgs)
|
||||
}
|
||||
|
||||
if i == 0 {
|
||||
expected = 3
|
||||
} else {
|
||||
expected = 5
|
||||
}
|
||||
vz, _ = sb.Varz(nil)
|
||||
if vz.OutMsgs != expected {
|
||||
t.Fatalf("Expected %d outMsgs for B, got %v", expected, vz.OutMsgs)
|
||||
}
|
||||
}
|
||||
|
||||
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
|
||||
if ts := fooA.TotalSubs(); ts != 1 {
|
||||
return fmt.Errorf("Expected one sub to be left on fooA, but got %d", ts)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Speed up exiration
|
||||
fooA.SetAutoExpireTTL(10 * time.Millisecond)
|
||||
|
||||
// Send 100 requests from clientB on foo.request,
|
||||
for i := 0; i < 100; i++ {
|
||||
natsPubReq(t, clientB, "foo.request", "reply", []byte("hi"))
|
||||
}
|
||||
natsFlush(t, clientB)
|
||||
|
||||
// Consume the requests, but don't reply to them...
|
||||
for i := 0; i < 100; i++ {
|
||||
if _, err := subA.NextMsg(time.Second); err != nil {
|
||||
t.Fatalf("subA did not receive request: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// These reply subjects will be dangling off of $foo account on serverA.
|
||||
// Remove our service endpoint and wait for the dangling replies to go to zero.
|
||||
natsUnsub(t, subA)
|
||||
natsFlush(t, clientA)
|
||||
|
||||
checkFor(t, 2*time.Second, 10*time.Millisecond, func() error {
|
||||
if ts := fooA.TotalSubs(); ts != 0 {
|
||||
return fmt.Errorf("Number of subs is %d, should be zero", ts)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Repeat similar test but without the small TTL and verify
|
||||
// that if B is shutdown, the dangling subs for replies are
|
||||
// cleared from the account sublist.
|
||||
fooA.SetAutoExpireTTL(10 * time.Second)
|
||||
|
||||
subA = natsSubSync(t, clientA, "test.request")
|
||||
natsFlush(t, clientA)
|
||||
|
||||
// Send 100 requests from clientB on foo.request,
|
||||
for i := 0; i < 100; i++ {
|
||||
natsPubReq(t, clientB, "foo.request", "reply", []byte("hi"))
|
||||
}
|
||||
natsFlush(t, clientB)
|
||||
|
||||
// Consume the requests, but don't reply to them...
|
||||
for i := 0; i < 100; i++ {
|
||||
if _, err := subA.NextMsg(time.Second); err != nil {
|
||||
t.Fatalf("subA did not receive request: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown B
|
||||
clientB.Close()
|
||||
sb.Shutdown()
|
||||
|
||||
// Close our last sub
|
||||
natsUnsub(t, subA)
|
||||
natsFlush(t, clientA)
|
||||
|
||||
// Verify that they are gone before the 10 sec TTL
|
||||
checkFor(t, 2*time.Second, 10*time.Millisecond, func() error {
|
||||
if ts := fooA.TotalSubs(); ts != 0 {
|
||||
return fmt.Errorf("Number of subs is %d, should be zero", ts)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
/*
|
||||
func TestGatewayPermissions(t *testing.T) {
|
||||
bo := testDefaultOptionsForGateway("B")
|
||||
|
||||
@@ -96,10 +96,19 @@ const (
|
||||
// accounts map timeout to match.
|
||||
// Lock should be held upon entering.
|
||||
func (c *client) addReplySubTimeout(acc *Account, sub *subscription, d time.Duration) {
|
||||
if c.route.replySubs == nil {
|
||||
c.route.replySubs = make(map[*subscription]*time.Timer)
|
||||
var prs *map[*subscription]*time.Timer
|
||||
switch c.kind {
|
||||
case ROUTER:
|
||||
prs = &c.route.replySubs
|
||||
case GATEWAY:
|
||||
prs = &c.gw.replySubs
|
||||
default:
|
||||
// TODO(ik): return or panic to show that there is a bug?
|
||||
}
|
||||
rs := c.route.replySubs
|
||||
if *prs == nil {
|
||||
*prs = make(map[*subscription]*time.Timer)
|
||||
}
|
||||
rs := *prs
|
||||
rs[sub] = time.AfterFunc(d, func() {
|
||||
c.mu.Lock()
|
||||
delete(rs, sub)
|
||||
@@ -116,12 +125,13 @@ func (c *client) removeReplySub(sub *subscription) {
|
||||
}
|
||||
// Lookup the account based on sub.sid.
|
||||
if i := bytes.Index(sub.sid, []byte(" ")); i > 0 {
|
||||
// First part of SID for route is account name.
|
||||
// First part of SID for route/gateway is account name.
|
||||
if acc, _ := c.srv.LookupAccount(string(sub.sid[:i])); acc != nil {
|
||||
acc.sl.Remove(sub)
|
||||
}
|
||||
c.mu.Lock()
|
||||
c.removeReplySubTimeout(sub)
|
||||
delete(c.subs, string(sub.sid))
|
||||
c.mu.Unlock()
|
||||
}
|
||||
}
|
||||
@@ -130,12 +140,18 @@ func (c *client) removeReplySub(sub *subscription) {
|
||||
// Lock should be held upon entering.
|
||||
func (c *client) removeReplySubTimeout(sub *subscription) {
|
||||
// Remove any reply sub timer if it exists.
|
||||
if c.route.replySubs == nil {
|
||||
var rs map[*subscription]*time.Timer
|
||||
switch c.kind {
|
||||
case ROUTER:
|
||||
rs = c.route.replySubs
|
||||
case GATEWAY:
|
||||
rs = c.gw.replySubs
|
||||
default:
|
||||
return
|
||||
}
|
||||
if t, ok := c.route.replySubs[sub]; ok {
|
||||
if t, ok := rs[sub]; ok {
|
||||
t.Stop()
|
||||
delete(c.route.replySubs, sub)
|
||||
delete(rs, sub)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -845,9 +845,12 @@ func TestLameDuckMode(t *testing.T) {
|
||||
srvA.grWG.Wait()
|
||||
srvB.grWG.Wait()
|
||||
checkClientsCount(t, srvC, 1)
|
||||
if n := atomic.LoadInt32(&rt); n != 1 {
|
||||
t.Fatalf("Expected client to reconnect only once, got %v", n)
|
||||
}
|
||||
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
|
||||
if n := atomic.LoadInt32(&rt); n != 1 {
|
||||
return fmt.Errorf("Expected client to reconnect only once, got %v", n)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func TestServerValidateGatewaysOptions(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user