diff --git a/server/client.go b/server/client.go index 6c75a7ec..3f39e9fb 100644 --- a/server/client.go +++ b/server/client.go @@ -1740,7 +1740,7 @@ func (c *client) addShadowSubscriptions(acc *Account, sub *subscription) error { shadow = append(shadow, nsub) } // Now walk through importMaps that we need to subscribe - // exactly to the from property. + // exactly to the "from" property. for _, im := range froms { // We will create a shadow subscription. nsub, err := c.addShadowSub(sub, im, true) @@ -2299,6 +2299,7 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) { rm := acc.imports.services[string(c.pa.subject)] invalid := rm != nil && rm.invalid acc.mu.RUnlock() + // Get the results from the other account for the mapped "to" subject. // If we have been marked invalid simply return here. if rm != nil && !invalid && rm.acc != nil && rm.acc.sl != nil { @@ -2320,13 +2321,14 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) { } // FIXME(dlc) - Do L1 cache trick from above. rr := rm.acc.sl.Match(rm.to) + // If we are a route or gateway and this message is flipped to a queue subscriber we // need to handle that since the processMsgResults will want a queue filter. if (c.kind == ROUTER || c.kind == GATEWAY) && c.pa.queues == nil && len(rr.qsubs) > 0 { c.makeQFilter(rr.qsubs) } - sendToGWs := c.srv.gateway.enabled && (c.kind == CLIENT || c.kind == SYSTEM) + sendToGWs := c.srv.gateway.enabled && (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF) queues := c.processMsgResults(rm.acc, rr, msg, []byte(rm.to), nrr, sendToGWs) // If this is not a gateway connection but gateway is enabled, // try to send this converted message to all gateways. @@ -2403,7 +2405,11 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, case LEAF: // We handle similarly to routes and use the same data structures. // Leaf node delivery audience is different however. - c.addSubToRouteTargets(sub) + // Also leaf nodes are always no echo, so we make sure we are not + // going to send back to ourselves here. + if c != sub.client { + c.addSubToRouteTargets(sub) + } continue } // Check for stream import mapped subs. These apply to local subs only. @@ -2425,21 +2431,10 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, // guidance on which queue groups we should deliver to. qf := c.pa.queues - // For route connections, we still want to send messages to - // leaf nodes even if there are no queue filters since we collect + // For all non-client connections, we may still want to send messages to + // leaf nodes or routes even if there are no queue filters since we collect // them above and do not process inline like normal clients. - if c.kind == ROUTER && qf == nil { - goto sendToRoutesOrLeafs - } - - // If we are sourced from a route or leaf node we need to have direct filtered queues. - if (c.kind == ROUTER || c.kind == LEAF) && qf == nil { - return queues - } - - // For gateway connections, we still want to send messages to routes - // and leaf nodes even if there are no queue filters. - if c.kind == GATEWAY && qf == nil { + if c.kind != CLIENT && qf == nil { goto sendToRoutesOrLeafs } diff --git a/server/leafnode.go b/server/leafnode.go index 5a817232..b4c9036c 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -784,7 +784,7 @@ func (c *client) initLeafNodeSmap() { return } // Collect all subs here. - _subs := [256]*subscription{} + _subs := [32]*subscription{} subs := _subs[:0] ims := []string{} acc.mu.RLock() @@ -871,7 +871,7 @@ func (c *client) updateSmap(sub *subscription, delta int32) { func (c *client) sendLeafNodeSubUpdate(key string, n int32) { _b := [64]byte{} b := bytes.NewBuffer(_b[:0]) - writeLeafSub(b, key, n) + c.writeLeafSub(b, key, n) c.sendProto(b.Bytes(), false) } @@ -902,7 +902,7 @@ func (c *client) sendAllAccountSubs() { var b bytes.Buffer for key, n := range c.leaf.smap { - writeLeafSub(&b, key, n) + c.writeLeafSub(&b, key, n) } // We will make sure we don't overflow here due to an max_pending. @@ -914,7 +914,7 @@ func (c *client) sendAllAccountSubs() { } } -func writeLeafSub(w *bytes.Buffer, key string, n int32) { +func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) { if key == "" { return } @@ -930,9 +930,18 @@ func writeLeafSub(w *bytes.Buffer, key string, n int32) { b[i] = digits[l%10] } w.Write(b[i:]) + if c.trace { + arg := fmt.Sprintf("%s %d", key, n) + c.traceOutOp("LS+", []byte(arg)) + } + } else if c.trace { + c.traceOutOp("LS+", []byte(key)) } } else { w.WriteString("LS- " + key) + if c.trace { + c.traceOutOp("LS-", []byte(key)) + } } w.WriteString(CR_LF) } diff --git a/test/leafnode_test.go b/test/leafnode_test.go index 7014b1ce..e652d19b 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -26,7 +26,6 @@ import ( "testing" "time" - "github.com/nats-io/gnatsd/logger" "github.com/nats-io/gnatsd/server" "github.com/nats-io/go-nats" "github.com/nats-io/jwt" @@ -1041,7 +1040,6 @@ func TestLeafNodeExportsImports(t *testing.T) { s, opts, conf := runLeafNodeOperatorServer(t) defer os.Remove(conf) defer s.Shutdown() - s.SetLogger(logger.NewTestLogger("[S ] ", true), true, true) // Setup the two accounts for this server. okp, _ := nkeys.FromSeed(oSeed) @@ -1096,7 +1094,6 @@ func TestLeafNodeExportsImports(t *testing.T) { sl, lopts, lnconf := runSolicitWithCredentials(t, opts, mycreds) defer os.Remove(lnconf) defer sl.Shutdown() - sl.SetLogger(logger.NewTestLogger("[LN] ", true), true, true) checkLeafNodeConnected(t, s) @@ -1145,7 +1142,7 @@ func TestLeafNodeExportsImports(t *testing.T) { } // Services - // Create listener on nc1 + // Create listener on nc2 nc2.Subscribe("req.echo", func(msg *nats.Msg) { nc2.Publish(msg.Reply, []byte("WORKED")) }) @@ -1173,7 +1170,6 @@ func TestLeadNodeExportImportComplexSetup(t *testing.T) { defer os.Remove(conf) s1, s1Opts := RunServerWithConfig(conf) defer s1.Shutdown() - s1.SetLogger(logger.NewTestLogger("[S1] ", true), true, true) content = fmt.Sprintf(` port: -1 @@ -1190,7 +1186,6 @@ func TestLeadNodeExportImportComplexSetup(t *testing.T) { conf = createConfFile(t, []byte(content)) s2, s2Opts := RunServerWithConfig(conf) defer s2.Shutdown() - s2.SetLogger(logger.NewTestLogger("[S2] ", true), true, true) // Setup the two accounts for this server. okp, _ := nkeys.FromSeed(oSeed) @@ -1261,7 +1256,6 @@ func TestLeadNodeExportImportComplexSetup(t *testing.T) { sl, lopts, lnconf := runSolicitWithCredentials(t, s1Opts, mycreds) defer os.Remove(lnconf) defer sl.Shutdown() - sl.SetLogger(logger.NewTestLogger("[LN] ", true), true, true) checkLeafNodeConnected(t, s1) @@ -1318,7 +1312,7 @@ func TestLeadNodeExportImportComplexSetup(t *testing.T) { nc2.Flush() // Now send the request on the leaf node client. - if _, err := ncl.Request("import.request", []byte("fingers crossed"), 500*time.Millisecond); err != nil { + if _, err := ncl.Request("import.request", []byte("fingers crossed"), 5500*time.Millisecond); err != nil { if atomic.LoadInt32(&gotIt) == 0 { t.Fatalf("Request was not received") }