mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Merge pull request #2555 from nats-io/reserved_memory_usage
[added] value to JS stats showing memory used from accounts with reservations
This commit is contained in:
@@ -49,12 +49,14 @@ type JetStreamConfig struct {
|
||||
}
|
||||
|
||||
type JetStreamStats struct {
|
||||
Memory uint64 `json:"memory"`
|
||||
Store uint64 `json:"storage"`
|
||||
Accounts int `json:"accounts,omitempty"`
|
||||
API JetStreamAPIStats `json:"api"`
|
||||
ReservedMemory uint64 `json:"reserved_memory"`
|
||||
ReservedStore uint64 `json:"reserved_storage"`
|
||||
Memory uint64 `json:"memory"`
|
||||
Store uint64 `json:"storage"`
|
||||
ReservedMemoryUsed uint64 `json:"reserved_memory_used,omitempty"`
|
||||
ReserveStoreUsed uint64 `json:"reserved_storage_used,omitempty"`
|
||||
Accounts int `json:"accounts,omitempty"`
|
||||
API JetStreamAPIStats `json:"api"`
|
||||
ReservedMemory uint64 `json:"reserved_memory,omitempty"`
|
||||
ReservedStore uint64 `json:"reserved_storage,omitempty"`
|
||||
}
|
||||
|
||||
type JetStreamAccountLimits struct {
|
||||
@@ -89,6 +91,8 @@ type jetStream struct {
|
||||
apiErrors int64
|
||||
memTotal int64
|
||||
storeTotal int64
|
||||
memTotalRes int64
|
||||
storeTotalRes int64
|
||||
mu sync.RWMutex
|
||||
srv *Server
|
||||
config JetStreamConfig
|
||||
@@ -530,12 +534,14 @@ func (s *Server) enableJetStreamAccounts() error {
|
||||
// If we have no configured accounts setup then setup imports on global account.
|
||||
if s.globalAccountOnly() {
|
||||
gacc := s.GlobalAccount()
|
||||
gacc.mu.Lock()
|
||||
if gacc.jsLimits == nil {
|
||||
gacc.jsLimits = dynamicJSAccountLimits
|
||||
}
|
||||
gacc.mu.Unlock()
|
||||
if err := s.configJetStream(gacc); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := gacc.EnableJetStream(nil); err != nil {
|
||||
return fmt.Errorf("Error enabling jetstream on the global account")
|
||||
}
|
||||
} else if err := s.configAllJetStreamAccounts(); err != nil {
|
||||
return fmt.Errorf("Error enabling jetstream on configured accounts: %v", err)
|
||||
}
|
||||
@@ -1294,6 +1300,20 @@ func (a *Account) UpdateJetStreamLimits(limits *JetStreamAccountLimits) error {
|
||||
// FIXME(dlc) - If we drop and are over the max on memory or store, do we delete??
|
||||
js.releaseResources(&jsaLimits)
|
||||
js.reserveResources(limits)
|
||||
if jsaLimits.MaxMemory >= 0 && limits.MaxMemory < 0 {
|
||||
// we had a reserve and are now dropping it
|
||||
atomic.AddInt64(&js.memTotalRes, -jsa.memTotal)
|
||||
} else if jsaLimits.MaxMemory < 0 && limits.MaxMemory >= 0 {
|
||||
// we had no reserve and are now adding it
|
||||
atomic.AddInt64(&js.memTotalRes, jsa.memTotal)
|
||||
}
|
||||
if jsaLimits.MaxStore >= 0 && limits.MaxStore < 0 {
|
||||
// we had a reserve and are now dropping it
|
||||
atomic.AddInt64(&js.storeTotalRes, -jsa.storeTotal)
|
||||
} else if jsaLimits.MaxStore < 0 && limits.MaxStore >= 0 {
|
||||
// we had no reserve and are now adding it
|
||||
atomic.AddInt64(&js.storeTotalRes, jsa.storeTotal)
|
||||
}
|
||||
js.mu.Unlock()
|
||||
|
||||
// Update
|
||||
@@ -1464,10 +1484,16 @@ func (jsa *jsAccount) updateUsage(storeType StorageType, delta int64) {
|
||||
jsa.usage.mem += delta
|
||||
jsa.memTotal += delta
|
||||
atomic.AddInt64(&js.memTotal, delta)
|
||||
if jsa.limits.MaxMemory > 0 {
|
||||
atomic.AddInt64(&js.memTotalRes, delta)
|
||||
}
|
||||
} else {
|
||||
jsa.usage.store += delta
|
||||
jsa.storeTotal += delta
|
||||
atomic.AddInt64(&js.storeTotal, delta)
|
||||
if jsa.limits.MaxStore > 0 {
|
||||
atomic.AddInt64(&js.storeTotalRes, delta)
|
||||
}
|
||||
}
|
||||
// Publish our local updates if in clustered mode.
|
||||
if js.cluster != nil {
|
||||
@@ -1665,6 +1691,8 @@ func (js *jetStream) usageStats() *JetStreamStats {
|
||||
stats.API.Errors = (uint64)(atomic.LoadInt64(&js.apiErrors))
|
||||
stats.Memory = (uint64)(atomic.LoadInt64(&js.memTotal))
|
||||
stats.Store = (uint64)(atomic.LoadInt64(&js.storeTotal))
|
||||
stats.ReservedMemoryUsed = (uint64)(atomic.LoadInt64(&js.memTotalRes))
|
||||
stats.ReserveStoreUsed = (uint64)(atomic.LoadInt64(&js.storeTotalRes))
|
||||
return &stats
|
||||
}
|
||||
|
||||
@@ -1713,17 +1741,6 @@ func (js *jetStream) releaseResources(limits *JetStreamAccountLimits) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Will clear the resource reservations. Mostly for reload of a config.
|
||||
func (js *jetStream) clearResources() {
|
||||
if js == nil {
|
||||
return
|
||||
}
|
||||
js.mu.Lock()
|
||||
js.memReserved = 0
|
||||
js.storeReserved = 0
|
||||
js.mu.Unlock()
|
||||
}
|
||||
|
||||
const (
|
||||
// JetStreamStoreDir is the prefix we use.
|
||||
JetStreamStoreDir = "jetstream"
|
||||
|
||||
@@ -4153,3 +4153,107 @@ func TestMonitorJsz(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestMonitorJszAccountReserves(t *testing.T) {
|
||||
readJsInfo := func(url string) *JSInfo {
|
||||
t.Helper()
|
||||
body := readBody(t, url)
|
||||
info := &JSInfo{}
|
||||
err := json.Unmarshal(body, info)
|
||||
require_NoError(t, err)
|
||||
return info
|
||||
}
|
||||
tmpDir := createDir(t, "srv")
|
||||
defer removeDir(t, tmpDir)
|
||||
tmplCfg := `
|
||||
listen: 127.0.0.1:-1
|
||||
http_port: 7501
|
||||
system_account: SYS
|
||||
jetstream: {
|
||||
store_dir: %s
|
||||
}
|
||||
accounts {
|
||||
SYS { users [{user: sys, password: pwd}] }
|
||||
ACC {
|
||||
users [{user: usr, password: pwd}]
|
||||
%s
|
||||
}
|
||||
}`
|
||||
|
||||
conf := createConfFile(t, []byte(fmt.Sprintf(tmplCfg, tmpDir, "jetstream: enabled")))
|
||||
defer removeFile(t, conf)
|
||||
s, _ := RunServerWithConfig(conf)
|
||||
defer s.Shutdown()
|
||||
checkForJSClusterUp(t, s)
|
||||
|
||||
nc := natsConnect(t, fmt.Sprintf("nats://usr:pwd@127.0.0.1:%d", s.opts.Port))
|
||||
defer nc.Close()
|
||||
js, err := nc.JetStream(nats.MaxWait(5 * time.Second))
|
||||
require_NoError(t, err)
|
||||
for _, v := range []struct {
|
||||
subject string
|
||||
storage nats.StorageType
|
||||
}{
|
||||
{"file", nats.FileStorage},
|
||||
{"mem", nats.MemoryStorage}} {
|
||||
_, err = js.AddStream(&nats.StreamConfig{
|
||||
Name: v.subject,
|
||||
Subjects: []string{v.subject},
|
||||
Replicas: 1,
|
||||
Storage: v.storage,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
require_NoError(t, nc.Flush())
|
||||
}
|
||||
|
||||
send := func() {
|
||||
for _, subj := range []string{"file", "mem"} {
|
||||
_, err = js.Publish(subj, []byte("hello world "+subj))
|
||||
require_NoError(t, err)
|
||||
}
|
||||
require_NoError(t, nc.Flush())
|
||||
}
|
||||
|
||||
test := func(msgs, reservedMemory, reservedStore uint64, totalIsReserveUsd bool) {
|
||||
t.Helper()
|
||||
info := readJsInfo(fmt.Sprintf("http://127.0.0.1:%d/jsz", s.opts.HTTPPort))
|
||||
if info.Streams != 2 {
|
||||
t.Fatalf("expected stream count to be 1 but got %d", info.Streams)
|
||||
}
|
||||
if info.Messages != msgs {
|
||||
t.Fatalf("expected one message but got %d", info.Messages)
|
||||
}
|
||||
if info.ReservedStore != reservedStore {
|
||||
t.Fatalf("expected %d bytes reserved, got %d bytes", reservedStore, info.ReservedStore)
|
||||
}
|
||||
if info.ReservedMemory != reservedMemory {
|
||||
t.Fatalf("expected %d bytes reserved, got %d bytes", reservedMemory, info.ReservedStore)
|
||||
}
|
||||
if info.Memory == 0 {
|
||||
t.Fatalf("memory expected to be not 0")
|
||||
}
|
||||
if info.Store == 0 {
|
||||
t.Fatalf("store expected to be not 0")
|
||||
}
|
||||
memory, store := uint64(0), uint64(0)
|
||||
if totalIsReserveUsd {
|
||||
memory = info.Memory
|
||||
store = info.Store
|
||||
}
|
||||
if info.ReservedMemoryUsed != memory {
|
||||
t.Fatalf("expected %d bytes reserved memory used, got %d bytes", memory, info.ReservedMemoryUsed)
|
||||
}
|
||||
if info.ReserveStoreUsed != store {
|
||||
t.Fatalf("expected %d bytes reserved store used, got %d bytes", store, info.ReserveStoreUsed)
|
||||
}
|
||||
}
|
||||
|
||||
send()
|
||||
test(2, 0, 0, false)
|
||||
reloadUpdateConfig(t, s, conf, fmt.Sprintf(tmplCfg, tmpDir, "jetstream: {max_mem: 4Mb, max_store: 5Mb}"))
|
||||
test(2, 4*1024*1024, 5*1024*1024, true)
|
||||
send()
|
||||
test(4, 4*1024*1024, 5*1024*1024, true)
|
||||
reloadUpdateConfig(t, s, conf, fmt.Sprintf(tmplCfg, tmpDir, "jetstream: enabled"))
|
||||
test(4, 0, 0, false)
|
||||
}
|
||||
|
||||
@@ -1614,7 +1614,6 @@ func (s *Server) reloadAuthorization() {
|
||||
|
||||
// We will double check all JetStream configs on a reload.
|
||||
if checkJetStream {
|
||||
s.getJetStream().clearResources()
|
||||
if err := s.enableJetStreamAccounts(); err != nil {
|
||||
s.Errorf(err.Error())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user