diff --git a/server/events.go b/server/events.go index 2a76192a..b0b918a5 100644 --- a/server/events.go +++ b/server/events.go @@ -578,6 +578,16 @@ func getHash(name string) []byte { return getHashSize(name, sysHashLen) } +// Returns the node name for this server which is a hash of the server name. +func (s *Server) Node() string { + s.mu.Lock() + defer s.mu.Unlock() + if s.sys != nil { + return s.sys.shash + } + return _EMPTY_ +} + // This will setup our system wide tracking subs. // For now we will setup one wildcard subscription to // monitor all accounts for changes in number of connections. diff --git a/server/jetstream.go b/server/jetstream.go index d815356b..8a1d3514 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -15,6 +15,7 @@ package server import ( "crypto/sha256" + "encoding/binary" "encoding/hex" "encoding/json" "fmt" @@ -77,13 +78,24 @@ type jsAccount struct { account *Account limits JetStreamAccountLimits memReserved int64 - memUsed int64 storeReserved int64 - storeUsed int64 + memTotal int64 + storeTotal int64 + usage jsaUsage + rusage map[string]*jsaUsage storeDir string streams map[string]*Stream templates map[string]*StreamTemplate store TemplateStore + + // Cluster support + updatesPub string + updatesSub *subscription +} + +type jsaUsage struct { + mem int64 + store int64 } // EnableJetStream will enable JetStream support on this server with the given configuration. @@ -418,7 +430,6 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { if s == nil { return fmt.Errorf("jetstream account not registered") } - // FIXME(dlc) - cluster mode js := s.getJetStream() if js == nil { return ErrJetStreamNotEnabled @@ -448,6 +459,14 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { js.reserveResources(limits) js.mu.Unlock() + sysNode := s.Node() + + // Cluster mode updates to resource usages, but we always will turn on. System internal prevents echos. + jsa.mu.Lock() + jsa.updatesPub = fmt.Sprintf(jsaUpdatesPubT, a.Name, sysNode) + jsa.updatesSub, _ = s.sysSubscribe(fmt.Sprintf(jsaUpdatesSubT, a.Name), jsa.remoteUpdateUsage) + jsa.mu.Unlock() + // Stamp inside account as well. a.mu.Lock() a.js = jsa @@ -763,17 +782,23 @@ func diffCheckedLimits(a, b *JetStreamAccountLimits) JetStreamAccountLimits { // JetStreamUsage reports on JetStream usage and limits for an account. func (a *Account) JetStreamUsage() JetStreamAccountStats { a.mu.RLock() - jsa := a.js + jsa, aname := a.js, a.Name a.mu.RUnlock() var stats JetStreamAccountStats if jsa != nil { - jsa.mu.Lock() - stats.Memory = uint64(jsa.memUsed) - stats.Store = uint64(jsa.storeUsed) - stats.Streams = len(jsa.streams) + jsa.mu.RLock() + stats.Memory = uint64(jsa.memTotal) + stats.Store = uint64(jsa.storeTotal) + if cc := jsa.js.cluster; cc != nil { + jsa.js.mu.RLock() + stats.Streams = len(cc.streams[aname]) + jsa.js.mu.RUnlock() + } else { + stats.Streams = len(jsa.streams) + } stats.Limits = jsa.limits - jsa.mu.Unlock() + jsa.mu.RUnlock() } return stats } @@ -829,32 +854,83 @@ func (a *Account) JetStreamEnabled() bool { return enabled } -// Updates accounting on in use memory and storage. +func (jsa *jsAccount) remoteUpdateUsage(sub *subscription, c *client, subject, _ string, msg []byte) { + const usageSize = 16 + + jsa.mu.Lock() + s := jsa.js.srv + if len(msg) != usageSize { + jsa.mu.Unlock() + s.Warnf("Received remote usage update that is wrong size: %d vs %d", len(msg), usageSize) + return + } + var rnode string + if li := strings.LastIndexByte(subject, btsep); li > 0 && li < len(subject) { + rnode = subject[li+1:] + } + if rnode == _EMPTY_ { + jsa.mu.Unlock() + s.Warnf("Received remote usage update with no remote node") + return + } + var le = binary.LittleEndian + memUsed, storeUsed := int64(le.Uint64(msg[0:])), int64(le.Uint64(msg[8:])) + + if jsa.rusage == nil { + jsa.rusage = make(map[string]*jsaUsage) + } + // Update the usage for this remote. + if usage := jsa.rusage[rnode]; usage != nil { + // Decrement our old values. + jsa.memTotal -= usage.mem + jsa.storeTotal -= usage.store + usage.mem, usage.store = memUsed, storeUsed + } else { + jsa.rusage[rnode] = &jsaUsage{memUsed, storeUsed} + } + jsa.memTotal += memUsed + jsa.storeTotal += storeUsed + + jsa.mu.Unlock() +} + +// Updates accounting on in use memory and storage. This is called from locally +// by the lower storage layers. func (jsa *jsAccount) updateUsage(storeType StorageType, delta int64) { - // TODO(dlc) - atomics? snapshot limits? jsa.mu.Lock() if storeType == MemoryStorage { - jsa.memUsed += delta + jsa.usage.mem += delta + jsa.memTotal += delta } else { - jsa.storeUsed += delta + jsa.usage.store += delta + jsa.storeTotal += delta + } + // Publish our local updates if in clustered mode. + if jsa.js != nil && jsa.js.cluster != nil && jsa.js.srv != nil { + s, b := jsa.js.srv, make([]byte, 16) + var le = binary.LittleEndian + le.PutUint64(b[0:], uint64(jsa.usage.mem)) + le.PutUint64(b[8:], uint64(jsa.usage.store)) + s.sendInternalMsgLocked(jsa.updatesPub, _EMPTY_, nil, b) } jsa.mu.Unlock() } func (jsa *jsAccount) limitsExceeded(storeType StorageType) bool { - var exceeded bool - jsa.mu.Lock() + jsa.mu.RLock() + defer jsa.mu.RUnlock() + if storeType == MemoryStorage { - if jsa.limits.MaxMemory > 0 && jsa.memUsed > jsa.limits.MaxMemory { - exceeded = true + if jsa.limits.MaxMemory > 0 && jsa.memTotal > jsa.limits.MaxMemory { + return true } } else { - if jsa.limits.MaxStore > 0 && jsa.storeUsed > jsa.limits.MaxStore { - exceeded = true + if jsa.limits.MaxStore > 0 && jsa.storeTotal > jsa.limits.MaxStore { + return true } } - jsa.mu.Unlock() - return exceeded + + return false } // Check if a new proposed msg set while exceed our account limits. @@ -905,6 +981,13 @@ func (jsa *jsAccount) delete() { var ts []string jsa.mu.Lock() + + if jsa.updatesSub != nil && jsa.js.srv != nil { + s := jsa.js.srv + s.sysUnsubscribe(jsa.updatesSub) + jsa.updatesSub = nil + } + for _, ms := range jsa.streams { streams = append(streams, ms) } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index c1de2a48..33bf2e05 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2008,9 +2008,6 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, subject, reply string, return } - js.mu.Lock() - defer js.mu.Unlock() - var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}} acc, err := s.LookupAccount(ci.Account) if err != nil { @@ -2019,6 +2016,30 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, subject, reply string, return } + js.mu.RLock() + numStreams := len(cc.streams[ci.Account]) + js.mu.RUnlock() + + // Grab our jetstream account info. + acc.mu.RLock() + jsa := acc.js + acc.mu.RUnlock() + + // Check for stream limits here before proposing. + jsa.mu.RLock() + exceeded := jsa.limits.MaxStreams > 0 && numStreams >= jsa.limits.MaxStreams + jsa.mu.RUnlock() + + if exceeded { + resp.Error = jsError(fmt.Errorf("maximum number of streams reached")) + s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) + return + } + + // Now process the request and proposal. + js.mu.Lock() + defer js.mu.Unlock() + if sa := js.streamAssignment(ci.Account, cfg.Name); sa != nil { resp.Error = jsError(ErrJetStreamStreamAlreadyUsed) s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) @@ -2608,12 +2629,46 @@ func (mset *Stream) snapshot() []byte { // processClusteredMsg will propose the inbound message to the underlying raft group. func (mset *Stream) processClusteredInboundMsg(subject, reply string, hdr, msg []byte) error { - mset.mu.Lock() // For possible error response. var response []byte - canRespond := !mset.config.NoAck && len(reply) > 0 && mset.isLeader() - sendq := mset.sendq + + mset.mu.RLock() + canRespond := !mset.config.NoAck && len(reply) > 0 + s, jsa, st, rf, sendq := mset.srv, mset.jsa, mset.config.Storage, mset.config.Replicas, mset.sendq + mset.mu.RUnlock() + + // Check here pre-emptively if we have exceeded our account limits. + var exceeded bool + jsa.mu.RLock() + if st == MemoryStorage { + total := jsa.storeTotal + int64(memStoreMsgSize(subject, hdr, msg)*uint64(rf)) + if jsa.limits.MaxMemory > 0 && total > jsa.limits.MaxMemory { + exceeded = true + } + } else { + total := jsa.storeTotal + int64(fileStoreMsgSize(subject, hdr, msg)*uint64(rf)) + if jsa.limits.MaxStore > 0 && total > jsa.limits.MaxStore { + exceeded = true + } + } + jsa.mu.RUnlock() + + // If we have exceeded our account limits go ahead and return. + if exceeded { + err := fmt.Errorf("JetStream resource limits exceeded for account: %q", jsa.acc().Name) + s.Warnf(err.Error()) + if canRespond { + var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: mset.Name()}} + resp.Error = &ApiError{Code: 400, Description: "resource limits exceeded for account"} + response, _ = json.Marshal(resp) + sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0} + } + return err + } + + // Proceed with proposing this message. + mset.mu.Lock() // We only use mset.nlseq for clustering and in case we run ahead of actual commits. // Check if we need to set initial value here @@ -2634,7 +2689,7 @@ func (mset *Stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ mset.mu.Unlock() // If we errored out respond here. - if err != nil && len(response) > 0 { + if err != nil && canRespond { sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0} } @@ -2994,5 +3049,9 @@ func syncSubject(pre string) string { return sb.String() } -const clusterStreamInfoT = "$JSC.SI.%s.%s" -const clusterConsumerInfoT = "$JSC.CI.%s.%s.%s" +const ( + clusterStreamInfoT = "$JSC.SI.%s.%s" + clusterConsumerInfoT = "$JSC.CI.%s.%s.%s" + jsaUpdatesSubT = "$JSC.ARU.%s.*" + jsaUpdatesPubT = "$JSC.ARU.%s.%s" +) diff --git a/test/jetstream_cluster_test.go b/test/jetstream_cluster_test.go index be3cd9cc..724a2f81 100644 --- a/test/jetstream_cluster_test.go +++ b/test/jetstream_cluster_test.go @@ -1507,6 +1507,88 @@ func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) { }) } +func TestJetStreamClusterAccountInfoAndLimits(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 5) + defer c.shutdown() + + // Adjust our limits. + c.updateLimits("$G", &server.JetStreamAccountLimits{ + MaxMemory: 1024, + MaxStore: 8000, + MaxStreams: 3, + MaxConsumers: 1, + }) + + // Client based API + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + if _, err := js.AddStream(&nats.StreamConfig{Name: "foo", Replicas: 1}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if _, err := js.AddStream(&nats.StreamConfig{Name: "bar", Replicas: 2}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if _, err := js.AddStream(&nats.StreamConfig{Name: "baz", Replicas: 3}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + sendBatch := func(subject string, n int) { + t.Helper() + for i := 0; i < n; i++ { + if _, err := js.Publish(subject, []byte("JSC-OK")); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + } + } + + sendBatch("foo", 25) + sendBatch("bar", 75) + sendBatch("baz", 10) + + accountStats := func() *server.JetStreamAccountStats { + t.Helper() + resp, err := nc.Request(server.JSApiAccountInfo, nil, time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + var info server.JSApiAccountInfoResponse + if err := json.Unmarshal(resp.Data, &info); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if info.Error != nil { + t.Fatalf("Unexpected error: %+v", info.Error) + } + if info.JetStreamAccountStats == nil { + t.Fatalf("AccountStats missing") + } + return info.JetStreamAccountStats + } + + // If subject is not 3 letters or payload not 2 this needs to change. + const msgSize = uint64(22 + 3 + 6 + 8) + + stats := accountStats() + if stats.Streams != 3 { + t.Fatalf("Should have been tracking 3 streams, found %d", stats.Streams) + } + expectedSize := 25*msgSize + 75*msgSize*2 + 10*msgSize*3 + if stats.Store != expectedSize { + t.Fatalf("Expected store size to be %d, got %+v\n", expectedSize, stats) + } + + // Check limit enforcement. + if _, err := js.AddStream(&nats.StreamConfig{Name: "fail", Replicas: 3}); err == nil { + t.Fatalf("Expected an error but got none") + } + + // We should be at 7995 at the moment with a limit of 8000, so any message will go over. + if _, err := js.Publish("baz", []byte("JSC-NOT-OK")); err == nil { + t.Fatalf("Expected publish error but got none") + } +} + func TestJetStreamClusterStreamPerf(t *testing.T) { // Comment out to run, holding place for now. skip(t) @@ -1575,7 +1657,7 @@ func TestJetStreamClusterStreamPerf(t *testing.T) { var jsClusterTempl = ` listen: 127.0.0.1:-1 server_name: %s - jetstream: {max_mem_store: 16GB, max_file_store: 10TB, store_dir: "%s"} + jetstream: {max_mem_store: 2GB, max_file_store: 1GB, store_dir: "%s"} cluster { name: %s listen: 127.0.0.1:%d @@ -1632,6 +1714,20 @@ func (c *cluster) addInNewServer() *server.Server { return s } +// Adjust limits for the given account. +func (c *cluster) updateLimits(account string, newLimits *server.JetStreamAccountLimits) { + c.t.Helper() + for _, s := range c.servers { + acc, err := s.LookupAccount(account) + if err != nil { + c.t.Fatalf("Unexpected error: %v", err) + } + if err := acc.UpdateJetStreamLimits(newLimits); err != nil { + c.t.Fatalf("Unexpected error: %v", err) + } + } +} + // Hack for staticcheck var skip = func(t *testing.T) { t.SkipNow()