diff --git a/server/jetstream.go b/server/jetstream.go index a44aeba0..00bc1b07 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -49,12 +49,14 @@ type JetStreamConfig struct { } type JetStreamStats struct { - Memory uint64 `json:"memory"` - Store uint64 `json:"storage"` - Accounts int `json:"accounts,omitempty"` - API JetStreamAPIStats `json:"api"` - ReservedMemory uint64 `json:"reserved_memory"` - ReservedStore uint64 `json:"reserved_storage"` + Memory uint64 `json:"memory"` + Store uint64 `json:"storage"` + ReservedMemoryUsed uint64 `json:"reserved_memory_used,omitempty"` + ReserveStoreUsed uint64 `json:"reserved_storage_used,omitempty"` + Accounts int `json:"accounts,omitempty"` + API JetStreamAPIStats `json:"api"` + ReservedMemory uint64 `json:"reserved_memory,omitempty"` + ReservedStore uint64 `json:"reserved_storage,omitempty"` } type JetStreamAccountLimits struct { @@ -89,6 +91,8 @@ type jetStream struct { apiErrors int64 memTotal int64 storeTotal int64 + memTotalRes int64 + storeTotalRes int64 mu sync.RWMutex srv *Server config JetStreamConfig @@ -530,12 +534,14 @@ func (s *Server) enableJetStreamAccounts() error { // If we have no configured accounts setup then setup imports on global account. if s.globalAccountOnly() { gacc := s.GlobalAccount() + gacc.mu.Lock() + if gacc.jsLimits == nil { + gacc.jsLimits = dynamicJSAccountLimits + } + gacc.mu.Unlock() if err := s.configJetStream(gacc); err != nil { return err } - if err := gacc.EnableJetStream(nil); err != nil { - return fmt.Errorf("Error enabling jetstream on the global account") - } } else if err := s.configAllJetStreamAccounts(); err != nil { return fmt.Errorf("Error enabling jetstream on configured accounts: %v", err) } @@ -1294,6 +1300,20 @@ func (a *Account) UpdateJetStreamLimits(limits *JetStreamAccountLimits) error { // FIXME(dlc) - If we drop and are over the max on memory or store, do we delete?? js.releaseResources(&jsaLimits) js.reserveResources(limits) + if jsaLimits.MaxMemory >= 0 && limits.MaxMemory < 0 { + // we had a reserve and are now dropping it + atomic.AddInt64(&js.memTotalRes, -jsa.memTotal) + } else if jsaLimits.MaxMemory < 0 && limits.MaxMemory >= 0 { + // we had no reserve and are now adding it + atomic.AddInt64(&js.memTotalRes, jsa.memTotal) + } + if jsaLimits.MaxStore >= 0 && limits.MaxStore < 0 { + // we had a reserve and are now dropping it + atomic.AddInt64(&js.storeTotalRes, -jsa.storeTotal) + } else if jsaLimits.MaxStore < 0 && limits.MaxStore >= 0 { + // we had no reserve and are now adding it + atomic.AddInt64(&js.storeTotalRes, jsa.storeTotal) + } js.mu.Unlock() // Update @@ -1464,10 +1484,16 @@ func (jsa *jsAccount) updateUsage(storeType StorageType, delta int64) { jsa.usage.mem += delta jsa.memTotal += delta atomic.AddInt64(&js.memTotal, delta) + if jsa.limits.MaxMemory > 0 { + atomic.AddInt64(&js.memTotalRes, delta) + } } else { jsa.usage.store += delta jsa.storeTotal += delta atomic.AddInt64(&js.storeTotal, delta) + if jsa.limits.MaxStore > 0 { + atomic.AddInt64(&js.storeTotalRes, delta) + } } // Publish our local updates if in clustered mode. if js.cluster != nil { @@ -1665,6 +1691,8 @@ func (js *jetStream) usageStats() *JetStreamStats { stats.API.Errors = (uint64)(atomic.LoadInt64(&js.apiErrors)) stats.Memory = (uint64)(atomic.LoadInt64(&js.memTotal)) stats.Store = (uint64)(atomic.LoadInt64(&js.storeTotal)) + stats.ReservedMemoryUsed = (uint64)(atomic.LoadInt64(&js.memTotalRes)) + stats.ReserveStoreUsed = (uint64)(atomic.LoadInt64(&js.storeTotalRes)) return &stats } @@ -1713,17 +1741,6 @@ func (js *jetStream) releaseResources(limits *JetStreamAccountLimits) error { return nil } -// Will clear the resource reservations. Mostly for reload of a config. -func (js *jetStream) clearResources() { - if js == nil { - return - } - js.mu.Lock() - js.memReserved = 0 - js.storeReserved = 0 - js.mu.Unlock() -} - const ( // JetStreamStoreDir is the prefix we use. JetStreamStoreDir = "jetstream" diff --git a/server/monitor_test.go b/server/monitor_test.go index 9c932c3d..d1e133f0 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -4153,3 +4153,107 @@ func TestMonitorJsz(t *testing.T) { } }) } + +func TestMonitorJszAccountReserves(t *testing.T) { + readJsInfo := func(url string) *JSInfo { + t.Helper() + body := readBody(t, url) + info := &JSInfo{} + err := json.Unmarshal(body, info) + require_NoError(t, err) + return info + } + tmpDir := createDir(t, "srv") + defer removeDir(t, tmpDir) + tmplCfg := ` + listen: 127.0.0.1:-1 + http_port: 7501 + system_account: SYS + jetstream: { + store_dir: %s + } + accounts { + SYS { users [{user: sys, password: pwd}] } + ACC { + users [{user: usr, password: pwd}] + %s + } + }` + + conf := createConfFile(t, []byte(fmt.Sprintf(tmplCfg, tmpDir, "jetstream: enabled"))) + defer removeFile(t, conf) + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + checkForJSClusterUp(t, s) + + nc := natsConnect(t, fmt.Sprintf("nats://usr:pwd@127.0.0.1:%d", s.opts.Port)) + defer nc.Close() + js, err := nc.JetStream(nats.MaxWait(5 * time.Second)) + require_NoError(t, err) + for _, v := range []struct { + subject string + storage nats.StorageType + }{ + {"file", nats.FileStorage}, + {"mem", nats.MemoryStorage}} { + _, err = js.AddStream(&nats.StreamConfig{ + Name: v.subject, + Subjects: []string{v.subject}, + Replicas: 1, + Storage: v.storage, + }) + require_NoError(t, err) + require_NoError(t, nc.Flush()) + } + + send := func() { + for _, subj := range []string{"file", "mem"} { + _, err = js.Publish(subj, []byte("hello world "+subj)) + require_NoError(t, err) + } + require_NoError(t, nc.Flush()) + } + + test := func(msgs, reservedMemory, reservedStore uint64, totalIsReserveUsd bool) { + t.Helper() + info := readJsInfo(fmt.Sprintf("http://127.0.0.1:%d/jsz", s.opts.HTTPPort)) + if info.Streams != 2 { + t.Fatalf("expected stream count to be 1 but got %d", info.Streams) + } + if info.Messages != msgs { + t.Fatalf("expected one message but got %d", info.Messages) + } + if info.ReservedStore != reservedStore { + t.Fatalf("expected %d bytes reserved, got %d bytes", reservedStore, info.ReservedStore) + } + if info.ReservedMemory != reservedMemory { + t.Fatalf("expected %d bytes reserved, got %d bytes", reservedMemory, info.ReservedStore) + } + if info.Memory == 0 { + t.Fatalf("memory expected to be not 0") + } + if info.Store == 0 { + t.Fatalf("store expected to be not 0") + } + memory, store := uint64(0), uint64(0) + if totalIsReserveUsd { + memory = info.Memory + store = info.Store + } + if info.ReservedMemoryUsed != memory { + t.Fatalf("expected %d bytes reserved memory used, got %d bytes", memory, info.ReservedMemoryUsed) + } + if info.ReserveStoreUsed != store { + t.Fatalf("expected %d bytes reserved store used, got %d bytes", store, info.ReserveStoreUsed) + } + } + + send() + test(2, 0, 0, false) + reloadUpdateConfig(t, s, conf, fmt.Sprintf(tmplCfg, tmpDir, "jetstream: {max_mem: 4Mb, max_store: 5Mb}")) + test(2, 4*1024*1024, 5*1024*1024, true) + send() + test(4, 4*1024*1024, 5*1024*1024, true) + reloadUpdateConfig(t, s, conf, fmt.Sprintf(tmplCfg, tmpDir, "jetstream: enabled")) + test(4, 0, 0, false) +} diff --git a/server/reload.go b/server/reload.go index af3dccea..9b03be56 100644 --- a/server/reload.go +++ b/server/reload.go @@ -1614,7 +1614,6 @@ func (s *Server) reloadAuthorization() { // We will double check all JetStream configs on a reload. if checkJetStream { - s.getJetStream().clearResources() if err := s.enableJetStreamAccounts(); err != nil { s.Errorf(err.Error()) }