Files
nats-server/server/jetstream.go
Waldemar Quevedo d338e9ae30 Disable JS on restart if on invalid state
Signed-off-by: Waldemar Quevedo <wally@synadia.com>
2021-03-10 01:40:32 -08:00

1750 lines
44 KiB
Go

// Copyright 2019-2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"crypto/sha256"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"io/ioutil"
"math"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"github.com/minio/highwayhash"
"github.com/nats-io/nats-server/v2/server/sysmem"
"github.com/nats-io/nuid"
)
// JetStreamConfig determines this server's configuration.
// MaxMemory and MaxStore are in bytes.
type JetStreamConfig struct {
MaxMemory int64 `json:"max_memory"`
MaxStore int64 `json:"max_storage"`
StoreDir string `json:"store_dir,omitempty"`
}
type JetStreamStats struct {
Memory uint64 `json:"memory"`
Store uint64 `json:"storage"`
Accounts int `json:"accounts,omitempty"`
API JetStreamAPIStats `json:"api"`
}
type JetStreamAccountLimits struct {
MaxMemory int64 `json:"max_memory"`
MaxStore int64 `json:"max_storage"`
MaxStreams int `json:"max_streams"`
MaxConsumers int `json:"max_consumers"`
}
// JetStreamAccountStats returns current statistics about the account's JetStream usage.
type JetStreamAccountStats struct {
Memory uint64 `json:"memory"`
Store uint64 `json:"storage"`
Streams int `json:"streams"`
Consumers int `json:"consumers"`
API JetStreamAPIStats `json:"api"`
Limits JetStreamAccountLimits `json:"limits"`
}
type JetStreamAPIStats struct {
Total uint64 `json:"total"`
Errors uint64 `json:"errors"`
}
// This is for internal accounting for JetStream for this server.
type jetStream struct {
mu sync.RWMutex
srv *Server
config JetStreamConfig
cluster *jetStreamCluster
accounts map[*Account]*jsAccount
memReserved int64
storeReserved int64
apiCalls int64
apiSubs *Sublist
disabled bool
}
// This represents a jetstream enabled account.
// Worth noting that we include the js ptr, this is because
// in general we want to be very efficient when receiving messages on
// and internal sub for a msgSet, so we will direct link to the msgSet
// and walk backwards as needed vs multiple hash lookups and locks, etc.
type jsAccount struct {
mu sync.RWMutex
js *jetStream
account *Account
limits JetStreamAccountLimits
memReserved int64
storeReserved int64
memTotal int64
storeTotal int64
apiTotal uint64
apiErrors uint64
usage jsaUsage
rusage map[string]*jsaUsage
storeDir string
streams map[string]*stream
templates map[string]*streamTemplate
store TemplateStore
// Cluster support
updatesPub string
updatesSub *subscription
// From server
sendq chan *pubMsg
lupdate time.Time
utimer *time.Timer
}
// Track general usage for this account.
type jsaUsage struct {
mem int64
store int64
api uint64
err uint64
}
// EnableJetStream will enable JetStream support on this server with the given configuration.
// A nil configuration will dynamically choose the limits and temporary file storage directory.
func (s *Server) EnableJetStream(config *JetStreamConfig) error {
if s.JetStreamEnabled() {
return fmt.Errorf("jetstream already enabled")
}
s.Noticef("Starting JetStream")
if config == nil || config.MaxMemory <= 0 || config.MaxStore <= 0 {
var storeDir string
var maxStore int64
if config != nil {
storeDir = config.StoreDir
maxStore = config.MaxStore
}
config = s.dynJetStreamConfig(storeDir, maxStore)
s.Debugf("JetStream creating dynamic configuration - %s memory, %s disk", friendlyBytes(config.MaxMemory), friendlyBytes(config.MaxStore))
}
// Copy, don't change callers version.
cfg := *config
if cfg.StoreDir == "" {
cfg.StoreDir = filepath.Join(os.TempDir(), JetStreamStoreDir)
}
return s.enableJetStream(cfg)
}
// enableJetStream will start up the JetStream subsystem.
func (s *Server) enableJetStream(cfg JetStreamConfig) error {
s.mu.Lock()
s.js = &jetStream{srv: s, config: cfg, accounts: make(map[*Account]*jsAccount), apiSubs: NewSublistNoCache()}
s.mu.Unlock()
// FIXME(dlc) - Allow memory only operation?
if stat, err := os.Stat(cfg.StoreDir); os.IsNotExist(err) {
if err := os.MkdirAll(cfg.StoreDir, 0755); err != nil {
return fmt.Errorf("could not create storage directory - %v", err)
}
} else {
// Make sure its a directory and that we can write to it.
if stat == nil || !stat.IsDir() {
return fmt.Errorf("storage directory is not a directory")
}
tmpfile, err := ioutil.TempFile(cfg.StoreDir, "_test_")
if err != nil {
return fmt.Errorf("storage directory is not writable")
}
os.Remove(tmpfile.Name())
}
// JetStream is an internal service so we need to make sure we have a system account.
// This system account will export the JetStream service endpoints.
if s.SystemAccount() == nil {
s.SetDefaultSystemAccount()
}
s.Warnf(" _ ___ _____ ___ _____ ___ ___ _ __ __")
s.Warnf(" _ | | __|_ _/ __|_ _| _ \\ __| /_\\ | \\/ |")
s.Warnf("| || | _| | | \\__ \\ | | | / _| / _ \\| |\\/| |")
s.Warnf(" \\__/|___| |_| |___/ |_| |_|_\\___/_/ \\_\\_| |_|")
s.Warnf("")
s.Warnf(" https://github.com/nats-io/jetstream")
s.Noticef("")
s.Noticef("---------------- JETSTREAM ----------------")
s.Noticef(" Max Memory: %s", friendlyBytes(cfg.MaxMemory))
s.Noticef(" Max Storage: %s", friendlyBytes(cfg.MaxStore))
s.Noticef(" Store Directory: %q", cfg.StoreDir)
s.Noticef("-------------------------------------------")
// Setup our internal subscriptions.
if err := s.setJetStreamExportSubs(); err != nil {
return fmt.Errorf("Error setting up internal jetstream subscriptions: %v", err)
}
// Setup our internal system exports.
s.Debugf(" Exports:")
s.Debugf(" %s", jsAllApi)
s.setupJetStreamExports()
// Enable accounts and restore state before starting clustering.
if err := s.enableJetStreamAccounts(); err != nil {
return err
}
// If we are in clustered mode go ahead and start the meta controller.
if !s.standAloneMode() {
if err := s.enableJetStreamClustering(); err != nil {
return err
}
}
return nil
}
// restartJetStream will try to re-enable JetStream during a reload if it had been disabled during runtime.
func (s *Server) restartJetStream() error {
opts := s.getOpts()
cfg := JetStreamConfig{
StoreDir: opts.StoreDir,
MaxMemory: opts.JetStreamMaxMemory,
MaxStore: opts.JetStreamMaxStore,
}
s.Noticef("Restarting JetStream")
err := s.enableJetStream(cfg)
if err != nil {
s.Warnf("Can't start JetStream: %v", err)
return s.DisableJetStream()
}
return nil
}
// checkStreamExports will check if we have the JS exports setup
// on the system account, and if not go ahead and set them up.
func (s *Server) checkJetStreamExports() {
sacc := s.SystemAccount()
if sacc != nil && sacc.getServiceExport(jsAllApi) == nil {
s.setupJetStreamExports()
}
}
func (s *Server) setupJetStreamExports() {
// Setup our internal system export.
if err := s.SystemAccount().AddServiceExport(jsAllApi, nil); err != nil {
s.Warnf("Error setting up jetstream service exports: %v", err)
}
}
func (s *Server) setJetStreamDisabled() {
s.mu.Lock()
js := s.js
s.mu.Unlock()
js.mu.Lock()
js.disabled = true
js.mu.Unlock()
}
func (s *Server) handleOutOfSpace(stream string) {
if s.JetStreamEnabled() {
s.Errorf("JetStream out of space, will be DISABLED")
go s.DisableJetStream()
adv := &JSServerOutOfSpaceAdvisory{
TypedEvent: TypedEvent{
Type: JSServerOutOfStorageAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Server: s.Name(),
ServerID: s.ID(),
Stream: stream,
Cluster: s.cachedClusterName(),
}
s.publishAdvisory(nil, JSAdvisoryServerOutOfStorage, adv)
}
}
// DisableJetStream will turn off JetStream and signals in clustered mode
// to have the metacontroller remove us from the peer list.
func (s *Server) DisableJetStream() error {
if !s.JetStreamEnabled() {
return nil
}
s.setJetStreamDisabled()
if s.JetStreamIsClustered() {
isLeader := s.JetStreamIsLeader()
js, cc := s.getJetStreamCluster()
if js == nil {
s.shutdownJetStream()
return nil
}
js.mu.RLock()
meta := cc.meta
js.mu.RUnlock()
if meta != nil {
if isLeader {
js.remapStreams(meta.ID())
s.Warnf("JetStream initiating meta leader transfer")
select {
case <-s.quitCh:
return nil
case <-time.After(2 * time.Second):
}
if !s.JetStreamIsCurrent() {
s.Warnf("JetStream timeout waiting for meta leader transfer")
}
}
// Once here we can forward our proposal to remove ourselves.
meta.ProposeRemovePeer(meta.ID())
time.Sleep(250 * time.Millisecond)
meta.Delete()
}
}
// Normal shutdown.
s.shutdownJetStream()
return nil
}
func (s *Server) enableJetStreamAccounts() error {
// If we have no configured accounts setup then setup imports on global account.
if s.globalAccountOnly() {
if err := s.GlobalAccount().EnableJetStream(nil); err != nil {
return fmt.Errorf("Error enabling jetstream on the global account")
}
} else if err := s.configAllJetStreamAccounts(); err != nil {
return fmt.Errorf("Error enabling jetstream on configured accounts: %v", err)
}
return nil
}
// enableAllJetStreamServiceImports turns on all service imports for jetstream for this account.
func (a *Account) enableAllJetStreamServiceImports() error {
a.mu.RLock()
s := a.srv
a.mu.RUnlock()
if s == nil {
return fmt.Errorf("jetstream account not registered")
}
if !a.serviceImportExists(jsAllApi) {
if err := a.AddServiceImport(s.SystemAccount(), jsAllApi, _EMPTY_); err != nil {
return fmt.Errorf("Error setting up jetstream service imports for account: %v", err)
}
}
return nil
}
// enableJetStreamEnabledServiceImportOnly will enable the single service import responder.
// Should we do them all regardless?
func (a *Account) enableJetStreamInfoServiceImportOnly() error {
// Check if this import would be overshadowed. This can happen when accounts
// are importing from another account for JS access.
if a.serviceImportShadowed(JSApiAccountInfo) {
return nil
}
return a.enableAllJetStreamServiceImports()
}
func (s *Server) configJetStream(acc *Account) error {
if acc.jsLimits != nil {
// Check if already enabled. This can be during a reload.
if acc.JetStreamEnabled() {
if err := acc.enableAllJetStreamServiceImports(); err != nil {
return err
}
if err := acc.UpdateJetStreamLimits(acc.jsLimits); err != nil {
return err
}
} else {
if err := acc.EnableJetStream(acc.jsLimits); err != nil {
return err
}
if s.gateway.enabled {
s.switchAccountToInterestMode(acc.GetName())
}
}
acc.jsLimits = nil
} else if acc != s.SystemAccount() {
if acc.JetStreamEnabled() {
acc.DisableJetStream()
}
// We will setup basic service imports to respond to
// requests if JS is enabled for this account.
if err := acc.enableJetStreamInfoServiceImportOnly(); err != nil {
return err
}
}
return nil
}
// configAllJetStreamAccounts walk all configured accounts and turn on jetstream if requested.
func (s *Server) configAllJetStreamAccounts() error {
// Check to see if system account has been enabled. We could arrive here via reload and
// a non-default system account.
s.checkJetStreamExports()
// Snapshot into our own list. Might not be needed.
s.mu.Lock()
// Bail if server not enabled. If it was enabled and a reload turns it off
// that will be handled elsewhere.
if s.js == nil {
s.mu.Unlock()
return nil
}
var jsAccounts []*Account
s.accounts.Range(func(k, v interface{}) bool {
jsAccounts = append(jsAccounts, v.(*Account))
return true
})
s.mu.Unlock()
// Process any jetstream enabled accounts here.
for _, acc := range jsAccounts {
if err := s.configJetStream(acc); err != nil {
return err
}
}
return nil
}
// JetStreamEnabled reports if jetstream is enabled.
func (s *Server) JetStreamEnabled() bool {
s.mu.Lock()
enabled := s.js != nil && !s.js.disabled
s.mu.Unlock()
return enabled
}
// Will migrate off ephemerals if possible.
// This means parent stream needs to be replicated.
func (s *Server) migrateEphemerals() {
js, cc := s.getJetStreamCluster()
// Make sure JetStream is enabled and we are clustered.
if js == nil || cc == nil {
return
}
var consumers []*consumerAssignment
js.mu.Lock()
ourID := cc.meta.ID()
for _, asa := range cc.streams {
for _, sa := range asa {
if rg := sa.Group; rg != nil && len(rg.Peers) > 1 && rg.isMember(ourID) && len(sa.consumers) > 0 {
for _, ca := range sa.consumers {
if ca.Group != nil && len(ca.Group.Peers) == 1 && ca.Group.isMember(ourID) {
// Need to select possible new peer from parent stream.
for _, p := range rg.Peers {
if p != ourID {
ca.Group.Peers = []string{p}
ca.Group.Preferred = p
consumers = append(consumers, ca)
break
}
}
}
}
}
}
}
js.mu.Unlock()
// Process the consumers.
for _, ca := range consumers {
// Locate the consumer itself.
if acc, err := s.LookupAccount(ca.Client.Account); err == nil && acc != nil {
if mset, err := acc.lookupStream(ca.Stream); err == nil && mset != nil {
if o := mset.lookupConsumer(ca.Name); o != nil {
state := o.readStoreState()
o.deleteWithoutAdvisory()
js.mu.Lock()
// Delete old one.
cc.meta.ForwardProposal(encodeDeleteConsumerAssignment(ca))
// Encode state and new name.
ca.State = state
ca.Name = createConsumerName()
addEntry := encodeAddConsumerAssignmentCompressed(ca)
cc.meta.ForwardProposal(addEntry)
js.mu.Unlock()
}
}
}
}
// Give time for migration information to make it out of our server.
if len(consumers) > 0 {
time.Sleep(50 * time.Millisecond)
}
}
// Shutdown jetstream for this server.
func (s *Server) shutdownJetStream() {
s.mu.Lock()
js := s.js
s.mu.Unlock()
if js == nil {
return
}
s.Noticef("Initiating JetStream Shutdown...")
defer s.Noticef("JetStream Shutdown")
var _a [512]*Account
accounts := _a[:0]
js.mu.RLock()
// Collect accounts.
for _, jsa := range js.accounts {
if a := jsa.acc(); a != nil {
accounts = append(accounts, a)
}
}
js.mu.RUnlock()
for _, a := range accounts {
a.removeJetStream()
}
s.mu.Lock()
s.js = nil
s.mu.Unlock()
js.mu.Lock()
js.accounts = nil
if cc := js.cluster; cc != nil {
js.stopUpdatesSub()
if cc.c != nil {
cc.c.closeConnection(ClientClosed)
cc.c = nil
}
cc.meta = nil
}
js.mu.Unlock()
}
// JetStreamConfig will return the current config. Useful if the system
// created a dynamic configuration. A copy is returned.
func (s *Server) JetStreamConfig() *JetStreamConfig {
var c *JetStreamConfig
s.mu.Lock()
if s.js != nil {
copy := s.js.config
c = &(copy)
}
s.mu.Unlock()
return c
}
func (s *Server) StoreDir() string {
s.mu.Lock()
defer s.mu.Unlock()
if s.js == nil {
return _EMPTY_
}
return s.js.config.StoreDir
}
// JetStreamNumAccounts returns the number of enabled accounts this server is tracking.
func (s *Server) JetStreamNumAccounts() int {
js := s.getJetStream()
if js == nil {
return 0
}
js.mu.Lock()
defer js.mu.Unlock()
return len(js.accounts)
}
// JetStreamReservedResources returns the reserved resources if JetStream is enabled.
func (s *Server) JetStreamReservedResources() (int64, int64, error) {
js := s.getJetStream()
if js == nil {
return -1, -1, ErrJetStreamNotEnabled
}
js.mu.RLock()
defer js.mu.RUnlock()
return js.memReserved, js.storeReserved, nil
}
func (s *Server) getJetStream() *jetStream {
s.mu.Lock()
js := s.js
s.mu.Unlock()
return js
}
// EnableJetStream will enable JetStream on this account with the defined limits.
// This is a helper for JetStreamEnableAccount.
func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error {
a.mu.RLock()
s := a.srv
a.mu.RUnlock()
if s == nil {
return fmt.Errorf("jetstream account not registered")
}
s.mu.Lock()
sendq := s.sys.sendq
s.mu.Unlock()
js := s.getJetStream()
if js == nil {
return ErrJetStreamNotEnabled
}
if s.SystemAccount() == a {
return fmt.Errorf("jetstream can not be enabled on the system account")
}
// No limits means we dynamically set up limits.
if limits == nil {
limits = js.dynamicAccountLimits()
}
js.mu.Lock()
// Check the limits against existing reservations.
if _, ok := js.accounts[a]; ok {
js.mu.Unlock()
return fmt.Errorf("jetstream already enabled for account")
}
if err := js.sufficientResources(limits); err != nil {
js.mu.Unlock()
return err
}
jsa := &jsAccount{js: js, account: a, limits: *limits, streams: make(map[string]*stream), sendq: sendq}
jsa.utimer = time.AfterFunc(usageTick, jsa.sendClusterUsageUpdateTimer)
jsa.storeDir = path.Join(js.config.StoreDir, a.Name)
js.accounts[a] = jsa
js.reserveResources(limits)
js.mu.Unlock()
sysNode := s.Node()
// Cluster mode updates to resource usages, but we always will turn on. System internal prevents echos.
jsa.mu.Lock()
jsa.updatesPub = fmt.Sprintf(jsaUpdatesPubT, a.Name, sysNode)
jsa.updatesSub, _ = s.sysSubscribe(fmt.Sprintf(jsaUpdatesSubT, a.Name), jsa.remoteUpdateUsage)
jsa.mu.Unlock()
// Stamp inside account as well.
a.mu.Lock()
a.js = jsa
a.mu.Unlock()
// Create the proper imports here.
if err := a.enableAllJetStreamServiceImports(); err != nil {
return err
}
s.Debugf("Enabled JetStream for account %q", a.Name)
s.Debugf(" Max Memory: %s", friendlyBytes(limits.MaxMemory))
s.Debugf(" Max Storage: %s", friendlyBytes(limits.MaxStore))
sdir := path.Join(jsa.storeDir, streamsDir)
if _, err := os.Stat(sdir); os.IsNotExist(err) {
if err := os.MkdirAll(sdir, 0755); err != nil {
return fmt.Errorf("could not create storage streams directory - %v", err)
}
}
// Restore any state here.
s.Debugf("Recovering JetStream state for account %q", a.Name)
// Check templates first since messsage sets will need proper ownership.
// FIXME(dlc) - Make this consistent.
tdir := path.Join(jsa.storeDir, tmplsDir)
if stat, err := os.Stat(tdir); err == nil && stat.IsDir() {
key := sha256.Sum256([]byte("templates"))
hh, err := highwayhash.New64(key[:])
if err != nil {
return err
}
fis, _ := ioutil.ReadDir(tdir)
for _, fi := range fis {
metafile := path.Join(tdir, fi.Name(), JetStreamMetaFile)
metasum := path.Join(tdir, fi.Name(), JetStreamMetaFileSum)
buf, err := ioutil.ReadFile(metafile)
if err != nil {
s.Warnf(" Error reading StreamTemplate metafile %q: %v", metasum, err)
continue
}
if _, err := os.Stat(metasum); os.IsNotExist(err) {
s.Warnf(" Missing StreamTemplate checksum for %q", metasum)
continue
}
sum, err := ioutil.ReadFile(metasum)
if err != nil {
s.Warnf(" Error reading StreamTemplate checksum %q: %v", metasum, err)
continue
}
hh.Reset()
hh.Write(buf)
checksum := hex.EncodeToString(hh.Sum(nil))
if checksum != string(sum) {
s.Warnf(" StreamTemplate checksums do not match %q vs %q", sum, checksum)
continue
}
var cfg StreamTemplateConfig
if err := json.Unmarshal(buf, &cfg); err != nil {
s.Warnf(" Error unmarshalling StreamTemplate metafile: %v", err)
continue
}
cfg.Config.Name = _EMPTY_
if _, err := a.addStreamTemplate(&cfg); err != nil {
s.Warnf(" Error recreating StreamTemplate %q: %v", cfg.Name, err)
continue
}
}
}
// Now recover the streams.
fis, _ := ioutil.ReadDir(sdir)
for _, fi := range fis {
mdir := path.Join(sdir, fi.Name())
key := sha256.Sum256([]byte(fi.Name()))
hh, err := highwayhash.New64(key[:])
if err != nil {
return err
}
metafile := path.Join(mdir, JetStreamMetaFile)
metasum := path.Join(mdir, JetStreamMetaFileSum)
if _, err := os.Stat(metafile); os.IsNotExist(err) {
s.Warnf(" Missing Stream metafile for %q", metafile)
continue
}
buf, err := ioutil.ReadFile(metafile)
if err != nil {
s.Warnf(" Error reading metafile %q: %v", metasum, err)
continue
}
if _, err := os.Stat(metasum); os.IsNotExist(err) {
s.Warnf(" Missing Stream checksum for %q", metasum)
continue
}
sum, err := ioutil.ReadFile(metasum)
if err != nil {
s.Warnf(" Error reading Stream metafile checksum %q: %v", metasum, err)
continue
}
hh.Write(buf)
checksum := hex.EncodeToString(hh.Sum(nil))
if checksum != string(sum) {
s.Warnf(" Stream metafile checksums do not match %q vs %q", sum, checksum)
continue
}
var cfg FileStreamInfo
if err := json.Unmarshal(buf, &cfg); err != nil {
s.Warnf(" Error unmarshalling Stream metafile: %v", err)
continue
}
if cfg.Template != _EMPTY_ {
if err := jsa.addStreamNameToTemplate(cfg.Template, cfg.Name); err != nil {
s.Warnf(" Error adding Stream %q to Template %q: %v", cfg.Name, cfg.Template, err)
}
}
mset, err := a.addStream(&cfg.StreamConfig)
if err != nil {
s.Warnf(" Error recreating Stream %q: %v", cfg.Name, err)
continue
}
if !cfg.Created.IsZero() {
mset.setCreatedTime(cfg.Created)
}
state := mset.state()
s.Noticef(" Restored %s messages for Stream %q", comma(int64(state.Msgs)), fi.Name())
// Now do the consumers.
odir := path.Join(sdir, fi.Name(), consumerDir)
ofis, _ := ioutil.ReadDir(odir)
if len(ofis) > 0 {
s.Noticef(" Recovering %d Consumers for Stream - %q", len(ofis), fi.Name())
}
for _, ofi := range ofis {
metafile := path.Join(odir, ofi.Name(), JetStreamMetaFile)
metasum := path.Join(odir, ofi.Name(), JetStreamMetaFileSum)
if _, err := os.Stat(metafile); os.IsNotExist(err) {
s.Warnf(" Missing Consumer Metafile %q", metafile)
continue
}
buf, err := ioutil.ReadFile(metafile)
if err != nil {
s.Warnf(" Error reading consumer metafile %q: %v", metasum, err)
continue
}
if _, err := os.Stat(metasum); os.IsNotExist(err) {
s.Warnf(" Missing Consumer checksum for %q", metasum)
continue
}
var cfg FileConsumerInfo
if err := json.Unmarshal(buf, &cfg); err != nil {
s.Warnf(" Error unmarshalling Consumer metafile: %v", err)
continue
}
isEphemeral := !isDurableConsumer(&cfg.ConsumerConfig)
if isEphemeral {
// This is an ephermal consumer and this could fail on restart until
// the consumer can reconnect. We will create it as a durable and switch it.
cfg.ConsumerConfig.Durable = ofi.Name()
}
obs, err := mset.addConsumer(&cfg.ConsumerConfig)
if err != nil {
s.Warnf(" Error adding Consumer: %v", err)
continue
}
if isEphemeral {
obs.switchToEphemeral()
}
if !cfg.Created.IsZero() {
obs.setCreatedTime(cfg.Created)
}
obs.mu.Lock()
err = obs.readStoredState()
obs.mu.Unlock()
if err != nil {
s.Warnf(" Error restoring Consumer state: %v", err)
}
}
}
// Make sure to cleanup and old remaining snapshots.
os.RemoveAll(path.Join(jsa.storeDir, snapsDir))
s.Debugf("JetStream state for account %q recovered", a.Name)
return nil
}
// NumStreams will return how many streams we have.
func (a *Account) numStreams() int {
a.mu.RLock()
jsa := a.js
a.mu.RUnlock()
if jsa == nil {
return 0
}
jsa.mu.Lock()
n := len(jsa.streams)
jsa.mu.Unlock()
return n
}
// Streams will return all known streams.
func (a *Account) streams() []*stream {
return a.filteredStreams(_EMPTY_)
}
func (a *Account) filteredStreams(filter string) []*stream {
a.mu.RLock()
jsa := a.js
a.mu.RUnlock()
if jsa == nil {
return nil
}
jsa.mu.Lock()
defer jsa.mu.Unlock()
var msets []*stream
for _, mset := range jsa.streams {
if filter != _EMPTY_ {
for _, subj := range mset.cfg.Subjects {
if SubjectsCollide(filter, subj) {
msets = append(msets, mset)
break
}
}
} else {
msets = append(msets, mset)
}
}
return msets
}
// lookupStream will lookup a stream by name.
func (a *Account) lookupStream(name string) (*stream, error) {
a.mu.RLock()
jsa := a.js
a.mu.RUnlock()
if jsa == nil {
return nil, ErrJetStreamNotEnabled
}
jsa.mu.Lock()
defer jsa.mu.Unlock()
mset, ok := jsa.streams[name]
if !ok {
return nil, ErrJetStreamStreamNotFound
}
return mset, nil
}
// UpdateJetStreamLimits will update the account limits for a JetStream enabled account.
func (a *Account) UpdateJetStreamLimits(limits *JetStreamAccountLimits) error {
a.mu.RLock()
s := a.srv
jsa := a.js
a.mu.RUnlock()
if s == nil {
return fmt.Errorf("jetstream account not registered")
}
js := s.getJetStream()
if js == nil {
return ErrJetStreamNotEnabled
}
if jsa == nil {
return ErrJetStreamNotEnabledForAccount
}
if limits == nil {
limits = js.dynamicAccountLimits()
}
// Calculate the delta between what we have and what we want.
jsa.mu.Lock()
dl := diffCheckedLimits(&jsa.limits, limits)
jsaLimits := jsa.limits
jsa.mu.Unlock()
js.mu.Lock()
// Check the limits against existing reservations.
if err := js.sufficientResources(&dl); err != nil {
js.mu.Unlock()
return err
}
// FIXME(dlc) - If we drop and are over the max on memory or store, do we delete??
js.releaseResources(&jsaLimits)
js.reserveResources(limits)
js.mu.Unlock()
// Update
jsa.mu.Lock()
jsa.limits = *limits
jsa.mu.Unlock()
return nil
}
func diffCheckedLimits(a, b *JetStreamAccountLimits) JetStreamAccountLimits {
return JetStreamAccountLimits{
MaxMemory: b.MaxMemory - a.MaxMemory,
MaxStore: b.MaxStore - a.MaxStore,
}
}
// JetStreamUsage reports on JetStream usage and limits for an account.
func (a *Account) JetStreamUsage() JetStreamAccountStats {
a.mu.RLock()
jsa, aname := a.js, a.Name
a.mu.RUnlock()
var stats JetStreamAccountStats
if jsa != nil {
js := jsa.js
jsa.mu.RLock()
stats.Memory = uint64(jsa.memTotal)
stats.Store = uint64(jsa.storeTotal)
stats.API = JetStreamAPIStats{
Total: jsa.apiTotal,
Errors: jsa.apiErrors,
}
if cc := jsa.js.cluster; cc != nil {
js.mu.RLock()
sas := cc.streams[aname]
stats.Streams = len(sas)
for _, sa := range sas {
stats.Consumers += len(sa.consumers)
}
js.mu.RUnlock()
} else {
stats.Streams = len(jsa.streams)
for _, mset := range jsa.streams {
stats.Consumers += mset.numConsumers()
}
}
stats.Limits = jsa.limits
jsa.mu.RUnlock()
}
return stats
}
// DisableJetStream will disable JetStream for this account.
func (a *Account) DisableJetStream() error {
a.mu.Lock()
s := a.srv
a.js = nil
a.mu.Unlock()
if s == nil {
return fmt.Errorf("jetstream account not registered")
}
js := s.getJetStream()
if js == nil {
return ErrJetStreamNotEnabled
}
// Remove service imports.
for _, export := range allJsExports {
a.removeServiceImport(export)
}
return js.disableJetStream(js.lookupAccount(a))
}
// removeJetStream is called when JetStream has been disabled for this
// server.
func (a *Account) removeJetStream() error {
a.mu.Lock()
s := a.srv
a.js = nil
a.mu.Unlock()
if s == nil {
return fmt.Errorf("jetstream account not registered")
}
js := s.getJetStream()
if js == nil {
return ErrJetStreamNotEnabled
}
return js.disableJetStream(js.lookupAccount(a))
}
// Disable JetStream for the account.
func (js *jetStream) disableJetStream(jsa *jsAccount) error {
if jsa == nil {
return ErrJetStreamNotEnabledForAccount
}
js.mu.Lock()
delete(js.accounts, jsa.account)
js.releaseResources(&jsa.limits)
js.mu.Unlock()
jsa.delete()
return nil
}
// JetStreamEnabled is a helper to determine if jetstream is enabled for an account.
func (a *Account) JetStreamEnabled() bool {
if a == nil {
return false
}
a.mu.RLock()
enabled := a.js != nil
a.mu.RUnlock()
return enabled
}
func (jsa *jsAccount) remoteUpdateUsage(sub *subscription, c *client, subject, _ string, msg []byte) {
const usageSize = 32
jsa.mu.Lock()
s := jsa.js.srv
if len(msg) < usageSize {
jsa.mu.Unlock()
s.Warnf("Ignoring remote usage update with size too short")
return
}
var rnode string
if li := strings.LastIndexByte(subject, btsep); li > 0 && li < len(subject) {
rnode = subject[li+1:]
}
if rnode == _EMPTY_ {
jsa.mu.Unlock()
s.Warnf("Received remote usage update with no remote node")
return
}
var le = binary.LittleEndian
memUsed, storeUsed := int64(le.Uint64(msg[0:])), int64(le.Uint64(msg[8:]))
apiTotal, apiErrors := le.Uint64(msg[16:]), le.Uint64(msg[24:])
if jsa.rusage == nil {
jsa.rusage = make(map[string]*jsaUsage)
}
// Update the usage for this remote.
if usage := jsa.rusage[rnode]; usage != nil {
// Decrement our old values.
jsa.memTotal -= usage.mem
jsa.storeTotal -= usage.store
jsa.apiTotal -= usage.api
jsa.apiErrors -= usage.err
usage.mem, usage.store = memUsed, storeUsed
usage.api, usage.err = apiTotal, apiErrors
} else {
jsa.rusage[rnode] = &jsaUsage{memUsed, storeUsed, apiTotal, apiErrors}
}
jsa.memTotal += memUsed
jsa.storeTotal += storeUsed
jsa.apiTotal += apiTotal
jsa.apiErrors += apiErrors
jsa.mu.Unlock()
}
// Updates accounting on in use memory and storage. This is called from locally
// by the lower storage layers.
func (jsa *jsAccount) updateUsage(storeType StorageType, delta int64) {
jsa.mu.Lock()
if storeType == MemoryStorage {
jsa.usage.mem += delta
jsa.memTotal += delta
} else {
jsa.usage.store += delta
jsa.storeTotal += delta
}
// Publish our local updates if in clustered mode.
if jsa.js != nil && jsa.js.cluster != nil {
jsa.sendClusterUsageUpdate()
}
jsa.mu.Unlock()
}
const usageTick = 1500 * time.Millisecond
func (jsa *jsAccount) sendClusterUsageUpdateTimer() {
jsa.mu.Lock()
defer jsa.mu.Unlock()
jsa.sendClusterUsageUpdate()
if jsa.utimer != nil {
jsa.utimer.Reset(usageTick)
}
}
// Send updates to our account usage for this server.
// Lock should be held.
func (jsa *jsAccount) sendClusterUsageUpdate() {
if jsa.js == nil || jsa.js.srv == nil {
return
}
// These values are absolute so we can limit send rates.
now := time.Now()
if now.Sub(jsa.lupdate) < 250*time.Millisecond {
return
}
jsa.lupdate = now
b := make([]byte, 32)
var le = binary.LittleEndian
le.PutUint64(b[0:], uint64(jsa.usage.mem))
le.PutUint64(b[8:], uint64(jsa.usage.store))
le.PutUint64(b[16:], uint64(jsa.usage.api))
le.PutUint64(b[24:], uint64(jsa.usage.err))
if jsa.sendq != nil {
jsa.sendq <- &pubMsg{nil, jsa.updatesPub, _EMPTY_, nil, b, false}
}
}
func (jsa *jsAccount) limitsExceeded(storeType StorageType) bool {
jsa.mu.RLock()
defer jsa.mu.RUnlock()
if storeType == MemoryStorage {
if jsa.limits.MaxMemory > 0 && jsa.memTotal > jsa.limits.MaxMemory {
return true
}
} else {
if jsa.limits.MaxStore > 0 && jsa.storeTotal > jsa.limits.MaxStore {
return true
}
}
return false
}
// Check if a new proposed msg set while exceed our account limits.
// Lock should be held.
func (jsa *jsAccount) checkLimits(config *StreamConfig) error {
if jsa.limits.MaxStreams > 0 && len(jsa.streams) >= jsa.limits.MaxStreams {
return fmt.Errorf("maximum number of streams reached")
}
// Check MaxConsumers
if config.MaxConsumers > 0 && jsa.limits.MaxConsumers > 0 && config.MaxConsumers > jsa.limits.MaxConsumers {
return fmt.Errorf("maximum consumers exceeds account limit")
}
// Check storage, memory or disk.
if config.MaxBytes > 0 {
return jsa.checkBytesLimits(config.MaxBytes*int64(config.Replicas), config.Storage)
}
return nil
}
// Check if additional bytes will exceed our account limits.
// This should account for replicas.
// Lock should be held.
func (jsa *jsAccount) checkBytesLimits(addBytes int64, storage StorageType) error {
switch storage {
case MemoryStorage:
if jsa.memReserved+addBytes > jsa.limits.MaxMemory {
return fmt.Errorf("insufficient memory resources available")
}
case FileStorage:
if jsa.storeReserved+addBytes > jsa.limits.MaxStore {
return fmt.Errorf("insufficient storage resources available")
}
}
return nil
}
func (jsa *jsAccount) acc() *Account {
jsa.mu.RLock()
acc := jsa.account
jsa.mu.RUnlock()
return acc
}
// Delete the JetStream resources.
func (jsa *jsAccount) delete() {
var streams []*stream
var ts []string
jsa.mu.Lock()
if jsa.utimer != nil {
jsa.utimer.Stop()
jsa.utimer = nil
}
if jsa.updatesSub != nil && jsa.js.srv != nil {
s := jsa.js.srv
s.sysUnsubscribe(jsa.updatesSub)
jsa.updatesSub = nil
}
for _, ms := range jsa.streams {
streams = append(streams, ms)
}
acc := jsa.account
for _, t := range jsa.templates {
ts = append(ts, t.Name)
}
jsa.templates = nil
jsa.mu.Unlock()
for _, ms := range streams {
ms.stop(false, false)
}
for _, t := range ts {
acc.deleteStreamTemplate(t)
}
}
// Lookup the jetstream account for a given account.
func (js *jetStream) lookupAccount(a *Account) *jsAccount {
js.mu.RLock()
jsa := js.accounts[a]
js.mu.RUnlock()
return jsa
}
// Will dynamically create limits for this account.
func (js *jetStream) dynamicAccountLimits() *JetStreamAccountLimits {
js.mu.RLock()
// For now used all resources. Mostly meant for $G in non-account mode.
limits := &JetStreamAccountLimits{js.config.MaxMemory, js.config.MaxStore, -1, -1}
js.mu.RUnlock()
return limits
}
// Report on JetStream stats and usage.
func (js *jetStream) usageStats() *JetStreamStats {
var stats JetStreamStats
var _jsa [512]*jsAccount
accounts := _jsa[:0]
js.mu.RLock()
for _, jsa := range js.accounts {
accounts = append(accounts, jsa)
}
js.mu.RUnlock()
stats.Accounts = len(accounts)
// Collect account information.
for _, jsa := range accounts {
jsa.mu.RLock()
stats.Memory += uint64(jsa.memTotal)
stats.Store += uint64(jsa.storeTotal)
stats.API.Total += jsa.apiTotal
stats.API.Errors += jsa.apiErrors
jsa.mu.RUnlock()
}
return &stats
}
// Check to see if we have enough system resources for this account.
// Lock should be held.
func (js *jetStream) sufficientResources(limits *JetStreamAccountLimits) error {
if limits == nil {
return nil
}
if js.memReserved+limits.MaxMemory > js.config.MaxMemory {
return fmt.Errorf("insufficient memory resources available")
}
if js.storeReserved+limits.MaxStore > js.config.MaxStore {
return fmt.Errorf("insufficient storage resources available")
}
return nil
}
// This will (blindly) reserve the respources requested.
// Lock should be held.
func (js *jetStream) reserveResources(limits *JetStreamAccountLimits) error {
if limits == nil {
return nil
}
if limits.MaxMemory > 0 {
js.memReserved += limits.MaxMemory
}
if limits.MaxStore > 0 {
js.storeReserved += limits.MaxStore
}
return nil
}
// Lock should be held.
func (js *jetStream) releaseResources(limits *JetStreamAccountLimits) error {
if limits == nil {
return nil
}
if limits.MaxMemory > 0 {
js.memReserved -= limits.MaxMemory
}
if limits.MaxStore > 0 {
js.storeReserved -= limits.MaxStore
}
return nil
}
// Will clear the resource reservations. Mostly for reload of a config.
func (js *jetStream) clearResources() {
if js == nil {
return
}
js.mu.Lock()
js.memReserved = 0
js.storeReserved = 0
js.mu.Unlock()
}
const (
// JetStreamStoreDir is the prefix we use.
JetStreamStoreDir = "jetstream"
// JetStreamMaxStoreDefault is the default disk storage limit. 1TB
JetStreamMaxStoreDefault = 1024 * 1024 * 1024 * 1024
// JetStreamMaxMemDefault is only used when we can't determine system memory. 256MB
JetStreamMaxMemDefault = 1024 * 1024 * 256
)
// Dynamically create a config with a tmp based directory (repeatable) and 75% of system memory.
func (s *Server) dynJetStreamConfig(storeDir string, maxStore int64) *JetStreamConfig {
jsc := &JetStreamConfig{}
if storeDir != _EMPTY_ {
jsc.StoreDir = filepath.Join(storeDir, JetStreamStoreDir)
} else {
// Create one in tmp directory, but make it consistent for restarts.
jsc.StoreDir = filepath.Join(os.TempDir(), "nats", JetStreamStoreDir)
}
if maxStore > 0 {
jsc.MaxStore = maxStore
} else {
jsc.MaxStore = diskAvailable(jsc.StoreDir)
}
// Estimate to 75% of total memory if we can determine system memory.
if sysMem := sysmem.Memory(); sysMem > 0 {
jsc.MaxMemory = sysMem / 4 * 3
} else {
jsc.MaxMemory = JetStreamMaxMemDefault
}
return jsc
}
// Helper function.
func (a *Account) checkForJetStream() (*Server, *jsAccount, error) {
a.mu.RLock()
s := a.srv
jsa := a.js
a.mu.RUnlock()
if s == nil || jsa == nil {
return nil, nil, ErrJetStreamNotEnabledForAccount
}
return s, jsa, nil
}
// StreamTemplateConfig allows a configuration to auto-create streams based on this template when a message
// is received that matches. Each new stream will use the config as the template config to create them.
type StreamTemplateConfig struct {
Name string `json:"name"`
Config *StreamConfig `json:"config"`
MaxStreams uint32 `json:"max_streams"`
}
// StreamTemplateInfo
type StreamTemplateInfo struct {
Config *StreamTemplateConfig `json:"config"`
Streams []string `json:"streams"`
}
// streamTemplate
type streamTemplate struct {
mu sync.Mutex
tc *client
jsa *jsAccount
*StreamTemplateConfig
streams []string
}
func (t *StreamTemplateConfig) deepCopy() *StreamTemplateConfig {
copy := *t
cfg := *t.Config
copy.Config = &cfg
return &copy
}
// addStreamTemplate will add a stream template to this account that allows auto-creation of streams.
func (a *Account) addStreamTemplate(tc *StreamTemplateConfig) (*streamTemplate, error) {
s, jsa, err := a.checkForJetStream()
if err != nil {
return nil, err
}
if tc.Config.Name != "" {
return nil, fmt.Errorf("template config name should be empty")
}
if len(tc.Name) > JSMaxNameLen {
return nil, fmt.Errorf("template name is too long, maximum allowed is %d", JSMaxNameLen)
}
// FIXME(dlc) - Hacky
tcopy := tc.deepCopy()
tcopy.Config.Name = "_"
cfg, err := checkStreamCfg(tcopy.Config)
if err != nil {
return nil, err
}
tcopy.Config = &cfg
t := &streamTemplate{
StreamTemplateConfig: tcopy,
tc: s.createInternalJetStreamClient(),
jsa: jsa,
}
t.tc.registerWithAccount(a)
jsa.mu.Lock()
if jsa.templates == nil {
jsa.templates = make(map[string]*streamTemplate)
// Create the appropriate store
if cfg.Storage == FileStorage {
jsa.store = newTemplateFileStore(jsa.storeDir)
} else {
jsa.store = newTemplateMemStore()
}
} else if _, ok := jsa.templates[tcopy.Name]; ok {
jsa.mu.Unlock()
return nil, fmt.Errorf("template with name %q already exists", tcopy.Name)
}
jsa.templates[tcopy.Name] = t
jsa.mu.Unlock()
// FIXME(dlc) - we can not overlap subjects between templates. Need to have test.
// Setup the internal subscriptions to trap the messages.
if err := t.createTemplateSubscriptions(); err != nil {
return nil, err
}
if err := jsa.store.Store(t); err != nil {
t.delete()
return nil, err
}
return t, nil
}
func (t *streamTemplate) createTemplateSubscriptions() error {
if t == nil {
return fmt.Errorf("no template")
}
if t.tc == nil {
return fmt.Errorf("template not enabled")
}
c := t.tc
if !c.srv.eventsEnabled() {
return ErrNoSysAccount
}
sid := 1
for _, subject := range t.Config.Subjects {
// Now create the subscription
if _, err := c.processSub([]byte(subject), nil, []byte(strconv.Itoa(sid)), t.processInboundTemplateMsg, false); err != nil {
c.acc.deleteStreamTemplate(t.Name)
return err
}
sid++
}
return nil
}
func (t *streamTemplate) processInboundTemplateMsg(_ *subscription, pc *client, subject, reply string, msg []byte) {
if t == nil || t.jsa == nil {
return
}
jsa := t.jsa
cn := canonicalName(subject)
jsa.mu.Lock()
// If we already are registered then we can just return here.
if _, ok := jsa.streams[cn]; ok {
jsa.mu.Unlock()
return
}
acc := jsa.account
jsa.mu.Unlock()
// Check if we are at the maximum and grab some variables.
t.mu.Lock()
c := t.tc
cfg := *t.Config
cfg.Template = t.Name
atLimit := len(t.streams) >= int(t.MaxStreams)
if !atLimit {
t.streams = append(t.streams, cn)
}
t.mu.Unlock()
if atLimit {
c.Warnf("JetStream could not create stream for account %q on subject %q, at limit", acc.Name, subject)
return
}
// We need to create the stream here.
// Change the config from the template and only use literal subject.
cfg.Name = cn
cfg.Subjects = []string{subject}
mset, err := acc.addStream(&cfg)
if err != nil {
acc.validateStreams(t)
c.Warnf("JetStream could not create stream for account %q on subject %q", acc.Name, subject)
return
}
// Process this message directly by invoking mset.
mset.processInboundJetStreamMsg(nil, pc, subject, reply, msg)
}
// lookupStreamTemplate looks up the names stream template.
func (a *Account) lookupStreamTemplate(name string) (*streamTemplate, error) {
_, jsa, err := a.checkForJetStream()
if err != nil {
return nil, err
}
jsa.mu.Lock()
defer jsa.mu.Unlock()
if jsa.templates == nil {
return nil, fmt.Errorf("template not found")
}
t, ok := jsa.templates[name]
if !ok {
return nil, fmt.Errorf("template not found")
}
return t, nil
}
// This function will check all named streams and make sure they are valid.
func (a *Account) validateStreams(t *streamTemplate) {
t.mu.Lock()
var vstreams []string
for _, sname := range t.streams {
if _, err := a.lookupStream(sname); err == nil {
vstreams = append(vstreams, sname)
}
}
t.streams = vstreams
t.mu.Unlock()
}
func (t *streamTemplate) delete() error {
if t == nil {
return fmt.Errorf("nil stream template")
}
t.mu.Lock()
jsa := t.jsa
c := t.tc
t.tc = nil
defer func() {
if c != nil {
c.closeConnection(ClientClosed)
}
}()
t.mu.Unlock()
if jsa == nil {
return ErrJetStreamNotEnabled
}
jsa.mu.Lock()
if jsa.templates == nil {
jsa.mu.Unlock()
return fmt.Errorf("template not found")
}
if _, ok := jsa.templates[t.Name]; !ok {
jsa.mu.Unlock()
return fmt.Errorf("template not found")
}
delete(jsa.templates, t.Name)
acc := jsa.account
jsa.mu.Unlock()
// Remove streams associated with this template.
var streams []*stream
t.mu.Lock()
for _, name := range t.streams {
if mset, err := acc.lookupStream(name); err == nil {
streams = append(streams, mset)
}
}
t.mu.Unlock()
if jsa.store != nil {
if err := jsa.store.Delete(t); err != nil {
return fmt.Errorf("error deleting template from store: %v", err)
}
}
var lastErr error
for _, mset := range streams {
if err := mset.delete(); err != nil {
lastErr = err
}
}
return lastErr
}
func (a *Account) deleteStreamTemplate(name string) error {
t, err := a.lookupStreamTemplate(name)
if err != nil {
return err
}
return t.delete()
}
func (a *Account) templates() []*streamTemplate {
var ts []*streamTemplate
_, jsa, err := a.checkForJetStream()
if err != nil {
return nil
}
jsa.mu.Lock()
for _, t := range jsa.templates {
// FIXME(dlc) - Copy?
ts = append(ts, t)
}
jsa.mu.Unlock()
return ts
}
// Will add a stream to a template, this is for recovery.
func (jsa *jsAccount) addStreamNameToTemplate(tname, mname string) error {
if jsa.templates == nil {
return fmt.Errorf("template not found")
}
t, ok := jsa.templates[tname]
if !ok {
return fmt.Errorf("template not found")
}
// We found template.
t.mu.Lock()
t.streams = append(t.streams, mname)
t.mu.Unlock()
return nil
}
// This will check if a template owns this stream.
// jsAccount lock should be held
func (jsa *jsAccount) checkTemplateOwnership(tname, sname string) bool {
if jsa.templates == nil {
return false
}
t, ok := jsa.templates[tname]
if !ok {
return false
}
// We found template, make sure we are in streams.
for _, streamName := range t.streams {
if sname == streamName {
return true
}
}
return false
}
// friendlyBytes returns a string with the given bytes int64
// represented as a size, such as 1KB, 10MB, etc...
func friendlyBytes(bytes int64) string {
fbytes := float64(bytes)
base := 1024
pre := []string{"K", "M", "G", "T", "P", "E"}
if fbytes < float64(base) {
return fmt.Sprintf("%v B", fbytes)
}
exp := int(math.Log(fbytes) / math.Log(float64(base)))
index := exp - 1
return fmt.Sprintf("%.2f %sB", fbytes/math.Pow(float64(base), float64(exp)), pre[index])
}
func isValidName(name string) bool {
if name == "" {
return false
}
return !strings.ContainsAny(name, ".*>")
}
// CanonicalName will replace all token separators '.' with '_'.
// This can be used when naming streams or consumers with multi-token subjects.
func canonicalName(name string) string {
return strings.ReplaceAll(name, ".", "_")
}