mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
- [X] Tests added - [X] Branch rebased on top of current main (`git pull --rebase origin main`) - [X] Changes squashed to a single commit (described [here](http://gitready.com/advanced/2009/02/10/squashing-commits-with-rebase.html)) - [X] Build is green in Travis CI - [X] You have certified that the contribution is your original work and that you license the work to the project under the [Apache 2 license](https://github.com/nats-io/nats-server/blob/main/LICENSE) ### Changes proposed in this pull request: There is currently a blanket requirement that subject transforms destinations MUST use ALL of the partial wildcards defined in the source of the transform. This is because the subject transformed defined for imports must be 'reversible' and therefore the destination transform must use all of the partial wildcard tokens defined in the source of the transform. This reversing of a transform is only used for transforms used by imports, where in any case it doesn't make sense to use any transform other than Wildcard. This PR: - relaxes this requirement to only apply when the transform is used by an import, adding the ability to drop a wildcard token in transforms other than as part of an import. - Improves transform reverse to support both legacy style wildcards $X and the new transform function {{Wildcard(X)}}- Improves reversible transform checking to only allow the use of wildcards in the destination. --------- Signed-off-by: Jean-Noël Moyne <jnmoyne@gmail.com>
500 lines
13 KiB
Go
500 lines
13 KiB
Go
// Copyright 2020 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package test
|
|
|
|
import (
|
|
"fmt"
|
|
"strconv"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/nats-io/nats-server/v2/server"
|
|
"github.com/nats-io/nats.go"
|
|
)
|
|
|
|
func TestAccountCycleService(t *testing.T) {
|
|
conf := createConfFile(t, []byte(`
|
|
accounts {
|
|
A {
|
|
exports [ { service: help } ]
|
|
imports [ { service { subject: help, account: B } } ]
|
|
}
|
|
B {
|
|
exports [ { service: help } ]
|
|
imports [ { service { subject: help, account: A } } ]
|
|
}
|
|
}
|
|
`))
|
|
|
|
if _, err := server.ProcessConfigFile(conf); err == nil || !strings.Contains(err.Error(), server.ErrImportFormsCycle.Error()) {
|
|
t.Fatalf("Expected an error on cycle service import, got none")
|
|
}
|
|
|
|
conf = createConfFile(t, []byte(`
|
|
accounts {
|
|
A {
|
|
exports [ { service: * } ]
|
|
imports [ { service { subject: help, account: B } } ]
|
|
}
|
|
B {
|
|
exports [ { service: help } ]
|
|
imports [ { service { subject: *, account: A } } ]
|
|
}
|
|
}
|
|
`))
|
|
|
|
if _, err := server.ProcessConfigFile(conf); err == nil || !strings.Contains(err.Error(), server.ErrImportFormsCycle.Error()) {
|
|
t.Fatalf("Expected an error on cycle service import, got none")
|
|
}
|
|
|
|
conf = createConfFile(t, []byte(`
|
|
accounts {
|
|
A {
|
|
exports [ { service: * } ]
|
|
imports [ { service { subject: help, account: B } } ]
|
|
}
|
|
B {
|
|
exports [ { service: help } ]
|
|
imports [ { service { subject: help, account: C } } ]
|
|
}
|
|
C {
|
|
exports [ { service: * } ]
|
|
imports [ { service { subject: *, account: A } } ]
|
|
}
|
|
}
|
|
`))
|
|
|
|
if _, err := server.ProcessConfigFile(conf); err == nil || !strings.Contains(err.Error(), server.ErrImportFormsCycle.Error()) {
|
|
t.Fatalf("Expected an error on cycle service import, got none")
|
|
}
|
|
}
|
|
|
|
func TestAccountCycleStream(t *testing.T) {
|
|
conf := createConfFile(t, []byte(`
|
|
accounts {
|
|
A {
|
|
exports [ { stream: strm } ]
|
|
imports [ { stream { subject: strm, account: B } } ]
|
|
}
|
|
B {
|
|
exports [ { stream: strm } ]
|
|
imports [ { stream { subject: strm, account: A } } ]
|
|
}
|
|
}
|
|
`))
|
|
if _, err := server.ProcessConfigFile(conf); err == nil || !strings.Contains(err.Error(), server.ErrImportFormsCycle.Error()) {
|
|
t.Fatalf("Expected an error on cyclic import, got none")
|
|
}
|
|
}
|
|
|
|
func TestAccountCycleStreamWithMapping(t *testing.T) {
|
|
conf := createConfFile(t, []byte(`
|
|
accounts {
|
|
A {
|
|
exports [ { stream: * } ]
|
|
imports [ { stream { subject: bar, account: B } } ]
|
|
}
|
|
B {
|
|
exports [ { stream: bar } ]
|
|
imports [ { stream { subject: foo, account: A }, to: bar } ]
|
|
}
|
|
}
|
|
`))
|
|
if _, err := server.ProcessConfigFile(conf); err == nil || !strings.Contains(err.Error(), server.ErrImportFormsCycle.Error()) {
|
|
t.Fatalf("Expected an error on cyclic import, got none")
|
|
}
|
|
}
|
|
|
|
func TestAccountCycleNonCycleStreamWithMapping(t *testing.T) {
|
|
conf := createConfFile(t, []byte(`
|
|
accounts {
|
|
A {
|
|
exports [ { stream: foo } ]
|
|
imports [ { stream { subject: bar, account: B } } ]
|
|
}
|
|
B {
|
|
exports [ { stream: bar } ]
|
|
imports [ { stream { subject: baz, account: C }, to: bar } ]
|
|
}
|
|
C {
|
|
exports [ { stream: baz } ]
|
|
imports [ { stream { subject: foo, account: A }, to: bar } ]
|
|
}
|
|
}
|
|
`))
|
|
if _, err := server.ProcessConfigFile(conf); err != nil {
|
|
t.Fatalf("Expected no error but got %s", err)
|
|
}
|
|
}
|
|
|
|
func TestAccountCycleServiceCycleWithMapping(t *testing.T) {
|
|
conf := createConfFile(t, []byte(`
|
|
accounts {
|
|
A {
|
|
exports [ { service: a } ]
|
|
imports [ { service { subject: b, account: B }, to: a } ]
|
|
}
|
|
B {
|
|
exports [ { service: b } ]
|
|
imports [ { service { subject: a, account: A }, to: b } ]
|
|
}
|
|
}
|
|
`))
|
|
if _, err := server.ProcessConfigFile(conf); err == nil || !strings.Contains(err.Error(), server.ErrImportFormsCycle.Error()) {
|
|
t.Fatalf("Expected an error on cycle service import, got none")
|
|
}
|
|
}
|
|
|
|
func TestAccountCycleServiceNonCycle(t *testing.T) {
|
|
conf := createConfFile(t, []byte(`
|
|
accounts {
|
|
A {
|
|
exports [ { service: * } ]
|
|
imports [ { service { subject: help, account: B } } ]
|
|
}
|
|
B {
|
|
exports [ { service: help } ]
|
|
imports [ { service { subject: nohelp, account: C } } ]
|
|
}
|
|
C {
|
|
exports [ { service: * } ]
|
|
imports [ { service { subject: *, account: A } } ]
|
|
}
|
|
}
|
|
`))
|
|
|
|
if _, err := server.ProcessConfigFile(conf); err != nil {
|
|
t.Fatalf("Expected no error but got %s", err)
|
|
}
|
|
}
|
|
|
|
func TestAccountCycleServiceNonCycleChain(t *testing.T) {
|
|
conf := createConfFile(t, []byte(`
|
|
accounts {
|
|
A {
|
|
exports [ { service: help } ]
|
|
imports [ { service { subject: help, account: B } } ]
|
|
}
|
|
B {
|
|
exports [ { service: help } ]
|
|
imports [ { service { subject: help, account: C } } ]
|
|
}
|
|
C {
|
|
exports [ { service: help } ]
|
|
imports [ { service { subject: help, account: D } } ]
|
|
}
|
|
D {
|
|
exports [ { service: help } ]
|
|
}
|
|
}
|
|
`))
|
|
|
|
if _, err := server.ProcessConfigFile(conf); err != nil {
|
|
t.Fatalf("Expected no error but got %s", err)
|
|
}
|
|
}
|
|
|
|
// bug: https://github.com/nats-io/nats-server/issues/1769
|
|
func TestServiceImportReplyMatchCycle(t *testing.T) {
|
|
conf := createConfFile(t, []byte(`
|
|
port: -1
|
|
accounts {
|
|
A {
|
|
users: [{user: d, pass: x}]
|
|
imports [ {service: {account: B, subject: ">" }}]
|
|
}
|
|
B {
|
|
users: [{user: x, pass: x}]
|
|
exports [ { service: ">" } ]
|
|
}
|
|
}
|
|
no_auth_user: d
|
|
`))
|
|
|
|
s, opts := RunServerWithConfig(conf)
|
|
defer s.Shutdown()
|
|
|
|
nc1 := clientConnectToServerWithUP(t, opts, "x", "x")
|
|
defer nc1.Close()
|
|
|
|
msg := []byte("HELLO")
|
|
nc1.Subscribe("foo", func(m *nats.Msg) {
|
|
m.Respond(msg)
|
|
})
|
|
|
|
nc2 := clientConnectToServer(t, s)
|
|
defer nc2.Close()
|
|
|
|
resp, err := nc2.Request("foo", nil, time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
if resp == nil || string(resp.Data) != string(msg) {
|
|
t.Fatalf("Wrong or empty response")
|
|
}
|
|
}
|
|
|
|
func TestServiceImportReplyMatchCycleMultiHops(t *testing.T) {
|
|
conf := createConfFile(t, []byte(`
|
|
port: -1
|
|
accounts {
|
|
A {
|
|
users: [{user: d, pass: x}]
|
|
imports [ {service: {account: B, subject: ">" }}]
|
|
}
|
|
B {
|
|
exports [ { service: ">" } ]
|
|
imports [ {service: {account: C, subject: ">" }}]
|
|
}
|
|
C {
|
|
users: [{user: x, pass: x}]
|
|
exports [ { service: ">" } ]
|
|
}
|
|
}
|
|
no_auth_user: d
|
|
`))
|
|
|
|
s, opts := RunServerWithConfig(conf)
|
|
defer s.Shutdown()
|
|
|
|
nc1 := clientConnectToServerWithUP(t, opts, "x", "x")
|
|
defer nc1.Close()
|
|
|
|
msg := []byte("HELLO")
|
|
nc1.Subscribe("foo", func(m *nats.Msg) {
|
|
m.Respond(msg)
|
|
})
|
|
|
|
nc2 := clientConnectToServer(t, s)
|
|
defer nc2.Close()
|
|
|
|
resp, err := nc2.Request("foo", nil, time.Second)
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
if resp == nil || string(resp.Data) != string(msg) {
|
|
t.Fatalf("Wrong or empty response")
|
|
}
|
|
}
|
|
|
|
// Go's stack are infinite sans memory, but not call depth. However its good to limit.
|
|
func TestAccountCycleDepthLimit(t *testing.T) {
|
|
var last *server.Account
|
|
chainLen := server.MaxAccountCycleSearchDepth + 1
|
|
|
|
// Services
|
|
for i := 1; i <= chainLen; i++ {
|
|
acc := server.NewAccount(fmt.Sprintf("ACC-%d", i))
|
|
if err := acc.AddServiceExport("*", nil); err != nil {
|
|
t.Fatalf("Error adding service export to '*': %v", err)
|
|
}
|
|
if last != nil {
|
|
err := acc.AddServiceImport(last, "foo", "foo")
|
|
switch i {
|
|
case chainLen:
|
|
if err != server.ErrCycleSearchDepth {
|
|
t.Fatalf("Expected last import to fail with '%v', but got '%v'", server.ErrCycleSearchDepth, err)
|
|
}
|
|
default:
|
|
if err != nil {
|
|
t.Fatalf("Error adding service import to 'foo': %v", err)
|
|
}
|
|
}
|
|
}
|
|
last = acc
|
|
}
|
|
|
|
last = nil
|
|
|
|
// Streams
|
|
for i := 1; i <= chainLen; i++ {
|
|
acc := server.NewAccount(fmt.Sprintf("ACC-%d", i))
|
|
if err := acc.AddStreamExport("foo", nil); err != nil {
|
|
t.Fatalf("Error adding stream export to '*': %v", err)
|
|
}
|
|
if last != nil {
|
|
err := acc.AddStreamImport(last, "foo", "")
|
|
switch i {
|
|
case chainLen:
|
|
if err != server.ErrCycleSearchDepth {
|
|
t.Fatalf("Expected last import to fail with '%v', but got '%v'", server.ErrCycleSearchDepth, err)
|
|
}
|
|
default:
|
|
if err != nil {
|
|
t.Fatalf("Error adding stream import to 'foo': %v", err)
|
|
}
|
|
}
|
|
}
|
|
last = acc
|
|
}
|
|
}
|
|
|
|
// Test token and partition subject mapping within an account
|
|
func TestAccountSubjectMapping(t *testing.T) {
|
|
conf := createConfFile(t, []byte(`
|
|
port: -1
|
|
mappings = {
|
|
"foo.*.*" : "foo.$1.{{wildcard(2)}}.{{partition(10,1,2)}}"
|
|
}
|
|
`))
|
|
|
|
s, _ := RunServerWithConfig(conf)
|
|
defer s.Shutdown()
|
|
|
|
nc1 := clientConnectToServer(t, s)
|
|
defer nc1.Close()
|
|
|
|
numMessages := 100
|
|
subjectsReceived := make(chan string)
|
|
|
|
msg := []byte("HELLO")
|
|
sub1, err := nc1.Subscribe("foo.*.*.*", func(m *nats.Msg) {
|
|
subjectsReceived <- m.Subject
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
sub1.AutoUnsubscribe(numMessages * 2)
|
|
|
|
nc2 := clientConnectToServer(t, s)
|
|
defer nc2.Close()
|
|
|
|
// publish numMessages with an increasing id (should map to partition numbers with the range of 10 partitions) - twice
|
|
for j := 0; j < 2; j++ {
|
|
for i := 0; i < numMessages; i++ {
|
|
err = nc2.Publish(fmt.Sprintf("foo.%d.%d", i, numMessages-i), msg)
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// verify all the partition numbers are in the expected range
|
|
partitionsReceived := make([]int, numMessages)
|
|
|
|
for i := 0; i < numMessages; i++ {
|
|
subject := <-subjectsReceived
|
|
sTokens := strings.Split(subject, ".")
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
t1, _ := strconv.Atoi(sTokens[1])
|
|
t2, _ := strconv.Atoi(sTokens[2])
|
|
partitionsReceived[i], err = strconv.Atoi(sTokens[3])
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
|
|
if partitionsReceived[i] > 9 || partitionsReceived[i] < 0 || t1 != i || t2 != numMessages-i {
|
|
t.Fatalf("Error received unexpected %d.%d to partition %d", t1, t2, partitionsReceived[i])
|
|
}
|
|
}
|
|
|
|
// verify hashing is deterministic by checking it produces the same exact result twice
|
|
for i := 0; i < numMessages; i++ {
|
|
subject := <-subjectsReceived
|
|
partitionNumber, err := strconv.Atoi(strings.Split(subject, ".")[3])
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
if partitionsReceived[i] != partitionNumber {
|
|
t.Fatalf("Error: same id mapped to two different partitions")
|
|
}
|
|
}
|
|
}
|
|
|
|
// test token subject mapping within an account
|
|
// Alice imports from Bob with subject mapping
|
|
func TestAccountImportSubjectMapping(t *testing.T) {
|
|
conf := createConfFile(t, []byte(`
|
|
port: -1
|
|
accounts {
|
|
A {
|
|
users: [{user: a, pass: x}]
|
|
imports [ {stream: {account: B, subject: "foo.*.*"}, to : "foo.$1.{{wildcard(2)}}"}]
|
|
}
|
|
B {
|
|
users: [{user: b, pass x}]
|
|
exports [ { stream: ">" } ]
|
|
}
|
|
}
|
|
`))
|
|
|
|
s, opts := RunServerWithConfig(conf)
|
|
|
|
defer s.Shutdown()
|
|
ncA := clientConnectToServerWithUP(t, opts, "a", "x")
|
|
defer ncA.Close()
|
|
|
|
numMessages := 100
|
|
subjectsReceived := make(chan string)
|
|
|
|
msg := []byte("HELLO")
|
|
sub1, err := ncA.Subscribe("foo.*.*", func(m *nats.Msg) {
|
|
subjectsReceived <- m.Subject
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
sub1.AutoUnsubscribe(numMessages)
|
|
|
|
ncB := clientConnectToServerWithUP(t, opts, "b", "x")
|
|
defer ncB.Close()
|
|
|
|
// publish numMessages with an increasing id
|
|
|
|
for i := 0; i < numMessages; i++ {
|
|
err = ncB.Publish(fmt.Sprintf("foo.%d.%d", i, numMessages-i), msg)
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
}
|
|
|
|
for i := 0; i < numMessages; i++ {
|
|
subject := <-subjectsReceived
|
|
sTokens := strings.Split(subject, ".")
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
t1, _ := strconv.Atoi(sTokens[1])
|
|
t2, _ := strconv.Atoi(sTokens[2])
|
|
|
|
if t1 != i || t2 != numMessages-i {
|
|
t.Fatalf("Error received unexpected %d.%d", t1, t2)
|
|
}
|
|
}
|
|
}
|
|
|
|
func clientConnectToServer(t *testing.T, s *server.Server) *nats.Conn {
|
|
t.Helper()
|
|
nc, err := nats.Connect(s.ClientURL(),
|
|
nats.Name("JS-TEST"),
|
|
nats.ReconnectWait(5*time.Millisecond),
|
|
nats.MaxReconnects(-1))
|
|
if err != nil {
|
|
t.Fatalf("Failed to create client: %v", err)
|
|
}
|
|
return nc
|
|
}
|
|
|
|
func clientConnectToServerWithUP(t *testing.T, opts *server.Options, user, pass string) *nats.Conn {
|
|
curl := fmt.Sprintf("nats://%s:%s@%s:%d", user, pass, opts.Host, opts.Port)
|
|
nc, err := nats.Connect(curl, nats.Name("JS-UP-TEST"), nats.ReconnectWait(5*time.Millisecond), nats.MaxReconnects(-1))
|
|
if err != nil {
|
|
t.Fatalf("Failed to create client: %v", err)
|
|
}
|
|
return nc
|
|
}
|