mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Moving 2 new tests to jetstream test file
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -36,8 +36,10 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/jwt/v2"
|
||||
"github.com/nats-io/nats-server/v2/server/sysmem"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nkeys"
|
||||
"github.com/nats-io/nuid"
|
||||
)
|
||||
|
||||
@@ -13197,6 +13199,98 @@ func TestJetStreamMirroredConsumerFailAfterRestart(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamDisabledLimitsEnforcementJWT(t *testing.T) {
|
||||
updateJwt := func(url string, akp nkeys.KeyPair, pubKey string, jwt string) {
|
||||
t.Helper()
|
||||
c := natsConnect(t, url, createUserCreds(t, nil, akp))
|
||||
defer c.Close()
|
||||
if msg, err := c.Request(fmt.Sprintf(accUpdateEventSubjNew, pubKey), []byte(jwt), time.Second); err != nil {
|
||||
t.Fatal("error not expected in this test", err)
|
||||
} else {
|
||||
content := make(map[string]interface{})
|
||||
if err := json.Unmarshal(msg.Data, &content); err != nil {
|
||||
t.Fatalf("%v", err)
|
||||
} else if _, ok := content["data"]; !ok {
|
||||
t.Fatalf("did not get an ok response got: %v", content)
|
||||
}
|
||||
}
|
||||
}
|
||||
// create system account
|
||||
sysKp, _ := nkeys.CreateAccount()
|
||||
sysPub, _ := sysKp.PublicKey()
|
||||
// limits to apply and check
|
||||
limits1 := jwt.JetStreamLimits{MemoryStorage: 1024, DiskStorage: 0, Streams: 1, Consumer: 2}
|
||||
akp, _ := nkeys.CreateAccount()
|
||||
aPub, _ := akp.PublicKey()
|
||||
claim := jwt.NewAccountClaims(aPub)
|
||||
claim.Limits.JetStreamLimits = limits1
|
||||
aJwt1, err := claim.Encode(oKp)
|
||||
require_NoError(t, err)
|
||||
dir := createDir(t, "srv")
|
||||
defer removeDir(t, dir)
|
||||
storeDir1 := createDir(t, JetStreamStoreDir)
|
||||
defer removeDir(t, storeDir1)
|
||||
conf := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: -1
|
||||
jetstream: {store_dir: %s}
|
||||
operator: %s
|
||||
resolver: {
|
||||
type: full
|
||||
dir: %s
|
||||
}
|
||||
system_account: %s
|
||||
`, storeDir1, ojwt, dir, sysPub)))
|
||||
defer removeFile(t, conf)
|
||||
s, _ := RunServerWithConfig(conf)
|
||||
defer s.Shutdown()
|
||||
updateJwt(s.ClientURL(), sysKp, aPub, aJwt1)
|
||||
c := natsConnect(t, s.ClientURL(), createUserCreds(t, nil, akp), nats.ReconnectWait(200*time.Millisecond))
|
||||
defer c.Close()
|
||||
// keep using the same connection
|
||||
js, err := c.JetStream()
|
||||
require_NoError(t, err)
|
||||
_, err = js.AddStream(&nats.StreamConfig{
|
||||
Name: "disk",
|
||||
Storage: nats.FileStorage,
|
||||
Subjects: []string{"disk"},
|
||||
})
|
||||
require_Error(t, err)
|
||||
}
|
||||
|
||||
func TestJetStreamDisabledLimitsEnforcement(t *testing.T) {
|
||||
storeDir1 := createDir(t, JetStreamStoreDir)
|
||||
conf1 := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: 127.0.0.1:-1
|
||||
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"}
|
||||
accounts {
|
||||
one {
|
||||
jetstream: {
|
||||
mem: 1024
|
||||
disk: 0
|
||||
streams: 1
|
||||
consumers: 2
|
||||
}
|
||||
users [{user: one, password: password}]
|
||||
}
|
||||
}
|
||||
no_auth_user: one
|
||||
`, storeDir1)))
|
||||
s, _ := RunServerWithConfig(conf1)
|
||||
defer s.Shutdown()
|
||||
|
||||
c := natsConnect(t, s.ClientURL())
|
||||
defer c.Close()
|
||||
// keep using the same connection
|
||||
js, err := c.JetStream()
|
||||
require_NoError(t, err)
|
||||
_, err = js.AddStream(&nats.StreamConfig{
|
||||
Name: "disk",
|
||||
Storage: nats.FileStorage,
|
||||
Subjects: []string{"disk"},
|
||||
})
|
||||
require_Error(t, err)
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
// Simple JetStream Benchmarks
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@@ -4375,98 +4375,6 @@ func TestJWTJetStreamLimits(t *testing.T) {
|
||||
c.Close()
|
||||
}
|
||||
|
||||
func TestJWTJetStreamDisabledLimitsEnforcement(t *testing.T) {
|
||||
updateJwt := func(url string, akp nkeys.KeyPair, pubKey string, jwt string) {
|
||||
t.Helper()
|
||||
c := natsConnect(t, url, createUserCreds(t, nil, akp))
|
||||
defer c.Close()
|
||||
if msg, err := c.Request(fmt.Sprintf(accUpdateEventSubjNew, pubKey), []byte(jwt), time.Second); err != nil {
|
||||
t.Fatal("error not expected in this test", err)
|
||||
} else {
|
||||
content := make(map[string]interface{})
|
||||
if err := json.Unmarshal(msg.Data, &content); err != nil {
|
||||
t.Fatalf("%v", err)
|
||||
} else if _, ok := content["data"]; !ok {
|
||||
t.Fatalf("did not get an ok response got: %v", content)
|
||||
}
|
||||
}
|
||||
}
|
||||
// create system account
|
||||
sysKp, _ := nkeys.CreateAccount()
|
||||
sysPub, _ := sysKp.PublicKey()
|
||||
// limits to apply and check
|
||||
limits1 := jwt.JetStreamLimits{MemoryStorage: 1024, DiskStorage: 0, Streams: 1, Consumer: 2}
|
||||
akp, _ := nkeys.CreateAccount()
|
||||
aPub, _ := akp.PublicKey()
|
||||
claim := jwt.NewAccountClaims(aPub)
|
||||
claim.Limits.JetStreamLimits = limits1
|
||||
aJwt1, err := claim.Encode(oKp)
|
||||
require_NoError(t, err)
|
||||
dir := createDir(t, "srv")
|
||||
defer removeDir(t, dir)
|
||||
storeDir1 := createDir(t, JetStreamStoreDir)
|
||||
defer removeDir(t, storeDir1)
|
||||
conf := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: -1
|
||||
jetstream: {store_dir: %s}
|
||||
operator: %s
|
||||
resolver: {
|
||||
type: full
|
||||
dir: %s
|
||||
}
|
||||
system_account: %s
|
||||
`, storeDir1, ojwt, dir, sysPub)))
|
||||
defer removeFile(t, conf)
|
||||
s, _ := RunServerWithConfig(conf)
|
||||
defer s.Shutdown()
|
||||
updateJwt(s.ClientURL(), sysKp, aPub, aJwt1)
|
||||
c := natsConnect(t, s.ClientURL(), createUserCreds(t, nil, akp), nats.ReconnectWait(200*time.Millisecond))
|
||||
defer c.Close()
|
||||
// keep using the same connection
|
||||
js, err := c.JetStream()
|
||||
require_NoError(t, err)
|
||||
_, err = js.AddStream(&nats.StreamConfig{
|
||||
Name: "disk",
|
||||
Storage: nats.FileStorage,
|
||||
Subjects: []string{"disk"},
|
||||
})
|
||||
require_Error(t, err)
|
||||
}
|
||||
|
||||
func TestJetStreamDisabledLimitsEnforcement(t *testing.T) {
|
||||
storeDir1 := createDir(t, JetStreamStoreDir)
|
||||
conf1 := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: 127.0.0.1:-1
|
||||
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"}
|
||||
accounts {
|
||||
one {
|
||||
jetstream: {
|
||||
mem: 1024
|
||||
disk: 0
|
||||
streams: 1
|
||||
consumers: 2
|
||||
}
|
||||
users [{user: one, password: password}]
|
||||
}
|
||||
}
|
||||
no_auth_user: one
|
||||
`, storeDir1)))
|
||||
s, _ := RunServerWithConfig(conf1)
|
||||
defer s.Shutdown()
|
||||
|
||||
c := natsConnect(t, s.ClientURL())
|
||||
defer c.Close()
|
||||
// keep using the same connection
|
||||
js, err := c.JetStream()
|
||||
require_NoError(t, err)
|
||||
_, err = js.AddStream(&nats.StreamConfig{
|
||||
Name: "disk",
|
||||
Storage: nats.FileStorage,
|
||||
Subjects: []string{"disk"},
|
||||
})
|
||||
require_Error(t, err)
|
||||
}
|
||||
|
||||
func TestJWTUserRevocation(t *testing.T) {
|
||||
createAccountAndUser := func(done chan struct{}, pubKey, jwt1, jwt2, creds1, creds2 *string) {
|
||||
t.Helper()
|
||||
|
||||
Reference in New Issue
Block a user