Files
nats-server/server/stream.go
R.I.Pienaar 0d71d35e43 do not log at Error level for some store failures
Some of these are quite generic errors that can happen a lot
in normal circumstances so no need to be too noisy about them

Fixes one missed old style Api Error

Signed-off-by: R.I.Pienaar <rip@devco.net>
2021-06-28 10:18:16 +02:00

3435 lines
87 KiB
Go

// Copyright 2019-2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"archive/tar"
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"math"
"os"
"path"
"path/filepath"
"reflect"
"strconv"
"strings"
"sync"
"time"
"github.com/klauspost/compress/s2"
"github.com/nats-io/nuid"
)
// StreamConfig will determine the name, subjects and retention policy
// for a given stream. If subjects is empty the name will be used.
type StreamConfig struct {
Name string `json:"name"`
Subjects []string `json:"subjects,omitempty"`
Retention RetentionPolicy `json:"retention"`
MaxConsumers int `json:"max_consumers"`
MaxMsgs int64 `json:"max_msgs"`
MaxBytes int64 `json:"max_bytes"`
MaxAge time.Duration `json:"max_age"`
MaxMsgsPer int64 `json:"max_msgs_per_subject"`
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
Discard DiscardPolicy `json:"discard"`
Storage StorageType `json:"storage"`
Replicas int `json:"num_replicas"`
NoAck bool `json:"no_ack,omitempty"`
Template string `json:"template_owner,omitempty"`
Duplicates time.Duration `json:"duplicate_window,omitempty"`
Placement *Placement `json:"placement,omitempty"`
Mirror *StreamSource `json:"mirror,omitempty"`
Sources []*StreamSource `json:"sources,omitempty"`
}
// JSPubAckResponse is a formal response to a publish operation.
type JSPubAckResponse struct {
Error *ApiError `json:"error,omitempty"`
*PubAck
}
// ToError checks if the response has a error and if it does converts it to an error avoiding the pitfalls described by https://yourbasic.org/golang/gotcha-why-nil-error-not-equal-nil/
func (r *JSPubAckResponse) ToError() error {
if r.Error == nil {
return nil
}
return r.Error
}
// PubAck is the detail you get back from a publish to a stream that was successful.
// e.g. +OK {"stream": "Orders", "seq": 22}
type PubAck struct {
Stream string `json:"stream"`
Sequence uint64 `json:"seq"`
Duplicate bool `json:"duplicate,omitempty"`
}
// StreamInfo shows config and current state for this stream.
type StreamInfo struct {
Config StreamConfig `json:"config"`
Created time.Time `json:"created"`
State StreamState `json:"state"`
Cluster *ClusterInfo `json:"cluster,omitempty"`
Mirror *StreamSourceInfo `json:"mirror,omitempty"`
Sources []*StreamSourceInfo `json:"sources,omitempty"`
}
// ClusterInfo shows information about the underlying set of servers
// that make up the stream or consumer.
type ClusterInfo struct {
Name string `json:"name,omitempty"`
Leader string `json:"leader,omitempty"`
Replicas []*PeerInfo `json:"replicas,omitempty"`
}
// PeerInfo shows information about all the peers in the cluster that
// are supporting the stream or consumer.
type PeerInfo struct {
Name string `json:"name"`
Current bool `json:"current"`
Offline bool `json:"offline,omitempty"`
Active time.Duration `json:"active"`
Lag uint64 `json:"lag,omitempty"`
}
// StreamSourceInfo shows information about an upstream stream source.
type StreamSourceInfo struct {
Name string `json:"name"`
External *ExternalStream `json:"external,omitempty"`
Lag uint64 `json:"lag"`
Active time.Duration `json:"active"`
Error *ApiError `json:"error,omitempty"`
}
// StreamSource dictates how streams can source from other streams.
type StreamSource struct {
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
External *ExternalStream `json:"external,omitempty"`
// Internal
iname string // For indexing when stream names are the same for multiple sources.
}
// ExternalStream allows you to qualify access to a stream source in another account.
type ExternalStream struct {
ApiPrefix string `json:"api"`
DeliverPrefix string `json:"deliver"`
}
// Stream is a jetstream stream of messages. When we receive a message internally destined
// for a Stream we will direct link from the client to this structure.
type stream struct {
mu sync.RWMutex
js *jetStream
jsa *jsAccount
acc *Account
srv *Server
client *client
sysc *client
sid int
pubAck []byte
outq *jsOutQ
msgs *inbound
store StreamStore
amch chan uint64
lseq uint64
lmsgId string
consumers map[string]*consumer
numFilter int
cfg StreamConfig
created time.Time
stype StorageType
ddmap map[string]*ddentry
ddarr []*ddentry
ddindex int
ddtmr *time.Timer
qch chan struct{}
active bool
// Mirror
mirror *sourceInfo
// Sources
sources map[string]*sourceInfo
// Indicates we have direct consumers.
directs int
// For flowcontrol processing for source and mirror internal consumers.
fcr map[uint64]string
// TODO(dlc) - Hide everything below behind two pointers.
// Clustered mode.
sa *streamAssignment
node RaftNode
catchup bool
syncSub *subscription
infoSub *subscription
clMu sync.Mutex
clseq uint64
clfs uint64
lqsent time.Time
catchups map[string]uint64
}
type sourceInfo struct {
name string
iname string
cname string
sub *subscription
msgs *inbound
sseq uint64
dseq uint64
clseq uint64
lag uint64
err *ApiError
last time.Time
lreq time.Time
qch chan struct{}
grr bool
}
// Headers for published messages.
const (
JSMsgId = "Nats-Msg-Id"
JSExpectedStream = "Nats-Expected-Stream"
JSExpectedLastSeq = "Nats-Expected-Last-Sequence"
JSExpectedLastMsgId = "Nats-Expected-Last-Msg-Id"
JSStreamSource = "Nats-Stream-Source"
JSLastConsumerSeq = "Nats-Last-Consumer"
JSLastStreamSeq = "Nats-Last-Stream"
)
// Dedupe entry
type ddentry struct {
id string
seq uint64
ts int64
}
// Replicas Range
const (
StreamMaxReplicas = 5
)
// AddStream adds a stream for the given account.
func (a *Account) addStream(config *StreamConfig) (*stream, error) {
return a.addStreamWithAssignment(config, nil, nil)
}
// AddStreamWithStore adds a stream for the given account with custome store config options.
func (a *Account) addStreamWithStore(config *StreamConfig, fsConfig *FileStoreConfig) (*stream, error) {
return a.addStreamWithAssignment(config, fsConfig, nil)
}
func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileStoreConfig, sa *streamAssignment) (*stream, error) {
s, jsa, err := a.checkForJetStream()
if err != nil {
return nil, err
}
// If we do not have the stream currently assigned to us in cluster mode we will proceed but warn.
// This can happen on startup with restored state where on meta replay we still do not have
// the assignment. Running in single server mode this always returns true.
if !jsa.streamAssigned(config.Name) {
s.Debugf("Stream '%s > %s' does not seem to be assigned to this server", a.Name, config.Name)
}
// Sensible defaults.
cfg, err := checkStreamCfg(config)
if err != nil {
return nil, ApiErrors[JSStreamInvalidConfigF].ErrOrNewT(err, "{err}", err)
}
singleServerMode := !s.JetStreamIsClustered() && s.standAloneMode()
if singleServerMode && cfg.Replicas > 1 {
return nil, ApiErrors[JSStreamReplicasNotSupportedErr]
}
jsa.mu.Lock()
js := jsa.js
if mset, ok := jsa.streams[cfg.Name]; ok {
jsa.mu.Unlock()
// Check to see if configs are same.
ocfg := mset.config()
if reflect.DeepEqual(ocfg, cfg) {
if sa != nil {
mset.setStreamAssignment(sa)
}
return mset, nil
} else {
return nil, ApiErrors[JSStreamNameExistErr]
}
}
// Check for limits.
if err := jsa.checkLimits(&cfg); err != nil {
jsa.mu.Unlock()
return nil, err
}
// Check for template ownership if present.
if cfg.Template != _EMPTY_ && jsa.account != nil {
if !jsa.checkTemplateOwnership(cfg.Template, cfg.Name) {
jsa.mu.Unlock()
return nil, fmt.Errorf("stream not owned by template")
}
}
// Check for mirror designation.
if cfg.Mirror != nil {
// Can't have subjects.
if len(cfg.Subjects) > 0 {
jsa.mu.Unlock()
return nil, fmt.Errorf("stream mirrors can not also contain subjects")
}
if len(cfg.Sources) > 0 {
jsa.mu.Unlock()
return nil, fmt.Errorf("stream mirrors can not also contain other sources")
}
if cfg.Mirror.FilterSubject != _EMPTY_ {
jsa.mu.Unlock()
return nil, fmt.Errorf("stream mirrors can not contain filtered subjects")
}
if cfg.Mirror.OptStartSeq > 0 && cfg.Mirror.OptStartTime != nil {
jsa.mu.Unlock()
return nil, fmt.Errorf("stream mirrors can not have both start seq and start time configured")
}
} else if len(cfg.Subjects) == 0 && len(cfg.Sources) == 0 {
jsa.mu.Unlock()
return nil, fmt.Errorf("stream needs at least one configured subject or mirror")
}
// Setup our internal indexed names here for sources.
if len(cfg.Sources) > 0 {
for _, ssi := range cfg.Sources {
ssi.setIndexName()
}
}
// Check for overlapping subjects. These are not allowed for now.
if jsa.subjectsOverlap(cfg.Subjects) {
jsa.mu.Unlock()
return nil, fmt.Errorf("subjects overlap with an existing stream")
}
// Setup the internal clients.
c := s.createInternalJetStreamClient()
ic := s.createInternalJetStreamClient()
mset := &stream{
acc: a,
jsa: jsa,
cfg: cfg,
js: js,
srv: s,
client: c,
sysc: ic,
stype: cfg.Storage,
consumers: make(map[string]*consumer),
msgs: &inbound{mch: make(chan struct{}, 1)},
qch: make(chan struct{}),
}
// For no-ack consumers when we are interest retention.
if cfg.Retention != LimitsPolicy {
mset.amch = make(chan uint64, 8192)
}
jsa.streams[cfg.Name] = mset
storeDir := path.Join(jsa.storeDir, streamsDir, cfg.Name)
jsa.mu.Unlock()
// Bind to the user account.
c.registerWithAccount(a)
// Bind to the system account.
ic.registerWithAccount(s.SystemAccount())
// Create the appropriate storage
fsCfg := fsConfig
if fsCfg == nil {
fsCfg = &FileStoreConfig{}
// If we are file based and not explicitly configured
// we may be able to auto-tune based on max msgs or bytes.
if cfg.Storage == FileStorage {
mset.autoTuneFileStorageBlockSize(fsCfg)
}
}
fsCfg.StoreDir = storeDir
fsCfg.AsyncFlush = false
fsCfg.SyncInterval = 2 * time.Minute
if err := mset.setupStore(fsCfg); err != nil {
mset.stop(true, false)
return nil, err
}
// Create our pubAck template here. Better than json marshal each time on success.
b, _ := json.Marshal(&JSPubAckResponse{PubAck: &PubAck{Stream: cfg.Name, Sequence: math.MaxUint64}})
end := bytes.Index(b, []byte(strconv.FormatUint(math.MaxUint64, 10)))
// We need to force cap here to make sure this is a copy when sending a response.
mset.pubAck = b[:end:end]
// Rebuild dedupe as needed.
mset.rebuildDedupe()
// Set our stream assignment if in clustered mode.
if sa != nil {
mset.setStreamAssignment(sa)
}
// Setup our internal send go routine.
mset.setupSendCapabilities()
// Call directly to set leader if not in clustered mode.
// This can be called though before we actually setup clustering, so check both.
if singleServerMode {
if err := mset.setLeader(true); err != nil {
mset.stop(true, false)
return nil, err
}
}
// This is always true in single server mode.
mset.mu.RLock()
isLeader := mset.isLeader()
mset.mu.RUnlock()
if isLeader {
// Send advisory.
var suppress bool
if !s.standAloneMode() && sa == nil {
suppress = true
} else if sa != nil {
suppress = sa.responded
}
if !suppress {
mset.sendCreateAdvisory()
}
}
return mset, nil
}
// Sets the index name. Usually just the stream name but when the stream is external we will
// use additional information in case the stream names are the same.
func (ssi *StreamSource) setIndexName() {
if ssi.External != nil {
ssi.iname = ssi.Name + ":" + string(getHash(ssi.External.ApiPrefix))
} else {
ssi.iname = ssi.Name
}
}
func (mset *stream) streamAssignment() *streamAssignment {
mset.mu.RLock()
defer mset.mu.RUnlock()
return mset.sa
}
func (mset *stream) setStreamAssignment(sa *streamAssignment) {
mset.mu.Lock()
defer mset.mu.Unlock()
mset.sa = sa
if sa == nil {
return
}
// Set our node.
mset.node = sa.Group.node
// Setup our info sub here as well for all stream members. This is now by design.
if mset.infoSub == nil {
isubj := fmt.Sprintf(clusterStreamInfoT, mset.jsa.acc(), mset.cfg.Name)
// Note below the way we subscribe here is so that we can send requests to ourselves.
mset.infoSub, _ = mset.srv.systemSubscribe(isubj, _EMPTY_, false, mset.sysc, mset.handleClusterStreamInfoRequest)
}
}
// Lock should be held.
func (mset *stream) isLeader() bool {
if mset.node != nil {
return mset.node.Leader()
}
return true
}
// TODO(dlc) - Check to see if we can accept being the leader or we should should step down.
func (mset *stream) setLeader(isLeader bool) error {
mset.mu.Lock()
// If we are here we have a change in leader status.
if isLeader {
// Make sure we are listening for sync requests.
// TODO(dlc) - Original design was that all in sync members of the group would do DQ.
mset.startClusterSubs()
// Setup subscriptions
if err := mset.subscribeToStream(); err != nil {
mset.mu.Unlock()
return err
}
} else {
// Stop responding to sync requests.
mset.stopClusterSubs()
// Unsubscribe from direct stream.
mset.unsubscribeToStream()
// Clear catchup state
mset.clearAllCatchupPeers()
}
mset.mu.Unlock()
return nil
}
// Lock should be held.
func (mset *stream) startClusterSubs() {
if mset.isClustered() && mset.syncSub == nil {
mset.syncSub, _ = mset.srv.systemSubscribe(mset.sa.Sync, _EMPTY_, false, mset.sysc, mset.handleClusterSyncRequest)
}
}
// Lock should be held.
func (mset *stream) stopClusterSubs() {
if mset.syncSub != nil {
mset.srv.sysUnsubscribe(mset.syncSub)
mset.syncSub = nil
}
}
// account gets the account for this stream.
func (mset *stream) account() *Account {
mset.mu.RLock()
jsa := mset.jsa
mset.mu.RUnlock()
if jsa == nil {
return nil
}
return jsa.acc()
}
// Helper to determine the max msg size for this stream if file based.
func (mset *stream) maxMsgSize() uint64 {
maxMsgSize := mset.cfg.MaxMsgSize
if maxMsgSize <= 0 {
// Pull from the account.
if mset.jsa != nil {
if acc := mset.jsa.acc(); acc != nil {
acc.mu.RLock()
maxMsgSize = acc.mpay
acc.mu.RUnlock()
}
}
// If all else fails use default.
if maxMsgSize <= 0 {
maxMsgSize = MAX_PAYLOAD_SIZE
}
}
// Now determine an estimation for the subjects etc.
maxSubject := -1
for _, subj := range mset.cfg.Subjects {
if subjectIsLiteral(subj) {
if len(subj) > maxSubject {
maxSubject = len(subj)
}
}
}
if maxSubject < 0 {
const defaultMaxSubject = 256
maxSubject = defaultMaxSubject
}
// filestore will add in estimates for record headers, etc.
return fileStoreMsgSizeEstimate(maxSubject, int(maxMsgSize))
}
// If we are file based and the file storage config was not explicitly set
// we can autotune block sizes to better match. Our target will be to store 125%
// of the theoretical limit. We will round up to nearest 100 bytes as well.
func (mset *stream) autoTuneFileStorageBlockSize(fsCfg *FileStoreConfig) {
var totalEstSize uint64
// MaxBytes will take precedence for now.
if mset.cfg.MaxBytes > 0 {
totalEstSize = uint64(mset.cfg.MaxBytes)
} else if mset.cfg.MaxMsgs > 0 {
// Determine max message size to estimate.
totalEstSize = mset.maxMsgSize() * uint64(mset.cfg.MaxMsgs)
} else {
// If nothing set will let underlying filestore determine blkSize.
return
}
blkSize := (totalEstSize / 4) + 1 // (25% overhead)
// Round up to nearest 100
if m := blkSize % 100; m != 0 {
blkSize += 100 - m
}
if blkSize < FileStoreMinBlkSize {
blkSize = FileStoreMinBlkSize
}
if blkSize > FileStoreMaxBlkSize {
blkSize = FileStoreMaxBlkSize
}
fsCfg.BlockSize = uint64(blkSize)
}
// rebuildDedupe will rebuild any dedupe structures needed after recovery of a stream.
// TODO(dlc) - Might be good to know if this should be checked at all for streams with no
// headers and msgId in them. Would need signaling from the storage layer.
func (mset *stream) rebuildDedupe() {
state := mset.store.State()
mset.lseq = state.LastSeq
// We have some messages. Lookup starting sequence by duplicate time window.
sseq := mset.store.GetSeqFromTime(time.Now().Add(-mset.cfg.Duplicates))
if sseq == 0 {
return
}
for seq := sseq; seq <= state.LastSeq; seq++ {
_, hdr, _, ts, err := mset.store.LoadMsg(seq)
var msgId string
if err == nil && len(hdr) > 0 {
if msgId = getMsgId(hdr); msgId != _EMPTY_ {
mset.storeMsgId(&ddentry{msgId, seq, ts})
}
}
if seq == state.LastSeq {
mset.lmsgId = msgId
}
}
}
func (mset *stream) lastSeq() uint64 {
mset.mu.RLock()
lseq := mset.lseq
mset.mu.RUnlock()
return lseq
}
func (mset *stream) setLastSeq(lseq uint64) {
mset.mu.Lock()
mset.lseq = lseq
mset.mu.Unlock()
}
func (mset *stream) sendCreateAdvisory() {
mset.mu.Lock()
name := mset.cfg.Name
template := mset.cfg.Template
outq := mset.outq
srv := mset.srv
mset.mu.Unlock()
if outq == nil {
return
}
// finally send an event that this stream was created
m := JSStreamActionAdvisory{
TypedEvent: TypedEvent{
Type: JSStreamActionAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Stream: name,
Action: CreateEvent,
Template: template,
Domain: srv.getOpts().JetStreamDomain,
}
j, err := json.Marshal(m)
if err != nil {
return
}
subj := JSAdvisoryStreamCreatedPre + "." + name
outq.send(&jsPubMsg{subj, _EMPTY_, _EMPTY_, nil, j, nil, 0, nil})
}
func (mset *stream) sendDeleteAdvisoryLocked() {
if mset.outq == nil {
return
}
m := JSStreamActionAdvisory{
TypedEvent: TypedEvent{
Type: JSStreamActionAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Stream: mset.cfg.Name,
Action: DeleteEvent,
Template: mset.cfg.Template,
Domain: mset.srv.getOpts().JetStreamDomain,
}
j, err := json.Marshal(m)
if err == nil {
subj := JSAdvisoryStreamDeletedPre + "." + mset.cfg.Name
mset.outq.send(&jsPubMsg{subj, _EMPTY_, _EMPTY_, nil, j, nil, 0, nil})
}
}
func (mset *stream) sendUpdateAdvisoryLocked() {
if mset.outq == nil {
return
}
m := JSStreamActionAdvisory{
TypedEvent: TypedEvent{
Type: JSStreamActionAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Stream: mset.cfg.Name,
Action: ModifyEvent,
Domain: mset.srv.getOpts().JetStreamDomain,
}
j, err := json.Marshal(m)
if err == nil {
subj := JSAdvisoryStreamUpdatedPre + "." + mset.cfg.Name
mset.outq.send(&jsPubMsg{subj, _EMPTY_, _EMPTY_, nil, j, nil, 0, nil})
}
}
// Created returns created time.
func (mset *stream) createdTime() time.Time {
mset.mu.RLock()
created := mset.created
mset.mu.RUnlock()
return created
}
// Internal to allow creation time to be restored.
func (mset *stream) setCreatedTime(created time.Time) {
mset.mu.Lock()
mset.created = created
mset.mu.Unlock()
}
// Check to see if these subjects overlap with existing subjects.
// Lock should be held.
func (jsa *jsAccount) subjectsOverlap(subjects []string) bool {
for _, mset := range jsa.streams {
for _, subj := range mset.cfg.Subjects {
for _, tsubj := range subjects {
if SubjectsCollide(tsubj, subj) {
return true
}
}
}
}
return false
}
// StreamDefaultDuplicatesWindow default duplicates window.
const StreamDefaultDuplicatesWindow = 2 * time.Minute
func checkStreamCfg(config *StreamConfig) (StreamConfig, error) {
if config == nil {
return StreamConfig{}, fmt.Errorf("stream configuration invalid")
}
if !isValidName(config.Name) {
return StreamConfig{}, fmt.Errorf("stream name is required and can not contain '.', '*', '>'")
}
if len(config.Name) > JSMaxNameLen {
return StreamConfig{}, fmt.Errorf("stream name is too long, maximum allowed is %d", JSMaxNameLen)
}
cfg := *config
// Make file the default.
if cfg.Storage == 0 {
cfg.Storage = FileStorage
}
if cfg.Replicas == 0 {
cfg.Replicas = 1
}
if cfg.Replicas > StreamMaxReplicas {
return cfg, fmt.Errorf("maximum replicas is %d", StreamMaxReplicas)
}
if cfg.MaxMsgs == 0 {
cfg.MaxMsgs = -1
}
if cfg.MaxBytes == 0 {
cfg.MaxBytes = -1
}
if cfg.MaxMsgSize == 0 {
cfg.MaxMsgSize = -1
}
if cfg.MaxConsumers == 0 {
cfg.MaxConsumers = -1
}
if cfg.Duplicates == 0 {
if cfg.MaxAge != 0 && cfg.MaxAge < StreamDefaultDuplicatesWindow {
cfg.Duplicates = cfg.MaxAge
} else {
cfg.Duplicates = StreamDefaultDuplicatesWindow
}
} else if cfg.Duplicates < 0 {
return StreamConfig{}, fmt.Errorf("duplicates window can not be negative")
}
// Check that duplicates is not larger then age if set.
if cfg.MaxAge != 0 && cfg.Duplicates > cfg.MaxAge {
return StreamConfig{}, fmt.Errorf("duplicates window can not be larger then max age")
}
if len(cfg.Subjects) == 0 {
if cfg.Mirror == nil && len(cfg.Sources) == 0 {
cfg.Subjects = append(cfg.Subjects, cfg.Name)
}
} else {
// We can allow overlaps, but don't allow direct duplicates.
dset := make(map[string]struct{}, len(cfg.Subjects))
for _, subj := range cfg.Subjects {
if _, ok := dset[subj]; ok {
return StreamConfig{}, fmt.Errorf("duplicate subjects detected")
}
// Also check to make sure we do not overlap with our $JS API subjects.
if subjectIsSubsetMatch(subj, "$JS.API.>") {
return StreamConfig{}, fmt.Errorf("subjects overlap with jetstream api")
}
dset[subj] = struct{}{}
}
}
return cfg, nil
}
// Config returns the stream's configuration.
func (mset *stream) config() StreamConfig {
mset.mu.RLock()
defer mset.mu.RUnlock()
return mset.cfg
}
func (mset *stream) fileStoreConfig() (FileStoreConfig, error) {
mset.mu.Lock()
defer mset.mu.Unlock()
fs, ok := mset.store.(*fileStore)
if !ok {
return FileStoreConfig{}, ErrStoreWrongType
}
return fs.fileStoreConfig(), nil
}
func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig) (*StreamConfig, error) {
cfg, err := checkStreamCfg(new)
if err != nil {
return nil, ApiErrors[JSStreamInvalidConfigF].ErrOrNewT(err, "{err}", err)
}
// Name must match.
if cfg.Name != old.Name {
return nil, ApiErrors[JSStreamInvalidConfigF].NewT("{err}", "stream configuration name must match original")
}
// Can't change MaxConsumers for now.
if cfg.MaxConsumers != old.MaxConsumers {
return nil, ApiErrors[JSStreamInvalidConfigF].NewT("{err}", "stream configuration update can not change MaxConsumers")
}
// Can't change storage types.
if cfg.Storage != old.Storage {
return nil, ApiErrors[JSStreamInvalidConfigF].NewT("{err}", "stream configuration update can not change storage type")
}
// Can't change retention.
if cfg.Retention != old.Retention {
return nil, ApiErrors[JSStreamInvalidConfigF].NewT("{err}", "stream configuration update can not change retention policy")
}
// Can not have a template owner for now.
if old.Template != _EMPTY_ {
return nil, ApiErrors[JSStreamInvalidConfigF].NewT("{err}", "stream configuration update not allowed on template owned stream")
}
if cfg.Template != _EMPTY_ {
return nil, ApiErrors[JSStreamInvalidConfigF].NewT("{err}", "stream configuration update can not be owned by a template")
}
// Check limits.
if err := jsa.checkLimits(&cfg); err != nil {
return nil, err
}
return &cfg, nil
}
// Update will allow certain configuration properties of an existing stream to be updated.
func (mset *stream) update(config *StreamConfig) error {
ocfg := mset.config()
cfg, err := mset.jsa.configUpdateCheck(&ocfg, config)
if err != nil {
return ApiErrors[JSStreamInvalidConfigF].ErrOrNewT(err, "{err}", err)
}
mset.mu.Lock()
if mset.isLeader() {
// Now check for subject interest differences.
current := make(map[string]struct{}, len(ocfg.Subjects))
for _, s := range ocfg.Subjects {
current[s] = struct{}{}
}
// Update config with new values. The store update will enforce any stricter limits.
// Now walk new subjects. All of these need to be added, but we will check
// the originals first, since if it is in there we can skip, already added.
for _, s := range cfg.Subjects {
if _, ok := current[s]; !ok {
if _, err := mset.subscribeInternal(s, mset.processInboundJetStreamMsg); err != nil {
mset.mu.Unlock()
return err
}
}
delete(current, s)
}
// What is left in current needs to be deleted.
for s := range current {
if err := mset.unsubscribeInternal(s); err != nil {
mset.mu.Unlock()
return err
}
}
// Check for the Duplicates
if cfg.Duplicates != ocfg.Duplicates && mset.ddtmr != nil {
// Let it fire right away, it will adjust properly on purge.
mset.ddtmr.Reset(time.Microsecond)
}
// Check for Sources.
if len(cfg.Sources) > 0 || len(ocfg.Sources) > 0 {
current := make(map[string]struct{})
for _, s := range ocfg.Sources {
current[s.Name] = struct{}{}
}
for _, s := range cfg.Sources {
s.setIndexName()
if _, ok := current[s.iname]; !ok {
if mset.sources == nil {
mset.sources = make(map[string]*sourceInfo)
}
mset.cfg.Sources = append(mset.cfg.Sources, s)
si := &sourceInfo{name: s.Name, iname: s.iname, msgs: &inbound{mch: make(chan struct{}, 1)}}
mset.sources[s.iname] = si
mset.setStartingSequenceForSource(s.iname)
mset.setSourceConsumer(s.iname, si.sseq+1)
}
delete(current, s.Name)
}
// What is left in current needs to be deleted.
for iname := range current {
mset.cancelSourceConsumer(iname)
delete(mset.sources, iname)
}
}
}
// Now update config and store's version of our config.
mset.cfg = *cfg
var suppress bool
if mset.isClustered() && mset.sa != nil {
suppress = mset.sa.responded
}
if mset.isLeader() && !suppress {
mset.sendUpdateAdvisoryLocked()
}
mset.mu.Unlock()
mset.store.UpdateConfig(cfg)
return nil
}
// Purge will remove all messages from the stream and underlying store.
func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err error) {
mset.mu.Lock()
if mset.client == nil {
mset.mu.Unlock()
return 0, errors.New("stream closed")
}
// Purge dedupe.
mset.ddmap = nil
var _obs [4]*consumer
obs := _obs[:0]
for _, o := range mset.consumers {
obs = append(obs, o)
}
mset.mu.Unlock()
if preq != nil {
purged, err = mset.store.PurgeEx(preq.Subject, preq.Sequence, preq.Keep)
} else {
purged, err = mset.store.Purge()
}
if err != nil {
return purged, err
}
var state StreamState
mset.store.FastState(&state)
for _, o := range obs {
o.purge(state.FirstSeq)
}
return purged, nil
}
// RemoveMsg will remove a message from a stream.
// FIXME(dlc) - Should pick one and be consistent.
func (mset *stream) removeMsg(seq uint64) (bool, error) {
return mset.deleteMsg(seq)
}
// DeleteMsg will remove a message from a stream.
func (mset *stream) deleteMsg(seq uint64) (bool, error) {
mset.mu.RLock()
if mset.client == nil {
mset.mu.RUnlock()
return false, fmt.Errorf("invalid stream")
}
mset.mu.RUnlock()
return mset.store.RemoveMsg(seq)
}
// EraseMsg will securely remove a message and rewrite the data with random data.
func (mset *stream) eraseMsg(seq uint64) (bool, error) {
mset.mu.RLock()
if mset.client == nil {
mset.mu.RUnlock()
return false, fmt.Errorf("invalid stream")
}
mset.mu.RUnlock()
return mset.store.EraseMsg(seq)
}
// Are we a mirror?
func (mset *stream) isMirror() bool {
mset.mu.RLock()
defer mset.mu.RUnlock()
return mset.cfg.Mirror != nil
}
func (mset *stream) hasSources() bool {
mset.mu.RLock()
defer mset.mu.RUnlock()
return len(mset.sources) > 0
}
func (mset *stream) sourcesInfo() (sis []*StreamSourceInfo) {
mset.mu.RLock()
defer mset.mu.RUnlock()
for _, si := range mset.sources {
sis = append(sis, mset.sourceInfo(si))
}
return sis
}
func (mset *stream) allSubjects() ([]string, bool) {
subjects, cfg, acc := mset.subjects(), mset.config(), mset.account()
var hasExt bool
var seen map[string]bool
if cfg.Mirror != nil {
var subjs []string
seen = make(map[string]bool)
subjs, hasExt = acc.streamSourceSubjects(cfg.Mirror, seen)
if len(subjs) > 0 {
subjects = append(subjects, subjs...)
}
} else if len(cfg.Sources) > 0 {
var subjs []string
seen = make(map[string]bool)
for _, si := range cfg.Sources {
subjs, hasExt = acc.streamSourceSubjects(si, seen)
if len(subjs) > 0 {
subjects = append(subjects, subjs...)
}
}
}
return subjects, hasExt
}
// Return the subjects for a stream source.
func (a *Account) streamSourceSubjects(ss *StreamSource, seen map[string]bool) (subjects []string, hasExt bool) {
if ss != nil && ss.External != nil {
return nil, true
}
s, js, _ := a.getJetStreamFromAccount()
if !s.JetStreamIsClustered() {
return a.streamSourceSubjectsNotClustered(ss.Name, seen)
} else {
return js.streamSourceSubjectsClustered(a.Name, ss.Name, seen)
}
}
func (js *jetStream) streamSourceSubjectsClustered(accountName, streamName string, seen map[string]bool) (subjects []string, hasExt bool) {
if seen[streamName] {
return nil, false
}
// We are clustered here so need to work through stream assignments.
sa := js.streamAssignment(accountName, streamName)
if sa == nil {
return nil, false
}
seen[streamName] = true
js.mu.RLock()
cfg := sa.Config
if len(cfg.Subjects) > 0 {
subjects = append(subjects, cfg.Subjects...)
}
// Check if we need to keep going.
var sources []*StreamSource
if cfg.Mirror != nil {
sources = append(sources, cfg.Mirror)
} else if len(cfg.Sources) > 0 {
sources = append(sources, cfg.Sources...)
}
js.mu.RUnlock()
if len(sources) > 0 {
var subjs []string
if acc, err := js.srv.lookupAccount(accountName); err == nil {
for _, ss := range sources {
subjs, hasExt = acc.streamSourceSubjects(ss, seen)
if len(subjs) > 0 {
subjects = append(subjects, subjs...)
}
if hasExt {
break
}
}
}
}
return subjects, hasExt
}
func (a *Account) streamSourceSubjectsNotClustered(streamName string, seen map[string]bool) (subjects []string, hasExt bool) {
if seen[streamName] {
return nil, false
}
mset, err := a.lookupStream(streamName)
if err != nil {
return nil, false
}
seen[streamName] = true
cfg := mset.config()
if len(cfg.Subjects) > 0 {
subjects = append(subjects, cfg.Subjects...)
}
var subjs []string
if cfg.Mirror != nil {
subjs, hasExt = a.streamSourceSubjects(cfg.Mirror, seen)
if len(subjs) > 0 {
subjects = append(subjects, subjs...)
}
} else if len(cfg.Sources) > 0 {
for _, si := range cfg.Sources {
subjs, hasExt = a.streamSourceSubjects(si, seen)
if len(subjs) > 0 {
subjects = append(subjects, subjs...)
}
if hasExt {
break
}
}
}
return subjects, hasExt
}
// Lock should be held
func (mset *stream) sourceInfo(si *sourceInfo) *StreamSourceInfo {
if si == nil {
return nil
}
ssi := &StreamSourceInfo{Name: si.name, Lag: si.lag, Active: time.Since(si.last), Error: si.err}
var ext *ExternalStream
if mset.cfg.Mirror != nil {
ext = mset.cfg.Mirror.External
} else if ss := mset.streamSource(si.iname); ss != nil && ss.External != nil {
ext = ss.External
}
if ext != nil {
ssi.External = &ExternalStream{
ApiPrefix: ext.ApiPrefix,
DeliverPrefix: ext.DeliverPrefix,
}
}
return ssi
}
// Return our source info for our mirror.
func (mset *stream) mirrorInfo() *StreamSourceInfo {
mset.mu.RLock()
defer mset.mu.RUnlock()
return mset.sourceInfo(mset.mirror)
}
const sourceHealthCheckInterval = 2 * time.Second
// Will run as a Go routine to process mirror consumer messages.
func (mset *stream) processMirrorMsgs() {
s := mset.srv
defer s.grWG.Done()
defer func() {
mset.mu.Lock()
if mset.mirror != nil {
mset.mirror.grr = false
if mset.mirror.qch != nil {
close(mset.mirror.qch)
mset.mirror.qch = nil
}
}
mset.mu.Unlock()
}()
// Grab stream quit channel.
mset.mu.Lock()
if mset.mirror == nil {
mset.mu.Unlock()
return
}
msgs, mch, qch, siqch := mset.mirror.msgs, mset.mirror.msgs.mch, mset.qch, mset.mirror.qch
// Set the last seen as now so that we don't fail at the first check.
mset.mirror.last = time.Now()
mset.mu.Unlock()
t := time.NewTicker(sourceHealthCheckInterval)
defer t.Stop()
for {
select {
case <-s.quitCh:
return
case <-qch:
return
case <-siqch:
return
case <-mch:
for im := mset.pending(msgs); im != nil; im = im.next {
if !mset.processInboundMirrorMsg(im) {
break
}
}
case <-t.C:
mset.mu.RLock()
isLeader := mset.isLeader()
stalled := mset.mirror != nil && time.Since(mset.mirror.last) > 3*sourceHealthCheckInterval
mset.mu.RUnlock()
// No longer leader.
if !isLeader {
mset.cancelMirrorConsumer()
return
}
// We are stalled.
if stalled {
mset.retryMirrorConsumer()
}
}
}
}
// Checks that the message is from our current direct consumer. We can not depend on sub comparison
// since cross account imports break.
func (si *sourceInfo) isCurrentSub(reply string) bool {
return si.cname != _EMPTY_ && strings.HasPrefix(reply, jsAckPre) && si.cname == tokenAt(reply, 4)
}
// processInboundMirrorMsg handles processing messages bound for a stream.
func (mset *stream) processInboundMirrorMsg(m *inMsg) bool {
mset.mu.Lock()
if mset.mirror == nil {
mset.mu.Unlock()
return false
}
if !mset.isLeader() {
mset.mu.Unlock()
mset.cancelMirrorConsumer()
return false
}
isControl := m.isControlMsg()
// Ignore from old subscriptions.
// The reason we can not just compare subs is that on cross account imports they will not match.
if !mset.mirror.isCurrentSub(m.rply) && !isControl {
mset.mu.Unlock()
return false
}
mset.mirror.last = time.Now()
node := mset.node
// Check for heartbeats and flow control messages.
if isControl {
var needsRetry bool
// Flow controls have reply subjects.
if m.rply != _EMPTY_ {
mset.handleFlowControl(mset.mirror, m)
} else {
// For idle heartbeats make sure we did not miss anything.
if ldseq := parseInt64(getHeader(JSLastConsumerSeq, m.hdr)); ldseq > 0 && uint64(ldseq) != mset.mirror.dseq {
needsRetry = true
}
}
mset.mu.Unlock()
if needsRetry {
mset.retryMirrorConsumer()
}
return !needsRetry
}
sseq, dseq, dc, ts, pending := replyInfo(m.rply)
if dc > 1 {
mset.mu.Unlock()
return false
}
// Mirror info tracking.
olag, osseq, odseq, oclseq := mset.mirror.lag, mset.mirror.sseq, mset.mirror.dseq, mset.mirror.clseq
if sseq == mset.mirror.sseq+1 {
mset.mirror.dseq = dseq
mset.mirror.sseq++
} else if sseq <= mset.mirror.sseq {
// Ignore older messages.
mset.mu.Unlock()
return true
} else if mset.mirror.cname == _EMPTY_ {
mset.mirror.cname = tokenAt(m.rply, 4)
mset.mirror.dseq, mset.mirror.sseq = dseq, sseq
} else {
// If the deliver sequence matches then the upstream stream has expired or deleted messages.
if dseq == mset.mirror.dseq+1 {
mset.skipMsgs(mset.mirror.sseq+1, sseq-1)
mset.mirror.dseq++
mset.mirror.sseq = sseq
} else {
mset.mu.Unlock()
mset.retryMirrorConsumer()
return false
}
}
if pending == 0 {
mset.mirror.lag = 0
} else {
mset.mirror.lag = pending - 1
}
mset.mirror.clseq = sseq - 1
js, stype := mset.js, mset.cfg.Storage
mset.mu.Unlock()
s := mset.srv
var err error
if node != nil {
if js.limitsExceeded(stype) {
s.resourcesExeededError()
err = ApiErrors[JSInsufficientResourcesErr]
} else {
err = node.Propose(encodeStreamMsg(m.subj, _EMPTY_, m.hdr, m.msg, sseq-1, ts))
}
} else {
err = mset.processJetStreamMsg(m.subj, _EMPTY_, m.hdr, m.msg, sseq-1, ts)
}
if err != nil {
if err == errLastSeqMismatch {
// We may have missed messages, restart.
if sseq <= mset.lastSeq() {
mset.mu.Lock()
mset.mirror.lag = olag
mset.mirror.sseq = osseq
mset.mirror.dseq = odseq
mset.mirror.clseq = oclseq
mset.mu.Unlock()
return false
} else {
mset.mu.Lock()
mset.mirror.dseq = odseq
mset.mirror.sseq = osseq
mset.mu.Unlock()
mset.retryMirrorConsumer()
}
} else {
s.Warnf("Got error processing JetStream mirror msg: %v", err)
}
if strings.Contains(err.Error(), "no space left") {
s.Errorf("JetStream out of space, will be DISABLED")
s.DisableJetStream()
}
}
return err == nil
}
func (mset *stream) setMirrorErr(err *ApiError) {
mset.mu.Lock()
if mset.mirror != nil {
mset.mirror.err = err
}
mset.mu.Unlock()
}
func (mset *stream) cancelMirrorConsumer() {
mset.mu.Lock()
defer mset.mu.Unlock()
if mset.mirror == nil {
return
}
if mset.mirror.sub != nil {
mset.unsubscribe(mset.mirror.sub)
mset.mirror.sub = nil
}
mset.removeInternalConsumer(mset.mirror)
// If the go routine is still running close the quit chan.
if mset.mirror.qch != nil {
close(mset.mirror.qch)
mset.mirror.qch = nil
}
}
func (mset *stream) retryMirrorConsumer() error {
mset.mu.Lock()
defer mset.mu.Unlock()
mset.srv.Debugf("Retrying mirror consumer for '%s > %s'", mset.acc.Name, mset.cfg.Name)
return mset.setupMirrorConsumer()
}
// Lock should be held.
func (mset *stream) skipMsgs(start, end uint64) {
node, store := mset.node, mset.store
var entries []*Entry
for seq := start; seq <= end; seq++ {
if node != nil {
entries = append(entries, &Entry{EntryNormal, encodeStreamMsg(_EMPTY_, _EMPTY_, nil, nil, seq-1, 0)})
// So a single message does not get too big.
if len(entries) > 10_000 {
node.ProposeDirect(entries)
entries = entries[:0]
}
} else {
mset.lseq = store.SkipMsg()
}
}
// Send all at once.
if node != nil && len(entries) > 0 {
node.ProposeDirect(entries)
}
}
// Setup our mirror consumer.
// Lock should be held.
func (mset *stream) setupMirrorConsumer() error {
if mset.outq == nil {
return errors.New("outq required")
}
isReset := mset.mirror != nil
// Reset
if isReset {
if mset.mirror.sub != nil {
mset.unsubscribe(mset.mirror.sub)
mset.mirror.sub = nil
mset.mirror.dseq = 0
mset.mirror.sseq = mset.lseq
}
// Make sure to delete any prior consumers if we know about them.
mset.removeInternalConsumer(mset.mirror)
// If we are no longer the leader stop trying.
if !mset.isLeader() {
return nil
}
}
// Determine subjects etc.
var deliverSubject string
ext := mset.cfg.Mirror.External
if ext != nil && ext.DeliverPrefix != _EMPTY_ {
deliverSubject = strings.ReplaceAll(ext.DeliverPrefix+syncSubject(".M"), "..", ".")
} else {
deliverSubject = syncSubject("$JS.M")
}
if !isReset {
mset.mirror = &sourceInfo{name: mset.cfg.Mirror.Name, msgs: &inbound{mch: make(chan struct{}, 1)}}
}
if !mset.mirror.grr {
mset.mirror.grr = true
mset.mirror.qch = make(chan struct{})
mset.srv.startGoRoutine(func() { mset.processMirrorMsgs() })
}
// We want to throttle here in terms of how fast we request new consumers.
if time.Since(mset.mirror.lreq) < 2*time.Second {
return nil
}
mset.mirror.lreq = time.Now()
// Now send off request to create/update our consumer. This will be all API based even in single server mode.
// We calculate durable names apriori so we do not need to save them off.
var state StreamState
mset.store.FastState(&state)
req := &CreateConsumerRequest{
Stream: mset.cfg.Mirror.Name,
Config: ConsumerConfig{
DeliverSubject: deliverSubject,
DeliverPolicy: DeliverByStartSequence,
OptStartSeq: state.LastSeq + 1,
AckPolicy: AckNone,
AckWait: 22 * time.Hour,
MaxDeliver: 1,
Heartbeat: sourceHealthCheckInterval,
FlowControl: true,
Direct: true,
},
}
// Only use start optionals on first time.
if state.Msgs == 0 && state.FirstSeq == 0 {
req.Config.OptStartSeq = 0
if mset.cfg.Mirror.OptStartSeq > 0 {
req.Config.OptStartSeq = mset.cfg.Mirror.OptStartSeq
} else if mset.cfg.Mirror.OptStartTime != nil {
req.Config.OptStartTime = mset.cfg.Mirror.OptStartTime
req.Config.DeliverPolicy = DeliverByStartTime
}
}
if req.Config.OptStartSeq == 0 && req.Config.OptStartTime == nil {
// If starting out and lastSeq is 0.
req.Config.DeliverPolicy = DeliverAll
}
respCh := make(chan *JSApiConsumerCreateResponse, 1)
reply := infoReplySubject()
crSub, _ := mset.subscribeInternal(reply, func(sub *subscription, c *client, subject, reply string, rmsg []byte) {
mset.unsubscribeUnlocked(sub)
_, msg := c.msgParts(rmsg)
var ccr JSApiConsumerCreateResponse
if err := json.Unmarshal(msg, &ccr); err != nil {
c.Warnf("JetStream bad mirror consumer create response: %q", msg)
mset.cancelMirrorConsumer()
mset.setMirrorErr(ApiErrors[JSInvalidJSONErr])
return
}
respCh <- &ccr
})
b, _ := json.Marshal(req)
subject := fmt.Sprintf(JSApiConsumerCreateT, mset.cfg.Mirror.Name)
if ext != nil {
subject = strings.Replace(subject, JSApiPrefix, ext.ApiPrefix, 1)
subject = strings.ReplaceAll(subject, "..", ".")
}
mset.outq.send(&jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0, nil})
go func() {
select {
case ccr := <-respCh:
if ccr.Error != nil || ccr.ConsumerInfo == nil {
mset.cancelMirrorConsumer()
} else {
mset.mu.Lock()
// Mirror config has been removed.
if mset.mirror == nil {
mset.mu.Unlock()
mset.cancelMirrorConsumer()
return
}
// When an upstream stream expires messages or in general has messages that we want
// that are no longer available we need to adjust here.
var state StreamState
mset.store.FastState(&state)
// Check if we need to skip messages.
if state.LastSeq != ccr.ConsumerInfo.Delivered.Stream {
mset.skipMsgs(state.LastSeq+1, ccr.ConsumerInfo.Delivered.Stream)
}
// Capture consumer name.
mset.mirror.cname = ccr.ConsumerInfo.Name
msgs := mset.mirror.msgs
// Process inbound mirror messages from the wire.
sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, subject, reply string, rmsg []byte) {
hdr, msg := c.msgParts(append(rmsg[:0:0], rmsg...)) // Need to copy.
mset.queueInbound(msgs, subject, reply, hdr, msg)
})
if err != nil {
mset.mirror.err = ApiErrors[JSMirrorConsumerSetupFailedErrF].ErrOrNewT(err, "{err}", err)
mset.mirror.sub = nil
mset.mirror.cname = _EMPTY_
} else {
mset.mirror.err = nil
mset.mirror.sub = sub
mset.mirror.last = time.Now()
mset.mirror.dseq = 0
mset.mirror.sseq = ccr.ConsumerInfo.Delivered.Stream
}
mset.mu.Unlock()
}
mset.setMirrorErr(ccr.Error)
case <-time.After(10 * time.Second):
mset.unsubscribeUnlocked(crSub)
return
}
}()
return nil
}
func (mset *stream) streamSource(iname string) *StreamSource {
for _, ssi := range mset.cfg.Sources {
if ssi.iname == iname {
return ssi
}
}
return nil
}
func (mset *stream) retrySourceConsumer(sname string) {
mset.mu.Lock()
defer mset.mu.Unlock()
si := mset.sources[sname]
if si == nil {
return
}
mset.setStartingSequenceForSource(sname)
mset.retrySourceConsumerAtSeq(sname, si.sseq+1)
}
// Lock should be held.
func (mset *stream) retrySourceConsumerAtSeq(sname string, seq uint64) {
if mset.client == nil {
return
}
s := mset.srv
s.Debugf("Retrying source consumer for '%s > %s'", mset.acc.Name, mset.cfg.Name)
// No longer configured.
if si := mset.sources[sname]; si == nil {
return
}
mset.setSourceConsumer(sname, seq)
}
// Lock should be held.
func (mset *stream) cancelSourceConsumer(sname string) {
if si := mset.sources[sname]; si != nil && si.sub != nil {
mset.unsubscribe(si.sub)
si.sub = nil
si.sseq, si.dseq = 0, 0
mset.removeInternalConsumer(si)
// If the go routine is still running close the quit chan.
if si.qch != nil {
close(si.qch)
si.qch = nil
}
}
}
// Lock should be held.
func (mset *stream) setSourceConsumer(iname string, seq uint64) {
si := mset.sources[iname]
if si == nil {
return
}
if si.sub != nil {
mset.unsubscribe(si.sub)
si.sub = nil
}
// Need to delete the old one.
mset.removeInternalConsumer(si)
si.sseq, si.dseq = seq, 0
si.last = time.Now()
ssi := mset.streamSource(iname)
// Determine subjects etc.
var deliverSubject string
ext := ssi.External
if ext != nil && ext.DeliverPrefix != _EMPTY_ {
deliverSubject = strings.ReplaceAll(ext.DeliverPrefix+syncSubject(".S"), "..", ".")
} else {
deliverSubject = syncSubject("$JS.S")
}
if !si.grr {
si.grr = true
si.qch = make(chan struct{})
mset.srv.startGoRoutine(func() { mset.processSourceMsgs(si) })
}
// We want to throttle here in terms of how fast we request new consumers.
if time.Since(si.lreq) < 2*time.Second {
return
}
si.lreq = time.Now()
req := &CreateConsumerRequest{
Stream: si.name,
Config: ConsumerConfig{
DeliverSubject: deliverSubject,
AckPolicy: AckNone,
AckWait: 22 * time.Hour,
MaxDeliver: 1,
Heartbeat: sourceHealthCheckInterval,
FlowControl: true,
Direct: true,
},
}
// If starting, check any configs.
if seq <= 1 {
if ssi.OptStartSeq > 0 {
req.Config.OptStartSeq = ssi.OptStartSeq
req.Config.DeliverPolicy = DeliverByStartSequence
} else if ssi.OptStartTime != nil {
req.Config.OptStartTime = ssi.OptStartTime
req.Config.DeliverPolicy = DeliverByStartTime
}
} else {
req.Config.OptStartSeq = seq
req.Config.DeliverPolicy = DeliverByStartSequence
}
// Filters
if ssi.FilterSubject != _EMPTY_ {
req.Config.FilterSubject = ssi.FilterSubject
}
respCh := make(chan *JSApiConsumerCreateResponse, 1)
reply := infoReplySubject()
crSub, _ := mset.subscribeInternal(reply, func(sub *subscription, c *client, subject, reply string, rmsg []byte) {
mset.unsubscribe(sub)
_, msg := c.msgParts(rmsg)
var ccr JSApiConsumerCreateResponse
if err := json.Unmarshal(msg, &ccr); err != nil {
c.Warnf("JetStream bad source consumer create response: %q", msg)
return
}
respCh <- &ccr
})
b, _ := json.Marshal(req)
subject := fmt.Sprintf(JSApiConsumerCreateT, si.name)
if ext != nil {
subject = strings.Replace(subject, JSApiPrefix, ext.ApiPrefix, 1)
subject = strings.ReplaceAll(subject, "..", ".")
}
mset.outq.send(&jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0, nil})
go func() {
select {
case ccr := <-respCh:
mset.mu.Lock()
if si := mset.sources[iname]; si != nil {
si.err = nil
if ccr.Error != nil || ccr.ConsumerInfo == nil {
mset.srv.Warnf("JetStream error response for create source consumer: %+v", ccr.Error)
si.err = ccr.Error
// We will retry every 10 seconds or so
mset.cancelSourceConsumer(iname)
} else {
if si.sseq != ccr.ConsumerInfo.Delivered.Stream {
si.sseq = ccr.ConsumerInfo.Delivered.Stream + 1
}
// Capture consumer name.
si.cname = ccr.ConsumerInfo.Name
// Now create sub to receive messages.
sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, subject, reply string, rmsg []byte) {
hdr, msg := c.msgParts(append(rmsg[:0:0], rmsg...)) // Need to copy.
mset.queueInbound(si.msgs, subject, reply, hdr, msg)
})
if err != nil {
si.err = ApiErrors[JSSourceConsumerSetupFailedErrF].ErrOrNewT(err, "{err}", err)
si.sub = nil
} else {
si.err = nil
si.sub = sub
si.last = time.Now()
}
}
}
mset.mu.Unlock()
case <-time.After(10 * time.Second):
mset.unsubscribe(crSub)
return
}
}()
}
func (mset *stream) processSourceMsgs(si *sourceInfo) {
s := mset.srv
defer s.grWG.Done()
if si == nil {
return
}
defer func() {
mset.mu.Lock()
si.grr = false
if si.qch != nil {
close(si.qch)
si.qch = nil
}
mset.mu.Unlock()
}()
// Grab stream quit channel.
mset.mu.Lock()
msgs, mch, qch, siqch := si.msgs, si.msgs.mch, mset.qch, si.qch
// Set the last seen as now so that we don't fail at the first check.
si.last = time.Now()
mset.mu.Unlock()
t := time.NewTicker(sourceHealthCheckInterval)
defer t.Stop()
for {
select {
case <-s.quitCh:
return
case <-qch:
return
case <-siqch:
return
case <-mch:
for im := mset.pending(msgs); im != nil; im = im.next {
if !mset.processInboundSourceMsg(si, im) {
break
}
}
case <-t.C:
mset.mu.RLock()
iname, isLeader := si.iname, mset.isLeader()
stalled := time.Since(si.last) > 3*sourceHealthCheckInterval
mset.mu.RUnlock()
// No longer leader.
if !isLeader {
mset.mu.Lock()
mset.cancelSourceConsumer(iname)
mset.mu.Unlock()
return
}
// We are stalled.
if stalled {
mset.retrySourceConsumer(iname)
}
}
}
}
// isControlMsg determines if this is a control message.
func (m *inMsg) isControlMsg() bool {
return len(m.msg) == 0 && len(m.hdr) > 0 && bytes.HasPrefix(m.hdr, []byte("NATS/1.0 100 "))
}
// handleFlowControl will properly handle flow control messages for both R1 and R>1.
// Lock should be held.
func (mset *stream) handleFlowControl(si *sourceInfo, m *inMsg) {
// If we are clustered we want to delay signaling back the the upstream consumer.
if node := mset.node; node != nil && si.clseq > 0 {
if mset.fcr == nil {
mset.fcr = make(map[uint64]string)
}
mset.fcr[si.clseq] = m.rply
} else {
mset.outq.send(&jsPubMsg{m.rply, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil})
}
}
// processInboundSourceMsg handles processing other stream messages bound for this stream.
func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool {
mset.mu.Lock()
// If we are no longer the leader cancel this subscriber.
if !mset.isLeader() {
mset.mu.Unlock()
mset.cancelSourceConsumer(si.name)
return false
}
isControl := m.isControlMsg()
// Ignore from old subscriptions.
if !si.isCurrentSub(m.rply) && !isControl {
mset.mu.Unlock()
return false
}
si.last = time.Now()
node := mset.node
// Check for heartbeats and flow control messages.
if isControl {
var needsRetry bool
// Flow controls have reply subjects.
if m.rply != _EMPTY_ {
mset.handleFlowControl(si, m)
} else {
// For idle heartbeats make sure we did not miss anything.
if ldseq := parseInt64(getHeader(JSLastConsumerSeq, m.hdr)); ldseq > 0 && uint64(ldseq) != si.dseq {
needsRetry = true
mset.retrySourceConsumerAtSeq(si.iname, si.sseq+1)
}
}
mset.mu.Unlock()
return !needsRetry
}
sseq, dseq, dc, _, pending := replyInfo(m.rply)
if dc > 1 {
mset.mu.Unlock()
return false
}
// Tracking is done here.
if dseq == si.dseq+1 {
si.dseq++
si.sseq = sseq
} else if dseq > si.dseq {
if si.cname == _EMPTY_ {
si.cname = tokenAt(m.rply, 4)
si.dseq, si.sseq = dseq, sseq
} else {
mset.retrySourceConsumerAtSeq(si.iname, si.sseq+1)
mset.mu.Unlock()
return false
}
} else {
mset.mu.Unlock()
return false
}
if pending == 0 {
si.lag = 0
} else {
si.lag = pending - 1
}
mset.mu.Unlock()
hdr, msg := m.hdr, m.msg
// If we are daisy chained here make sure to remove the original one.
if len(hdr) > 0 {
hdr = removeHeaderIfPresent(hdr, JSStreamSource)
}
// Hold onto the origin reply which has all the metadata.
hdr = genHeader(hdr, JSStreamSource, si.genSourceHeader(m.rply))
var err error
var clseq uint64
// If we are clustered we need to propose this message to the underlying raft group.
if node != nil {
clseq, err = mset.processClusteredInboundMsg(m.subj, _EMPTY_, hdr, msg)
if err == nil {
mset.mu.Lock()
si.clseq = clseq
mset.mu.Unlock()
}
} else {
err = mset.processJetStreamMsg(m.subj, _EMPTY_, hdr, msg, 0, 0)
}
if err != nil {
s := mset.srv
if err == errLastSeqMismatch {
mset.cancelSourceConsumer(si.iname)
mset.retrySourceConsumer(si.iname)
} else {
s.Warnf("JetStream got an error processing inbound source msg: %v", err)
}
if strings.Contains(err.Error(), "no space left") {
s.Errorf("JetStream out of space, will be DISABLED")
s.DisableJetStream()
}
}
return true
}
// Generate a new style source header.
func (si *sourceInfo) genSourceHeader(reply string) string {
var b strings.Builder
b.WriteString(si.iname)
b.WriteByte(' ')
// Grab sequence as text here from reply subject.
var tsa [expectedNumReplyTokens]string
start, tokens := 0, tsa[:0]
for i := 0; i < len(reply); i++ {
if reply[i] == btsep {
tokens, start = append(tokens, reply[start:i]), i+1
}
}
tokens = append(tokens, reply[start:])
seq := "1" // Default
if len(tokens) == expectedNumReplyTokens && tokens[0] == "$JS" && tokens[1] == "ACK" {
seq = tokens[5]
}
b.WriteString(seq)
return b.String()
}
// Original version of header that stored ack reply direct.
func streamAndSeqFromAckReply(reply string) (string, uint64) {
tsa := [expectedNumReplyTokens]string{}
start, tokens := 0, tsa[:0]
for i := 0; i < len(reply); i++ {
if reply[i] == btsep {
tokens, start = append(tokens, reply[start:i]), i+1
}
}
tokens = append(tokens, reply[start:])
if len(tokens) != expectedNumReplyTokens || tokens[0] != "$JS" || tokens[1] != "ACK" {
return _EMPTY_, 0
}
return tokens[2], uint64(parseAckReplyNum(tokens[5]))
}
// Extract the stream (indexed name) and sequence from the source header.
func streamAndSeq(shdr string) (string, uint64) {
if strings.HasPrefix(shdr, jsAckPre) {
return streamAndSeqFromAckReply(shdr)
}
// New version which is stream index name <SPC> sequence
fields := strings.Fields(shdr)
if len(fields) != 2 {
return _EMPTY_, 0
}
return fields[0], uint64(parseAckReplyNum(fields[1]))
}
// Lock should be held.
func (mset *stream) setStartingSequenceForSource(sname string) {
si := mset.sources[sname]
if si == nil {
return
}
var state StreamState
mset.store.FastState(&state)
// Do not reset sseq here so we can remember when purge/expiration happens.
if state.Msgs == 0 {
si.dseq = 0
return
}
for seq := state.LastSeq; seq >= state.FirstSeq; seq-- {
_, hdr, _, _, err := mset.store.LoadMsg(seq)
if err != nil || len(hdr) == 0 {
continue
}
ss := getHeader(JSStreamSource, hdr)
if len(ss) == 0 {
continue
}
iname, sseq := streamAndSeq(string(ss))
if iname == sname {
si.sseq = sseq
si.dseq = 0
return
}
}
}
// Lock should be held.
// This will do a reverse scan on startup or leader election
// searching for the starting sequence number.
// This can be slow in degenerative cases.
// Lock should be held.
func (mset *stream) startingSequenceForSources() {
if len(mset.cfg.Sources) == 0 {
return
}
// Always reset here.
mset.sources = make(map[string]*sourceInfo)
for _, ssi := range mset.cfg.Sources {
if ssi.iname == _EMPTY_ {
ssi.setIndexName()
}
si := &sourceInfo{name: ssi.Name, iname: ssi.iname, msgs: &inbound{mch: make(chan struct{}, 1)}}
mset.sources[ssi.iname] = si
}
var state StreamState
mset.store.FastState(&state)
if state.Msgs == 0 {
return
}
// For short circuiting return.
expected := len(mset.cfg.Sources)
seqs := make(map[string]uint64)
// Stamp our si seq records on the way out.
defer func() {
for sname, seq := range seqs {
// Ignore if not set.
if seq == 0 {
continue
}
if si := mset.sources[sname]; si != nil {
si.sseq = seq
si.dseq = 0
}
}
}()
for seq := state.LastSeq; seq >= state.FirstSeq; seq-- {
_, hdr, _, _, err := mset.store.LoadMsg(seq)
if err != nil || len(hdr) == 0 {
continue
}
ss := getHeader(JSStreamSource, hdr)
if len(ss) == 0 {
continue
}
name, sseq := streamAndSeq(string(ss))
// Only update active in case we have older ones in here that got configured out.
if si := mset.sources[name]; si != nil {
if _, ok := seqs[name]; !ok {
seqs[name] = sseq
if len(seqs) == expected {
return
}
}
}
}
}
// Setup our source consumers.
// Lock should be held.
func (mset *stream) setupSourceConsumers() error {
if mset.outq == nil {
return errors.New("outq required")
}
// Reset if needed.
for _, si := range mset.sources {
if si.sub != nil {
mset.cancelSourceConsumer(si.name)
}
}
mset.startingSequenceForSources()
// Setup our consumers at the proper starting position.
for _, ssi := range mset.cfg.Sources {
if si := mset.sources[ssi.iname]; si != nil {
mset.setSourceConsumer(ssi.iname, si.sseq+1)
}
}
return nil
}
// Will create internal subscriptions for the stream.
// Lock should be held.
func (mset *stream) subscribeToStream() error {
if mset.active {
return nil
}
for _, subject := range mset.cfg.Subjects {
if _, err := mset.subscribeInternal(subject, mset.processInboundJetStreamMsg); err != nil {
return err
}
}
// Check if we need to setup mirroring.
if mset.cfg.Mirror != nil {
if err := mset.setupMirrorConsumer(); err != nil {
return err
}
} else if len(mset.cfg.Sources) > 0 {
if err := mset.setupSourceConsumers(); err != nil {
return err
}
}
mset.active = true
return nil
}
// Stop our source consumers.
// Lock should be held.
func (mset *stream) stopSourceConsumers() {
for _, si := range mset.sources {
if si.sub != nil {
mset.unsubscribe(si.sub)
}
// Need to delete the old one.
mset.removeInternalConsumer(si)
// If the go routine is still running close the quit chan.
if si.qch != nil {
close(si.qch)
si.qch = nil
}
}
}
// Lock should be held.
func (mset *stream) removeInternalConsumer(si *sourceInfo) {
if si == nil || si.cname == _EMPTY_ {
return
}
si.cname = _EMPTY_
}
// Will unsubscribe from the stream.
// Lock should be held.
func (mset *stream) unsubscribeToStream() error {
for _, subject := range mset.cfg.Subjects {
mset.unsubscribeInternal(subject)
}
if mset.mirror != nil {
if mset.mirror.sub != nil {
mset.unsubscribe(mset.mirror.sub)
}
mset.removeInternalConsumer(mset.mirror)
// If the go routine is still running close the quit chan.
if mset.mirror.qch != nil {
close(mset.mirror.qch)
}
mset.mirror = nil
}
if len(mset.cfg.Sources) > 0 {
mset.stopSourceConsumers()
}
mset.active = false
return nil
}
// Lock should be held.
func (mset *stream) subscribeInternal(subject string, cb msgHandler) (*subscription, error) {
c := mset.client
if c == nil {
return nil, fmt.Errorf("invalid stream")
}
if !c.srv.eventsEnabled() {
return nil, ErrNoSysAccount
}
if cb == nil {
return nil, fmt.Errorf("undefined message handler")
}
mset.sid++
// Now create the subscription
return c.processSub([]byte(subject), nil, []byte(strconv.Itoa(mset.sid)), cb, false)
}
// Helper for unlocked stream.
func (mset *stream) subscribeInternalUnlocked(subject string, cb msgHandler) (*subscription, error) {
mset.mu.Lock()
defer mset.mu.Unlock()
return mset.subscribeInternal(subject, cb)
}
// This will unsubscribe us from the exact subject given.
// We do not currently track the subs so do not have the sid.
// This should be called only on an update.
// Lock should be held.
func (mset *stream) unsubscribeInternal(subject string) error {
c := mset.client
if c == nil {
return fmt.Errorf("invalid stream")
}
var sid []byte
c.mu.Lock()
for _, sub := range c.subs {
if subject == string(sub.subject) {
sid = sub.sid
break
}
}
c.mu.Unlock()
if sid != nil {
return c.processUnsub(sid)
}
return nil
}
// Lock should be held.
func (mset *stream) unsubscribe(sub *subscription) {
if sub == nil || mset.client == nil {
return
}
mset.client.processUnsub(sub.sid)
}
func (mset *stream) unsubscribeUnlocked(sub *subscription) {
mset.mu.Lock()
mset.unsubscribe(sub)
mset.mu.Unlock()
}
func (mset *stream) setupStore(fsCfg *FileStoreConfig) error {
mset.mu.Lock()
mset.created = time.Now().UTC()
switch mset.cfg.Storage {
case MemoryStorage:
ms, err := newMemStore(&mset.cfg)
if err != nil {
mset.mu.Unlock()
return err
}
mset.store = ms
case FileStorage:
s := mset.srv
fs, err := newFileStoreWithCreated(*fsCfg, mset.cfg, mset.created, s.jsKeyGen(mset.acc.Name))
if err != nil {
mset.mu.Unlock()
return err
}
mset.store = fs
}
mset.mu.Unlock()
mset.store.RegisterStorageUpdates(mset.storeUpdates)
return nil
}
// Called for any updates to the underlying stream. We pass through the bytes to the
// jetstream account. We do local processing for stream pending for consumers, but only
// for removals.
// Lock should not be held.
func (mset *stream) storeUpdates(md, bd int64, seq uint64, subj string) {
// If we have a single negative update then we will process our consumers for stream pending.
// Purge and Store handled separately inside individual calls.
if md == -1 && seq > 0 {
mset.mu.RLock()
for _, o := range mset.consumers {
o.decStreamPending(seq, subj)
}
mset.mu.RUnlock()
}
if mset.jsa != nil {
mset.jsa.updateUsage(mset.stype, bd)
}
}
// NumMsgIds returns the number of message ids being tracked for duplicate suppression.
func (mset *stream) numMsgIds() int {
mset.mu.RLock()
defer mset.mu.RUnlock()
return len(mset.ddmap)
}
// checkMsgId will process and check for duplicates.
// Lock should be held.
func (mset *stream) checkMsgId(id string) *ddentry {
if id == "" || mset.ddmap == nil {
return nil
}
return mset.ddmap[id]
}
// Will purge the entries that are past the window.
// Should be called from a timer.
func (mset *stream) purgeMsgIds() {
mset.mu.Lock()
defer mset.mu.Unlock()
now := time.Now().UnixNano()
tmrNext := mset.cfg.Duplicates
window := int64(tmrNext)
for i, dde := range mset.ddarr[mset.ddindex:] {
if now-dde.ts >= window {
delete(mset.ddmap, dde.id)
} else {
mset.ddindex += i
// Check if we should garbage collect here if we are 1/3 total size.
if cap(mset.ddarr) > 3*(len(mset.ddarr)-mset.ddindex) {
mset.ddarr = append([]*ddentry(nil), mset.ddarr[mset.ddindex:]...)
mset.ddindex = 0
}
tmrNext = time.Duration(window - (now - dde.ts))
break
}
}
if len(mset.ddmap) > 0 {
// Make sure to not fire too quick
const minFire = 50 * time.Millisecond
if tmrNext < minFire {
tmrNext = minFire
}
mset.ddtmr.Reset(tmrNext)
} else {
mset.ddtmr.Stop()
mset.ddtmr = nil
}
}
// storeMsgId will store the message id for duplicate detection.
func (mset *stream) storeMsgId(dde *ddentry) {
mset.mu.Lock()
if mset.ddmap == nil {
mset.ddmap = make(map[string]*ddentry)
}
if mset.ddtmr == nil {
mset.ddtmr = time.AfterFunc(mset.cfg.Duplicates, mset.purgeMsgIds)
}
mset.ddmap[dde.id] = dde
mset.ddarr = append(mset.ddarr, dde)
mset.mu.Unlock()
}
// Fast lookup of msgId.
func getMsgId(hdr []byte) string {
return string(getHeader(JSMsgId, hdr))
}
// Fast lookup of expected last msgId.
func getExpectedLastMsgId(hdr []byte) string {
return string(getHeader(JSExpectedLastMsgId, hdr))
}
// Fast lookup of expected stream.
func getExpectedStream(hdr []byte) string {
return string(getHeader(JSExpectedStream, hdr))
}
// Fast lookup of expected stream.
func getExpectedLastSeq(hdr []byte) uint64 {
bseq := getHeader(JSExpectedLastSeq, hdr)
if len(bseq) == 0 {
return 0
}
return uint64(parseInt64(bseq))
}
// Lock should be held.
func (mset *stream) isClustered() bool {
return mset.node != nil
}
// Used if we have to queue things internally to avoid the route/gw path.
type inMsg struct {
subj string
rply string
hdr []byte
msg []byte
next *inMsg
}
// Linked list for inbound messages.
type inbound struct {
head *inMsg
tail *inMsg
mch chan struct{}
}
func (mset *stream) pending(msgs *inbound) *inMsg {
mset.mu.Lock()
head := msgs.head
msgs.head, msgs.tail = nil, nil
mset.mu.Unlock()
return head
}
func (mset *stream) queueInbound(ib *inbound, subj, rply string, hdr, msg []byte) {
m := &inMsg{subj, rply, hdr, msg, nil}
mset.mu.Lock()
var notify bool
if ib.head == nil {
ib.head = m
notify = true
} else {
ib.tail.next = m
}
ib.tail = m
mch := ib.mch
mset.mu.Unlock()
if notify {
select {
case mch <- struct{}{}:
default:
}
}
}
func (mset *stream) queueInboundMsg(subj, rply string, hdr, msg []byte) {
// Copy these.
if len(hdr) > 0 {
hdr = append(hdr[:0:0], hdr...)
}
if len(msg) > 0 {
msg = append(msg[:0:0], msg...)
}
mset.queueInbound(mset.msgs, subj, rply, hdr, msg)
}
// processInboundJetStreamMsg handles processing messages bound for a stream.
func (mset *stream) processInboundJetStreamMsg(_ *subscription, c *client, subject, reply string, rmsg []byte) {
mset.mu.RLock()
isLeader, isClustered := mset.isLeader(), mset.node != nil
mset.mu.RUnlock()
// If we are not the leader just ignore.
if !isLeader {
return
}
hdr, msg := c.msgParts(rmsg)
// If we are not receiving directly from a client we should move this this Go routine.
if c.kind != CLIENT {
mset.queueInboundMsg(subject, reply, hdr, msg)
return
}
// If we are clustered we need to propose this message to the underlying raft group.
if isClustered {
mset.processClusteredInboundMsg(subject, reply, hdr, msg)
} else {
mset.processJetStreamMsg(subject, reply, hdr, msg, 0, 0)
}
}
var (
errLastSeqMismatch = errors.New("last sequence mismatch")
errMsgIdDuplicate = errors.New("msgid is duplicate")
)
// processJetStreamMsg is where we try to actually process the stream msg.
func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, lseq uint64, ts int64) error {
mset.mu.Lock()
store := mset.store
c, s := mset.client, mset.srv
if c == nil {
mset.mu.Unlock()
return nil
}
var accName string
if mset.acc != nil {
accName = mset.acc.Name
}
doAck, pubAck := !mset.cfg.NoAck, mset.pubAck
js, jsa := mset.js, mset.jsa
name, stype := mset.cfg.Name, mset.cfg.Storage
maxMsgSize := int(mset.cfg.MaxMsgSize)
numConsumers := len(mset.consumers)
interestRetention := mset.cfg.Retention == InterestPolicy
// Snapshot if we are the leader and if we can respond.
isLeader := mset.isLeader()
canRespond := doAck && len(reply) > 0 && isLeader
var resp = &JSPubAckResponse{}
// For clustering the lower layers will pass our expected lseq. If it is present check for that here.
if lseq > 0 && lseq != (mset.lseq+mset.clfs) {
isMisMatch := true
// If our first message for this mirror, see if we have to adjust our starting sequence.
if mset.cfg.Mirror != nil {
var state StreamState
mset.store.FastState(&state)
if state.FirstSeq == 0 {
mset.store.Compact(lseq + 1)
mset.lseq = lseq
isMisMatch = false
}
}
// Really is a mismatch.
if isMisMatch {
outq := mset.outq
mset.mu.Unlock()
if canRespond && outq != nil {
resp.PubAck = &PubAck{Stream: name}
resp.Error = ApiErrors[JSStreamSequenceNotMatchErr]
b, _ := json.Marshal(resp)
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil})
}
return errLastSeqMismatch
}
}
// If we have received this message across an account we may have request information attached.
// For now remove. TODO(dlc) - Should this be opt-in or opt-out?
if len(hdr) > 0 {
hdr = removeHeaderIfPresent(hdr, ClientInfoHdr)
}
// Process additional msg headers if still present.
var msgId string
if len(hdr) > 0 {
msgId = getMsgId(hdr)
outq := mset.outq
if dde := mset.checkMsgId(msgId); dde != nil {
mset.clfs++
mset.mu.Unlock()
if canRespond {
response := append(pubAck, strconv.FormatUint(dde.seq, 10)...)
response = append(response, ",\"duplicate\": true}"...)
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil})
}
return errMsgIdDuplicate
}
// Expected stream.
if sname := getExpectedStream(hdr); sname != _EMPTY_ && sname != name {
mset.clfs++
mset.mu.Unlock()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = ApiErrors[JSStreamNotMatchErr]
b, _ := json.Marshal(resp)
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil})
}
return errors.New("expected stream does not match")
}
// Expected last sequence.
if seq := getExpectedLastSeq(hdr); seq > 0 && seq != mset.lseq {
mlseq := mset.lseq
mset.clfs++
mset.mu.Unlock()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = ApiErrors[JSStreamWrongLastSequenceErrF].NewT("{seq}", mlseq)
b, _ := json.Marshal(resp)
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil})
}
return fmt.Errorf("last sequence mismatch: %d vs %d", seq, mlseq)
}
// Expected last msgId.
if lmsgId := getExpectedLastMsgId(hdr); lmsgId != _EMPTY_ && lmsgId != mset.lmsgId {
last := mset.lmsgId
mset.clfs++
mset.mu.Unlock()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = ApiErrors[JSStreamWrongLastMsgIDErrF].NewT("{id}", last)
b, _ := json.Marshal(resp)
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil})
}
return fmt.Errorf("last msgid mismatch: %q vs %q", lmsgId, last)
}
}
// Response Ack.
var (
response []byte
seq uint64
err error
)
// Check to see if we are over the max msg size.
if maxMsgSize >= 0 && (len(hdr)+len(msg)) > maxMsgSize {
mset.clfs++
mset.mu.Unlock()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = ApiErrors[JSStreamMessageExceedsMaximumErr]
b, _ := json.Marshal(resp)
mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil})
}
return ErrMaxPayload
}
// Check to see if we have exceeded our limits.
if js.limitsExceeded(stype) {
s.resourcesExeededError()
mset.clfs++
mset.mu.Unlock()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = ApiErrors[JSInsufficientResourcesErr]
b, _ := json.Marshal(resp)
mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil})
}
// Stepdown regardless.
if node := mset.raftNode(); node != nil {
node.StepDown()
}
return ApiErrors[JSInsufficientResourcesErr]
}
var noInterest bool
// If we are interest based retention and have no consumers then we can skip.
if interestRetention {
if numConsumers == 0 {
noInterest = true
} else if mset.numFilter > 0 {
// Assume none.
noInterest = true
for _, o := range mset.consumers {
if o.cfg.FilterSubject != _EMPTY_ && subjectIsSubsetMatch(subject, o.cfg.FilterSubject) {
noInterest = false
break
}
}
}
}
// Grab timestamp if not already set.
if ts == 0 && lseq > 0 {
ts = time.Now().UnixNano()
}
// Skip msg here.
if noInterest {
mset.lseq = store.SkipMsg()
mset.lmsgId = msgId
mset.mu.Unlock()
if canRespond {
response = append(pubAck, strconv.FormatUint(mset.lseq, 10)...)
response = append(response, '}')
mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil})
}
// If we have a msgId make sure to save.
if msgId != _EMPTY_ {
mset.storeMsgId(&ddentry{msgId, seq, ts})
}
return nil
}
// If here we will attempt to store the message.
// Assume this will succeed.
olmsgId := mset.lmsgId
mset.lmsgId = msgId
clfs := mset.clfs
mset.lseq++
// We hold the lock to this point to make sure nothing gets between us since we check for pre-conditions.
// Currently can not hold while calling store b/c we have inline storage update calls that may need the lock.
// Note that upstream that sets seq/ts should be serialized as much as possible.
mset.mu.Unlock()
// Store actual msg.
if lseq == 0 && ts == 0 {
seq, ts, err = store.StoreMsg(subject, hdr, msg)
} else {
// Make sure to take into account any message assignments that we had to skip (clfs).
seq = lseq + 1 - clfs
err = store.StoreRawMsg(subject, hdr, msg, seq, ts)
}
if err != nil {
// If we did not succeed put those values back and increment clfs in case we are clustered.
mset.mu.Lock()
var state StreamState
mset.store.FastState(&state)
mset.lseq = state.LastSeq
mset.lmsgId = olmsgId
mset.clfs++
mset.mu.Unlock()
switch err {
case ErrMaxMsgs, ErrMaxBytes, ErrMaxMsgsPerSubject, ErrMsgTooLarge:
s.Debugf("JetStream failed to store a msg on stream '%s > %s': %v", accName, name, err)
case ErrStoreClosed:
default:
s.Errorf("JetStream failed to store a msg on stream '%s > %s': %v", accName, name, err)
}
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = ApiErrors[JSStreamStoreFailedF].ErrOrNewT(err, "{err}", err)
response, _ = json.Marshal(resp)
}
} else if jsa.limitsExceeded(stype) {
s.Warnf("JetStream resource limits exceeded for account: %q", accName)
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = ApiErrors[JSAccountResourcesExceededErr]
response, _ = json.Marshal(resp)
}
// If we did not succeed put those values back.
mset.mu.Lock()
var state StreamState
mset.store.FastState(&state)
mset.lseq = state.LastSeq
mset.lmsgId = olmsgId
mset.mu.Unlock()
store.RemoveMsg(seq)
seq = 0
} else {
// If we have a msgId make sure to save.
if msgId != _EMPTY_ {
mset.storeMsgId(&ddentry{msgId, seq, ts})
}
if canRespond {
response = append(pubAck, strconv.FormatUint(seq, 10)...)
response = append(response, '}')
}
}
// Send response here.
if canRespond {
mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil})
}
if err == nil && seq > 0 && numConsumers > 0 {
mset.mu.Lock()
for _, o := range mset.consumers {
o.mu.Lock()
if o.isLeader() {
if o.isFilteredMatch(subject) {
o.sgap++
}
o.signalNewMessages()
}
o.mu.Unlock()
}
mset.mu.Unlock()
}
return err
}
// Internal message for use by jetstream subsystem.
type jsPubMsg struct {
subj string
dsubj string
reply string
hdr []byte
msg []byte
o *consumer
seq uint64
next *jsPubMsg
}
func (pm *jsPubMsg) size() int {
if pm == nil {
return 0
}
return len(pm.subj) + len(pm.reply) + len(pm.hdr) + len(pm.msg)
}
// Forms a linked list for sending internal system messages.
type jsOutQ struct {
mu sync.Mutex
mch chan struct{}
head *jsPubMsg
tail *jsPubMsg
}
func (q *jsOutQ) pending() *jsPubMsg {
if q == nil {
return nil
}
q.mu.Lock()
head := q.head
q.head, q.tail = nil, nil
q.mu.Unlock()
return head
}
func (q *jsOutQ) send(msg *jsPubMsg) {
if q == nil || msg == nil {
return
}
q.mu.Lock()
var notify bool
if q.head == nil {
q.head = msg
notify = true
} else {
q.tail.next = msg
}
q.tail = msg
q.mu.Unlock()
if notify {
select {
case q.mch <- struct{}{}:
default:
}
}
}
// StoredMsg is for raw access to messages in a stream.
type StoredMsg struct {
Subject string `json:"subject"`
Sequence uint64 `json:"seq"`
Header []byte `json:"hdrs,omitempty"`
Data []byte `json:"data,omitempty"`
Time time.Time `json:"time"`
}
// This is similar to system semantics but did not want to overload the single system sendq,
// or require system account when doing simple setup with jetstream.
func (mset *stream) setupSendCapabilities() {
mset.mu.Lock()
defer mset.mu.Unlock()
if mset.outq != nil {
return
}
mset.outq = &jsOutQ{mch: make(chan struct{}, 1)}
go mset.internalLoop()
}
// Name returns the stream name.
func (mset *stream) name() string {
if mset == nil {
return _EMPTY_
}
mset.mu.RLock()
defer mset.mu.RUnlock()
return mset.cfg.Name
}
// Returns a copy of the interest subjects for this stream.
func (mset *stream) subjects() []string {
mset.mu.RLock()
defer mset.mu.RUnlock()
if len(mset.cfg.Subjects) == 0 {
return nil
}
return append(mset.cfg.Subjects[:0:0], mset.cfg.Subjects...)
}
func (mset *stream) internalLoop() {
mset.mu.RLock()
s := mset.srv
c := s.createInternalJetStreamClient()
c.registerWithAccount(mset.acc)
defer c.closeConnection(ClientClosed)
outq, qch, mch, amch := mset.outq, mset.qch, mset.msgs.mch, mset.amch
isClustered := mset.cfg.Replicas > 1
mset.mu.RUnlock()
for {
select {
case <-outq.mch:
for pm := outq.pending(); pm != nil; pm = pm.next {
c.pa.subject = []byte(pm.subj)
c.pa.deliver = []byte(pm.dsubj)
c.pa.size = len(pm.msg) + len(pm.hdr)
c.pa.szb = []byte(strconv.Itoa(c.pa.size))
c.pa.reply = []byte(pm.reply)
var msg []byte
if len(pm.hdr) > 0 {
c.pa.hdr = len(pm.hdr)
c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr))
msg = append(pm.hdr, pm.msg...)
msg = append(msg, _CRLF_...)
} else {
c.pa.hdr = -1
c.pa.hdb = nil
msg = append(pm.msg, _CRLF_...)
}
didDeliver, _ := c.processInboundClientMsg(msg)
c.pa.szb = nil
// Check to see if this is a delivery for a consumer and
// we failed to deliver the message. If so alert the consumer.
if pm.o != nil && pm.seq > 0 && !didDeliver {
pm.o.didNotDeliver(pm.seq)
}
}
c.flushClients(10 * time.Millisecond)
case <-mch:
for im := mset.pending(mset.msgs); im != nil; im = im.next {
// If we are clustered we need to propose this message to the underlying raft group.
if isClustered {
mset.processClusteredInboundMsg(im.subj, im.rply, im.hdr, im.msg)
} else {
mset.processJetStreamMsg(im.subj, im.rply, im.hdr, im.msg, 0, 0)
}
}
case seq := <-amch:
mset.ackMsg(nil, seq)
case <-qch:
return
case <-s.quitCh:
return
}
}
}
// Internal function to delete a stream.
func (mset *stream) delete() error {
return mset.stop(true, true)
}
// Internal function to stop or delete the stream.
func (mset *stream) stop(deleteFlag, advisory bool) error {
mset.mu.RLock()
jsa := mset.jsa
mset.mu.RUnlock()
if jsa == nil {
return ApiErrors[JSNotEnabledForAccountErr]
}
// Remove from our account map.
jsa.mu.Lock()
delete(jsa.streams, mset.cfg.Name)
jsa.mu.Unlock()
// Clean up consumers.
mset.mu.Lock()
var obs []*consumer
for _, o := range mset.consumers {
obs = append(obs, o)
}
// Check if we are a mirror.
if mset.mirror != nil && mset.mirror.sub != nil {
mset.unsubscribe(mset.mirror.sub)
mset.mirror.sub = nil
mset.removeInternalConsumer(mset.mirror)
}
// Now check for sources.
if len(mset.sources) > 0 {
for _, si := range mset.sources {
mset.cancelSourceConsumer(si.iname)
}
}
mset.mu.Unlock()
for _, o := range obs {
// Second flag says do not broadcast to signal.
// TODO(dlc) - If we have an err here we don't want to stop
// but should we log?
o.stopWithFlags(deleteFlag, false, advisory)
}
mset.mu.Lock()
// Stop responding to sync requests.
mset.stopClusterSubs()
// Unsubscribe from direct stream.
mset.unsubscribeToStream()
// Our info sub if we spun it up.
if mset.infoSub != nil {
mset.srv.sysUnsubscribe(mset.infoSub)
mset.infoSub = nil
}
// Quit channel.
if mset.qch != nil {
close(mset.qch)
mset.qch = nil
}
// Cluster cleanup
if n := mset.node; n != nil {
if deleteFlag {
n.Delete()
} else {
n.Stop()
}
}
// Send stream delete advisory after the consumers.
if deleteFlag && advisory {
mset.sendDeleteAdvisoryLocked()
}
c := mset.client
mset.client = nil
if c == nil {
mset.mu.Unlock()
return nil
}
// Cleanup duplicate timer if running.
if mset.ddtmr != nil {
mset.ddtmr.Stop()
mset.ddtmr = nil
mset.ddarr = nil
mset.ddmap = nil
}
sysc := mset.sysc
mset.sysc = nil
// Clustered cleanup.
mset.mu.Unlock()
c.closeConnection(ClientClosed)
if sysc != nil {
sysc.closeConnection(ClientClosed)
}
if mset.store == nil {
return nil
}
if deleteFlag {
if err := mset.store.Delete(); err != nil {
return err
}
} else if err := mset.store.Stop(); err != nil {
return err
}
return nil
}
func (mset *stream) getMsg(seq uint64) (*StoredMsg, error) {
subj, hdr, msg, ts, err := mset.store.LoadMsg(seq)
if err != nil {
return nil, err
}
sm := &StoredMsg{
Subject: subj,
Sequence: seq,
Header: hdr,
Data: msg,
Time: time.Unix(0, ts).UTC(),
}
return sm, nil
}
// getConsumers will return all the current consumers for this stream.
func (mset *stream) getConsumers() []*consumer {
mset.mu.Lock()
defer mset.mu.Unlock()
var obs []*consumer
for _, o := range mset.consumers {
obs = append(obs, o)
}
return obs
}
// This returns all consumers that are not DIRECT.
func (mset *stream) getPublicConsumers() []*consumer {
mset.mu.Lock()
defer mset.mu.Unlock()
var obs []*consumer
for _, o := range mset.consumers {
if !o.cfg.Direct {
obs = append(obs, o)
}
}
return obs
}
// NumConsumers reports on number of active consumers for this stream.
func (mset *stream) numConsumers() int {
mset.mu.Lock()
defer mset.mu.Unlock()
return len(mset.consumers)
}
func (mset *stream) setConsumer(o *consumer) {
mset.consumers[o.name] = o
if o.cfg.FilterSubject != _EMPTY_ {
mset.numFilter++
}
if o.cfg.Direct {
mset.directs++
}
}
func (mset *stream) removeConsumer(o *consumer) {
if o.cfg.FilterSubject != _EMPTY_ {
mset.numFilter--
}
if o.cfg.Direct {
mset.directs--
}
delete(mset.consumers, o.name)
}
// lookupConsumer will retrieve a consumer by name.
func (mset *stream) lookupConsumer(name string) *consumer {
mset.mu.Lock()
defer mset.mu.Unlock()
return mset.consumers[name]
}
// State will return the current state for this stream.
func (mset *stream) state() StreamState {
return mset.stateWithDetail(false)
}
func (mset *stream) numDirectConsumers() (num int) {
mset.mu.RLock()
defer mset.mu.RUnlock()
// Consumers that are direct are not recorded at the store level.
for _, o := range mset.consumers {
o.mu.RLock()
if o.cfg.Direct {
num++
}
o.mu.RUnlock()
}
return num
}
func (mset *stream) stateWithDetail(details bool) StreamState {
mset.mu.RLock()
c, store := mset.client, mset.store
mset.mu.RUnlock()
if c == nil || store == nil {
return StreamState{}
}
// Currently rely on store.
state := store.State()
if !details {
state.Deleted = nil
}
return state
}
// Determines if the new proposed partition is unique amongst all consumers.
// Lock should be held.
func (mset *stream) partitionUnique(partition string) bool {
for _, o := range mset.consumers {
if o.cfg.FilterSubject == _EMPTY_ {
return false
}
if subjectIsSubsetMatch(partition, o.cfg.FilterSubject) {
return false
}
}
return true
}
// Lock should be held.
func (mset *stream) checkInterest(seq uint64, obs *consumer) bool {
for _, o := range mset.consumers {
if o != obs && o.needAck(seq) {
return true
}
}
return false
}
// ackMsg is called into from a consumer when we have a WorkQueue or Interest Retention Policy.
func (mset *stream) ackMsg(o *consumer, seq uint64) {
switch mset.cfg.Retention {
case LimitsPolicy:
return
case WorkQueuePolicy:
// Normally we just remove a message when its ack'd here but if we have direct consumers
// from sources and/or mirrors we need to make sure they have delivered the msg.
mset.mu.RLock()
shouldRemove := mset.directs <= 0 || !mset.checkInterest(seq, o)
mset.mu.RUnlock()
if shouldRemove {
mset.store.RemoveMsg(seq)
}
case InterestPolicy:
mset.mu.RLock()
hasInterest := mset.checkInterest(seq, o)
mset.mu.RUnlock()
if !hasInterest {
mset.store.RemoveMsg(seq)
}
}
}
// Snapshot creates a snapshot for the stream and possibly consumers.
func (mset *stream) snapshot(deadline time.Duration, checkMsgs, includeConsumers bool) (*SnapshotResult, error) {
mset.mu.RLock()
if mset.client == nil || mset.store == nil {
mset.mu.RUnlock()
return nil, fmt.Errorf("invalid stream")
}
store := mset.store
mset.mu.RUnlock()
return store.Snapshot(deadline, checkMsgs, includeConsumers)
}
const snapsDir = "__snapshots__"
// RestoreStream will restore a stream from a snapshot.
func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error) {
if ncfg == nil {
return nil, errors.New("nil config on stream restore")
}
cfg, err := checkStreamCfg(ncfg)
if err != nil {
return nil, ApiErrors[JSStreamNotFoundErr].ErrOr(err)
}
_, jsa, err := a.checkForJetStream()
if err != nil {
return nil, err
}
sd := path.Join(jsa.storeDir, snapsDir)
if _, err := os.Stat(sd); os.IsNotExist(err) {
if err := os.MkdirAll(sd, defaultDirPerms); err != nil {
return nil, fmt.Errorf("could not create snapshots directory - %v", err)
}
}
sdir, err := ioutil.TempDir(sd, "snap-")
if err != nil {
return nil, err
}
if _, err := os.Stat(sdir); os.IsNotExist(err) {
if err := os.MkdirAll(sdir, defaultDirPerms); err != nil {
return nil, fmt.Errorf("could not create snapshots directory - %v", err)
}
}
defer os.RemoveAll(sdir)
tr := tar.NewReader(s2.NewReader(r))
for {
hdr, err := tr.Next()
if err == io.EOF {
break // End of snapshot
}
if err != nil {
return nil, err
}
fpath := path.Join(sdir, filepath.Clean(hdr.Name))
os.MkdirAll(filepath.Dir(fpath), defaultDirPerms)
fd, err := os.OpenFile(fpath, os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
return nil, err
}
_, err = io.Copy(fd, tr)
fd.Close()
if err != nil {
return nil, err
}
}
// Check metadata.
// The cfg passed in will be the new identity for the stream.
var fcfg FileStreamInfo
b, err := ioutil.ReadFile(path.Join(sdir, JetStreamMetaFile))
if err != nil {
return nil, err
}
if err := json.Unmarshal(b, &fcfg); err != nil {
return nil, err
}
// See if this stream already exists.
if _, err := a.lookupStream(cfg.Name); err == nil {
return nil, ApiErrors[JSStreamNameExistErr]
}
// Move into the correct place here.
ndir := path.Join(jsa.storeDir, streamsDir, cfg.Name)
// Remove old one if for some reason it is still here.
if _, err := os.Stat(ndir); err == nil {
os.RemoveAll(ndir)
}
// Make sure our destination streams directory exists.
if err := os.MkdirAll(path.Join(jsa.storeDir, streamsDir), defaultDirPerms); err != nil {
return nil, err
}
// Move into new location.
if err := os.Rename(sdir, ndir); err != nil {
return nil, err
}
if cfg.Template != _EMPTY_ {
if err := jsa.addStreamNameToTemplate(cfg.Template, cfg.Name); err != nil {
return nil, err
}
}
mset, err := a.addStream(&cfg)
if err != nil {
return nil, err
}
if !fcfg.Created.IsZero() {
mset.setCreatedTime(fcfg.Created)
}
// Now do consumers.
odir := path.Join(ndir, consumerDir)
ofis, _ := ioutil.ReadDir(odir)
for _, ofi := range ofis {
metafile := path.Join(odir, ofi.Name(), JetStreamMetaFile)
metasum := path.Join(odir, ofi.Name(), JetStreamMetaFileSum)
if _, err := os.Stat(metafile); os.IsNotExist(err) {
mset.stop(true, false)
return nil, fmt.Errorf("error restoring consumer [%q]: %v", ofi.Name(), err)
}
buf, err := ioutil.ReadFile(metafile)
if err != nil {
mset.stop(true, false)
return nil, fmt.Errorf("error restoring consumer [%q]: %v", ofi.Name(), err)
}
if _, err := os.Stat(metasum); os.IsNotExist(err) {
mset.stop(true, false)
return nil, fmt.Errorf("error restoring consumer [%q]: %v", ofi.Name(), err)
}
var cfg FileConsumerInfo
if err := json.Unmarshal(buf, &cfg); err != nil {
mset.stop(true, false)
return nil, fmt.Errorf("error restoring consumer [%q]: %v", ofi.Name(), err)
}
isEphemeral := !isDurableConsumer(&cfg.ConsumerConfig)
if isEphemeral {
// This is an ephermal consumer and this could fail on restart until
// the consumer can reconnect. We will create it as a durable and switch it.
cfg.ConsumerConfig.Durable = ofi.Name()
}
obs, err := mset.addConsumer(&cfg.ConsumerConfig)
if err != nil {
mset.stop(true, false)
return nil, fmt.Errorf("error restoring consumer [%q]: %v", ofi.Name(), err)
}
if isEphemeral {
obs.switchToEphemeral()
}
if !cfg.Created.IsZero() {
obs.setCreatedTime(cfg.Created)
}
obs.mu.Lock()
err = obs.readStoredState()
obs.mu.Unlock()
if err != nil {
mset.stop(true, false)
return nil, fmt.Errorf("error restoring consumer [%q]: %v", ofi.Name(), err)
}
}
return mset, nil
}