[ADDED] Multiple routes and ability to have per-account routes

New configuration fields:
```
cluster {
   ...
   pool_size: 5
   accounts: ["A", "B"]
}
```

The configuration `pool_size` in the example above means that this
server will create 5 routes to a remote server, assuming that that
server has the same `pool_size` setting.

Accounts (which are not part of the `accounts[]` configuration)
are assigned a specific route in this pool, and this will be the
same route on all servers in the cluster.

Accounts that are defined in the `accounts` field will each have
a dedicated route connection. This will allow suppression of the
account name in some of the route protocols, reducing bytes transmitted
which may increase performance.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2023-03-20 16:48:39 -06:00
parent 70f6caa060
commit 105237cba8
34 changed files with 4590 additions and 722 deletions

View File

@@ -1346,12 +1346,12 @@ func TestConnzWithRoutes(t *testing.T) {
for mode := 0; mode < 2; mode++ {
rz := pollRoutez(t, s, mode, url+urlSuffix, &RoutezOptions{Subscriptions: subs == 1, SubscriptionsDetail: subs == 2})
if rz.NumRoutes != 1 {
t.Fatalf("Expected 1 route, got %d", rz.NumRoutes)
if rz.NumRoutes != DEFAULT_ROUTE_POOL_SIZE {
t.Fatalf("Expected %d route, got %d", DEFAULT_ROUTE_POOL_SIZE, rz.NumRoutes)
}
if len(rz.Routes) != 1 {
t.Fatalf("Expected route array of 1, got %v", len(rz.Routes))
if len(rz.Routes) != DEFAULT_ROUTE_POOL_SIZE {
t.Fatalf("Expected route array of %d, got %v", DEFAULT_ROUTE_POOL_SIZE, len(rz.Routes))
}
route := rz.Routes[0]
@@ -2328,6 +2328,7 @@ func TestRoutezPermissions(t *testing.T) {
defer s1.Shutdown()
opts = DefaultMonitorOptions()
opts.NoSystemAccount = true
opts.ServerName = "monitor_server_2"
opts.Cluster.Host = "127.0.0.1"
opts.Cluster.Name = "A"
@@ -2371,8 +2372,8 @@ func TestRoutezPermissions(t *testing.T) {
t.Fatal("Routez body should NOT contain \"export\" information.")
}
// We do expect to see them show up for the information we have on Server A though.
if len(rz.Routes) != 1 {
t.Fatalf("Expected route array of 1, got %v\n", len(rz.Routes))
if len(rz.Routes) != DEFAULT_ROUTE_POOL_SIZE {
t.Fatalf("Expected route array of %d, got %v\n", DEFAULT_ROUTE_POOL_SIZE, len(rz.Routes))
}
route := rz.Routes[0]
if route.Import == nil || route.Import.Allow == nil ||
@@ -2577,6 +2578,7 @@ func TestMonitorCluster(t *testing.T) {
opts.Cluster.TLSTimeout,
opts.Cluster.TLSConfig != nil,
opts.Cluster.TLSConfig != nil,
DEFAULT_ROUTE_POOL_SIZE,
}
varzURL := fmt.Sprintf("http://127.0.0.1:%d/varz", s.MonitorAddr().Port)
@@ -2592,7 +2594,7 @@ func TestMonitorCluster(t *testing.T) {
// Having this here to make sure that if fields are added in ClusterOptsVarz,
// we make sure to update this test (compiler will report an error if we don't)
_ = ClusterOptsVarz{"", "", 0, 0, nil, 2, false, false}
_ = ClusterOptsVarz{"", "", 0, 0, nil, 2, false, false, 0}
// Alter the fields to make sure that we have a proper deep copy
// of what may be stored in the server. Anything we change here
@@ -3608,12 +3610,14 @@ func TestMonitorRouteRTT(t *testing.T) {
for pollMode := 0; pollMode < 2; pollMode++ {
checkFor(t, 2*firstPingInterval, 15*time.Millisecond, func() error {
rz := pollRoutez(t, s, pollMode, routezURL, nil)
if len(rz.Routes) != 1 {
return fmt.Errorf("Expected 1 route, got %v", len(rz.Routes))
// Pool size + 1 for system account
if len(rz.Routes) != DEFAULT_ROUTE_POOL_SIZE+1 {
return fmt.Errorf("Expected %d route, got %v", DEFAULT_ROUTE_POOL_SIZE+1, len(rz.Routes))
}
ri := rz.Routes[0]
if ri.RTT == _EMPTY_ {
return fmt.Errorf("Route's RTT not reported")
for _, ri := range rz.Routes {
if ri.RTT == _EMPTY_ {
return fmt.Errorf("Route's RTT not reported")
}
}
return nil
})
@@ -4672,3 +4676,178 @@ func TestMonitorProfilez(t *testing.T) {
}
}
}
func TestMonitorRoutePoolSize(t *testing.T) {
conf1 := createConfFile(t, []byte(`
port: -1
http: -1
cluster {
port: -1
name: "local"
pool_size: 5
}
no_sys_acc: true
`))
defer removeFile(t, conf1)
s1, o1 := RunServerWithConfig(conf1)
defer s1.Shutdown()
conf23 := createConfFile(t, []byte(fmt.Sprintf(`
port: -1
http: -1
cluster {
port: -1
name: "local"
routes: ["nats://127.0.0.1:%d"]
pool_size: 5
}
no_sys_acc: true
`, o1.Cluster.Port)))
defer removeFile(t, conf23)
s2, _ := RunServerWithConfig(conf23)
defer s2.Shutdown()
s3, _ := RunServerWithConfig(conf23)
defer s3.Shutdown()
checkClusterFormed(t, s1, s2, s3)
for i, s := range []*Server{s1, s2, s3} {
url := fmt.Sprintf("http://127.0.0.1:%d/varz", s.MonitorAddr().Port)
for mode := 0; mode < 2; mode++ {
v := pollVarz(t, s, mode, url, nil)
if v.Cluster.PoolSize != 5 {
t.Fatalf("Expected Cluster.PoolSize==5, got %v", v.Cluster.PoolSize)
}
if v.Remotes != 2 {
t.Fatalf("Expected Remotes==2, got %v", v.Remotes)
}
if v.Routes != 10 {
t.Fatalf("Expected NumRoutes==10, got %v", v.Routes)
}
}
url = fmt.Sprintf("http://127.0.0.1:%d/routez", s.MonitorAddr().Port)
for mode := 0; mode < 2; mode++ {
v := pollRoutez(t, s, mode, url, nil)
if v.NumRoutes != 10 {
t.Fatalf("Expected NumRoutes==10, got %v", v.NumRoutes)
}
if n := len(v.Routes); n != 10 {
t.Fatalf("Expected len(Routes)==10, got %v", n)
}
remotes := make(map[string]int)
for _, r := range v.Routes {
remotes[r.RemoteID]++
}
if n := len(remotes); n != 2 {
t.Fatalf("Expected routes for 2 different servers, got %v", n)
}
switch i {
case 0:
if n := remotes[s2.ID()]; n != 5 {
t.Fatalf("Expected 5 routes from S1 to S2, got %v", n)
}
if n := remotes[s3.ID()]; n != 5 {
t.Fatalf("Expected 5 routes from S1 to S3, got %v", n)
}
case 1:
if n := remotes[s1.ID()]; n != 5 {
t.Fatalf("Expected 5 routes from S2 to S1, got %v", n)
}
if n := remotes[s3.ID()]; n != 5 {
t.Fatalf("Expected 5 routes from S2 to S3, got %v", n)
}
case 2:
if n := remotes[s1.ID()]; n != 5 {
t.Fatalf("Expected 5 routes from S3 to S1, got %v", n)
}
if n := remotes[s2.ID()]; n != 5 {
t.Fatalf("Expected 5 routes from S3 to S2, got %v", n)
}
}
}
}
}
func TestMonitorRoutePerAccount(t *testing.T) {
conf1 := createConfFile(t, []byte(`
port: -1
http: -1
accounts {
A { users: [{user: "a", password: "pwd"}] }
}
cluster {
port: -1
name: "local"
accounts: ["A"]
}
`))
defer removeFile(t, conf1)
s1, o1 := RunServerWithConfig(conf1)
defer s1.Shutdown()
conf23 := createConfFile(t, []byte(fmt.Sprintf(`
port: -1
http: -1
accounts {
A { users: [{user: "a", password: "pwd"}] }
}
cluster {
port: -1
name: "local"
routes: ["nats://127.0.0.1:%d"]
accounts: ["A"]
}
`, o1.Cluster.Port)))
defer removeFile(t, conf23)
s2, _ := RunServerWithConfig(conf23)
defer s2.Shutdown()
s3, _ := RunServerWithConfig(conf23)
defer s3.Shutdown()
checkClusterFormed(t, s1, s2, s3)
for _, s := range []*Server{s1, s2, s3} {
// Default pool size + account "A" + system account (added by default)
enr := 2 * (DEFAULT_ROUTE_POOL_SIZE + 1 + 1)
url := fmt.Sprintf("http://127.0.0.1:%d/varz", s.MonitorAddr().Port)
for mode := 0; mode < 2; mode++ {
v := pollVarz(t, s, mode, url, nil)
if v.Remotes != 2 {
t.Fatalf("Expected Remotes==2, got %v", v.Remotes)
}
if v.Routes != enr {
t.Fatalf("Expected NumRoutes==%d, got %v", enr, v.Routes)
}
}
url = fmt.Sprintf("http://127.0.0.1:%d/routez", s.MonitorAddr().Port)
for mode := 0; mode < 2; mode++ {
v := pollRoutez(t, s, mode, url, nil)
if v.NumRoutes != enr {
t.Fatalf("Expected NumRoutes==%d, got %v", enr, v.NumRoutes)
}
if n := len(v.Routes); n != enr {
t.Fatalf("Expected len(Routes)==%d, got %v", enr, n)
}
remotes := make(map[string]int)
for _, r := range v.Routes {
var acc int
if r.Account == "A" {
acc = 1
}
remotes[r.RemoteID] += acc
}
if n := len(remotes); n != 2 {
t.Fatalf("Expected routes for 2 different servers, got %v", n)
}
for remoteID, v := range remotes {
if v != 1 {
t.Fatalf("Expected one and only one connection for account A for remote %q, got %v", remoteID, v)
}
}
}
}
}