mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Added support for clustered account info and limit enforcement
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -578,6 +578,16 @@ func getHash(name string) []byte {
|
||||
return getHashSize(name, sysHashLen)
|
||||
}
|
||||
|
||||
// Returns the node name for this server which is a hash of the server name.
|
||||
func (s *Server) Node() string {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.sys != nil {
|
||||
return s.sys.shash
|
||||
}
|
||||
return _EMPTY_
|
||||
}
|
||||
|
||||
// This will setup our system wide tracking subs.
|
||||
// For now we will setup one wildcard subscription to
|
||||
// monitor all accounts for changes in number of connections.
|
||||
|
||||
@@ -15,6 +15,7 @@ package server
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
@@ -77,13 +78,24 @@ type jsAccount struct {
|
||||
account *Account
|
||||
limits JetStreamAccountLimits
|
||||
memReserved int64
|
||||
memUsed int64
|
||||
storeReserved int64
|
||||
storeUsed int64
|
||||
memTotal int64
|
||||
storeTotal int64
|
||||
usage jsaUsage
|
||||
rusage map[string]*jsaUsage
|
||||
storeDir string
|
||||
streams map[string]*Stream
|
||||
templates map[string]*StreamTemplate
|
||||
store TemplateStore
|
||||
|
||||
// Cluster support
|
||||
updatesPub string
|
||||
updatesSub *subscription
|
||||
}
|
||||
|
||||
type jsaUsage struct {
|
||||
mem int64
|
||||
store int64
|
||||
}
|
||||
|
||||
// EnableJetStream will enable JetStream support on this server with the given configuration.
|
||||
@@ -418,7 +430,6 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error {
|
||||
if s == nil {
|
||||
return fmt.Errorf("jetstream account not registered")
|
||||
}
|
||||
// FIXME(dlc) - cluster mode
|
||||
js := s.getJetStream()
|
||||
if js == nil {
|
||||
return ErrJetStreamNotEnabled
|
||||
@@ -448,6 +459,14 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error {
|
||||
js.reserveResources(limits)
|
||||
js.mu.Unlock()
|
||||
|
||||
sysNode := s.Node()
|
||||
|
||||
// Cluster mode updates to resource usages, but we always will turn on. System internal prevents echos.
|
||||
jsa.mu.Lock()
|
||||
jsa.updatesPub = fmt.Sprintf(jsaUpdatesPubT, a.Name, sysNode)
|
||||
jsa.updatesSub, _ = s.sysSubscribe(fmt.Sprintf(jsaUpdatesSubT, a.Name), jsa.remoteUpdateUsage)
|
||||
jsa.mu.Unlock()
|
||||
|
||||
// Stamp inside account as well.
|
||||
a.mu.Lock()
|
||||
a.js = jsa
|
||||
@@ -763,17 +782,23 @@ func diffCheckedLimits(a, b *JetStreamAccountLimits) JetStreamAccountLimits {
|
||||
// JetStreamUsage reports on JetStream usage and limits for an account.
|
||||
func (a *Account) JetStreamUsage() JetStreamAccountStats {
|
||||
a.mu.RLock()
|
||||
jsa := a.js
|
||||
jsa, aname := a.js, a.Name
|
||||
a.mu.RUnlock()
|
||||
|
||||
var stats JetStreamAccountStats
|
||||
if jsa != nil {
|
||||
jsa.mu.Lock()
|
||||
stats.Memory = uint64(jsa.memUsed)
|
||||
stats.Store = uint64(jsa.storeUsed)
|
||||
stats.Streams = len(jsa.streams)
|
||||
jsa.mu.RLock()
|
||||
stats.Memory = uint64(jsa.memTotal)
|
||||
stats.Store = uint64(jsa.storeTotal)
|
||||
if cc := jsa.js.cluster; cc != nil {
|
||||
jsa.js.mu.RLock()
|
||||
stats.Streams = len(cc.streams[aname])
|
||||
jsa.js.mu.RUnlock()
|
||||
} else {
|
||||
stats.Streams = len(jsa.streams)
|
||||
}
|
||||
stats.Limits = jsa.limits
|
||||
jsa.mu.Unlock()
|
||||
jsa.mu.RUnlock()
|
||||
}
|
||||
return stats
|
||||
}
|
||||
@@ -829,32 +854,83 @@ func (a *Account) JetStreamEnabled() bool {
|
||||
return enabled
|
||||
}
|
||||
|
||||
// Updates accounting on in use memory and storage.
|
||||
func (jsa *jsAccount) remoteUpdateUsage(sub *subscription, c *client, subject, _ string, msg []byte) {
|
||||
const usageSize = 16
|
||||
|
||||
jsa.mu.Lock()
|
||||
s := jsa.js.srv
|
||||
if len(msg) != usageSize {
|
||||
jsa.mu.Unlock()
|
||||
s.Warnf("Received remote usage update that is wrong size: %d vs %d", len(msg), usageSize)
|
||||
return
|
||||
}
|
||||
var rnode string
|
||||
if li := strings.LastIndexByte(subject, btsep); li > 0 && li < len(subject) {
|
||||
rnode = subject[li+1:]
|
||||
}
|
||||
if rnode == _EMPTY_ {
|
||||
jsa.mu.Unlock()
|
||||
s.Warnf("Received remote usage update with no remote node")
|
||||
return
|
||||
}
|
||||
var le = binary.LittleEndian
|
||||
memUsed, storeUsed := int64(le.Uint64(msg[0:])), int64(le.Uint64(msg[8:]))
|
||||
|
||||
if jsa.rusage == nil {
|
||||
jsa.rusage = make(map[string]*jsaUsage)
|
||||
}
|
||||
// Update the usage for this remote.
|
||||
if usage := jsa.rusage[rnode]; usage != nil {
|
||||
// Decrement our old values.
|
||||
jsa.memTotal -= usage.mem
|
||||
jsa.storeTotal -= usage.store
|
||||
usage.mem, usage.store = memUsed, storeUsed
|
||||
} else {
|
||||
jsa.rusage[rnode] = &jsaUsage{memUsed, storeUsed}
|
||||
}
|
||||
jsa.memTotal += memUsed
|
||||
jsa.storeTotal += storeUsed
|
||||
|
||||
jsa.mu.Unlock()
|
||||
}
|
||||
|
||||
// Updates accounting on in use memory and storage. This is called from locally
|
||||
// by the lower storage layers.
|
||||
func (jsa *jsAccount) updateUsage(storeType StorageType, delta int64) {
|
||||
// TODO(dlc) - atomics? snapshot limits?
|
||||
jsa.mu.Lock()
|
||||
if storeType == MemoryStorage {
|
||||
jsa.memUsed += delta
|
||||
jsa.usage.mem += delta
|
||||
jsa.memTotal += delta
|
||||
} else {
|
||||
jsa.storeUsed += delta
|
||||
jsa.usage.store += delta
|
||||
jsa.storeTotal += delta
|
||||
}
|
||||
// Publish our local updates if in clustered mode.
|
||||
if jsa.js != nil && jsa.js.cluster != nil && jsa.js.srv != nil {
|
||||
s, b := jsa.js.srv, make([]byte, 16)
|
||||
var le = binary.LittleEndian
|
||||
le.PutUint64(b[0:], uint64(jsa.usage.mem))
|
||||
le.PutUint64(b[8:], uint64(jsa.usage.store))
|
||||
s.sendInternalMsgLocked(jsa.updatesPub, _EMPTY_, nil, b)
|
||||
}
|
||||
jsa.mu.Unlock()
|
||||
}
|
||||
|
||||
func (jsa *jsAccount) limitsExceeded(storeType StorageType) bool {
|
||||
var exceeded bool
|
||||
jsa.mu.Lock()
|
||||
jsa.mu.RLock()
|
||||
defer jsa.mu.RUnlock()
|
||||
|
||||
if storeType == MemoryStorage {
|
||||
if jsa.limits.MaxMemory > 0 && jsa.memUsed > jsa.limits.MaxMemory {
|
||||
exceeded = true
|
||||
if jsa.limits.MaxMemory > 0 && jsa.memTotal > jsa.limits.MaxMemory {
|
||||
return true
|
||||
}
|
||||
} else {
|
||||
if jsa.limits.MaxStore > 0 && jsa.storeUsed > jsa.limits.MaxStore {
|
||||
exceeded = true
|
||||
if jsa.limits.MaxStore > 0 && jsa.storeTotal > jsa.limits.MaxStore {
|
||||
return true
|
||||
}
|
||||
}
|
||||
jsa.mu.Unlock()
|
||||
return exceeded
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// Check if a new proposed msg set while exceed our account limits.
|
||||
@@ -905,6 +981,13 @@ func (jsa *jsAccount) delete() {
|
||||
var ts []string
|
||||
|
||||
jsa.mu.Lock()
|
||||
|
||||
if jsa.updatesSub != nil && jsa.js.srv != nil {
|
||||
s := jsa.js.srv
|
||||
s.sysUnsubscribe(jsa.updatesSub)
|
||||
jsa.updatesSub = nil
|
||||
}
|
||||
|
||||
for _, ms := range jsa.streams {
|
||||
streams = append(streams, ms)
|
||||
}
|
||||
|
||||
@@ -2008,9 +2008,6 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, subject, reply string,
|
||||
return
|
||||
}
|
||||
|
||||
js.mu.Lock()
|
||||
defer js.mu.Unlock()
|
||||
|
||||
var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
|
||||
acc, err := s.LookupAccount(ci.Account)
|
||||
if err != nil {
|
||||
@@ -2019,6 +2016,30 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, subject, reply string,
|
||||
return
|
||||
}
|
||||
|
||||
js.mu.RLock()
|
||||
numStreams := len(cc.streams[ci.Account])
|
||||
js.mu.RUnlock()
|
||||
|
||||
// Grab our jetstream account info.
|
||||
acc.mu.RLock()
|
||||
jsa := acc.js
|
||||
acc.mu.RUnlock()
|
||||
|
||||
// Check for stream limits here before proposing.
|
||||
jsa.mu.RLock()
|
||||
exceeded := jsa.limits.MaxStreams > 0 && numStreams >= jsa.limits.MaxStreams
|
||||
jsa.mu.RUnlock()
|
||||
|
||||
if exceeded {
|
||||
resp.Error = jsError(fmt.Errorf("maximum number of streams reached"))
|
||||
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
// Now process the request and proposal.
|
||||
js.mu.Lock()
|
||||
defer js.mu.Unlock()
|
||||
|
||||
if sa := js.streamAssignment(ci.Account, cfg.Name); sa != nil {
|
||||
resp.Error = jsError(ErrJetStreamStreamAlreadyUsed)
|
||||
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
|
||||
@@ -2608,12 +2629,46 @@ func (mset *Stream) snapshot() []byte {
|
||||
|
||||
// processClusteredMsg will propose the inbound message to the underlying raft group.
|
||||
func (mset *Stream) processClusteredInboundMsg(subject, reply string, hdr, msg []byte) error {
|
||||
mset.mu.Lock()
|
||||
|
||||
// For possible error response.
|
||||
var response []byte
|
||||
canRespond := !mset.config.NoAck && len(reply) > 0 && mset.isLeader()
|
||||
sendq := mset.sendq
|
||||
|
||||
mset.mu.RLock()
|
||||
canRespond := !mset.config.NoAck && len(reply) > 0
|
||||
s, jsa, st, rf, sendq := mset.srv, mset.jsa, mset.config.Storage, mset.config.Replicas, mset.sendq
|
||||
mset.mu.RUnlock()
|
||||
|
||||
// Check here pre-emptively if we have exceeded our account limits.
|
||||
var exceeded bool
|
||||
jsa.mu.RLock()
|
||||
if st == MemoryStorage {
|
||||
total := jsa.storeTotal + int64(memStoreMsgSize(subject, hdr, msg)*uint64(rf))
|
||||
if jsa.limits.MaxMemory > 0 && total > jsa.limits.MaxMemory {
|
||||
exceeded = true
|
||||
}
|
||||
} else {
|
||||
total := jsa.storeTotal + int64(fileStoreMsgSize(subject, hdr, msg)*uint64(rf))
|
||||
if jsa.limits.MaxStore > 0 && total > jsa.limits.MaxStore {
|
||||
exceeded = true
|
||||
}
|
||||
}
|
||||
jsa.mu.RUnlock()
|
||||
|
||||
// If we have exceeded our account limits go ahead and return.
|
||||
if exceeded {
|
||||
err := fmt.Errorf("JetStream resource limits exceeded for account: %q", jsa.acc().Name)
|
||||
s.Warnf(err.Error())
|
||||
if canRespond {
|
||||
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: mset.Name()}}
|
||||
resp.Error = &ApiError{Code: 400, Description: "resource limits exceeded for account"}
|
||||
response, _ = json.Marshal(resp)
|
||||
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Proceed with proposing this message.
|
||||
mset.mu.Lock()
|
||||
|
||||
// We only use mset.nlseq for clustering and in case we run ahead of actual commits.
|
||||
// Check if we need to set initial value here
|
||||
@@ -2634,7 +2689,7 @@ func (mset *Stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
|
||||
mset.mu.Unlock()
|
||||
|
||||
// If we errored out respond here.
|
||||
if err != nil && len(response) > 0 {
|
||||
if err != nil && canRespond {
|
||||
sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
|
||||
}
|
||||
|
||||
@@ -2994,5 +3049,9 @@ func syncSubject(pre string) string {
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
const clusterStreamInfoT = "$JSC.SI.%s.%s"
|
||||
const clusterConsumerInfoT = "$JSC.CI.%s.%s.%s"
|
||||
const (
|
||||
clusterStreamInfoT = "$JSC.SI.%s.%s"
|
||||
clusterConsumerInfoT = "$JSC.CI.%s.%s.%s"
|
||||
jsaUpdatesSubT = "$JSC.ARU.%s.*"
|
||||
jsaUpdatesPubT = "$JSC.ARU.%s.%s"
|
||||
)
|
||||
|
||||
@@ -1507,6 +1507,88 @@ func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestJetStreamClusterAccountInfoAndLimits(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "R3S", 5)
|
||||
defer c.shutdown()
|
||||
|
||||
// Adjust our limits.
|
||||
c.updateLimits("$G", &server.JetStreamAccountLimits{
|
||||
MaxMemory: 1024,
|
||||
MaxStore: 8000,
|
||||
MaxStreams: 3,
|
||||
MaxConsumers: 1,
|
||||
})
|
||||
|
||||
// Client based API
|
||||
s := c.randomServer()
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
if _, err := js.AddStream(&nats.StreamConfig{Name: "foo", Replicas: 1}); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if _, err := js.AddStream(&nats.StreamConfig{Name: "bar", Replicas: 2}); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if _, err := js.AddStream(&nats.StreamConfig{Name: "baz", Replicas: 3}); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
sendBatch := func(subject string, n int) {
|
||||
t.Helper()
|
||||
for i := 0; i < n; i++ {
|
||||
if _, err := js.Publish(subject, []byte("JSC-OK")); err != nil {
|
||||
t.Fatalf("Unexpected publish error: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sendBatch("foo", 25)
|
||||
sendBatch("bar", 75)
|
||||
sendBatch("baz", 10)
|
||||
|
||||
accountStats := func() *server.JetStreamAccountStats {
|
||||
t.Helper()
|
||||
resp, err := nc.Request(server.JSApiAccountInfo, nil, time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
var info server.JSApiAccountInfoResponse
|
||||
if err := json.Unmarshal(resp.Data, &info); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if info.Error != nil {
|
||||
t.Fatalf("Unexpected error: %+v", info.Error)
|
||||
}
|
||||
if info.JetStreamAccountStats == nil {
|
||||
t.Fatalf("AccountStats missing")
|
||||
}
|
||||
return info.JetStreamAccountStats
|
||||
}
|
||||
|
||||
// If subject is not 3 letters or payload not 2 this needs to change.
|
||||
const msgSize = uint64(22 + 3 + 6 + 8)
|
||||
|
||||
stats := accountStats()
|
||||
if stats.Streams != 3 {
|
||||
t.Fatalf("Should have been tracking 3 streams, found %d", stats.Streams)
|
||||
}
|
||||
expectedSize := 25*msgSize + 75*msgSize*2 + 10*msgSize*3
|
||||
if stats.Store != expectedSize {
|
||||
t.Fatalf("Expected store size to be %d, got %+v\n", expectedSize, stats)
|
||||
}
|
||||
|
||||
// Check limit enforcement.
|
||||
if _, err := js.AddStream(&nats.StreamConfig{Name: "fail", Replicas: 3}); err == nil {
|
||||
t.Fatalf("Expected an error but got none")
|
||||
}
|
||||
|
||||
// We should be at 7995 at the moment with a limit of 8000, so any message will go over.
|
||||
if _, err := js.Publish("baz", []byte("JSC-NOT-OK")); err == nil {
|
||||
t.Fatalf("Expected publish error but got none")
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamClusterStreamPerf(t *testing.T) {
|
||||
// Comment out to run, holding place for now.
|
||||
skip(t)
|
||||
@@ -1575,7 +1657,7 @@ func TestJetStreamClusterStreamPerf(t *testing.T) {
|
||||
var jsClusterTempl = `
|
||||
listen: 127.0.0.1:-1
|
||||
server_name: %s
|
||||
jetstream: {max_mem_store: 16GB, max_file_store: 10TB, store_dir: "%s"}
|
||||
jetstream: {max_mem_store: 2GB, max_file_store: 1GB, store_dir: "%s"}
|
||||
cluster {
|
||||
name: %s
|
||||
listen: 127.0.0.1:%d
|
||||
@@ -1632,6 +1714,20 @@ func (c *cluster) addInNewServer() *server.Server {
|
||||
return s
|
||||
}
|
||||
|
||||
// Adjust limits for the given account.
|
||||
func (c *cluster) updateLimits(account string, newLimits *server.JetStreamAccountLimits) {
|
||||
c.t.Helper()
|
||||
for _, s := range c.servers {
|
||||
acc, err := s.LookupAccount(account)
|
||||
if err != nil {
|
||||
c.t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if err := acc.UpdateJetStreamLimits(newLimits); err != nil {
|
||||
c.t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Hack for staticcheck
|
||||
var skip = func(t *testing.T) {
|
||||
t.SkipNow()
|
||||
|
||||
Reference in New Issue
Block a user