mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Speed up test and make it more robust (#1569)
* Speed up test and make it more robust Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
@@ -2886,7 +2886,7 @@ func TestExpiredUserCredentialsRenewal(t *testing.T) {
|
||||
|
||||
func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
require_NextMsg := func(sub *nats.Subscription) bool {
|
||||
msg := natsNexMsg(t, sub, 10*time.Second)
|
||||
msg := natsNexMsg(t, sub, time.Second)
|
||||
content := make(map[string]interface{})
|
||||
json.Unmarshal(msg.Data, &content)
|
||||
if _, ok := content["data"]; ok {
|
||||
@@ -2949,22 +2949,24 @@ func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
nc := natsConnect(t, url, nats.UserCredentials(credsfile))
|
||||
nc.Close()
|
||||
}
|
||||
createAccountAndUser := func(limit bool) (string, string, string, string) {
|
||||
createAccountAndUser := func(limit bool, done chan struct{}, pubKey, jwt1, jwt2, creds *string) {
|
||||
t.Helper()
|
||||
kp, _ := nkeys.CreateAccount()
|
||||
pub, _ := kp.PublicKey()
|
||||
claim := jwt.NewAccountClaims(pub)
|
||||
*pubKey, _ = kp.PublicKey()
|
||||
claim := jwt.NewAccountClaims(*pubKey)
|
||||
if limit {
|
||||
claim.Limits.Conn = 1
|
||||
}
|
||||
jwt1, err := claim.Encode(oKp)
|
||||
var err error
|
||||
*jwt1, err = claim.Encode(oKp)
|
||||
require_NoError(t, err)
|
||||
time.Sleep(2 * time.Second)
|
||||
// need to assure that create time differs (resolution is sec)
|
||||
time.Sleep(time.Millisecond * 1100)
|
||||
// create updated claim allowing more connections
|
||||
if limit {
|
||||
claim.Limits.Conn = 2
|
||||
}
|
||||
jwt2, err := claim.Encode(oKp)
|
||||
*jwt2, err = claim.Encode(oKp)
|
||||
require_NoError(t, err)
|
||||
ukp, _ := nkeys.CreateUser()
|
||||
seed, _ := ukp.Seed()
|
||||
@@ -2973,7 +2975,8 @@ func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
uclaim.Subject = upub
|
||||
ujwt, err := uclaim.Encode(kp)
|
||||
require_NoError(t, err)
|
||||
return pub, jwt1, jwt2, genCredsFile(t, ujwt, seed)
|
||||
*creds = genCredsFile(t, ujwt, seed)
|
||||
done <- struct{}{}
|
||||
}
|
||||
updateJwt := func(url string, creds string, pubKey string, jwt string) int {
|
||||
t.Helper()
|
||||
@@ -3005,15 +3008,29 @@ func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
}
|
||||
return passCnt
|
||||
}
|
||||
// Create Accounts and corresponding user creds
|
||||
syspub, sysjwt, _, sysCreds := createAccountAndUser(false)
|
||||
// Create Accounts and corresponding user creds. Do so concurrently to speed up the test
|
||||
doneChan := make(chan struct{}, 5)
|
||||
defer close(doneChan)
|
||||
var syspub, sysjwt, dummy1, sysCreds string
|
||||
go createAccountAndUser(false, doneChan, &syspub, &sysjwt, &dummy1, &sysCreds)
|
||||
var apub, ajwt1, ajwt2, aCreds string
|
||||
go createAccountAndUser(true, doneChan, &apub, &ajwt1, &ajwt2, &aCreds)
|
||||
var bpub, bjwt1, bjwt2, bCreds string
|
||||
go createAccountAndUser(true, doneChan, &bpub, &bjwt1, &bjwt2, &bCreds)
|
||||
var cpub, cjwt1, cjwt2, cCreds string
|
||||
go createAccountAndUser(true, doneChan, &cpub, &cjwt1, &cjwt2, &cCreds)
|
||||
var dpub, djwt1, dummy2, dCreds string // extra user used later in the test in order to test limits
|
||||
go createAccountAndUser(true, doneChan, &dpub, &djwt1, &dummy2, &dCreds)
|
||||
<-doneChan
|
||||
<-doneChan
|
||||
<-doneChan
|
||||
<-doneChan
|
||||
<-doneChan
|
||||
defer os.Remove(sysCreds)
|
||||
apub, ajwt1, ajwt2, aCreds := createAccountAndUser(true)
|
||||
defer os.Remove(aCreds)
|
||||
bpub, bjwt1, bjwt2, bCreds := createAccountAndUser(true)
|
||||
defer os.Remove(bCreds)
|
||||
cpub, cjwt1, cjwt2, cCreds := createAccountAndUser(true)
|
||||
defer os.Remove(cCreds)
|
||||
defer os.Remove(dCreds)
|
||||
// Create one directory for each server
|
||||
dirA := createDir("srv-a")
|
||||
defer os.RemoveAll(dirA)
|
||||
@@ -3037,7 +3054,7 @@ func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
resolver: {
|
||||
type: full
|
||||
dir: %s
|
||||
interval: "1s"
|
||||
interval: "200ms"
|
||||
limit: 4
|
||||
}
|
||||
resolver_preload: {
|
||||
@@ -3063,7 +3080,7 @@ func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
resolver: {
|
||||
type: full
|
||||
dir: %s
|
||||
interval: "1s"
|
||||
interval: "200ms"
|
||||
limit: 4
|
||||
}
|
||||
cluster {
|
||||
@@ -3078,7 +3095,7 @@ func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
defer os.Remove(confB)
|
||||
sB, _ := RunServerWithConfig(confB)
|
||||
// Create Server C (using no_advertise to prevent fail over)
|
||||
confC := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
fmtC := `
|
||||
listen: -1
|
||||
server_name: srv-C
|
||||
operator: %s
|
||||
@@ -3086,7 +3103,7 @@ func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
resolver: {
|
||||
type: cache
|
||||
dir: %s
|
||||
ttl: "10s"
|
||||
ttl: "%dms"
|
||||
limit: 4
|
||||
}
|
||||
cluster {
|
||||
@@ -3097,13 +3114,15 @@ func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
nats-route://localhost:%d
|
||||
]
|
||||
}
|
||||
`, ojwt, syspub, dirC, sA.opts.Cluster.Port)))
|
||||
defer os.Remove(confC)
|
||||
sC, _ := RunServerWithConfig(confC)
|
||||
`
|
||||
confClongTTL := createConfFile(t, []byte(fmt.Sprintf(fmtC, ojwt, syspub, dirC, 10000, sA.opts.Cluster.Port)))
|
||||
defer os.Remove(confClongTTL)
|
||||
confCshortTTL := createConfFile(t, []byte(fmt.Sprintf(fmtC, ojwt, syspub, dirC, 1000, sA.opts.Cluster.Port)))
|
||||
defer os.Remove(confCshortTTL)
|
||||
sC, _ := RunServerWithConfig(confClongTTL) // use long ttl to assure it is not kicking
|
||||
// startup cluster
|
||||
checkClusterFormed(t, sA, sB, sC)
|
||||
time.Sleep(2 * time.Second) // wait for the protocol to converge
|
||||
|
||||
time.Sleep(500 * time.Millisecond) // wait for the protocol to converge
|
||||
// Check all accounts
|
||||
require_FilePresent(dirA, apub) // was already present on startup
|
||||
require_FilePresent(dirB, apub) // was copied from server A
|
||||
@@ -3163,32 +3182,38 @@ func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
sB, _ = RunServerWithConfig(confB)
|
||||
defer sB.Shutdown()
|
||||
checkClusterFormed(t, sA, sB, sC)
|
||||
time.Sleep(2 * time.Second) // wait for the protocol to converge, will also assure that C's cache becomes invalid
|
||||
time.Sleep(500 * time.Millisecond) // wait for the protocol to converge
|
||||
// Restart server C. this is a workaround to force C to do a lookup in the absence of account cleanup
|
||||
sC.Shutdown()
|
||||
sC, _ = RunServerWithConfig(confC) //TODO remove this once we clean up accounts
|
||||
|
||||
require_FileEqual(dirA, apub, ajwt2) // was copied from server B
|
||||
require_FileEqual(dirB, apub, ajwt2) // was restarted with this
|
||||
require_FileEqual(dirC, apub, ajwt1) // still contains old cached value
|
||||
sC, _ = RunServerWithConfig(confClongTTL) //TODO remove this once we clean up accounts
|
||||
require_FileEqual(dirA, apub, ajwt2) // was copied from server B
|
||||
require_FileEqual(dirB, apub, ajwt2) // was restarted with this
|
||||
require_FileEqual(dirC, apub, ajwt1) // still contains old cached value
|
||||
require_2Connection(sA.ClientURL(), aCreds)
|
||||
require_2Connection(sB.ClientURL(), aCreds)
|
||||
require_1Connection(sC.ClientURL(), aCreds)
|
||||
|
||||
// Restart server C. this is a workaround to force C to do a lookup in the absence of account cleanup
|
||||
sC.Shutdown()
|
||||
sC, _ = RunServerWithConfig(confC) //TODO remove this once we clean up accounts
|
||||
sC, _ = RunServerWithConfig(confCshortTTL) //TODO remove this once we clean up accounts
|
||||
defer sC.Shutdown()
|
||||
require_FileEqual(dirC, apub, ajwt1) // still contains old cached value
|
||||
time.Sleep(time.Second * 12) // Force next connect to do a lookup
|
||||
checkClusterFormed(t, sA, sB, sC)
|
||||
// Force next connect to do a lookup exceeds ttl
|
||||
fname := filepath.Join(dirC, apub+".jwt")
|
||||
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
|
||||
_, err := os.Stat(fname)
|
||||
if os.IsNotExist(err) {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("File not removed in time")
|
||||
})
|
||||
connect(sC.ClientURL(), aCreds) // When lookup happens
|
||||
require_FileEqual(dirC, apub, ajwt2) // was looked up form A or B
|
||||
require_2Connection(sC.ClientURL(), aCreds)
|
||||
|
||||
// Test exceeding limit. For the exclusive directory resolver, limit is a stop gap measure.
|
||||
// It is not expected to be hit. When hit the administrator is supposed to take action.
|
||||
dpub, djwt1, _, dCreds := createAccountAndUser(true)
|
||||
defer os.Remove(dCreds)
|
||||
passCnt = updateJwt(sA.ClientURL(), sysCreds, dpub, djwt1)
|
||||
require_True(t, passCnt == 1) // Only Server C updated
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user