Do not forward service import messages to a stream export.

Addresses stack overflow issue wally was seeing with configs
that mix and match streams and services between each other.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-10-20 19:20:44 -07:00
parent 496d18468a
commit 705f8b6ed7
2 changed files with 93 additions and 4 deletions

View File

@@ -179,12 +179,13 @@ const (
ClusterNameConflict
)
// Some flags passed to processMsgResultsEx
// Some flags passed to processMsgResults
const pmrNoFlag int = 0
const (
pmrCollectQueueNames int = 1 << iota
pmrIgnoreEmptyQueueFilter
pmrAllowSendFromRouteToRoute
pmrMsgImportedFromService
)
type client struct {
@@ -2234,7 +2235,6 @@ func (c *client) parseSub(argo []byte, noForward bool) error {
}
func (c *client) processSub(subject, queue, bsid []byte, cb msgHandler, noForward bool) (*subscription, error) {
// Create the subscription
sub := &subscription{client: c, subject: subject, queue: queue, sid: bsid, icb: cb}
@@ -3538,7 +3538,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
// If we are a route or gateway or leafnode and this message is flipped to a queue subscriber we
// need to handle that since the processMsgResults will want a queue filter.
flags := pmrNoFlag
flags := pmrMsgImportedFromService
if c.kind == GATEWAY || c.kind == ROUTER || c.kind == LEAF {
flags |= pmrIgnoreEmptyQueueFilter
}
@@ -3708,6 +3708,10 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
dsubj = subj
// Check for stream import mapped subs (shadow subs). These apply to local subs only.
if sub.im != nil {
// If this message was a service import do not re-export to an exported stream.
if flags&pmrMsgImportedFromService != 0 {
continue
}
if sub.im.tr != nil {
to, _ := sub.im.tr.transformSubject(string(subj))
dsubj = append(_dsubj[:0], to...)
@@ -3823,6 +3827,10 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
dsubj = subj
// Check for stream import mapped subs. These apply to local subs only.
if sub.im != nil {
// If this message was a service import do not re-export to an exported stream.
if flags&pmrMsgImportedFromService != 0 {
continue
}
if sub.im.tr != nil {
to, _ := sub.im.tr.transformSubject(string(subj))
dsubj = append(_dsubj[:0], to...)

View File

@@ -1221,6 +1221,87 @@ func TestServiceLatencyOldRequestStyleSingleServer(t *testing.T) {
noShareCheck(t, &sl.Requestor)
}
// To test a bug wally@nats.io is seeing.
func TestServiceAndStreamStackOverflow(t *testing.T) {
conf := createConfFile(t, []byte(`
accounts {
STATIC {
users = [ { user: "static", pass: "foo" } ]
exports [
{ stream: > }
{ service: my.service }
]
}
DYN {
users = [ { user: "foo", pass: "bar" } ]
imports [
{ stream { subject: >, account: STATIC } }
{ service { subject: my.service, account: STATIC } }
]
}
}
`))
defer os.Remove(conf)
srv, opts := RunServerWithConfig(conf)
defer srv.Shutdown()
// Responder (just request sub)
nc, err := nats.Connect(fmt.Sprintf("nats://static:foo@%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
sub, _ := nc.SubscribeSync("my.service")
nc.Flush()
// Requestor
nc2, err := nats.Connect(fmt.Sprintf("nats://foo:bar@%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc2.Close()
// Send a single request.
nc2.PublishRequest("my.service", "foo", []byte("hi"))
checkFor(t, time.Second, 10*time.Millisecond, func() error {
if nm, _, err := sub.Pending(); err != nil || nm != 1 {
return fmt.Errorf("Expected one request, got %d", nm)
}
return nil
})
// Make sure works for queue subscribers as well.
sub.Unsubscribe()
sub, _ = nc.QueueSubscribeSync("my.service", "prod")
nc.Flush()
// Send a single request.
nc2.PublishRequest("my.service", "foo", []byte("hi"))
checkFor(t, time.Second, 10*time.Millisecond, func() error {
if nm, _, err := sub.Pending(); err != nil || nm != 1 {
return fmt.Errorf("Expected one request, got %d", nm)
}
return nil
})
// Now create an interest in the stream from nc2. that is a queue subscriber.
sub2, _ := nc2.QueueSubscribeSync("my.service", "prod")
defer sub2.Unsubscribe()
nc2.Flush()
// Send a single request.
nc2.PublishRequest("my.service", "foo", []byte("hi"))
time.Sleep(10 * time.Millisecond)
checkFor(t, time.Second, 10*time.Millisecond, func() error {
if nm, _, err := sub.Pending(); err != nil || nm != 2 {
return fmt.Errorf("Expected two requests, got %d", nm)
}
return nil
})
}
// Check we get the proper detailed information for the requestor when allowed.
func TestServiceLatencyRequestorSharesDetailedInfo(t *testing.T) {
sc := createSuperCluster(t, 3, 3)
@@ -1635,7 +1716,7 @@ func TestServiceLatencyHeaderTriggered(t *testing.T) {
},
SYS: { users: [{user: admin, password: pass}] }
}
system_account: SYS
`, v.shared)))
defer os.Remove(conf)