mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
@@ -511,6 +511,7 @@ func (a *Account) AddServiceExportWithResponse(subject string, respType ServiceR
|
||||
}
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
|
||||
if a.exports.services == nil {
|
||||
a.exports.services = make(map[string]*serviceExport)
|
||||
}
|
||||
@@ -1279,8 +1280,10 @@ func (a *Account) AddStreamExport(subject string, accounts []*Account) error {
|
||||
if a == nil {
|
||||
return ErrMissingAccount
|
||||
}
|
||||
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
|
||||
if a.exports.streams == nil {
|
||||
a.exports.streams = make(map[string]*streamExport)
|
||||
}
|
||||
|
||||
@@ -194,7 +194,7 @@ func (s *Server) EnableJetStream(config *JetStreamConfig) error {
|
||||
s.Noticef("----------------------------------------")
|
||||
|
||||
// If we have no configured accounts setup then setup imports on global account.
|
||||
if s.globalAccountOnly() && !s.inOperatorMode() {
|
||||
if s.globalAccountOnly() {
|
||||
if err := s.GlobalAccount().EnableJetStream(nil); err != nil {
|
||||
return fmt.Errorf("Error enabling jetstream on the global account")
|
||||
}
|
||||
@@ -224,20 +224,45 @@ func (s *Server) JetStreamConfig() *JetStreamConfig {
|
||||
return c
|
||||
}
|
||||
|
||||
// EnableJetStream will enable JetStream on this account.
|
||||
// JetStreamNumAccounts returns the number of enabled accounts this server is tracking.
|
||||
func (s *Server) JetStreamNumAccounts() int {
|
||||
js := s.getJetStream()
|
||||
if js == nil {
|
||||
return 0
|
||||
}
|
||||
js.mu.Lock()
|
||||
defer js.mu.Unlock()
|
||||
return len(js.accounts)
|
||||
}
|
||||
|
||||
// JetStreamReservedResources returns the reserved resources if JetStream is enabled.
|
||||
func (s *Server) JetStreamReservedResources() (int64, int64, error) {
|
||||
js := s.getJetStream()
|
||||
if js == nil {
|
||||
return -1, -1, fmt.Errorf("jetstream not enabled")
|
||||
}
|
||||
js.mu.RLock()
|
||||
defer js.mu.RUnlock()
|
||||
return js.memReserved, js.storeReserved, nil
|
||||
}
|
||||
|
||||
func (s *Server) getJetStream() *jetStream {
|
||||
s.mu.Lock()
|
||||
js := s.js
|
||||
s.mu.Unlock()
|
||||
return js
|
||||
}
|
||||
|
||||
// EnableJetStream will enable JetStream on this account with the defined limits.
|
||||
// This is a helper for JetStreamEnableAccount.
|
||||
func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error {
|
||||
a.mu.RLock()
|
||||
s := a.srv
|
||||
a.mu.RUnlock()
|
||||
if s == nil {
|
||||
return fmt.Errorf("jetstream unknown account")
|
||||
return fmt.Errorf("jetstream account not registered")
|
||||
}
|
||||
return s.JetStreamEnableAccount(a, limits)
|
||||
}
|
||||
|
||||
// JetStreamEnableAccount enables jetstream capabilties for the given account with the defined limits.
|
||||
func (s *Server) JetStreamEnableAccount(a *Account, limits *JetStreamAccountLimits) error {
|
||||
// FIXME(dlc) - cluster mode
|
||||
js := s.getJetStream()
|
||||
if js == nil {
|
||||
return fmt.Errorf("jetstream not enabled")
|
||||
@@ -294,7 +319,7 @@ func (a *Account) UpdateJetStreamLimits(limits *JetStreamAccountLimits) error {
|
||||
s := a.srv
|
||||
a.mu.RUnlock()
|
||||
if s == nil {
|
||||
return fmt.Errorf("jetstream unknown account")
|
||||
return fmt.Errorf("jetstream account not registered")
|
||||
}
|
||||
|
||||
js := s.getJetStream()
|
||||
@@ -367,12 +392,9 @@ func (a *Account) DisableJetStream() error {
|
||||
s := a.srv
|
||||
a.mu.RUnlock()
|
||||
if s == nil {
|
||||
return fmt.Errorf("jetstream unknown account")
|
||||
return fmt.Errorf("jetstream account not registered")
|
||||
}
|
||||
return s.JetStreamDisableAccount(a)
|
||||
}
|
||||
|
||||
func (s *Server) JetStreamDisableAccount(a *Account) error {
|
||||
js := s.getJetStream()
|
||||
if js == nil {
|
||||
return fmt.Errorf("jetstream not enabled")
|
||||
@@ -419,35 +441,6 @@ func (a *Account) JetStreamEnabled() bool {
|
||||
return enabled
|
||||
}
|
||||
|
||||
// JetStreamNumAccounts returns the number of enabled accounts this server is tracking.
|
||||
func (s *Server) JetStreamNumAccounts() int {
|
||||
js := s.getJetStream()
|
||||
if js == nil {
|
||||
return 0
|
||||
}
|
||||
js.mu.Lock()
|
||||
defer js.mu.Unlock()
|
||||
return len(js.accounts)
|
||||
}
|
||||
|
||||
// JetStreamReservedResources returns the reserved resources if JetStream is enabled.
|
||||
func (s *Server) JetStreamReservedResources() (int64, int64, error) {
|
||||
js := s.getJetStream()
|
||||
if js == nil {
|
||||
return -1, -1, fmt.Errorf("jetstream not enabled")
|
||||
}
|
||||
js.mu.RLock()
|
||||
defer js.mu.RUnlock()
|
||||
return js.memReserved, js.storeReserved, nil
|
||||
}
|
||||
|
||||
func (s *Server) getJetStream() *jetStream {
|
||||
s.mu.Lock()
|
||||
js := s.js
|
||||
s.mu.Unlock()
|
||||
return js
|
||||
}
|
||||
|
||||
// Updates accounting on in use memory and storage.
|
||||
func (jsa *jsAccount) updateUsage(storeType StorageType, delta int64) {
|
||||
// TODO(dlc) - atomics? snapshot limits?
|
||||
@@ -624,3 +617,10 @@ func FriendlyBytes(bytes int64) string {
|
||||
index := exp - 1
|
||||
return fmt.Sprintf("%.2f %sB", fbytes/math.Pow(float64(base), float64(exp)), pre[index])
|
||||
}
|
||||
|
||||
func isValidName(name string) bool {
|
||||
if name == "" {
|
||||
return false
|
||||
}
|
||||
return !strings.ContainsAny(name, ".*>")
|
||||
}
|
||||
|
||||
@@ -16,7 +16,6 @@ package server
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
@@ -69,14 +68,22 @@ const (
|
||||
MsgSetMaxReplicas = 8
|
||||
)
|
||||
|
||||
// JetStreamAddMsgSet adds a message set for the given account.
|
||||
func (s *Server) JetStreamAddMsgSet(a *Account, config *MsgSetConfig) (*MsgSet, error) {
|
||||
// AddMsgSet adds a JetStream message set for the given account.
|
||||
func (a *Account) AddMsgSet(config *MsgSetConfig) (*MsgSet, error) {
|
||||
a.mu.RLock()
|
||||
s := a.srv
|
||||
a.mu.RUnlock()
|
||||
if s == nil {
|
||||
return nil, fmt.Errorf("jetstream account not registered")
|
||||
}
|
||||
|
||||
// FIXME(dlc) - Change for clustering.
|
||||
js := s.getJetStream()
|
||||
if js == nil {
|
||||
return nil, fmt.Errorf("jetstream not enabled")
|
||||
}
|
||||
|
||||
jsa := js.lookupAccount(a)
|
||||
jsa := a.js
|
||||
if jsa == nil {
|
||||
return nil, fmt.Errorf("jetstream not enabled for account")
|
||||
}
|
||||
@@ -128,9 +135,14 @@ func (s *Server) JetStreamAddMsgSet(a *Account, config *MsgSetConfig) (*MsgSet,
|
||||
}
|
||||
|
||||
func checkMsgSetCfg(config *MsgSetConfig) (MsgSetConfig, error) {
|
||||
if config == nil || config.Name == _EMPTY_ {
|
||||
if config == nil {
|
||||
return MsgSetConfig{}, fmt.Errorf("message set configuration invalid")
|
||||
}
|
||||
|
||||
if !isValidName(config.Name) {
|
||||
return MsgSetConfig{}, fmt.Errorf("message set name required, can not contain '.', '*', '>'")
|
||||
}
|
||||
|
||||
cfg := *config
|
||||
|
||||
// TODO(dlc) - check config for conflicts, e.g replicas > 1 in single server mode.
|
||||
@@ -149,12 +161,7 @@ func checkMsgSetCfg(config *MsgSetConfig) (MsgSetConfig, error) {
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
// JetStreamDeleteMsgSet will delete a message set.
|
||||
func (s *Server) JetStreamDeleteMsgSet(mset *MsgSet) error {
|
||||
return mset.Delete()
|
||||
}
|
||||
|
||||
// Delete deletes a message set.
|
||||
// Delete deletes a message set from the owning account.
|
||||
func (mset *MsgSet) Delete() error {
|
||||
mset.mu.Lock()
|
||||
jsa := mset.jsa
|
||||
@@ -242,6 +249,7 @@ func (mset *MsgSet) setupStore() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// processInboundJetStreamMsg handles processing messages bound for a message set.
|
||||
func (mset *MsgSet) processInboundJetStreamMsg(_ *subscription, _ *client, subject, reply string, msg []byte) {
|
||||
mset.mu.Lock()
|
||||
store := mset.store
|
||||
@@ -390,13 +398,6 @@ func (mset *MsgSet) delete() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Returns a name that can be used as a single token for subscriptions.
|
||||
// Really only need to replace token separators.
|
||||
// Lock should be held
|
||||
func (mset *MsgSet) cleanName() string {
|
||||
return strings.Replace(mset.config.Name, tsep, "-", -1)
|
||||
}
|
||||
|
||||
// NumObservables reports on number of active observables for this message set.
|
||||
func (mset *MsgSet) NumObservables() int {
|
||||
mset.mu.Lock()
|
||||
@@ -420,15 +421,17 @@ func (mset *MsgSet) Stats() MsgSetStats {
|
||||
// waitForMsgs will have the message set wait for the arrival of new messages.
|
||||
func (mset *MsgSet) waitForMsgs() {
|
||||
mset.mu.Lock()
|
||||
defer mset.mu.Unlock()
|
||||
|
||||
if mset.client == nil {
|
||||
mset.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
mset.sgw++
|
||||
mset.sg.Wait()
|
||||
mset.sgw--
|
||||
|
||||
mset.mu.Unlock()
|
||||
}
|
||||
|
||||
// Determines if the new proposed partition is unique amongst all observables.
|
||||
|
||||
@@ -203,6 +203,11 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error)
|
||||
o.name = createObservableName()
|
||||
}
|
||||
|
||||
if !isValidName(o.name) {
|
||||
mset.mu.Unlock()
|
||||
return nil, fmt.Errorf("durable name can not contain '.', '*', '>'")
|
||||
}
|
||||
|
||||
// Select starting sequence number
|
||||
o.selectStartingSeqNo()
|
||||
|
||||
@@ -235,9 +240,9 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error)
|
||||
// Set up the ack subscription for this observable. Will use wildcard for all acks.
|
||||
// We will remember the template to generate replaies with sequence numbers and use
|
||||
// that to scanf them back in.
|
||||
cn := mset.cleanName()
|
||||
o.ackReplyT = fmt.Sprintf("%s.%s.%s.%%d.%%d.%%d", JetStreamAckPre, cn, o.name)
|
||||
ackSubj := fmt.Sprintf("%s.%s.%s.*.*.*", JetStreamAckPre, cn, o.name)
|
||||
mn := mset.config.Name
|
||||
o.ackReplyT = fmt.Sprintf("%s.%s.%s.%%d.%%d.%%d", JetStreamAckPre, mn, o.name)
|
||||
ackSubj := fmt.Sprintf("%s.%s.%s.*.*.*", JetStreamAckPre, mn, o.name)
|
||||
if sub, err := mset.subscribeInternal(ackSubj, o.processAck); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
@@ -245,7 +250,7 @@ func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error)
|
||||
}
|
||||
// Setup the internal sub for next message requests.
|
||||
if !o.isPushMode() {
|
||||
o.nextMsgSubj = fmt.Sprintf("%s.%s.%s", JetStreamRequestNextPre, cn, o.name)
|
||||
o.nextMsgSubj = fmt.Sprintf("%s.%s.%s", JetStreamRequestNextPre, mn, o.name)
|
||||
if sub, err := mset.subscribeInternal(o.nextMsgSubj, o.processNextMsgReq); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
@@ -615,13 +620,11 @@ func (o *Observable) loopAndDeliverMsgs(s *Server, a *Account) {
|
||||
// Deliver all the msgs we have now, once done or on a condition, we wait for new ones.
|
||||
for {
|
||||
var (
|
||||
mset *MsgSet
|
||||
seq uint64
|
||||
dcnt uint64
|
||||
subj string
|
||||
dsubj string
|
||||
msg []byte
|
||||
err error
|
||||
mset *MsgSet
|
||||
seq, dcnt uint64
|
||||
subj, dsubj string
|
||||
msg []byte
|
||||
err error
|
||||
)
|
||||
|
||||
o.mu.Lock()
|
||||
@@ -702,6 +705,7 @@ func (o *Observable) trackPending(seq uint64) {
|
||||
o.pending[seq] = time.Now().UnixNano()
|
||||
}
|
||||
|
||||
// This will check if a registered delivery subject still has interest, e.g. subscriptions.
|
||||
func (o *Observable) checkActive() {
|
||||
o.mu.Lock()
|
||||
mset := o.mset
|
||||
|
||||
@@ -598,12 +598,6 @@ func (s *Server) generateRouteInfoJSON() {
|
||||
s.routeInfoJSON = bytes.Join(pcs, []byte(" "))
|
||||
}
|
||||
|
||||
// Determines if we are in operator mode.
|
||||
// Uses opts instead of server lock.
|
||||
func (s *Server) inOperatorMode() bool {
|
||||
return len(s.getOpts().TrustedOperators) > 0
|
||||
}
|
||||
|
||||
// Determines if we are in pre NATS 2.0 setup with no accounts.
|
||||
func (s *Server) globalAccountOnly() bool {
|
||||
var hasOthers bool
|
||||
|
||||
Reference in New Issue
Block a user