mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
Fix for stream imports and leafnodes, #1332
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -2125,6 +2125,8 @@ func (c *client) addShadowSub(sub *subscription, im *streamImport, useFrom bool)
|
||||
|
||||
// Update our route map here.
|
||||
c.srv.updateRouteSubscriptionMap(im.acc, &nsub, 1)
|
||||
c.srv.updateLeafNodes(im.acc, &nsub, 1)
|
||||
|
||||
return &nsub, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -3228,3 +3228,82 @@ func TestClusterTLSMixedIPAndDNS(t *testing.T) {
|
||||
// Make sure this works.
|
||||
checkLeafNodeConnected(t, srvA)
|
||||
}
|
||||
|
||||
// This will test for a bug in stream export/import with leafnodes.
|
||||
// https://github.com/nats-io/nats-server/issues/1332
|
||||
func TestStreamExportWithMultipleAccounts(t *testing.T) {
|
||||
confA := createConfFile(t, []byte(`
|
||||
listen: 127.0.0.1:-1
|
||||
leafnodes {
|
||||
listen: "127.0.0.1:-1"
|
||||
}
|
||||
`))
|
||||
srvA, optsA := RunServerWithConfig(confA)
|
||||
defer srvA.Shutdown()
|
||||
|
||||
bConfigTemplate := `
|
||||
listen: 127.0.0.1:-1
|
||||
leafnodes {
|
||||
listen: "127.0.0.1:-1"
|
||||
remotes = [
|
||||
{
|
||||
url:"nats://127.0.0.1:%d"
|
||||
account:"EXTERNAL"
|
||||
}
|
||||
]
|
||||
}
|
||||
accounts: {
|
||||
INTERNAL: {
|
||||
users: [
|
||||
{user: good, password: pwd}
|
||||
]
|
||||
imports: [
|
||||
{
|
||||
stream: {
|
||||
account: EXTERNAL
|
||||
subject: "foo"
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
EXTERNAL: {
|
||||
users: [
|
||||
{user: bad, password: pwd}
|
||||
]
|
||||
exports: [{stream: "foo"}]
|
||||
},
|
||||
}
|
||||
`
|
||||
|
||||
confB := createConfFile(t, []byte(fmt.Sprintf(bConfigTemplate, optsA.LeafNode.Port)))
|
||||
srvB, optsB := RunServerWithConfig(confB)
|
||||
defer srvB.Shutdown()
|
||||
|
||||
nc, err := nats.Connect(fmt.Sprintf("nats://good:pwd@%s:%d", optsB.Host, optsB.Port))
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
wcsub, err := nc.SubscribeSync(">")
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
defer wcsub.Unsubscribe()
|
||||
nc.Flush()
|
||||
|
||||
nc2, err := nats.Connect(fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port))
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer nc2.Close()
|
||||
|
||||
nc2.Publish("foo", nil)
|
||||
|
||||
checkFor(t, 250*time.Millisecond, 10*time.Millisecond, func() error {
|
||||
if nmsgs, _, err := wcsub.Pending(); err != nil || nmsgs != 1 {
|
||||
return fmt.Errorf("Did not receive the message: %v", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user