Fix for service import processing across routes for leaf nodes

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2019-04-17 14:37:09 -07:00
parent bda267ec2c
commit bfef3bd5a6
3 changed files with 27 additions and 29 deletions

View File

@@ -1740,7 +1740,7 @@ func (c *client) addShadowSubscriptions(acc *Account, sub *subscription) error {
shadow = append(shadow, nsub)
}
// Now walk through importMaps that we need to subscribe
// exactly to the from property.
// exactly to the "from" property.
for _, im := range froms {
// We will create a shadow subscription.
nsub, err := c.addShadowSub(sub, im, true)
@@ -2299,6 +2299,7 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) {
rm := acc.imports.services[string(c.pa.subject)]
invalid := rm != nil && rm.invalid
acc.mu.RUnlock()
// Get the results from the other account for the mapped "to" subject.
// If we have been marked invalid simply return here.
if rm != nil && !invalid && rm.acc != nil && rm.acc.sl != nil {
@@ -2320,13 +2321,14 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) {
}
// FIXME(dlc) - Do L1 cache trick from above.
rr := rm.acc.sl.Match(rm.to)
// If we are a route or gateway and this message is flipped to a queue subscriber we
// need to handle that since the processMsgResults will want a queue filter.
if (c.kind == ROUTER || c.kind == GATEWAY) && c.pa.queues == nil && len(rr.qsubs) > 0 {
c.makeQFilter(rr.qsubs)
}
sendToGWs := c.srv.gateway.enabled && (c.kind == CLIENT || c.kind == SYSTEM)
sendToGWs := c.srv.gateway.enabled && (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF)
queues := c.processMsgResults(rm.acc, rr, msg, []byte(rm.to), nrr, sendToGWs)
// If this is not a gateway connection but gateway is enabled,
// try to send this converted message to all gateways.
@@ -2403,7 +2405,11 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
case LEAF:
// We handle similarly to routes and use the same data structures.
// Leaf node delivery audience is different however.
c.addSubToRouteTargets(sub)
// Also leaf nodes are always no echo, so we make sure we are not
// going to send back to ourselves here.
if c != sub.client {
c.addSubToRouteTargets(sub)
}
continue
}
// Check for stream import mapped subs. These apply to local subs only.
@@ -2425,21 +2431,10 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
// guidance on which queue groups we should deliver to.
qf := c.pa.queues
// For route connections, we still want to send messages to
// leaf nodes even if there are no queue filters since we collect
// For all non-client connections, we may still want to send messages to
// leaf nodes or routes even if there are no queue filters since we collect
// them above and do not process inline like normal clients.
if c.kind == ROUTER && qf == nil {
goto sendToRoutesOrLeafs
}
// If we are sourced from a route or leaf node we need to have direct filtered queues.
if (c.kind == ROUTER || c.kind == LEAF) && qf == nil {
return queues
}
// For gateway connections, we still want to send messages to routes
// and leaf nodes even if there are no queue filters.
if c.kind == GATEWAY && qf == nil {
if c.kind != CLIENT && qf == nil {
goto sendToRoutesOrLeafs
}

View File

@@ -784,7 +784,7 @@ func (c *client) initLeafNodeSmap() {
return
}
// Collect all subs here.
_subs := [256]*subscription{}
_subs := [32]*subscription{}
subs := _subs[:0]
ims := []string{}
acc.mu.RLock()
@@ -871,7 +871,7 @@ func (c *client) updateSmap(sub *subscription, delta int32) {
func (c *client) sendLeafNodeSubUpdate(key string, n int32) {
_b := [64]byte{}
b := bytes.NewBuffer(_b[:0])
writeLeafSub(b, key, n)
c.writeLeafSub(b, key, n)
c.sendProto(b.Bytes(), false)
}
@@ -902,7 +902,7 @@ func (c *client) sendAllAccountSubs() {
var b bytes.Buffer
for key, n := range c.leaf.smap {
writeLeafSub(&b, key, n)
c.writeLeafSub(&b, key, n)
}
// We will make sure we don't overflow here due to an max_pending.
@@ -914,7 +914,7 @@ func (c *client) sendAllAccountSubs() {
}
}
func writeLeafSub(w *bytes.Buffer, key string, n int32) {
func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) {
if key == "" {
return
}
@@ -930,9 +930,18 @@ func writeLeafSub(w *bytes.Buffer, key string, n int32) {
b[i] = digits[l%10]
}
w.Write(b[i:])
if c.trace {
arg := fmt.Sprintf("%s %d", key, n)
c.traceOutOp("LS+", []byte(arg))
}
} else if c.trace {
c.traceOutOp("LS+", []byte(key))
}
} else {
w.WriteString("LS- " + key)
if c.trace {
c.traceOutOp("LS-", []byte(key))
}
}
w.WriteString(CR_LF)
}

View File

@@ -26,7 +26,6 @@ import (
"testing"
"time"
"github.com/nats-io/gnatsd/logger"
"github.com/nats-io/gnatsd/server"
"github.com/nats-io/go-nats"
"github.com/nats-io/jwt"
@@ -1041,7 +1040,6 @@ func TestLeafNodeExportsImports(t *testing.T) {
s, opts, conf := runLeafNodeOperatorServer(t)
defer os.Remove(conf)
defer s.Shutdown()
s.SetLogger(logger.NewTestLogger("[S ] ", true), true, true)
// Setup the two accounts for this server.
okp, _ := nkeys.FromSeed(oSeed)
@@ -1096,7 +1094,6 @@ func TestLeafNodeExportsImports(t *testing.T) {
sl, lopts, lnconf := runSolicitWithCredentials(t, opts, mycreds)
defer os.Remove(lnconf)
defer sl.Shutdown()
sl.SetLogger(logger.NewTestLogger("[LN] ", true), true, true)
checkLeafNodeConnected(t, s)
@@ -1145,7 +1142,7 @@ func TestLeafNodeExportsImports(t *testing.T) {
}
// Services
// Create listener on nc1
// Create listener on nc2
nc2.Subscribe("req.echo", func(msg *nats.Msg) {
nc2.Publish(msg.Reply, []byte("WORKED"))
})
@@ -1173,7 +1170,6 @@ func TestLeadNodeExportImportComplexSetup(t *testing.T) {
defer os.Remove(conf)
s1, s1Opts := RunServerWithConfig(conf)
defer s1.Shutdown()
s1.SetLogger(logger.NewTestLogger("[S1] ", true), true, true)
content = fmt.Sprintf(`
port: -1
@@ -1190,7 +1186,6 @@ func TestLeadNodeExportImportComplexSetup(t *testing.T) {
conf = createConfFile(t, []byte(content))
s2, s2Opts := RunServerWithConfig(conf)
defer s2.Shutdown()
s2.SetLogger(logger.NewTestLogger("[S2] ", true), true, true)
// Setup the two accounts for this server.
okp, _ := nkeys.FromSeed(oSeed)
@@ -1261,7 +1256,6 @@ func TestLeadNodeExportImportComplexSetup(t *testing.T) {
sl, lopts, lnconf := runSolicitWithCredentials(t, s1Opts, mycreds)
defer os.Remove(lnconf)
defer sl.Shutdown()
sl.SetLogger(logger.NewTestLogger("[LN] ", true), true, true)
checkLeafNodeConnected(t, s1)
@@ -1318,7 +1312,7 @@ func TestLeadNodeExportImportComplexSetup(t *testing.T) {
nc2.Flush()
// Now send the request on the leaf node client.
if _, err := ncl.Request("import.request", []byte("fingers crossed"), 500*time.Millisecond); err != nil {
if _, err := ncl.Request("import.request", []byte("fingers crossed"), 5500*time.Millisecond); err != nil {
if atomic.LoadInt32(&gotIt) == 0 {
t.Fatalf("Request was not received")
}