From 705f8b6ed7b46c3e66c6da99971cb0ab8dcf021e Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 20 Oct 2020 19:20:44 -0700 Subject: [PATCH] Do not forward service import messages to a stream export. Addresses stack overflow issue wally was seeing with configs that mix and match streams and services between each other. Signed-off-by: Derek Collison --- server/client.go | 14 ++++-- test/service_latency_test.go | 83 +++++++++++++++++++++++++++++++++++- 2 files changed, 93 insertions(+), 4 deletions(-) diff --git a/server/client.go b/server/client.go index b9058f2c..610472c3 100644 --- a/server/client.go +++ b/server/client.go @@ -179,12 +179,13 @@ const ( ClusterNameConflict ) -// Some flags passed to processMsgResultsEx +// Some flags passed to processMsgResults const pmrNoFlag int = 0 const ( pmrCollectQueueNames int = 1 << iota pmrIgnoreEmptyQueueFilter pmrAllowSendFromRouteToRoute + pmrMsgImportedFromService ) type client struct { @@ -2234,7 +2235,6 @@ func (c *client) parseSub(argo []byte, noForward bool) error { } func (c *client) processSub(subject, queue, bsid []byte, cb msgHandler, noForward bool) (*subscription, error) { - // Create the subscription sub := &subscription{client: c, subject: subject, queue: queue, sid: bsid, icb: cb} @@ -3538,7 +3538,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt // If we are a route or gateway or leafnode and this message is flipped to a queue subscriber we // need to handle that since the processMsgResults will want a queue filter. - flags := pmrNoFlag + flags := pmrMsgImportedFromService if c.kind == GATEWAY || c.kind == ROUTER || c.kind == LEAF { flags |= pmrIgnoreEmptyQueueFilter } @@ -3708,6 +3708,10 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, dsubj = subj // Check for stream import mapped subs (shadow subs). These apply to local subs only. if sub.im != nil { + // If this message was a service import do not re-export to an exported stream. + if flags&pmrMsgImportedFromService != 0 { + continue + } if sub.im.tr != nil { to, _ := sub.im.tr.transformSubject(string(subj)) dsubj = append(_dsubj[:0], to...) @@ -3823,6 +3827,10 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, dsubj = subj // Check for stream import mapped subs. These apply to local subs only. if sub.im != nil { + // If this message was a service import do not re-export to an exported stream. + if flags&pmrMsgImportedFromService != 0 { + continue + } if sub.im.tr != nil { to, _ := sub.im.tr.transformSubject(string(subj)) dsubj = append(_dsubj[:0], to...) diff --git a/test/service_latency_test.go b/test/service_latency_test.go index bd0f0e56..4065732e 100644 --- a/test/service_latency_test.go +++ b/test/service_latency_test.go @@ -1221,6 +1221,87 @@ func TestServiceLatencyOldRequestStyleSingleServer(t *testing.T) { noShareCheck(t, &sl.Requestor) } +// To test a bug wally@nats.io is seeing. +func TestServiceAndStreamStackOverflow(t *testing.T) { + conf := createConfFile(t, []byte(` + accounts { + STATIC { + users = [ { user: "static", pass: "foo" } ] + exports [ + { stream: > } + { service: my.service } + ] + } + DYN { + users = [ { user: "foo", pass: "bar" } ] + imports [ + { stream { subject: >, account: STATIC } } + { service { subject: my.service, account: STATIC } } + ] + } + } + `)) + defer os.Remove(conf) + + srv, opts := RunServerWithConfig(conf) + defer srv.Shutdown() + + // Responder (just request sub) + nc, err := nats.Connect(fmt.Sprintf("nats://static:foo@%s:%d", opts.Host, opts.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + sub, _ := nc.SubscribeSync("my.service") + nc.Flush() + + // Requestor + nc2, err := nats.Connect(fmt.Sprintf("nats://foo:bar@%s:%d", opts.Host, opts.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc2.Close() + + // Send a single request. + nc2.PublishRequest("my.service", "foo", []byte("hi")) + checkFor(t, time.Second, 10*time.Millisecond, func() error { + if nm, _, err := sub.Pending(); err != nil || nm != 1 { + return fmt.Errorf("Expected one request, got %d", nm) + } + return nil + }) + + // Make sure works for queue subscribers as well. + sub.Unsubscribe() + sub, _ = nc.QueueSubscribeSync("my.service", "prod") + nc.Flush() + + // Send a single request. + nc2.PublishRequest("my.service", "foo", []byte("hi")) + checkFor(t, time.Second, 10*time.Millisecond, func() error { + if nm, _, err := sub.Pending(); err != nil || nm != 1 { + return fmt.Errorf("Expected one request, got %d", nm) + } + return nil + }) + + // Now create an interest in the stream from nc2. that is a queue subscriber. + sub2, _ := nc2.QueueSubscribeSync("my.service", "prod") + defer sub2.Unsubscribe() + nc2.Flush() + + // Send a single request. + nc2.PublishRequest("my.service", "foo", []byte("hi")) + time.Sleep(10 * time.Millisecond) + checkFor(t, time.Second, 10*time.Millisecond, func() error { + if nm, _, err := sub.Pending(); err != nil || nm != 2 { + return fmt.Errorf("Expected two requests, got %d", nm) + } + return nil + }) +} + // Check we get the proper detailed information for the requestor when allowed. func TestServiceLatencyRequestorSharesDetailedInfo(t *testing.T) { sc := createSuperCluster(t, 3, 3) @@ -1635,7 +1716,7 @@ func TestServiceLatencyHeaderTriggered(t *testing.T) { }, SYS: { users: [{user: admin, password: pass}] } } - + system_account: SYS `, v.shared))) defer os.Remove(conf)