From a3be36fcd4bc45b4334431ab3ca1eca00580c5f2 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 22 Sep 2021 17:02:29 -0600 Subject: [PATCH] Moving 2 new tests to jetstream test file Signed-off-by: Ivan Kozlovic --- server/jetstream_test.go | 94 ++++++++++++++++++++++++++++++++++++++++ server/jwt_test.go | 92 --------------------------------------- 2 files changed, 94 insertions(+), 92 deletions(-) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 17fec898..25d6a529 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -36,8 +36,10 @@ import ( "testing" "time" + "github.com/nats-io/jwt/v2" "github.com/nats-io/nats-server/v2/server/sysmem" "github.com/nats-io/nats.go" + "github.com/nats-io/nkeys" "github.com/nats-io/nuid" ) @@ -13197,6 +13199,98 @@ func TestJetStreamMirroredConsumerFailAfterRestart(t *testing.T) { } } +func TestJetStreamDisabledLimitsEnforcementJWT(t *testing.T) { + updateJwt := func(url string, akp nkeys.KeyPair, pubKey string, jwt string) { + t.Helper() + c := natsConnect(t, url, createUserCreds(t, nil, akp)) + defer c.Close() + if msg, err := c.Request(fmt.Sprintf(accUpdateEventSubjNew, pubKey), []byte(jwt), time.Second); err != nil { + t.Fatal("error not expected in this test", err) + } else { + content := make(map[string]interface{}) + if err := json.Unmarshal(msg.Data, &content); err != nil { + t.Fatalf("%v", err) + } else if _, ok := content["data"]; !ok { + t.Fatalf("did not get an ok response got: %v", content) + } + } + } + // create system account + sysKp, _ := nkeys.CreateAccount() + sysPub, _ := sysKp.PublicKey() + // limits to apply and check + limits1 := jwt.JetStreamLimits{MemoryStorage: 1024, DiskStorage: 0, Streams: 1, Consumer: 2} + akp, _ := nkeys.CreateAccount() + aPub, _ := akp.PublicKey() + claim := jwt.NewAccountClaims(aPub) + claim.Limits.JetStreamLimits = limits1 + aJwt1, err := claim.Encode(oKp) + require_NoError(t, err) + dir := createDir(t, "srv") + defer removeDir(t, dir) + storeDir1 := createDir(t, JetStreamStoreDir) + defer removeDir(t, storeDir1) + conf := createConfFile(t, []byte(fmt.Sprintf(` + listen: -1 + jetstream: {store_dir: %s} + operator: %s + resolver: { + type: full + dir: %s + } + system_account: %s + `, storeDir1, ojwt, dir, sysPub))) + defer removeFile(t, conf) + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + updateJwt(s.ClientURL(), sysKp, aPub, aJwt1) + c := natsConnect(t, s.ClientURL(), createUserCreds(t, nil, akp), nats.ReconnectWait(200*time.Millisecond)) + defer c.Close() + // keep using the same connection + js, err := c.JetStream() + require_NoError(t, err) + _, err = js.AddStream(&nats.StreamConfig{ + Name: "disk", + Storage: nats.FileStorage, + Subjects: []string{"disk"}, + }) + require_Error(t, err) +} + +func TestJetStreamDisabledLimitsEnforcement(t *testing.T) { + storeDir1 := createDir(t, JetStreamStoreDir) + conf1 := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:-1 + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + accounts { + one { + jetstream: { + mem: 1024 + disk: 0 + streams: 1 + consumers: 2 + } + users [{user: one, password: password}] + } + } + no_auth_user: one + `, storeDir1))) + s, _ := RunServerWithConfig(conf1) + defer s.Shutdown() + + c := natsConnect(t, s.ClientURL()) + defer c.Close() + // keep using the same connection + js, err := c.JetStream() + require_NoError(t, err) + _, err = js.AddStream(&nats.StreamConfig{ + Name: "disk", + Storage: nats.FileStorage, + Subjects: []string{"disk"}, + }) + require_Error(t, err) +} + /////////////////////////////////////////////////////////////////////////// // Simple JetStream Benchmarks /////////////////////////////////////////////////////////////////////////// diff --git a/server/jwt_test.go b/server/jwt_test.go index c5fb0f28..2abd4ee9 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -4375,98 +4375,6 @@ func TestJWTJetStreamLimits(t *testing.T) { c.Close() } -func TestJWTJetStreamDisabledLimitsEnforcement(t *testing.T) { - updateJwt := func(url string, akp nkeys.KeyPair, pubKey string, jwt string) { - t.Helper() - c := natsConnect(t, url, createUserCreds(t, nil, akp)) - defer c.Close() - if msg, err := c.Request(fmt.Sprintf(accUpdateEventSubjNew, pubKey), []byte(jwt), time.Second); err != nil { - t.Fatal("error not expected in this test", err) - } else { - content := make(map[string]interface{}) - if err := json.Unmarshal(msg.Data, &content); err != nil { - t.Fatalf("%v", err) - } else if _, ok := content["data"]; !ok { - t.Fatalf("did not get an ok response got: %v", content) - } - } - } - // create system account - sysKp, _ := nkeys.CreateAccount() - sysPub, _ := sysKp.PublicKey() - // limits to apply and check - limits1 := jwt.JetStreamLimits{MemoryStorage: 1024, DiskStorage: 0, Streams: 1, Consumer: 2} - akp, _ := nkeys.CreateAccount() - aPub, _ := akp.PublicKey() - claim := jwt.NewAccountClaims(aPub) - claim.Limits.JetStreamLimits = limits1 - aJwt1, err := claim.Encode(oKp) - require_NoError(t, err) - dir := createDir(t, "srv") - defer removeDir(t, dir) - storeDir1 := createDir(t, JetStreamStoreDir) - defer removeDir(t, storeDir1) - conf := createConfFile(t, []byte(fmt.Sprintf(` - listen: -1 - jetstream: {store_dir: %s} - operator: %s - resolver: { - type: full - dir: %s - } - system_account: %s - `, storeDir1, ojwt, dir, sysPub))) - defer removeFile(t, conf) - s, _ := RunServerWithConfig(conf) - defer s.Shutdown() - updateJwt(s.ClientURL(), sysKp, aPub, aJwt1) - c := natsConnect(t, s.ClientURL(), createUserCreds(t, nil, akp), nats.ReconnectWait(200*time.Millisecond)) - defer c.Close() - // keep using the same connection - js, err := c.JetStream() - require_NoError(t, err) - _, err = js.AddStream(&nats.StreamConfig{ - Name: "disk", - Storage: nats.FileStorage, - Subjects: []string{"disk"}, - }) - require_Error(t, err) -} - -func TestJetStreamDisabledLimitsEnforcement(t *testing.T) { - storeDir1 := createDir(t, JetStreamStoreDir) - conf1 := createConfFile(t, []byte(fmt.Sprintf(` - listen: 127.0.0.1:-1 - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} - accounts { - one { - jetstream: { - mem: 1024 - disk: 0 - streams: 1 - consumers: 2 - } - users [{user: one, password: password}] - } - } - no_auth_user: one - `, storeDir1))) - s, _ := RunServerWithConfig(conf1) - defer s.Shutdown() - - c := natsConnect(t, s.ClientURL()) - defer c.Close() - // keep using the same connection - js, err := c.JetStream() - require_NoError(t, err) - _, err = js.AddStream(&nats.StreamConfig{ - Name: "disk", - Storage: nats.FileStorage, - Subjects: []string{"disk"}, - }) - require_Error(t, err) -} - func TestJWTUserRevocation(t *testing.T) { createAccountAndUser := func(done chan struct{}, pubKey, jwt1, jwt2, creds1, creds2 *string) { t.Helper()