Track API calls per account. Track success and errors.

These tracking data are ephemeral per server. so on restart they reset.
That should be ok since these will most likely be used more for rates.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-01-28 16:29:46 -08:00
parent 6480a45fc4
commit 0a3124e27d
4 changed files with 313 additions and 158 deletions

View File

@@ -1,4 +1,4 @@
// Copyright 2019-2020 The NATS Authors
// Copyright 2019-2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -40,7 +40,6 @@ type JetStreamConfig struct {
StoreDir string
}
// TODO(dlc) - need to track and rollup against server limits, etc.
type JetStreamAccountLimits struct {
MaxMemory int64 `json:"max_memory"`
MaxStore int64 `json:"max_storage"`
@@ -50,10 +49,17 @@ type JetStreamAccountLimits struct {
// JetStreamAccountStats returns current statistics about the account's JetStream usage.
type JetStreamAccountStats struct {
Memory uint64 `json:"memory"`
Store uint64 `json:"storage"`
Streams int `json:"streams"`
Limits JetStreamAccountLimits `json:"limits"`
Memory uint64 `json:"memory"`
Store uint64 `json:"storage"`
Streams int `json:"streams"`
Consumers int `json:"consumers"`
API JetStreamAPIStats `json:"api"`
Limits JetStreamAccountLimits `json:"limits"`
}
type JetStreamAPIStats struct {
Ok uint64
Err uint64
}
// This is for internal accounting for JetStream for this server.
@@ -81,6 +87,8 @@ type jsAccount struct {
storeReserved int64
memTotal int64
storeTotal int64
apiOk uint64
apiErr uint64
usage jsaUsage
rusage map[string]*jsaUsage
storeDir string
@@ -93,9 +101,12 @@ type jsAccount struct {
updatesSub *subscription
}
// Track general usage for this account.
type jsaUsage struct {
mem int64
store int64
api uint64
err uint64
}
// EnableJetStream will enable JetStream support on this server with the given configuration.
@@ -787,15 +798,27 @@ func (a *Account) JetStreamUsage() JetStreamAccountStats {
var stats JetStreamAccountStats
if jsa != nil {
js := jsa.js
jsa.mu.RLock()
stats.Memory = uint64(jsa.memTotal)
stats.Store = uint64(jsa.storeTotal)
stats.API = JetStreamAPIStats{
Ok: jsa.apiOk,
Err: jsa.apiErr,
}
if cc := jsa.js.cluster; cc != nil {
jsa.js.mu.RLock()
stats.Streams = len(cc.streams[aname])
jsa.js.mu.RUnlock()
js.mu.RLock()
sas := cc.streams[aname]
stats.Streams = len(sas)
for _, sa := range sas {
stats.Consumers += len(sa.consumers)
}
js.mu.RUnlock()
} else {
stats.Streams = len(jsa.streams)
for _, mset := range jsa.streams {
stats.Consumers += mset.NumConsumers()
}
}
stats.Limits = jsa.limits
jsa.mu.RUnlock()
@@ -855,13 +878,13 @@ func (a *Account) JetStreamEnabled() bool {
}
func (jsa *jsAccount) remoteUpdateUsage(sub *subscription, c *client, subject, _ string, msg []byte) {
const usageSize = 16
const usageSize = 32
jsa.mu.Lock()
s := jsa.js.srv
if len(msg) != usageSize {
if len(msg) < usageSize {
jsa.mu.Unlock()
s.Warnf("Received remote usage update that is wrong size: %d vs %d", len(msg), usageSize)
s.Warnf("Ignoring remote usage update with size too short")
return
}
var rnode string
@@ -875,6 +898,7 @@ func (jsa *jsAccount) remoteUpdateUsage(sub *subscription, c *client, subject, _
}
var le = binary.LittleEndian
memUsed, storeUsed := int64(le.Uint64(msg[0:])), int64(le.Uint64(msg[8:]))
apiOk, apiErr := le.Uint64(msg[16:]), le.Uint64(msg[24:])
if jsa.rusage == nil {
jsa.rusage = make(map[string]*jsaUsage)
@@ -884,13 +908,17 @@ func (jsa *jsAccount) remoteUpdateUsage(sub *subscription, c *client, subject, _
// Decrement our old values.
jsa.memTotal -= usage.mem
jsa.storeTotal -= usage.store
jsa.apiOk -= usage.api
jsa.apiErr -= usage.err
usage.mem, usage.store = memUsed, storeUsed
usage.api, usage.err = apiOk, apiErr
} else {
jsa.rusage[rnode] = &jsaUsage{memUsed, storeUsed}
jsa.rusage[rnode] = &jsaUsage{memUsed, storeUsed, apiOk, apiErr}
}
jsa.memTotal += memUsed
jsa.storeTotal += storeUsed
jsa.apiOk += apiOk
jsa.apiErr += apiErr
jsa.mu.Unlock()
}
@@ -906,16 +934,27 @@ func (jsa *jsAccount) updateUsage(storeType StorageType, delta int64) {
jsa.storeTotal += delta
}
// Publish our local updates if in clustered mode.
if jsa.js != nil && jsa.js.cluster != nil && jsa.js.srv != nil {
s, b := jsa.js.srv, make([]byte, 16)
var le = binary.LittleEndian
le.PutUint64(b[0:], uint64(jsa.usage.mem))
le.PutUint64(b[8:], uint64(jsa.usage.store))
s.sendInternalMsgLocked(jsa.updatesPub, _EMPTY_, nil, b)
if jsa.js != nil && jsa.js.cluster != nil {
jsa.sendClusterUsageUpdate()
}
jsa.mu.Unlock()
}
// Send updates to our account usage for this server.
// Lock should be held.
func (jsa *jsAccount) sendClusterUsageUpdate() {
if jsa.js == nil || jsa.js.srv == nil {
return
}
s, b := jsa.js.srv, make([]byte, 32)
var le = binary.LittleEndian
le.PutUint64(b[0:], uint64(jsa.usage.mem))
le.PutUint64(b[8:], uint64(jsa.usage.store))
le.PutUint64(b[16:], uint64(jsa.usage.api))
le.PutUint64(b[24:], uint64(jsa.usage.err))
s.sendInternalMsgLocked(jsa.updatesPub, _EMPTY_, nil, b)
}
func (jsa *jsAccount) limitsExceeded(storeType StorageType) bool {
jsa.mu.RLock()
defer jsa.mu.RUnlock()

View File

@@ -485,6 +485,7 @@ var (
jsNotEmptyRequestErr = &ApiError{Code: 400, Description: "expected an empty request payload"}
jsInvalidJSONErr = &ApiError{Code: 400, Description: "invalid JSON request"}
jsInsufficientErr = &ApiError{Code: 503, Description: "insufficient Resources"}
jsNoAccountErr = &ApiError{Code: 404, Description: "account not found"}
jsNoConsumerErr = &ApiError{Code: 404, Description: "consumer not found"}
jsStreamMismatchErr = &ApiError{Code: 400, Description: "stream name in subject does not match request"}
jsNoClusterSupportErr = &ApiError{Code: 503, Description: "not currently supported in clustered mode"}
@@ -555,6 +556,13 @@ func (s *Server) setJetStreamExportSubs() error {
}
func (s *Server) sendAPIResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string) {
acc.trackAPI()
s.sendInternalAccountMsg(nil, reply, response)
s.sendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response)
}
func (s *Server) sendAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string) {
acc.trackAPIErr()
s.sendInternalAccountMsg(nil, reply, response)
s.sendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response)
}
@@ -581,6 +589,32 @@ func (s *Server) getRequestInfo(c *client, raw []byte) (pci *ClientInfo, acc *Ac
return &ci, acc, hdr, msg, nil
}
func (a *Account) trackAPI() {
a.mu.RLock()
jsa := a.js
a.mu.RUnlock()
if jsa != nil {
jsa.mu.Lock()
jsa.usage.api++
jsa.apiOk++
jsa.sendClusterUsageUpdate()
jsa.mu.Unlock()
}
}
func (a *Account) trackAPIErr() {
a.mu.RLock()
jsa := a.js
a.mu.RUnlock()
if jsa != nil {
jsa.mu.Lock()
jsa.usage.err++
jsa.apiErr++
jsa.sendClusterUsageUpdate()
jsa.mu.Unlock()
}
}
const badAPIRequestT = "Malformed JetStream API Request: %q"
// Request for current usage and limits for this account.
@@ -604,7 +638,7 @@ func (s *Server) jsAccountInfoRequest(sub *subscription, c *client, subject, rep
}
if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ {
resp.Error = jsClusterNotAvailErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Make sure we are meta leader.
@@ -653,34 +687,34 @@ func (s *Server) jsTemplateCreateRequest(sub *subscription, c *client, subject,
var resp = JSApiStreamTemplateCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamTemplateCreateResponseType}}
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Not supported for now.
if s.JetStreamIsClustered() {
resp.Error = jsNoClusterSupportErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
var cfg StreamTemplateConfig
if err := json.Unmarshal(msg, &cfg); err != nil {
resp.Error = jsInvalidJSONErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
templateName := templateNameFromSubject(subject)
if templateName != cfg.Name {
resp.Error = &ApiError{Code: 400, Description: "template name in subject does not match request"}
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
t, err := acc.AddStreamTemplate(&cfg)
if err != nil {
resp.Error = jsError(err)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
t.mu.Lock()
@@ -708,14 +742,14 @@ func (s *Server) jsTemplateNamesRequest(sub *subscription, c *client, subject, r
var resp = JSApiStreamTemplateNamesResponse{ApiResponse: ApiResponse{Type: JSApiStreamTemplateNamesResponseType}}
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Not supported for now.
if s.JetStreamIsClustered() {
resp.Error = jsNoClusterSupportErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
@@ -724,7 +758,7 @@ func (s *Server) jsTemplateNamesRequest(sub *subscription, c *client, subject, r
var req JSApiStreamTemplatesRequest
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = jsInvalidJSONErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
offset = req.Offset
@@ -772,19 +806,19 @@ func (s *Server) jsTemplateInfoRequest(sub *subscription, c *client, subject, re
var resp = JSApiStreamTemplateInfoResponse{ApiResponse: ApiResponse{Type: JSApiStreamTemplateInfoResponseType}}
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if !isEmptyRequest(msg) {
resp.Error = jsNotEmptyRequestErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
name := templateNameFromSubject(subject)
t, err := acc.LookupStreamTemplate(name)
if err != nil {
resp.Error = jsNotFoundError(err)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
t.mu.Lock()
@@ -813,19 +847,19 @@ func (s *Server) jsTemplateDeleteRequest(sub *subscription, c *client, subject,
var resp = JSApiStreamTemplateDeleteResponse{ApiResponse: ApiResponse{Type: JSApiStreamTemplateDeleteResponseType}}
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if !isEmptyRequest(msg) {
resp.Error = jsNotEmptyRequestErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
name := templateNameFromSubject(subject)
err = acc.DeleteStreamTemplate(name)
if err != nil {
resp.Error = jsError(err)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
resp.Success = true
@@ -876,7 +910,7 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, subject, re
}
if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ {
resp.Error = jsClusterNotAvailErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Make sure we are meta leader.
@@ -887,19 +921,19 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, subject, re
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
var cfg StreamConfig
if err := json.Unmarshal(msg, &cfg); err != nil {
resp.Error = jsInvalidJSONErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
streamName := streamNameFromSubject(subject)
if streamName != cfg.Name {
resp.Error = jsStreamMismatchErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
@@ -911,7 +945,7 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, subject, re
mset, err := acc.AddStream(&cfg)
if err != nil {
resp.Error = jsError(err)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
resp.StreamInfo = &StreamInfo{Created: mset.Created(), State: mset.State(), Config: mset.Config()}
@@ -940,7 +974,7 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, subject, re
}
if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ {
resp.Error = jsClusterNotAvailErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Make sure we are meta leader.
@@ -951,31 +985,31 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, subject, re
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
var cfg StreamConfig
if err := json.Unmarshal(msg, &cfg); err != nil {
resp.Error = jsInvalidJSONErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
streamName := streamNameFromSubject(subject)
if streamName != cfg.Name {
resp.Error = jsStreamMismatchErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
mset, err := acc.LookupStream(streamName)
if err != nil {
resp.Error = jsNotFoundError(err)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if err := mset.Update(&cfg); err != nil {
resp.Error = jsError(err)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
@@ -1004,7 +1038,7 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, subject, rep
}
if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ {
resp.Error = jsClusterNotAvailErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Make sure we are meta leader.
@@ -1015,7 +1049,7 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, subject, rep
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
@@ -1026,7 +1060,7 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, subject, rep
var req JSApiStreamNamesRequest
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = jsInvalidJSONErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
offset = req.Offset
@@ -1128,7 +1162,7 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, subject, repl
}
if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ {
resp.Error = jsClusterNotAvailErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Make sure we are meta leader.
@@ -1139,7 +1173,7 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, subject, repl
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
@@ -1148,7 +1182,7 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, subject, repl
var req JSApiStreamNamesRequest
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = jsInvalidJSONErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
offset = req.Offset
@@ -1211,7 +1245,7 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, subject, repl
if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ {
resp.Error = jsClusterNotAvailErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
@@ -1223,12 +1257,12 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, subject, repl
// We can't find the stream, so mimic what would be the errors below.
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// No stream present.
resp.Error = jsNotFoundError(ErrJetStreamStreamNotFound)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
} else if sa == nil {
return
@@ -1237,7 +1271,7 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, subject, repl
// Check to see if we are a member of the group and if the group has no leader.
if js.isGroupLeaderless(sa.Group) {
resp.Error = jsClusterNotAvailErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
@@ -1249,19 +1283,19 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, subject, repl
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if !isEmptyRequest(msg) {
resp.Error = jsNotEmptyRequestErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
mset, err := acc.LookupStream(name)
if err != nil {
resp.Error = jsNotFoundError(err)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
config := mset.Config()
@@ -1315,7 +1349,7 @@ func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, subject, re
}
if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ {
resp.Error = jsClusterNotAvailErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Make sure we are meta leader.
@@ -1326,12 +1360,12 @@ func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, subject, re
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if !isEmptyRequest(msg) {
resp.Error = jsNotEmptyRequestErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
stream := streamNameFromSubject(subject)
@@ -1345,18 +1379,18 @@ func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, subject, re
mset, err := acc.LookupStream(stream)
if err != nil {
resp.Error = jsNotFoundError(err)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if mset.Config().internal {
resp.Error = &ApiError{Code: 403, Description: "not allowed to delete internal stream"}
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if err := mset.Delete(); err != nil {
resp.Error = jsError(err)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
resp.Success = true
@@ -1389,7 +1423,7 @@ func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, subject, reply
if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ {
resp.Error = jsClusterNotAvailErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
@@ -1401,12 +1435,12 @@ func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, subject, reply
// We can't find the stream, so mimic what would be the errors below.
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// No stream present.
resp.Error = jsNotFoundError(ErrJetStreamStreamNotFound)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
} else if sa == nil {
return
@@ -1415,7 +1449,7 @@ func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, subject, reply
// Check to see if we are a member of the group and if the group has no leader.
if js.isGroupLeaderless(sa.Group) {
resp.Error = jsClusterNotAvailErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
@@ -1427,25 +1461,25 @@ func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, subject, reply
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if isEmptyRequest(msg) {
resp.Error = jsBadRequestErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
var req JSApiMsgDeleteRequest
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = jsInvalidJSONErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
mset, err := acc.LookupStream(stream)
if err != nil {
resp.Error = jsNotFoundError(err)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
@@ -1479,18 +1513,18 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, subject, reply st
var resp = JSApiMsgGetResponse{ApiResponse: ApiResponse{Type: JSApiMsgGetResponseType}}
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if isEmptyRequest(msg) {
resp.Error = jsBadRequestErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
var req JSApiMsgGetRequest
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = jsInvalidJSONErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
@@ -1498,14 +1532,14 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, subject, reply st
mset, err := acc.LookupStream(stream)
if err != nil {
resp.Error = jsNotFoundError(err)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
subj, hdr, msg, ts, err := mset.store.LoadMsg(req.Seq)
if err != nil {
resp.Error = jsError(err)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
resp.Message = &StoredMsg{
@@ -1543,7 +1577,7 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, subject, rep
if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ {
resp.Error = jsClusterNotAvailErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
@@ -1555,12 +1589,12 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, subject, rep
// We can't find the stream, so mimic what would be the errors below.
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// No stream present.
resp.Error = jsNotFoundError(ErrJetStreamStreamNotFound)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
} else if sa == nil {
return
@@ -1569,7 +1603,7 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, subject, rep
// Check to see if we are a member of the group and if the group has no leader.
if js.isGroupLeaderless(sa.Group) {
resp.Error = jsClusterNotAvailErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
@@ -1581,18 +1615,18 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, subject, rep
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if !isEmptyRequest(msg) {
resp.Error = jsNotEmptyRequestErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
mset, err := acc.LookupStream(stream)
if err != nil {
resp.Error = jsNotFoundError(err)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
@@ -1625,19 +1659,19 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, subject, r
var resp = JSApiStreamRestoreResponse{ApiResponse: ApiResponse{Type: JSApiStreamRestoreResponseType}}
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if isEmptyRequest(msg) {
resp.Error = jsBadRequestErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
var req JSApiStreamRestoreRequest
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = jsInvalidJSONErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
@@ -1650,7 +1684,7 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, subject, r
if _, err := acc.LookupStream(stream); err == nil {
resp.Error = jsError(ErrJetStreamStreamAlreadyUsed)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
@@ -1665,7 +1699,7 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, stream, subj
tfile, err := ioutil.TempFile("", "jetstream-restore-")
if err != nil {
resp.Error = &ApiError{Code: 500, Description: "JetStream unable to open temp storage for restore"}
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return nil
}
@@ -1740,13 +1774,13 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, stream, subj
sub, err := acc.subscribeInternal(restoreSubj, processChunk)
if err != nil {
resp.Error = &ApiError{Code: 500, Description: "JetStream unable to subscribe to restore snapshot"}
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return nil
}
// Mark the subject so the end user knows where to send the snapshot chunks.
resp.DeliverSubject = restoreSubj
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
doneCh := make(chan error, 1)
@@ -1842,31 +1876,31 @@ func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, subject,
var resp = JSApiStreamSnapshotResponse{ApiResponse: ApiResponse{Type: JSApiStreamSnapshotResponseType}}
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
return
}
if isEmptyRequest(msg) {
resp.Error = jsBadRequestErr
s.sendAPIResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
return
}
mset, err := acc.LookupStream(stream)
if err != nil {
resp.Error = jsNotFoundError(err)
s.sendAPIResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
return
}
var req JSApiStreamSnapshotRequest
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = jsInvalidJSONErr
s.sendAPIResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
return
}
if !IsValidSubject(req.DeliverSubject) {
resp.Error = &ApiError{Code: 400, Description: "deliver subject not valid"}
s.sendAPIResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
return
}
@@ -1885,7 +1919,7 @@ func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, subject,
if err != nil {
s.Warnf("Snapshot of stream '%s > %s' failed: %v", mset.jsa.account.Name, mset.Name(), err)
resp.Error = jsError(err)
s.sendAPIResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
return
}
@@ -2045,7 +2079,7 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, subject, reply s
}
if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ {
resp.Error = jsClusterNotAvailErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Make sure we are meta leader.
@@ -2063,48 +2097,48 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, subject, reply s
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
var req CreateConsumerRequest
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = jsInvalidJSONErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if streamName != req.Stream {
resp.Error = jsStreamMismatchErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if expectDurable {
if numTokens(subject) != 7 {
resp.Error = &ApiError{Code: 400, Description: "consumer expected to be durable but no durable name set in subject"}
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Now check on requirements for durable request.
if req.Config.Durable == _EMPTY_ {
resp.Error = &ApiError{Code: 400, Description: "consumer expected to be durable but a durable name was not set"}
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
consumerName := tokenAt(subject, 7)
if consumerName != req.Config.Durable {
resp.Error = &ApiError{Code: 400, Description: "consumer name in subject does not match durable name in request"}
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
} else {
if numTokens(subject) != 5 {
resp.Error = &ApiError{Code: 400, Description: "consumer expected to be ephemeral but detected a durable name set in subject"}
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if req.Config.Durable != _EMPTY_ {
resp.Error = &ApiError{Code: 400, Description: "consumer expected to be ephemeral but a durable name was set in request"}
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
}
@@ -2117,14 +2151,14 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, subject, reply s
stream, err := acc.LookupStream(req.Stream)
if err != nil {
resp.Error = jsNotFoundError(err)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
o, err := stream.AddConsumer(&req.Config)
if err != nil {
resp.Error = jsError(err)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
resp.ConsumerInfo = o.Info()
@@ -2155,7 +2189,7 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, subject, r
}
if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ {
resp.Error = jsClusterNotAvailErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Make sure we are meta leader.
@@ -2166,7 +2200,7 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, subject, r
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
@@ -2175,7 +2209,7 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, subject, r
var req JSApiConsumersRequest
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = jsInvalidJSONErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
offset = req.Offset
@@ -2195,14 +2229,14 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, subject, r
if sas == nil {
js.mu.RUnlock()
resp.Error = jsNotFoundError(ErrJetStreamNotEnabled)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
sa := sas[streamName]
if sa == nil || sa.err != nil {
js.mu.RUnlock()
resp.Error = jsNotFoundError(ErrJetStreamStreamNotFound)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
for consumer := range sa.consumers {
@@ -2222,7 +2256,7 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, subject, r
mset, err := acc.LookupStream(streamName)
if err != nil {
resp.Error = jsNotFoundError(err)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
@@ -2274,7 +2308,7 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, subject, re
}
if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ {
resp.Error = jsClusterNotAvailErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Make sure we are meta leader.
@@ -2285,7 +2319,7 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, subject, re
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
@@ -2294,7 +2328,7 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, subject, re
var req JSApiConsumersRequest
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = jsInvalidJSONErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
offset = req.Offset
@@ -2314,7 +2348,7 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, subject, re
mset, err := acc.LookupStream(streamName)
if err != nil {
resp.Error = jsNotFoundError(err)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
@@ -2366,7 +2400,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, subject, re
if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ {
resp.Error = jsClusterNotAvailErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
@@ -2378,17 +2412,17 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, subject, re
// We can't find the consumer, so mimic what would be the errors below.
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if sa == nil {
resp.Error = jsNotFoundError(ErrJetStreamStreamNotFound)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// If we are here the consumer is not present.
resp.Error = jsNoConsumerErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
} else if ca == nil {
return
@@ -2397,7 +2431,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, subject, re
// Check to see if we are a member of the group and if the group has no leader.
if js.isGroupLeaderless(ca.Group) {
resp.Error = jsClusterNotAvailErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
@@ -2409,26 +2443,26 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, subject, re
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if !isEmptyRequest(msg) {
resp.Error = jsNotEmptyRequestErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
mset, err := acc.LookupStream(stream)
if err != nil {
resp.Error = jsNotFoundError(err)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
obs := mset.LookupConsumer(consumer)
if obs == nil {
resp.Error = jsNoConsumerErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
resp.ConsumerInfo = obs.Info()
@@ -2456,7 +2490,7 @@ func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, subject,
}
if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ {
resp.Error = jsClusterNotAvailErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Make sure we are meta leader.
@@ -2467,12 +2501,12 @@ func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, subject,
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if !isEmptyRequest(msg) {
resp.Error = jsNotEmptyRequestErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
stream := streamNameFromSubject(subject)
@@ -2486,24 +2520,24 @@ func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, subject,
mset, err := acc.LookupStream(stream)
if err != nil {
resp.Error = jsNotFoundError(err)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if mset.Config().internal {
resp.Error = &ApiError{Code: 403, Description: "not allowed to delete consumer of internal stream"}
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
obs := mset.LookupConsumer(consumer)
if obs == nil {
resp.Error = jsNoConsumerErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if err := obs.Delete(); err != nil {
resp.Error = jsError(err)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
resp.Success = true

View File

@@ -1203,12 +1203,14 @@ func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry) (bool,
var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}}
if err != nil {
resp.Error = jsError(err)
s.sendAPIErrResponse(md.Client, mset.account(), _EMPTY_, md.Reply, _EMPTY_, s.jsonResponse(resp))
} else if !removed {
resp.Error = &ApiError{Code: 400, Description: fmt.Sprintf("sequence [%d] not found", md.Seq)}
s.sendAPIErrResponse(md.Client, mset.account(), _EMPTY_, md.Reply, _EMPTY_, s.jsonResponse(resp))
} else {
resp.Success = true
s.sendAPIResponse(md.Client, mset.account(), _EMPTY_, md.Reply, _EMPTY_, s.jsonResponse(resp))
}
s.sendAPIResponse(md.Client, mset.account(), _EMPTY_, md.Reply, _EMPTY_, s.jsonResponse(resp))
}
case purgeStreamOp:
sp, err := decodeStreamPurge(buf[1:])
@@ -1227,11 +1229,12 @@ func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry) (bool,
var resp = JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}}
if err != nil {
resp.Error = jsError(err)
s.sendAPIErrResponse(sp.Client, mset.account(), _EMPTY_, sp.Reply, _EMPTY_, s.jsonResponse(resp))
} else {
resp.Purged = purged
resp.Success = true
s.sendAPIResponse(sp.Client, mset.account(), _EMPTY_, sp.Reply, _EMPTY_, s.jsonResponse(resp))
}
s.sendAPIResponse(sp.Client, mset.account(), _EMPTY_, sp.Reply, _EMPTY_, s.jsonResponse(resp))
}
default:
panic("JetStream Cluster Unknown group entry op type!")
@@ -1310,10 +1313,11 @@ func (js *jetStream) processStreamLeaderChange(mset *Stream, sa *streamAssignmen
var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
if err != nil {
resp.Error = jsError(err)
s.sendAPIErrResponse(client, acc, _EMPTY_, reply, _EMPTY_, s.jsonResponse(&resp))
} else {
resp.StreamInfo = &StreamInfo{Created: mset.Created(), State: mset.State(), Config: mset.Config(), Cluster: s.clusterInfo(nil)}
s.sendAPIResponse(client, acc, _EMPTY_, reply, _EMPTY_, s.jsonResponse(&resp))
}
s.sendAPIResponse(client, acc, _EMPTY_, reply, _EMPTY_, s.jsonResponse(&resp))
}
// Will lookup a stream assignment.
@@ -1604,10 +1608,11 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember,
if resp.Error == nil {
resp.Error = jsError(err)
}
s.sendAPIErrResponse(sa.Client, acc, _EMPTY_, sa.Reply, _EMPTY_, s.jsonResponse(resp))
} else {
resp.Success = true
s.sendAPIResponse(sa.Client, acc, _EMPTY_, sa.Reply, _EMPTY_, s.jsonResponse(resp))
}
s.sendAPIResponse(sa.Client, acc, _EMPTY_, sa.Reply, _EMPTY_, s.jsonResponse(resp))
}
// processConsumerAssignment is called when followers have replicated an assignment for a consumer.
@@ -1826,11 +1831,11 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb
if resp.Error == nil {
resp.Error = jsError(err)
}
s.sendAPIErrResponse(ca.Client, acc, _EMPTY_, ca.Reply, _EMPTY_, s.jsonResponse(resp))
} else {
resp.Success = true
s.sendAPIResponse(ca.Client, acc, _EMPTY_, ca.Reply, _EMPTY_, s.jsonResponse(resp))
}
s.sendAPIResponse(ca.Client, acc, _EMPTY_, ca.Reply, _EMPTY_, s.jsonResponse(resp))
}
// Returns the consumer assignment, or nil if not present.
@@ -2074,10 +2079,11 @@ func (js *jetStream) processConsumerLeaderChange(o *Consumer, ca *consumerAssign
var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}
if err != nil {
resp.Error = jsError(err)
s.sendAPIErrResponse(client, acc, _EMPTY_, reply, _EMPTY_, s.jsonResponse(&resp))
} else {
resp.ConsumerInfo = o.Info()
s.sendAPIResponse(client, acc, _EMPTY_, reply, _EMPTY_, s.jsonResponse(&resp))
}
s.sendAPIResponse(client, acc, _EMPTY_, reply, _EMPTY_, s.jsonResponse(&resp))
}
type streamAssignmentResult struct {
@@ -2114,7 +2120,7 @@ func (js *jetStream) processStreamAssignmentResults(sub *subscription, c *client
} else if result.Restore != nil {
resp = s.jsonResponse(result.Restore)
}
js.srv.sendAPIResponse(sa.Client, acc, _EMPTY_, sa.Reply, _EMPTY_, resp)
js.srv.sendAPIErrResponse(sa.Client, acc, _EMPTY_, sa.Reply, _EMPTY_, resp)
sa.responded = true
// TODO(dlc) - Could have mixed results, should track per peer.
// Set sa.err while we are deleting so we will not respond to list/names requests.
@@ -2142,7 +2148,7 @@ func (js *jetStream) processConsumerAssignmentResults(sub *subscription, c *clie
if sa := js.streamAssignment(result.Account, result.Stream); sa != nil && sa.consumers != nil {
if ca := sa.consumers[result.Consumer]; ca != nil && !ca.responded {
js.srv.sendAPIResponse(ca.Client, acc, _EMPTY_, ca.Reply, _EMPTY_, s.jsonResponse(result.Response))
js.srv.sendAPIErrResponse(ca.Client, acc, _EMPTY_, ca.Reply, _EMPTY_, s.jsonResponse(result.Response))
ca.responded = true
// Check if this failed.
// TODO(dlc) - Could have mixed results, should track per peer.
@@ -2266,7 +2272,7 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, subject, reply string,
acc, err := s.LookupAccount(ci.Account)
if err != nil {
resp.Error = jsError(err)
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
@@ -2286,7 +2292,7 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, subject, reply string,
if exceeded {
resp.Error = jsError(fmt.Errorf("maximum number of streams reached"))
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
@@ -2296,7 +2302,7 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, subject, reply string,
if sa := js.streamAssignment(ci.Account, cfg.Name); sa != nil {
resp.Error = jsError(ErrJetStreamStreamAlreadyUsed)
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
@@ -2304,7 +2310,7 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, subject, reply string,
rg := cc.createGroupForStream(cfg)
if rg == nil {
resp.Error = jsInsufficientErr
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
// Pick a preferred leader.
@@ -2329,7 +2335,7 @@ func (s *Server) jsClusteredStreamDeleteRequest(ci *ClientInfo, stream, reply st
if err == nil {
var resp = JSApiStreamDeleteResponse{ApiResponse: ApiResponse{Type: JSApiStreamDeleteResponseType}}
resp.Error = jsNotFoundError(ErrJetStreamStreamNotFound)
s.sendAPIResponse(ci, acc, _EMPTY_, reply, string(rmsg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, _EMPTY_, reply, string(rmsg), s.jsonResponse(&resp))
}
return
}
@@ -2361,7 +2367,7 @@ func (s *Server) jsClusteredStreamPurgeRequest(ci *ClientInfo, stream, subject,
} else {
resp.Error = jsNotFoundError(ErrJetStreamStreamNotFound)
}
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
@@ -2384,7 +2390,7 @@ func (s *Server) jsClusteredStreamRestoreRequest(ci *ClientInfo, acc *Account, r
if sa := js.streamAssignment(ci.Account, cfg.Name); sa != nil {
resp.Error = jsError(ErrJetStreamStreamAlreadyUsed)
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
@@ -2392,7 +2398,7 @@ func (s *Server) jsClusteredStreamRestoreRequest(ci *ClientInfo, acc *Account, r
rg := cc.createGroupForStream(cfg)
if rg == nil {
resp.Error = jsInsufficientErr
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
// Pick a preferred leader.
@@ -2636,8 +2642,8 @@ func (s *Server) jsClusteredConsumerDeleteRequest(ci *ClientInfo, stream, consum
acc, err := s.LookupAccount(ci.Account)
if err == nil {
var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerDeleteResponseType}}
resp.Error = jsNoConsumerErr
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
resp.Error = jsNoAccountErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
}
return
}
@@ -2719,7 +2725,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, subject, reply strin
acc, err := s.LookupAccount(ci.Account)
if err != nil {
resp.Error = jsError(err)
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
@@ -2727,14 +2733,14 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, subject, reply strin
sa := js.streamAssignment(ci.Account, stream)
if sa == nil {
resp.Error = jsError(ErrJetStreamStreamNotFound)
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
rg := cc.createGroupForConsumer(sa)
if rg == nil {
resp.Error = jsInsufficientErr
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
// Pick a preferred leader.
@@ -2756,7 +2762,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, subject, reply strin
oname = cfg.Durable
if sa.consumers[oname] != nil {
resp.Error = jsError(ErrJetStreamConsumerAlreadyUsed)
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
}

View File

@@ -2276,6 +2276,82 @@ func TestJetStreamClusterStreamTemplates(t *testing.T) {
}
}
func TestJetStreamClusterExtendedAccountInfo(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
sendBatch := func(subject string, n int) {
t.Helper()
for i := 0; i < n; i++ {
if _, err := js.Publish(subject, []byte("JSC-OK")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
}
// Add in some streams with msgs and consumers.
if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST-1", Replicas: 2}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if _, err := js.SubscribeSync("TEST-1"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
sendBatch("TEST-1", 25)
if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST-2", Replicas: 2}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if _, err := js.SubscribeSync("TEST-2"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
sendBatch("TEST-2", 50)
if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST-3", Replicas: 3, Storage: nats.MemoryStorage}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if _, err := js.SubscribeSync("TEST-3"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
sendBatch("TEST-3", 100)
// Go client will lag so use direct for now.
getAccountInfo := func() *server.JetStreamAccountStats {
resp, err := nc.Request(server.JSApiAccountInfo, nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var info server.JSApiAccountInfoResponse
if err := json.Unmarshal(resp.Data, &info); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
return info.JetStreamAccountStats
}
ai := getAccountInfo()
if ai.Streams != 3 || ai.Consumers != 3 {
t.Fatalf("AccountInfo not correct: %+v", ai)
}
if ai.API.Ok < 10 {
t.Fatalf("Expected at least 10 API calls to be ok, got %d", ai.API.Ok)
}
// Now do a failure to make sure we track API errors.
js.StreamInfo("NO-STREAM")
js.ConsumerInfo("TEST-1", "NO-CONSUMER")
js.ConsumerInfo("TEST-2", "NO-CONSUMER")
js.ConsumerInfo("TEST-3", "NO-CONSUMER")
ai = getAccountInfo()
if ai.API.Err != 4 {
t.Fatalf("Expected 4 API calls to be errors, got %d", ai.API.Err)
}
}
func TestJetStreamClusterNoQuorumStepdown(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()