mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Merge pull request #3383 from nats-io/gw_switch_to_interest_only_right_away
[CHANGED] Gateway: Switch all accounts to interest-only mode
This commit is contained in:
@@ -22,6 +22,8 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -29,7 +31,9 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/jwt/v2"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nkeys"
|
||||
)
|
||||
|
||||
func TestJetStreamSuperClusterMetaPlacement(t *testing.T) {
|
||||
@@ -413,6 +417,9 @@ func TestJetStreamSuperClusterPeerReassign(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestJetStreamSuperClusterInterestOnlyMode(t *testing.T) {
|
||||
GatewayDoNotForceInterestOnlyMode(true)
|
||||
defer GatewayDoNotForceInterestOnlyMode(false)
|
||||
|
||||
template := `
|
||||
listen: 127.0.0.1:-1
|
||||
server_name: %s
|
||||
@@ -3619,3 +3626,210 @@ func TestJetStreamSuperClusterStreamCathupLongRTT(t *testing.T) {
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func TestJetStreamSuperClusterMixedModeSwitchToInterestOnlyStaticConfig(t *testing.T) {
|
||||
tmpl := `
|
||||
listen: 127.0.0.1:-1
|
||||
server_name: %s
|
||||
jetstream: { domain: ngs, max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
|
||||
leaf: { listen: 127.0.0.1:-1 }
|
||||
cluster {
|
||||
name: %s
|
||||
listen: 127.0.0.1:%d
|
||||
routes = [%s]
|
||||
}
|
||||
accounts {
|
||||
ONE {
|
||||
users = [ { user: "one", pass: "pwd" } ]
|
||||
jetstream: enabled
|
||||
}
|
||||
TWO { users = [ { user: "two", pass: "pwd" } ] }
|
||||
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
|
||||
}
|
||||
`
|
||||
sc := createJetStreamSuperClusterWithTemplateAndModHook(t, tmpl, 5, 3,
|
||||
func(serverName, clusterName, storeDir, conf string) string {
|
||||
sname := serverName[strings.Index(serverName, "-")+1:]
|
||||
switch sname {
|
||||
case "S4", "S5":
|
||||
conf = strings.ReplaceAll(conf, "jetstream: { ", "#jetstream: { ")
|
||||
default:
|
||||
conf = strings.ReplaceAll(conf, "leaf: { ", "#leaf: { ")
|
||||
}
|
||||
return conf
|
||||
}, nil)
|
||||
defer sc.shutdown()
|
||||
|
||||
// Connect our client to a non JS server
|
||||
c := sc.randomCluster()
|
||||
var s *Server
|
||||
for _, as := range c.servers {
|
||||
if !as.JetStreamEnabled() {
|
||||
s = as
|
||||
break
|
||||
}
|
||||
}
|
||||
if s == nil {
|
||||
t.Fatal("Did not find a non JS server!")
|
||||
}
|
||||
nc, js := jsClientConnect(t, s, nats.UserInfo("one", "pwd"))
|
||||
defer nc.Close()
|
||||
|
||||
// Just create a stream and then make sure that all gateways have switched
|
||||
// to interest-only mode.
|
||||
si, err := js.AddStream(&nats.StreamConfig{Name: "interest", Replicas: 3})
|
||||
require_NoError(t, err)
|
||||
|
||||
sc.waitOnStreamLeader("ONE", "interest")
|
||||
|
||||
check := func(accName string) {
|
||||
t.Helper()
|
||||
for _, c := range sc.clusters {
|
||||
for _, s := range c.servers {
|
||||
// Check only JS servers outbound GW connections
|
||||
if !s.JetStreamEnabled() {
|
||||
continue
|
||||
}
|
||||
opts := s.getOpts()
|
||||
for _, gw := range opts.Gateway.Gateways {
|
||||
if gw.Name == opts.Gateway.Name {
|
||||
continue
|
||||
}
|
||||
checkGWInterestOnlyMode(t, s, gw.Name, accName)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Starting v2.9.0, all accounts should be switched to interest-only mode
|
||||
check("ONE")
|
||||
check("TWO")
|
||||
|
||||
var gwsa [16]*client
|
||||
gws := gwsa[:0]
|
||||
|
||||
s = sc.serverByName(si.Cluster.Leader)
|
||||
// Get the GW outbound connections
|
||||
s.getOutboundGatewayConnections(&gws)
|
||||
for _, gwc := range gws {
|
||||
gwc.mu.Lock()
|
||||
gwc.nc.Close()
|
||||
gwc.mu.Unlock()
|
||||
}
|
||||
waitForOutboundGateways(t, s, 2, 5*time.Second)
|
||||
check("ONE")
|
||||
check("TWO")
|
||||
}
|
||||
|
||||
func TestJetStreamSuperClusterMixedModeSwitchToInterestOnlyOperatorConfig(t *testing.T) {
|
||||
kp, _ := nkeys.FromSeed(oSeed)
|
||||
|
||||
skp, _ := nkeys.CreateAccount()
|
||||
spub, _ := skp.PublicKey()
|
||||
nac := jwt.NewAccountClaims(spub)
|
||||
sjwt, err := nac.Encode(kp)
|
||||
require_NoError(t, err)
|
||||
|
||||
akp, _ := nkeys.CreateAccount()
|
||||
apub, _ := akp.PublicKey()
|
||||
nac = jwt.NewAccountClaims(apub)
|
||||
// Set some limits to enable JS.
|
||||
nac.Limits.JetStreamLimits.DiskStorage = 1024 * 1024
|
||||
nac.Limits.JetStreamLimits.Streams = 10
|
||||
ajwt, err := nac.Encode(kp)
|
||||
require_NoError(t, err)
|
||||
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if strings.HasSuffix(r.URL.Path, spub) {
|
||||
w.Write([]byte(sjwt))
|
||||
} else {
|
||||
w.Write([]byte(ajwt))
|
||||
}
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
operator := fmt.Sprintf(`
|
||||
operator: %s
|
||||
resolver: URL("%s/ngs/v1/accounts/jwt/")
|
||||
`, ojwt, ts.URL)
|
||||
|
||||
tmpl := `
|
||||
listen: 127.0.0.1:-1
|
||||
server_name: %s
|
||||
jetstream: { domain: ngs, max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
|
||||
leaf: { listen: 127.0.0.1:-1 }
|
||||
cluster {
|
||||
name: %s
|
||||
listen: 127.0.0.1:%d
|
||||
routes = [%s]
|
||||
}
|
||||
` + operator
|
||||
sc := createJetStreamSuperClusterWithTemplateAndModHook(t, tmpl, 5, 3,
|
||||
func(serverName, clusterName, storeDir, conf string) string {
|
||||
conf = strings.ReplaceAll(conf, "system_account: \"$SYS\"", fmt.Sprintf("system_account: \"%s\"", spub))
|
||||
sname := serverName[strings.Index(serverName, "-")+1:]
|
||||
switch sname {
|
||||
case "S4", "S5":
|
||||
conf = strings.ReplaceAll(conf, "jetstream: { ", "#jetstream: { ")
|
||||
default:
|
||||
conf = strings.ReplaceAll(conf, "leaf: { ", "#leaf: { ")
|
||||
}
|
||||
return conf
|
||||
}, nil)
|
||||
defer sc.shutdown()
|
||||
|
||||
// Connect our client to a non JS server
|
||||
c := sc.randomCluster()
|
||||
var s *Server
|
||||
for _, as := range c.servers {
|
||||
if !as.JetStreamEnabled() {
|
||||
s = as
|
||||
break
|
||||
}
|
||||
}
|
||||
if s == nil {
|
||||
t.Fatal("Did not find a non JS server!")
|
||||
}
|
||||
nc, js := jsClientConnect(t, s, createUserCreds(t, nil, akp))
|
||||
defer nc.Close()
|
||||
|
||||
// Just create a stream and then make sure that all gateways have switched
|
||||
// to interest-only mode.
|
||||
si, err := js.AddStream(&nats.StreamConfig{Name: "interest", Replicas: 3})
|
||||
require_NoError(t, err)
|
||||
|
||||
sc.waitOnStreamLeader(apub, "interest")
|
||||
|
||||
check := func(s *Server) {
|
||||
opts := s.getOpts()
|
||||
for _, gw := range opts.Gateway.Gateways {
|
||||
if gw.Name == opts.Gateway.Name {
|
||||
continue
|
||||
}
|
||||
checkGWInterestOnlyMode(t, s, gw.Name, apub)
|
||||
}
|
||||
}
|
||||
s = sc.serverByName(si.Cluster.Leader)
|
||||
check(s)
|
||||
|
||||
// Let's cause a leadership change and verify that it still works.
|
||||
_, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "interest"), nil, time.Second)
|
||||
require_NoError(t, err)
|
||||
sc.waitOnStreamLeader(apub, "interest")
|
||||
|
||||
si, err = js.StreamInfo("interest")
|
||||
require_NoError(t, err)
|
||||
s = sc.serverByName(si.Cluster.Leader)
|
||||
check(s)
|
||||
|
||||
var gwsa [16]*client
|
||||
gws := gwsa[:0]
|
||||
// Get the GW outbound connections
|
||||
s.getOutboundGatewayConnections(&gws)
|
||||
for _, gwc := range gws {
|
||||
gwc.mu.Lock()
|
||||
gwc.nc.Close()
|
||||
gwc.mu.Unlock()
|
||||
}
|
||||
waitForOutboundGateways(t, s, 2, 5*time.Second)
|
||||
check(s)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user