Merge branch 'main' into dev

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-08-01 21:46:54 -07:00
9 changed files with 269 additions and 29 deletions

View File

@@ -31,6 +31,7 @@ import (
"testing"
"time"
"github.com/nats-io/jwt/v2"
"github.com/nats-io/nats.go"
)
@@ -399,7 +400,7 @@ func TestJetStreamClusterNegativeReplicas(t *testing.T) {
})
require_NoError(t, err)
// Check upadte now.
// Check update now.
_, err = js.UpdateStream(&nats.StreamConfig{
Name: name,
Replicas: -11,
@@ -4588,3 +4589,221 @@ func TestJetStreamClusterBadEncryptKey(t *testing.T) {
t.Fatalf("Expected server not to start")
}
}
func TestJetStreamAccountUsageDrifts(t *testing.T) {
tmpl := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
leaf {
listen: 127.0.0.1:-1
}
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
`
opFrag := `
operator: %s
system_account: %s
resolver: { type: MEM }
resolver_preload = {
%s : %s
%s : %s
}
`
_, syspub := createKey(t)
sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub)
accKp, aExpPub := createKey(t)
accClaim := jwt.NewAccountClaims(aExpPub)
accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{
DiskStorage: -1, Consumer: 1, Streams: 1}
accClaim.Limits.JetStreamTieredLimits["R3"] = jwt.JetStreamLimits{
DiskStorage: -1, Consumer: 1, Streams: 1}
accJwt := encodeClaim(t, accClaim, aExpPub)
accCreds := newUser(t, accKp)
template := tmpl + fmt.Sprintf(opFrag, ojwt, syspub, syspub, sysJwt, aExpPub, accJwt)
c := createJetStreamClusterWithTemplate(t, template, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer(), nats.UserCredentials(accCreds))
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST1",
Subjects: []string{"foo"},
MaxBytes: 1 * 1024 * 1024 * 1024,
MaxMsgs: 1000,
Replicas: 3,
})
require_NoError(t, err)
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST2",
Subjects: []string{"bar"},
})
require_NoError(t, err)
// These expected store values can come directory from stream info's state bytes.
// We will *= 3 for R3
checkAccount := func(r1u, r3u uint64) {
t.Helper()
r3u *= 3
// Remote usage updates can be delayed, so wait for a bit for values we want.
checkFor(t, 5*time.Second, 250*time.Millisecond, func() error {
info, err := js.AccountInfo()
require_NoError(t, err)
require_True(t, len(info.Tiers) >= 2)
// These can move.
if u := info.Tiers["R1"].Store; u != r1u {
return fmt.Errorf("Expected R1 to be %v, got %v", friendlyBytes(r1u), friendlyBytes(u))
}
if u := info.Tiers["R3"].Store; u != r3u {
return fmt.Errorf("Expected R3 to be %v, got %v", friendlyBytes(r3u), friendlyBytes(u))
}
return nil
})
}
checkAccount(0, 0)
// Now add in some R3 data.
msg := bytes.Repeat([]byte("Z"), 32*1024) // 32k
smallMsg := bytes.Repeat([]byte("Z"), 4*1024) // 4k
for i := 0; i < 1000; i++ {
js.Publish("foo", msg)
}
sir3, err := js.StreamInfo("TEST1")
require_NoError(t, err)
checkAccount(0, sir3.State.Bytes)
// Now add in some R1 data.
for i := 0; i < 100; i++ {
js.Publish("bar", msg)
}
sir1, err := js.StreamInfo("TEST2")
require_NoError(t, err)
checkAccount(sir1.State.Bytes, sir3.State.Bytes)
// We will now test a bunch of scenarios to see that we are doing accounting correctly.
// Since our R3 has a limit of 1000 msgs, let's add in more msgs and drop older ones.
for i := 0; i < 100; i++ {
js.Publish("foo", smallMsg)
}
sir3, err = js.StreamInfo("TEST1")
require_NoError(t, err)
checkAccount(sir1.State.Bytes, sir3.State.Bytes)
// Move our R3 stream leader and make sure acounting is correct.
_, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST1"), nil, time.Second)
require_NoError(t, err)
checkAccount(sir1.State.Bytes, sir3.State.Bytes)
// Now scale down.
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST1",
Subjects: []string{"foo"},
MaxBytes: 1 * 1024 * 1024 * 1024,
MaxMsgs: 1000,
Replicas: 1,
})
require_NoError(t, err)
checkAccount(sir1.State.Bytes+sir3.State.Bytes, 0)
// Add in more msgs which will replace the older and bigger ones.
for i := 0; i < 100; i++ {
js.Publish("foo", smallMsg)
}
sir3, err = js.StreamInfo("TEST1")
require_NoError(t, err)
// Now scale back up.
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST1",
Subjects: []string{"foo"},
MaxBytes: 1 * 1024 * 1024 * 1024,
MaxMsgs: 1000,
Replicas: 3,
})
require_NoError(t, err)
checkAccount(sir1.State.Bytes, sir3.State.Bytes)
// Test Purge.
err = js.PurgeStream("TEST1")
require_NoError(t, err)
checkAccount(sir1.State.Bytes, 0)
for i := 0; i < 1000; i++ {
js.Publish("foo", smallMsg)
}
sir3, err = js.StreamInfo("TEST1")
require_NoError(t, err)
checkAccount(sir1.State.Bytes, sir3.State.Bytes)
requestLeaderStepDown := func() {
ml := c.leader()
checkFor(t, 5*time.Second, 250*time.Millisecond, func() error {
if cml := c.leader(); cml == ml {
nc.Request(JSApiLeaderStepDown, nil, time.Second)
return fmt.Errorf("Metaleader has not moved yet")
}
return nil
})
}
// Test meta leader stepdowns.
for i := 0; i < len(c.servers); i++ {
requestLeaderStepDown()
checkAccount(sir1.State.Bytes, sir3.State.Bytes)
}
// Now test cluster reset operations where we internally reset the NRG and optionally the stream too.
nl := c.randomNonStreamLeader(aExpPub, "TEST1")
acc, err := nl.LookupAccount(aExpPub)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST1")
require_NoError(t, err)
// NRG only
mset.resetClusteredState(nil)
checkAccount(sir1.State.Bytes, sir3.State.Bytes)
// Now NRG and Stream state itself.
mset.resetClusteredState(errFirstSequenceMismatch)
checkAccount(sir1.State.Bytes, sir3.State.Bytes)
// Now test server restart
for _, s := range c.servers {
s.Shutdown()
s.WaitForShutdown()
s = c.restartServer(s)
// Wait on healthz and leader etc.
checkFor(t, 10*time.Second, 200*time.Millisecond, func() error {
if hs := s.healthz(nil); hs.Error != _EMPTY_ {
return errors.New(hs.Error)
}
return nil
})
c.waitOnLeader()
c.waitOnStreamLeader(aExpPub, "TEST1")
c.waitOnStreamLeader(aExpPub, "TEST2")
// Now check account again.
checkAccount(sir1.State.Bytes, sir3.State.Bytes)
}
}