Prevent reserved bytes underflow (#2907)

This commit is contained in:
Jaime Piña
2022-03-16 15:19:35 -07:00
committed by GitHub
parent 59753ec0da
commit acfd456758
2 changed files with 795 additions and 2 deletions

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"io"
"io/ioutil"
"math"
"math/rand"
"net/http"
"net/url"
@@ -6801,6 +6802,541 @@ func TestJetStreamSystemLimits(t *testing.T) {
}
}
func TestJetStreamSystemLimitsPlacement(t *testing.T) {
const smallSystemLimit = 128
const mediumSystemLimit = smallSystemLimit * 2
const largeSystemLimit = smallSystemLimit * 3
getServer := func(t *testing.T, serverName string) *Server {
storeDir, err := os.MkdirTemp(tempRoot, "jstests-storedir-")
require_NoError(t, err)
natsPort, clusterPort := 24000, 26000
systemLimit := smallSystemLimit
if serverName == "medium" {
natsPort, clusterPort = 24001, 26001
systemLimit = mediumSystemLimit
} else if serverName == "large" {
natsPort, clusterPort = 24002, 26002
systemLimit = largeSystemLimit
}
s, _ := RunServerWithConfig(createConfFile(t, []byte(fmt.Sprintf(`
server_name: %s
port: %d
jetstream: {
enabled: true
store_dir: '%s'
max_mem: %d
max_file: %d
}
server_tags: [%s]
cluster {
name: cluster-a
port: %d
routes: [
nats-route://127.0.0.1:26000
nats-route://127.0.0.1:26001
nats-route://127.0.0.1:26002
]
}
`,
serverName,
natsPort,
storeDir,
systemLimit,
systemLimit,
serverName,
clusterPort,
))))
return s
}
smallSrv := getServer(t, "small")
if config := smallSrv.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer smallSrv.Shutdown()
mediumSrv := getServer(t, "medium")
if config := mediumSrv.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer mediumSrv.Shutdown()
largeSrv := getServer(t, "large")
defer largeSrv.Shutdown()
if config := largeSrv.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer mediumSrv.Shutdown()
checkClusterFormed(t, smallSrv, mediumSrv, largeSrv)
requestLeaderStepDown := func(clientURL string) error {
nc, err := nats.Connect(clientURL)
if err != nil {
return err
}
defer nc.Close()
ncResp, err := nc.Request(JSApiLeaderStepDown, nil, time.Second)
if err != nil {
return err
}
var resp JSApiLeaderStepDownResponse
if err := json.Unmarshal(ncResp.Data, &resp); err != nil {
return err
}
if resp.Error != nil {
return resp.Error
}
if !resp.Success {
return fmt.Errorf("leader step down request not successful")
}
return nil
}
// Force large server to be leader
checkFor(t, 10*time.Second, 500*time.Millisecond, func() error {
if largeSrv.JetStreamIsLeader() {
return nil
}
if err := requestLeaderStepDown(largeSrv.ClientURL()); err != nil {
return err
}
return fmt.Errorf("large server is not leader")
})
nc, js := jsClientConnect(t, largeSrv)
defer nc.Close()
cases := []struct {
name string
storage nats.StorageType
createMaxBytes int64
serverTag string
wantErr bool
}{
{
name: "file create large stream on small server",
storage: nats.FileStorage,
createMaxBytes: largeSystemLimit,
serverTag: "small",
wantErr: true,
},
{
name: "memory create large stream on small server",
storage: nats.MemoryStorage,
createMaxBytes: largeSystemLimit,
serverTag: "small",
wantErr: true,
},
{
name: "file create large stream on medium server",
storage: nats.FileStorage,
createMaxBytes: largeSystemLimit,
serverTag: "medium",
wantErr: true,
},
{
name: "memory create large stream on medium server",
storage: nats.MemoryStorage,
createMaxBytes: largeSystemLimit,
serverTag: "medium",
wantErr: true,
},
{
name: "file create large stream on large server",
storage: nats.FileStorage,
createMaxBytes: largeSystemLimit,
serverTag: "large",
},
{
name: "memory create large stream on large server",
storage: nats.MemoryStorage,
createMaxBytes: largeSystemLimit,
serverTag: "large",
},
}
for i := 0; i < len(cases) && !t.Failed(); i++ {
c := cases[i]
t.Run(c.name, func(st *testing.T) {
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Storage: c.storage,
MaxBytes: c.createMaxBytes,
Placement: &nats.Placement{
Cluster: "cluster-a",
Tags: []string{c.serverTag},
},
})
if c.wantErr && err == nil {
st.Fatalf("unexpected stream create success, maxBytes=%d, tag=%s",
c.createMaxBytes, c.serverTag)
} else if !c.wantErr && err != nil {
st.Fatalf("unexpected error: %s", err)
}
if err == nil {
err = js.DeleteStream("TEST")
require_NoError(st, err)
}
})
}
// These next two tests should fail because although the stream fits in the
// large and medium server, it doesn't fit on the small server.
si, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Storage: nats.FileStorage,
MaxBytes: smallSystemLimit + 1,
Replicas: 3,
})
if err == nil {
t.Fatalf("unexpected file stream create success, maxBytes=%d, replicas=%d",
si.Config.MaxBytes, si.Config.Replicas)
}
si, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Storage: nats.MemoryStorage,
MaxBytes: smallSystemLimit + 1,
Replicas: 3,
})
if err == nil {
t.Fatalf("unexpected memory stream create success, maxBytes=%d, replicas=%d",
si.Config.MaxBytes, si.Config.Replicas)
}
}
func TestJetStreamSuperClusterSystemLimitsPlacement(t *testing.T) {
const largeSystemLimit = 1024
const smallSystemLimit = 512
getServer := func(t *testing.T, serverName string) *Server {
require_NoError(t, os.MkdirAll(tempRoot, 0755))
storeDir, err := os.MkdirTemp(tempRoot, "jstests-storedir-")
require_NoError(t, err)
var (
natsPort int
systemLimit int
clusterName string
clusterPort int
route1, route2, route3 int
gatewayPort int
)
switch serverName {
case "a0":
natsPort = 4000
systemLimit = largeSystemLimit
clusterName = "cluster-a"
clusterPort = 6000
route1, route2, route3 = clusterPort, clusterPort+1, clusterPort+2
gatewayPort = 7000
case "a1":
natsPort = 4001
systemLimit = largeSystemLimit
clusterName = "cluster-a"
clusterPort = 6001
route1, route2, route3 = clusterPort, clusterPort+1, clusterPort+2
gatewayPort = 7001
case "a2":
natsPort = 4002
systemLimit = largeSystemLimit
clusterName = "cluster-a"
clusterPort = 6002
route1, route2, route3 = clusterPort, clusterPort+1, clusterPort+2
gatewayPort = 7002
case "b0":
natsPort = 4100
systemLimit = smallSystemLimit
clusterName = "cluster-b"
clusterPort = 6100
route1, route2, route3 = clusterPort, clusterPort+1, clusterPort+2
gatewayPort = 7100
case "b1":
natsPort = 4101
systemLimit = smallSystemLimit
clusterName = "cluster-b"
clusterPort = 6101
route1, route2, route3 = clusterPort, clusterPort+1, clusterPort+2
gatewayPort = 7101
case "b2":
natsPort = 4102
systemLimit = smallSystemLimit
clusterName = "cluster-b"
clusterPort = 6102
route1, route2, route3 = clusterPort, clusterPort+1, clusterPort+2
gatewayPort = 7102
default:
t.Fatalf("unknown server name: %s", serverName)
}
s, _ := RunServerWithConfig(createConfFile(t, []byte(fmt.Sprintf(`
server_name: %s
port: %d
jetstream: {
enabled: true
store_dir: '%s'
max_mem: %d
max_file: %d
}
server_tags: [%s]
cluster {
name: %s
port: %d
routes: [
nats-route://127.0.0.1:%d
nats-route://127.0.0.1:%d
nats-route://127.0.0.1:%d
]
}
gateway {
name: %s
port: %d
gateways: [
{name: "cluster-a", url: "nats://localhost:7000"},
{name: "cluster-b", url: "nats://localhost:7100"},
]
}
`,
serverName,
natsPort,
storeDir,
systemLimit,
systemLimit,
serverName,
clusterName,
clusterPort,
route1, route2, route3,
clusterName,
gatewayPort,
))))
return s
}
var servers []*Server
for _, name := range []string{"a0", "a1", "a2", "b0", "b1", "b2"} {
s := getServer(t, name)
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()
servers = append(servers, s)
}
checkClusterFormed(t, servers[:3]...)
checkClusterFormed(t, servers[3:]...)
for _, s := range servers {
waitForOutboundGateways(t, s, 1, 2*time.Second)
}
requestLeaderStepDown := func(clientURL string) error {
nc, err := nats.Connect(clientURL)
if err != nil {
return err
}
defer nc.Close()
ncResp, err := nc.Request(JSApiLeaderStepDown, nil, time.Second)
if err != nil {
return err
}
var resp JSApiLeaderStepDownResponse
if err := json.Unmarshal(ncResp.Data, &resp); err != nil {
return err
}
if resp.Error != nil {
return resp.Error
}
if !resp.Success {
return fmt.Errorf("leader step down request not successful")
}
return nil
}
// Force large cluster to be leader
checkFor(t, 15*time.Second, 500*time.Millisecond, func() error {
a0 := servers[0]
if a0.JetStreamIsLeader() {
return nil
}
if err := requestLeaderStepDown(a0.ClientURL()); err != nil {
return fmt.Errorf("failed to request leader step down: %s", err)
}
return fmt.Errorf("a0 server is not leader")
})
getStreams := func(jsm nats.JetStreamManager) []string {
var streams []string
for s := range jsm.StreamNames() {
streams = append(streams, s)
}
return streams
}
nc, js := jsClientConnect(t, servers[0])
defer nc.Close()
cases := []struct {
name string
storage nats.StorageType
createMaxBytes int64
serverTag string
wantErr bool
}{
{
name: "file create large stream on small cluster b0",
storage: nats.FileStorage,
createMaxBytes: smallSystemLimit + 1,
serverTag: "b0",
wantErr: true,
},
{
name: "memory create large stream on small cluster b0",
storage: nats.MemoryStorage,
createMaxBytes: smallSystemLimit + 1,
serverTag: "b0",
wantErr: true,
},
{
name: "file create large stream on small cluster b1",
storage: nats.FileStorage,
createMaxBytes: smallSystemLimit + 1,
serverTag: "b1",
wantErr: true,
},
{
name: "memory create large stream on small cluster b1",
storage: nats.MemoryStorage,
createMaxBytes: smallSystemLimit + 1,
serverTag: "b1",
wantErr: true,
},
{
name: "file create large stream on small cluster b2",
storage: nats.FileStorage,
createMaxBytes: smallSystemLimit + 1,
serverTag: "b2",
wantErr: true,
},
{
name: "memory create large stream on small cluster b2",
storage: nats.MemoryStorage,
createMaxBytes: smallSystemLimit + 1,
serverTag: "b2",
wantErr: true,
},
{
name: "file create large stream on large cluster a0",
storage: nats.FileStorage,
createMaxBytes: smallSystemLimit + 1,
serverTag: "a0",
},
{
name: "memory create large stream on large cluster a0",
storage: nats.MemoryStorage,
createMaxBytes: smallSystemLimit + 1,
serverTag: "a0",
},
{
name: "file create large stream on large cluster a1",
storage: nats.FileStorage,
createMaxBytes: smallSystemLimit + 1,
serverTag: "a1",
},
{
name: "memory create large stream on large cluster a1",
storage: nats.MemoryStorage,
createMaxBytes: smallSystemLimit + 1,
serverTag: "a1",
},
{
name: "file create large stream on large cluster a2",
storage: nats.FileStorage,
createMaxBytes: smallSystemLimit + 1,
serverTag: "a2",
},
{
name: "memory create large stream on large cluster a2",
storage: nats.MemoryStorage,
createMaxBytes: smallSystemLimit + 1,
serverTag: "a2",
},
}
for i := 0; i < len(cases) && !t.Failed(); i++ {
c := cases[i]
t.Run(c.name, func(st *testing.T) {
var clusterName string
if strings.HasPrefix(c.serverTag, "a") {
clusterName = "cluster-a"
} else if strings.HasPrefix(c.serverTag, "b") {
clusterName = "cluster-b"
}
if s := getStreams(js); len(s) != 0 {
st.Fatalf("unexpected stream count, got=%d, want=0", len(s))
}
streamName := fmt.Sprintf("TEST-%s", c.serverTag)
si, err := js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{"foo"},
Storage: c.storage,
MaxBytes: c.createMaxBytes,
Placement: &nats.Placement{
Cluster: clusterName,
Tags: []string{c.serverTag},
},
})
if c.wantErr && err == nil {
if s := getStreams(js); len(s) != 1 {
st.Logf("unexpected stream count, got=%d, want=1, streams=%v", len(s), s)
}
cfg := si.Config
st.Fatalf("unexpected success, maxBytes=%d, cluster=%s, tags=%v",
cfg.MaxBytes, cfg.Placement.Cluster, cfg.Placement.Tags)
} else if !c.wantErr && err != nil {
if s := getStreams(js); len(s) != 0 {
st.Logf("unexpected stream count, got=%d, want=0, streams=%v", len(s), s)
}
st.Fatalf("unexpected error: %s", err)
}
if err == nil {
if s := getStreams(js); len(s) != 1 {
st.Fatalf("unexpected stream count, got=%d, want=1", len(s))
}
err = js.DeleteStream(streamName)
require_NoError(st, err)
}
})
}
}
func TestJetStreamStreamStorageTrackingAndLimits(t *testing.T) {
s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
@@ -15093,6 +15629,218 @@ func TestJetStreamPullConsumerHeartBeats(t *testing.T) {
}
}
func TestStorageReservedBytes(t *testing.T) {
const systemLimit = 1024
opts := DefaultTestOptions
opts.Port = -1
opts.JetStream = true
opts.JetStreamMaxMemory = systemLimit
opts.JetStreamMaxStore = systemLimit
tdir, _ := ioutil.TempDir(tempRoot, "jstests-storedir-")
opts.StoreDir = tdir
opts.HTTPPort = -1
s := RunServer(&opts)
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()
// Client for API requests.
nc, js := jsClientConnect(t, s)
defer nc.Close()
getJetStreamVarz := func(hc *http.Client, addr string) (JetStreamVarz, error) {
resp, err := hc.Get(addr)
if err != nil {
return JetStreamVarz{}, err
}
defer resp.Body.Close()
var v Varz
if err := json.NewDecoder(resp.Body).Decode(&v); err != nil {
return JetStreamVarz{}, err
}
return v.JetStream, nil
}
getReserved := func(hc *http.Client, addr string, st nats.StorageType) (uint64, error) {
jsv, err := getJetStreamVarz(hc, addr)
if err != nil {
return 0, err
}
if st == nats.MemoryStorage {
return jsv.Stats.ReservedMemory, nil
}
return jsv.Stats.ReservedStore, nil
}
varzAddr := fmt.Sprintf("http://127.0.0.1:%d/varz", s.MonitorAddr().Port)
hc := &http.Client{Timeout: 5 * time.Second}
jsv, err := getJetStreamVarz(hc, varzAddr)
require_NoError(t, err)
if got, want := systemLimit, int(jsv.Config.MaxMemory); got != want {
t.Fatalf("Unexpected max memory: got=%d, want=%d", got, want)
}
if got, want := systemLimit, int(jsv.Config.MaxStore); got != want {
t.Fatalf("Unexpected max store: got=%d, want=%d", got, want)
}
cases := []struct {
name string
accountLimit int64
storage nats.StorageType
createMaxBytes int64
updateMaxBytes int64
wantUpdateError bool
}{
{
name: "file reserve 66% of system limit",
accountLimit: -1,
storage: nats.FileStorage,
createMaxBytes: int64(math.Round(float64(systemLimit) * .666)),
updateMaxBytes: int64(math.Round(float64(systemLimit)*.666)) + 1,
},
{
name: "memory reserve 66% of system limit",
accountLimit: -1,
storage: nats.MemoryStorage,
createMaxBytes: int64(math.Round(float64(systemLimit) * .666)),
updateMaxBytes: int64(math.Round(float64(systemLimit)*.666)) + 1,
},
{
name: "file update past system limit",
accountLimit: -1,
storage: nats.FileStorage,
createMaxBytes: systemLimit,
updateMaxBytes: systemLimit + 1,
wantUpdateError: true,
},
{
name: "memory update past system limit",
accountLimit: -1,
storage: nats.MemoryStorage,
createMaxBytes: systemLimit,
updateMaxBytes: systemLimit + 1,
wantUpdateError: true,
},
{
name: "file update to system limit",
accountLimit: -1,
storage: nats.FileStorage,
createMaxBytes: systemLimit - 1,
updateMaxBytes: systemLimit,
},
{
name: "memory update to system limit",
accountLimit: -1,
storage: nats.MemoryStorage,
createMaxBytes: systemLimit - 1,
updateMaxBytes: systemLimit,
},
{
name: "file reserve 66% of account limit",
accountLimit: systemLimit / 2,
storage: nats.FileStorage,
createMaxBytes: int64(math.Round(float64(systemLimit/2) * .666)),
updateMaxBytes: int64(math.Round(float64(systemLimit/2)*.666)) + 1,
},
{
name: "memory reserve 66% of account limit",
accountLimit: systemLimit / 2,
storage: nats.MemoryStorage,
createMaxBytes: int64(math.Round(float64(systemLimit/2) * .666)),
updateMaxBytes: int64(math.Round(float64(systemLimit/2)*.666)) + 1,
},
// TODO: Enable these once account limits are enforced.
//{
// name: "file update past account limit",
// accountLimit: systemLimit / 2,
// storage: nats.FileStorage,
// createMaxBytes: (systemLimit / 2),
// updateMaxBytes: (systemLimit / 2) + 1,
// wantUpdateError: true,
//},
//{
// name: "memory update past account limit",
// accountLimit: systemLimit / 2,
// storage: nats.MemoryStorage,
// createMaxBytes: (systemLimit / 2),
// updateMaxBytes: (systemLimit / 2) + 1,
// wantUpdateError: true,
//},
{
name: "file update to account limit",
accountLimit: systemLimit / 2,
storage: nats.FileStorage,
createMaxBytes: (systemLimit / 2) - 1,
updateMaxBytes: (systemLimit / 2),
},
{
name: "memory update to account limit",
accountLimit: systemLimit / 2,
storage: nats.MemoryStorage,
createMaxBytes: (systemLimit / 2) - 1,
updateMaxBytes: (systemLimit / 2),
},
}
for i := 0; i < len(cases) && !t.Failed(); i++ {
c := cases[i]
t.Run(c.name, func(st *testing.T) {
// Setup limits
err = s.GlobalAccount().UpdateJetStreamLimits(&JetStreamAccountLimits{
MaxMemory: c.accountLimit,
MaxStore: c.accountLimit,
})
require_NoError(st, err)
// Create initial stream
cfg := &nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Storage: c.storage,
MaxBytes: c.createMaxBytes,
}
_, err = js.AddStream(cfg)
require_NoError(st, err)
// Update stream MaxBytes
cfg.MaxBytes = c.updateMaxBytes
info, err := js.UpdateStream(cfg)
if c.wantUpdateError && err == nil {
got := info.Config.MaxBytes
st.Fatalf("Unexpected update success, newMaxBytes=%d; systemLimit=%d; accountLimit=%d",
got, systemLimit, c.accountLimit)
} else if !c.wantUpdateError && err != nil {
st.Fatalf("Unexpected update error: %s", err)
}
if !c.wantUpdateError && err == nil {
// If update was successful, then ensure reserved shows new
// amount
reserved, err := getReserved(hc, varzAddr, c.storage)
require_NoError(st, err)
if got, want := reserved, uint64(c.updateMaxBytes); got != want {
st.Fatalf("Unexpected reserved: %d, want %d", got, want)
}
}
// Delete stream
err = js.DeleteStream("TEST")
require_NoError(st, err)
// Ensure reserved shows 0 because we've deleted the stream
reserved, err := getReserved(hc, varzAddr, c.storage)
require_NoError(st, err)
if reserved != 0 {
st.Fatalf("Unexpected reserved: %d, want 0", reserved)
}
})
}
}
func TestJetStreamRecoverStreamWithDeletedMessagesNonCleanShutdown(t *testing.T) {
s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {

View File

@@ -969,8 +969,34 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig) (*StreamConfig,
cfg.AllowRollup = false
}
// Check limits.
if err := jsa.checkAllLimits(&cfg); err != nil {
// Check limits. We need some extra handling to allow updating MaxBytes.
// First, let's calculate the difference between the new and old MaxBytes.
maxBytesDiff := cfg.MaxBytes - old.MaxBytes
if maxBytesDiff < 0 {
// If we're updating to a lower MaxBytes (maxBytesDiff is negative),
// then set to zero so checkBytesLimits doesn't set addBytes to 1.
maxBytesDiff = 0
}
// If maxBytesDiff == 0, then that means MaxBytes didn't change.
// If maxBytesDiff > 0, then we want to reserve additional bytes.
// Save the user configured MaxBytes.
newMaxBytes := cfg.MaxBytes
// We temporarily set cfg.MaxBytes to maxBytesDiff because checkAllLimits
// adds cfg.MaxBytes to the current reserved limit and checks if we've gone
// over. However, we don't want an addition cfg.MaxBytes, we only want to
// reserve the difference between the new and the old values.
cfg.MaxBytes = maxBytesDiff
// Check if we can reserve the additional difference.
err = jsa.checkAllLimits(&cfg)
// Restore the user configured MaxBytes.
cfg.MaxBytes = newMaxBytes
if err != nil {
return nil, err
}
return &cfg, nil
@@ -1046,6 +1072,8 @@ func (mset *stream) update(config *StreamConfig) error {
}
}
js := mset.js
// Now update config and store's version of our config.
mset.cfg = *cfg
@@ -1055,6 +1083,23 @@ func (mset *stream) update(config *StreamConfig) error {
}
mset.mu.Unlock()
if js != nil {
maxBytesDiff := cfg.MaxBytes - ocfg.MaxBytes
if maxBytesDiff > 0 {
// Reserve the difference
js.reserveStreamResources(&StreamConfig{
MaxBytes: maxBytesDiff,
Storage: cfg.Storage,
})
} else if maxBytesDiff < 0 {
// Release the difference
js.releaseStreamResources(&StreamConfig{
MaxBytes: -maxBytesDiff,
Storage: ocfg.Storage,
})
}
}
mset.store.UpdateConfig(cfg)
return nil