mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
1. We were holding open FDs longer than we should for consumers causing issues with open FD limits. We now do not hold them open and cap updates a bit better. 2. When doing a stream delete, consumer delete was repeating alot of work that was not necessary, causing longer delays. This has been optimized a bit, still more improvements to be made. 3. We cover all JS under a single export, but that was also trapping GetNext for pull based consumers, and since this was a no-op (is handled at user account level) we were creating alot of garbage service import responses and reverse map entries that had to be garbage collected. We have a fix in to avoind this but still looking for a better one. 4. Still had some lingering references to all exports vs single JS export. Signed-off-by: Derek Collison <derek@nats.io>
3460 lines
88 KiB
Go
3460 lines
88 KiB
Go
// Copyright 2019-2021 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package server
|
|
|
|
import (
|
|
"archive/tar"
|
|
"bytes"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"math"
|
|
"os"
|
|
"path"
|
|
"path/filepath"
|
|
"reflect"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/klauspost/compress/s2"
|
|
"github.com/nats-io/nuid"
|
|
)
|
|
|
|
// StreamConfig will determine the name, subjects and retention policy
|
|
// for a given stream. If subjects is empty the name will be used.
|
|
type StreamConfig struct {
|
|
Name string `json:"name"`
|
|
Subjects []string `json:"subjects,omitempty"`
|
|
Retention RetentionPolicy `json:"retention"`
|
|
MaxConsumers int `json:"max_consumers"`
|
|
MaxMsgs int64 `json:"max_msgs"`
|
|
MaxBytes int64 `json:"max_bytes"`
|
|
MaxAge time.Duration `json:"max_age"`
|
|
MaxMsgsPer int64 `json:"max_msgs_per_subject"`
|
|
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
|
|
Discard DiscardPolicy `json:"discard"`
|
|
Storage StorageType `json:"storage"`
|
|
Replicas int `json:"num_replicas"`
|
|
NoAck bool `json:"no_ack,omitempty"`
|
|
Template string `json:"template_owner,omitempty"`
|
|
Duplicates time.Duration `json:"duplicate_window,omitempty"`
|
|
Placement *Placement `json:"placement,omitempty"`
|
|
Mirror *StreamSource `json:"mirror,omitempty"`
|
|
Sources []*StreamSource `json:"sources,omitempty"`
|
|
}
|
|
|
|
// JSPubAckResponse is a formal response to a publish operation.
|
|
type JSPubAckResponse struct {
|
|
Error *ApiError `json:"error,omitempty"`
|
|
*PubAck
|
|
}
|
|
|
|
// ToError checks if the response has a error and if it does converts it to an error avoiding the pitfalls described by https://yourbasic.org/golang/gotcha-why-nil-error-not-equal-nil/
|
|
func (r *JSPubAckResponse) ToError() error {
|
|
if r.Error == nil {
|
|
return nil
|
|
}
|
|
|
|
return r.Error
|
|
}
|
|
|
|
// PubAck is the detail you get back from a publish to a stream that was successful.
|
|
// e.g. +OK {"stream": "Orders", "seq": 22}
|
|
type PubAck struct {
|
|
Stream string `json:"stream"`
|
|
Sequence uint64 `json:"seq"`
|
|
Duplicate bool `json:"duplicate,omitempty"`
|
|
}
|
|
|
|
// StreamInfo shows config and current state for this stream.
|
|
type StreamInfo struct {
|
|
Config StreamConfig `json:"config"`
|
|
Created time.Time `json:"created"`
|
|
State StreamState `json:"state"`
|
|
Cluster *ClusterInfo `json:"cluster,omitempty"`
|
|
Mirror *StreamSourceInfo `json:"mirror,omitempty"`
|
|
Sources []*StreamSourceInfo `json:"sources,omitempty"`
|
|
}
|
|
|
|
// ClusterInfo shows information about the underlying set of servers
|
|
// that make up the stream or consumer.
|
|
type ClusterInfo struct {
|
|
Name string `json:"name,omitempty"`
|
|
Leader string `json:"leader,omitempty"`
|
|
Replicas []*PeerInfo `json:"replicas,omitempty"`
|
|
}
|
|
|
|
// PeerInfo shows information about all the peers in the cluster that
|
|
// are supporting the stream or consumer.
|
|
type PeerInfo struct {
|
|
Name string `json:"name"`
|
|
Current bool `json:"current"`
|
|
Offline bool `json:"offline,omitempty"`
|
|
Active time.Duration `json:"active"`
|
|
Lag uint64 `json:"lag,omitempty"`
|
|
}
|
|
|
|
// StreamSourceInfo shows information about an upstream stream source.
|
|
type StreamSourceInfo struct {
|
|
Name string `json:"name"`
|
|
External *ExternalStream `json:"external,omitempty"`
|
|
Lag uint64 `json:"lag"`
|
|
Active time.Duration `json:"active"`
|
|
Error *ApiError `json:"error,omitempty"`
|
|
}
|
|
|
|
// StreamSource dictates how streams can source from other streams.
|
|
type StreamSource struct {
|
|
Name string `json:"name"`
|
|
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
|
|
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
|
|
FilterSubject string `json:"filter_subject,omitempty"`
|
|
External *ExternalStream `json:"external,omitempty"`
|
|
|
|
// Internal
|
|
iname string // For indexing when stream names are the same for multiple sources.
|
|
}
|
|
|
|
// ExternalStream allows you to qualify access to a stream source in another account.
|
|
type ExternalStream struct {
|
|
ApiPrefix string `json:"api"`
|
|
DeliverPrefix string `json:"deliver"`
|
|
}
|
|
|
|
// Stream is a jetstream stream of messages. When we receive a message internally destined
|
|
// for a Stream we will direct link from the client to this structure.
|
|
type stream struct {
|
|
mu sync.RWMutex
|
|
js *jetStream
|
|
jsa *jsAccount
|
|
acc *Account
|
|
srv *Server
|
|
client *client
|
|
sysc *client
|
|
sid int
|
|
pubAck []byte
|
|
outq *jsOutQ
|
|
msgs *inbound
|
|
store StreamStore
|
|
amch chan uint64
|
|
lseq uint64
|
|
lmsgId string
|
|
consumers map[string]*consumer
|
|
numFilter int
|
|
cfg StreamConfig
|
|
created time.Time
|
|
stype StorageType
|
|
ddmap map[string]*ddentry
|
|
ddarr []*ddentry
|
|
ddindex int
|
|
ddtmr *time.Timer
|
|
qch chan struct{}
|
|
active bool
|
|
|
|
// Mirror
|
|
mirror *sourceInfo
|
|
|
|
// Sources
|
|
sources map[string]*sourceInfo
|
|
|
|
// Indicates we have direct consumers.
|
|
directs int
|
|
|
|
// For flowcontrol processing for source and mirror internal consumers.
|
|
fcr map[uint64]string
|
|
|
|
// TODO(dlc) - Hide everything below behind two pointers.
|
|
// Clustered mode.
|
|
sa *streamAssignment
|
|
node RaftNode
|
|
catchup bool
|
|
syncSub *subscription
|
|
infoSub *subscription
|
|
clMu sync.Mutex
|
|
clseq uint64
|
|
clfs uint64
|
|
lqsent time.Time
|
|
catchups map[string]uint64
|
|
}
|
|
|
|
type sourceInfo struct {
|
|
name string
|
|
iname string
|
|
cname string
|
|
sub *subscription
|
|
msgs *inbound
|
|
sseq uint64
|
|
dseq uint64
|
|
clseq uint64
|
|
lag uint64
|
|
err *ApiError
|
|
last time.Time
|
|
lreq time.Time
|
|
qch chan struct{}
|
|
grr bool
|
|
}
|
|
|
|
// Headers for published messages.
|
|
const (
|
|
JSMsgId = "Nats-Msg-Id"
|
|
JSExpectedStream = "Nats-Expected-Stream"
|
|
JSExpectedLastSeq = "Nats-Expected-Last-Sequence"
|
|
JSExpectedLastSubjSeq = "Nats-Expected-Last-Subject-Sequence"
|
|
JSExpectedLastMsgId = "Nats-Expected-Last-Msg-Id"
|
|
JSStreamSource = "Nats-Stream-Source"
|
|
JSLastConsumerSeq = "Nats-Last-Consumer"
|
|
JSLastStreamSeq = "Nats-Last-Stream"
|
|
)
|
|
|
|
// Dedupe entry
|
|
type ddentry struct {
|
|
id string
|
|
seq uint64
|
|
ts int64
|
|
}
|
|
|
|
// Replicas Range
|
|
const (
|
|
StreamMaxReplicas = 5
|
|
)
|
|
|
|
// AddStream adds a stream for the given account.
|
|
func (a *Account) addStream(config *StreamConfig) (*stream, error) {
|
|
return a.addStreamWithAssignment(config, nil, nil)
|
|
}
|
|
|
|
// AddStreamWithStore adds a stream for the given account with custome store config options.
|
|
func (a *Account) addStreamWithStore(config *StreamConfig, fsConfig *FileStoreConfig) (*stream, error) {
|
|
return a.addStreamWithAssignment(config, fsConfig, nil)
|
|
}
|
|
|
|
func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileStoreConfig, sa *streamAssignment) (*stream, error) {
|
|
s, jsa, err := a.checkForJetStream()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// If we do not have the stream currently assigned to us in cluster mode we will proceed but warn.
|
|
// This can happen on startup with restored state where on meta replay we still do not have
|
|
// the assignment. Running in single server mode this always returns true.
|
|
if !jsa.streamAssigned(config.Name) {
|
|
s.Debugf("Stream '%s > %s' does not seem to be assigned to this server", a.Name, config.Name)
|
|
}
|
|
|
|
// Sensible defaults.
|
|
cfg, err := checkStreamCfg(config)
|
|
if err != nil {
|
|
return nil, ApiErrors[JSStreamInvalidConfigF].ErrOrNewT(err, "{err}", err)
|
|
}
|
|
|
|
singleServerMode := !s.JetStreamIsClustered() && s.standAloneMode()
|
|
if singleServerMode && cfg.Replicas > 1 {
|
|
return nil, ApiErrors[JSStreamReplicasNotSupportedErr]
|
|
}
|
|
|
|
jsa.mu.Lock()
|
|
js := jsa.js
|
|
if mset, ok := jsa.streams[cfg.Name]; ok {
|
|
jsa.mu.Unlock()
|
|
// Check to see if configs are same.
|
|
ocfg := mset.config()
|
|
if reflect.DeepEqual(ocfg, cfg) {
|
|
if sa != nil {
|
|
mset.setStreamAssignment(sa)
|
|
}
|
|
return mset, nil
|
|
} else {
|
|
return nil, ApiErrors[JSStreamNameExistErr]
|
|
}
|
|
}
|
|
// Check for limits.
|
|
if err := jsa.checkLimits(&cfg); err != nil {
|
|
jsa.mu.Unlock()
|
|
return nil, err
|
|
}
|
|
// Check for template ownership if present.
|
|
if cfg.Template != _EMPTY_ && jsa.account != nil {
|
|
if !jsa.checkTemplateOwnership(cfg.Template, cfg.Name) {
|
|
jsa.mu.Unlock()
|
|
return nil, fmt.Errorf("stream not owned by template")
|
|
}
|
|
}
|
|
|
|
// Check for mirror designation.
|
|
if cfg.Mirror != nil {
|
|
// Can't have subjects.
|
|
if len(cfg.Subjects) > 0 {
|
|
jsa.mu.Unlock()
|
|
return nil, fmt.Errorf("stream mirrors can not also contain subjects")
|
|
}
|
|
if len(cfg.Sources) > 0 {
|
|
jsa.mu.Unlock()
|
|
return nil, fmt.Errorf("stream mirrors can not also contain other sources")
|
|
}
|
|
if cfg.Mirror.FilterSubject != _EMPTY_ {
|
|
jsa.mu.Unlock()
|
|
return nil, fmt.Errorf("stream mirrors can not contain filtered subjects")
|
|
}
|
|
if cfg.Mirror.OptStartSeq > 0 && cfg.Mirror.OptStartTime != nil {
|
|
jsa.mu.Unlock()
|
|
return nil, fmt.Errorf("stream mirrors can not have both start seq and start time configured")
|
|
}
|
|
} else if len(cfg.Subjects) == 0 && len(cfg.Sources) == 0 {
|
|
jsa.mu.Unlock()
|
|
return nil, fmt.Errorf("stream needs at least one configured subject or mirror")
|
|
}
|
|
|
|
// Setup our internal indexed names here for sources.
|
|
if len(cfg.Sources) > 0 {
|
|
for _, ssi := range cfg.Sources {
|
|
ssi.setIndexName()
|
|
}
|
|
}
|
|
|
|
// Check for overlapping subjects. These are not allowed for now.
|
|
if jsa.subjectsOverlap(cfg.Subjects) {
|
|
jsa.mu.Unlock()
|
|
return nil, fmt.Errorf("subjects overlap with an existing stream")
|
|
}
|
|
|
|
// Setup the internal clients.
|
|
c := s.createInternalJetStreamClient()
|
|
ic := s.createInternalJetStreamClient()
|
|
|
|
mset := &stream{
|
|
acc: a,
|
|
jsa: jsa,
|
|
cfg: cfg,
|
|
js: js,
|
|
srv: s,
|
|
client: c,
|
|
sysc: ic,
|
|
stype: cfg.Storage,
|
|
consumers: make(map[string]*consumer),
|
|
msgs: &inbound{mch: make(chan struct{}, 1)},
|
|
qch: make(chan struct{}),
|
|
}
|
|
|
|
// For no-ack consumers when we are interest retention.
|
|
if cfg.Retention != LimitsPolicy {
|
|
mset.amch = make(chan uint64, 8192)
|
|
}
|
|
|
|
jsa.streams[cfg.Name] = mset
|
|
storeDir := path.Join(jsa.storeDir, streamsDir, cfg.Name)
|
|
jsa.mu.Unlock()
|
|
|
|
// Bind to the user account.
|
|
c.registerWithAccount(a)
|
|
// Bind to the system account.
|
|
ic.registerWithAccount(s.SystemAccount())
|
|
|
|
// Create the appropriate storage
|
|
fsCfg := fsConfig
|
|
if fsCfg == nil {
|
|
fsCfg = &FileStoreConfig{}
|
|
// If we are file based and not explicitly configured
|
|
// we may be able to auto-tune based on max msgs or bytes.
|
|
if cfg.Storage == FileStorage {
|
|
mset.autoTuneFileStorageBlockSize(fsCfg)
|
|
}
|
|
}
|
|
fsCfg.StoreDir = storeDir
|
|
fsCfg.AsyncFlush = false
|
|
fsCfg.SyncInterval = 2 * time.Minute
|
|
|
|
if err := mset.setupStore(fsCfg); err != nil {
|
|
mset.stop(true, false)
|
|
return nil, err
|
|
}
|
|
|
|
// Create our pubAck template here. Better than json marshal each time on success.
|
|
b, _ := json.Marshal(&JSPubAckResponse{PubAck: &PubAck{Stream: cfg.Name, Sequence: math.MaxUint64}})
|
|
end := bytes.Index(b, []byte(strconv.FormatUint(math.MaxUint64, 10)))
|
|
// We need to force cap here to make sure this is a copy when sending a response.
|
|
mset.pubAck = b[:end:end]
|
|
|
|
// Rebuild dedupe as needed.
|
|
mset.rebuildDedupe()
|
|
|
|
// Set our stream assignment if in clustered mode.
|
|
if sa != nil {
|
|
mset.setStreamAssignment(sa)
|
|
}
|
|
|
|
// Setup our internal send go routine.
|
|
mset.setupSendCapabilities()
|
|
|
|
// Call directly to set leader if not in clustered mode.
|
|
// This can be called though before we actually setup clustering, so check both.
|
|
if singleServerMode {
|
|
if err := mset.setLeader(true); err != nil {
|
|
mset.stop(true, false)
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// This is always true in single server mode.
|
|
mset.mu.RLock()
|
|
isLeader := mset.isLeader()
|
|
mset.mu.RUnlock()
|
|
|
|
if isLeader {
|
|
// Send advisory.
|
|
var suppress bool
|
|
if !s.standAloneMode() && sa == nil {
|
|
suppress = true
|
|
} else if sa != nil {
|
|
suppress = sa.responded
|
|
}
|
|
if !suppress {
|
|
mset.sendCreateAdvisory()
|
|
}
|
|
}
|
|
|
|
return mset, nil
|
|
}
|
|
|
|
// Sets the index name. Usually just the stream name but when the stream is external we will
|
|
// use additional information in case the stream names are the same.
|
|
func (ssi *StreamSource) setIndexName() {
|
|
if ssi.External != nil {
|
|
ssi.iname = ssi.Name + ":" + string(getHash(ssi.External.ApiPrefix))
|
|
} else {
|
|
ssi.iname = ssi.Name
|
|
}
|
|
}
|
|
|
|
func (mset *stream) streamAssignment() *streamAssignment {
|
|
mset.mu.RLock()
|
|
defer mset.mu.RUnlock()
|
|
return mset.sa
|
|
}
|
|
|
|
func (mset *stream) setStreamAssignment(sa *streamAssignment) {
|
|
mset.mu.Lock()
|
|
defer mset.mu.Unlock()
|
|
mset.sa = sa
|
|
if sa == nil {
|
|
return
|
|
}
|
|
|
|
// Set our node.
|
|
mset.node = sa.Group.node
|
|
|
|
// Setup our info sub here as well for all stream members. This is now by design.
|
|
if mset.infoSub == nil {
|
|
isubj := fmt.Sprintf(clusterStreamInfoT, mset.jsa.acc(), mset.cfg.Name)
|
|
// Note below the way we subscribe here is so that we can send requests to ourselves.
|
|
mset.infoSub, _ = mset.srv.systemSubscribe(isubj, _EMPTY_, false, mset.sysc, mset.handleClusterStreamInfoRequest)
|
|
}
|
|
}
|
|
|
|
// Lock should be held.
|
|
func (mset *stream) isLeader() bool {
|
|
if mset.node != nil {
|
|
return mset.node.Leader()
|
|
}
|
|
return true
|
|
}
|
|
|
|
// TODO(dlc) - Check to see if we can accept being the leader or we should should step down.
|
|
func (mset *stream) setLeader(isLeader bool) error {
|
|
mset.mu.Lock()
|
|
// If we are here we have a change in leader status.
|
|
if isLeader {
|
|
// Make sure we are listening for sync requests.
|
|
// TODO(dlc) - Original design was that all in sync members of the group would do DQ.
|
|
mset.startClusterSubs()
|
|
// Setup subscriptions
|
|
if err := mset.subscribeToStream(); err != nil {
|
|
mset.mu.Unlock()
|
|
return err
|
|
}
|
|
} else {
|
|
// Stop responding to sync requests.
|
|
mset.stopClusterSubs()
|
|
// Unsubscribe from direct stream.
|
|
mset.unsubscribeToStream()
|
|
// Clear catchup state
|
|
mset.clearAllCatchupPeers()
|
|
}
|
|
mset.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// Lock should be held.
|
|
func (mset *stream) startClusterSubs() {
|
|
if mset.isClustered() && mset.syncSub == nil {
|
|
mset.syncSub, _ = mset.srv.systemSubscribe(mset.sa.Sync, _EMPTY_, false, mset.sysc, mset.handleClusterSyncRequest)
|
|
}
|
|
}
|
|
|
|
// Lock should be held.
|
|
func (mset *stream) stopClusterSubs() {
|
|
if mset.syncSub != nil {
|
|
mset.srv.sysUnsubscribe(mset.syncSub)
|
|
mset.syncSub = nil
|
|
}
|
|
}
|
|
|
|
// account gets the account for this stream.
|
|
func (mset *stream) account() *Account {
|
|
mset.mu.RLock()
|
|
jsa := mset.jsa
|
|
mset.mu.RUnlock()
|
|
if jsa == nil {
|
|
return nil
|
|
}
|
|
return jsa.acc()
|
|
}
|
|
|
|
// Helper to determine the max msg size for this stream if file based.
|
|
func (mset *stream) maxMsgSize() uint64 {
|
|
maxMsgSize := mset.cfg.MaxMsgSize
|
|
if maxMsgSize <= 0 {
|
|
// Pull from the account.
|
|
if mset.jsa != nil {
|
|
if acc := mset.jsa.acc(); acc != nil {
|
|
acc.mu.RLock()
|
|
maxMsgSize = acc.mpay
|
|
acc.mu.RUnlock()
|
|
}
|
|
}
|
|
// If all else fails use default.
|
|
if maxMsgSize <= 0 {
|
|
maxMsgSize = MAX_PAYLOAD_SIZE
|
|
}
|
|
}
|
|
// Now determine an estimation for the subjects etc.
|
|
maxSubject := -1
|
|
for _, subj := range mset.cfg.Subjects {
|
|
if subjectIsLiteral(subj) {
|
|
if len(subj) > maxSubject {
|
|
maxSubject = len(subj)
|
|
}
|
|
}
|
|
}
|
|
if maxSubject < 0 {
|
|
const defaultMaxSubject = 256
|
|
maxSubject = defaultMaxSubject
|
|
}
|
|
// filestore will add in estimates for record headers, etc.
|
|
return fileStoreMsgSizeEstimate(maxSubject, int(maxMsgSize))
|
|
}
|
|
|
|
// If we are file based and the file storage config was not explicitly set
|
|
// we can autotune block sizes to better match. Our target will be to store 125%
|
|
// of the theoretical limit. We will round up to nearest 100 bytes as well.
|
|
func (mset *stream) autoTuneFileStorageBlockSize(fsCfg *FileStoreConfig) {
|
|
var totalEstSize uint64
|
|
|
|
// MaxBytes will take precedence for now.
|
|
if mset.cfg.MaxBytes > 0 {
|
|
totalEstSize = uint64(mset.cfg.MaxBytes)
|
|
} else if mset.cfg.MaxMsgs > 0 {
|
|
// Determine max message size to estimate.
|
|
totalEstSize = mset.maxMsgSize() * uint64(mset.cfg.MaxMsgs)
|
|
} else {
|
|
// If nothing set will let underlying filestore determine blkSize.
|
|
return
|
|
}
|
|
|
|
blkSize := (totalEstSize / 4) + 1 // (25% overhead)
|
|
// Round up to nearest 100
|
|
if m := blkSize % 100; m != 0 {
|
|
blkSize += 100 - m
|
|
}
|
|
if blkSize < FileStoreMinBlkSize {
|
|
blkSize = FileStoreMinBlkSize
|
|
}
|
|
if blkSize > FileStoreMaxBlkSize {
|
|
blkSize = FileStoreMaxBlkSize
|
|
}
|
|
fsCfg.BlockSize = uint64(blkSize)
|
|
}
|
|
|
|
// rebuildDedupe will rebuild any dedupe structures needed after recovery of a stream.
|
|
// TODO(dlc) - Might be good to know if this should be checked at all for streams with no
|
|
// headers and msgId in them. Would need signaling from the storage layer.
|
|
func (mset *stream) rebuildDedupe() {
|
|
state := mset.store.State()
|
|
mset.lseq = state.LastSeq
|
|
|
|
// We have some messages. Lookup starting sequence by duplicate time window.
|
|
sseq := mset.store.GetSeqFromTime(time.Now().Add(-mset.cfg.Duplicates))
|
|
if sseq == 0 {
|
|
return
|
|
}
|
|
|
|
for seq := sseq; seq <= state.LastSeq; seq++ {
|
|
_, hdr, _, ts, err := mset.store.LoadMsg(seq)
|
|
var msgId string
|
|
if err == nil && len(hdr) > 0 {
|
|
if msgId = getMsgId(hdr); msgId != _EMPTY_ {
|
|
mset.storeMsgId(&ddentry{msgId, seq, ts})
|
|
}
|
|
}
|
|
if seq == state.LastSeq {
|
|
mset.lmsgId = msgId
|
|
}
|
|
}
|
|
}
|
|
|
|
func (mset *stream) lastSeq() uint64 {
|
|
mset.mu.RLock()
|
|
lseq := mset.lseq
|
|
mset.mu.RUnlock()
|
|
return lseq
|
|
}
|
|
|
|
func (mset *stream) setLastSeq(lseq uint64) {
|
|
mset.mu.Lock()
|
|
mset.lseq = lseq
|
|
mset.mu.Unlock()
|
|
}
|
|
|
|
func (mset *stream) sendCreateAdvisory() {
|
|
mset.mu.Lock()
|
|
name := mset.cfg.Name
|
|
template := mset.cfg.Template
|
|
outq := mset.outq
|
|
srv := mset.srv
|
|
mset.mu.Unlock()
|
|
|
|
if outq == nil {
|
|
return
|
|
}
|
|
|
|
// finally send an event that this stream was created
|
|
m := JSStreamActionAdvisory{
|
|
TypedEvent: TypedEvent{
|
|
Type: JSStreamActionAdvisoryType,
|
|
ID: nuid.Next(),
|
|
Time: time.Now().UTC(),
|
|
},
|
|
Stream: name,
|
|
Action: CreateEvent,
|
|
Template: template,
|
|
Domain: srv.getOpts().JetStreamDomain,
|
|
}
|
|
|
|
j, err := json.Marshal(m)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
subj := JSAdvisoryStreamCreatedPre + "." + name
|
|
outq.send(&jsPubMsg{subj, _EMPTY_, _EMPTY_, nil, j, nil, 0, nil})
|
|
}
|
|
|
|
func (mset *stream) sendDeleteAdvisoryLocked() {
|
|
if mset.outq == nil {
|
|
return
|
|
}
|
|
|
|
m := JSStreamActionAdvisory{
|
|
TypedEvent: TypedEvent{
|
|
Type: JSStreamActionAdvisoryType,
|
|
ID: nuid.Next(),
|
|
Time: time.Now().UTC(),
|
|
},
|
|
Stream: mset.cfg.Name,
|
|
Action: DeleteEvent,
|
|
Template: mset.cfg.Template,
|
|
Domain: mset.srv.getOpts().JetStreamDomain,
|
|
}
|
|
|
|
j, err := json.Marshal(m)
|
|
if err == nil {
|
|
subj := JSAdvisoryStreamDeletedPre + "." + mset.cfg.Name
|
|
mset.outq.send(&jsPubMsg{subj, _EMPTY_, _EMPTY_, nil, j, nil, 0, nil})
|
|
}
|
|
}
|
|
|
|
func (mset *stream) sendUpdateAdvisoryLocked() {
|
|
if mset.outq == nil {
|
|
return
|
|
}
|
|
|
|
m := JSStreamActionAdvisory{
|
|
TypedEvent: TypedEvent{
|
|
Type: JSStreamActionAdvisoryType,
|
|
ID: nuid.Next(),
|
|
Time: time.Now().UTC(),
|
|
},
|
|
Stream: mset.cfg.Name,
|
|
Action: ModifyEvent,
|
|
Domain: mset.srv.getOpts().JetStreamDomain,
|
|
}
|
|
|
|
j, err := json.Marshal(m)
|
|
if err == nil {
|
|
subj := JSAdvisoryStreamUpdatedPre + "." + mset.cfg.Name
|
|
mset.outq.send(&jsPubMsg{subj, _EMPTY_, _EMPTY_, nil, j, nil, 0, nil})
|
|
}
|
|
}
|
|
|
|
// Created returns created time.
|
|
func (mset *stream) createdTime() time.Time {
|
|
mset.mu.RLock()
|
|
created := mset.created
|
|
mset.mu.RUnlock()
|
|
return created
|
|
}
|
|
|
|
// Internal to allow creation time to be restored.
|
|
func (mset *stream) setCreatedTime(created time.Time) {
|
|
mset.mu.Lock()
|
|
mset.created = created
|
|
mset.mu.Unlock()
|
|
}
|
|
|
|
// Check to see if these subjects overlap with existing subjects.
|
|
// Lock should be held.
|
|
func (jsa *jsAccount) subjectsOverlap(subjects []string) bool {
|
|
for _, mset := range jsa.streams {
|
|
for _, subj := range mset.cfg.Subjects {
|
|
for _, tsubj := range subjects {
|
|
if SubjectsCollide(tsubj, subj) {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// StreamDefaultDuplicatesWindow default duplicates window.
|
|
const StreamDefaultDuplicatesWindow = 2 * time.Minute
|
|
|
|
func checkStreamCfg(config *StreamConfig) (StreamConfig, error) {
|
|
if config == nil {
|
|
return StreamConfig{}, fmt.Errorf("stream configuration invalid")
|
|
}
|
|
if !isValidName(config.Name) {
|
|
return StreamConfig{}, fmt.Errorf("stream name is required and can not contain '.', '*', '>'")
|
|
}
|
|
if len(config.Name) > JSMaxNameLen {
|
|
return StreamConfig{}, fmt.Errorf("stream name is too long, maximum allowed is %d", JSMaxNameLen)
|
|
}
|
|
cfg := *config
|
|
|
|
// Make file the default.
|
|
if cfg.Storage == 0 {
|
|
cfg.Storage = FileStorage
|
|
}
|
|
if cfg.Replicas == 0 {
|
|
cfg.Replicas = 1
|
|
}
|
|
if cfg.Replicas > StreamMaxReplicas {
|
|
return cfg, fmt.Errorf("maximum replicas is %d", StreamMaxReplicas)
|
|
}
|
|
if cfg.MaxMsgs == 0 {
|
|
cfg.MaxMsgs = -1
|
|
}
|
|
if cfg.MaxBytes == 0 {
|
|
cfg.MaxBytes = -1
|
|
}
|
|
if cfg.MaxMsgSize == 0 {
|
|
cfg.MaxMsgSize = -1
|
|
}
|
|
if cfg.MaxConsumers == 0 {
|
|
cfg.MaxConsumers = -1
|
|
}
|
|
if cfg.Duplicates == 0 {
|
|
if cfg.MaxAge != 0 && cfg.MaxAge < StreamDefaultDuplicatesWindow {
|
|
cfg.Duplicates = cfg.MaxAge
|
|
} else {
|
|
cfg.Duplicates = StreamDefaultDuplicatesWindow
|
|
}
|
|
} else if cfg.Duplicates < 0 {
|
|
return StreamConfig{}, fmt.Errorf("duplicates window can not be negative")
|
|
}
|
|
// Check that duplicates is not larger then age if set.
|
|
if cfg.MaxAge != 0 && cfg.Duplicates > cfg.MaxAge {
|
|
return StreamConfig{}, fmt.Errorf("duplicates window can not be larger then max age")
|
|
}
|
|
|
|
if len(cfg.Subjects) == 0 {
|
|
if cfg.Mirror == nil && len(cfg.Sources) == 0 {
|
|
cfg.Subjects = append(cfg.Subjects, cfg.Name)
|
|
}
|
|
} else {
|
|
// We can allow overlaps, but don't allow direct duplicates.
|
|
dset := make(map[string]struct{}, len(cfg.Subjects))
|
|
for _, subj := range cfg.Subjects {
|
|
if _, ok := dset[subj]; ok {
|
|
return StreamConfig{}, fmt.Errorf("duplicate subjects detected")
|
|
}
|
|
// Also check to make sure we do not overlap with our $JS API subjects.
|
|
if subjectIsSubsetMatch(subj, "$JS.API.>") {
|
|
return StreamConfig{}, fmt.Errorf("subjects overlap with jetstream api")
|
|
}
|
|
|
|
dset[subj] = struct{}{}
|
|
}
|
|
}
|
|
return cfg, nil
|
|
}
|
|
|
|
// Config returns the stream's configuration.
|
|
func (mset *stream) config() StreamConfig {
|
|
mset.mu.RLock()
|
|
defer mset.mu.RUnlock()
|
|
return mset.cfg
|
|
}
|
|
|
|
func (mset *stream) fileStoreConfig() (FileStoreConfig, error) {
|
|
mset.mu.Lock()
|
|
defer mset.mu.Unlock()
|
|
fs, ok := mset.store.(*fileStore)
|
|
if !ok {
|
|
return FileStoreConfig{}, ErrStoreWrongType
|
|
}
|
|
return fs.fileStoreConfig(), nil
|
|
}
|
|
|
|
func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig) (*StreamConfig, error) {
|
|
cfg, err := checkStreamCfg(new)
|
|
if err != nil {
|
|
return nil, ApiErrors[JSStreamInvalidConfigF].ErrOrNewT(err, "{err}", err)
|
|
}
|
|
|
|
// Name must match.
|
|
if cfg.Name != old.Name {
|
|
return nil, ApiErrors[JSStreamInvalidConfigF].NewT("{err}", "stream configuration name must match original")
|
|
}
|
|
// Can't change MaxConsumers for now.
|
|
if cfg.MaxConsumers != old.MaxConsumers {
|
|
return nil, ApiErrors[JSStreamInvalidConfigF].NewT("{err}", "stream configuration update can not change MaxConsumers")
|
|
}
|
|
// Can't change storage types.
|
|
if cfg.Storage != old.Storage {
|
|
return nil, ApiErrors[JSStreamInvalidConfigF].NewT("{err}", "stream configuration update can not change storage type")
|
|
}
|
|
// Can't change retention.
|
|
if cfg.Retention != old.Retention {
|
|
return nil, ApiErrors[JSStreamInvalidConfigF].NewT("{err}", "stream configuration update can not change retention policy")
|
|
}
|
|
// Can not have a template owner for now.
|
|
if old.Template != _EMPTY_ {
|
|
return nil, ApiErrors[JSStreamInvalidConfigF].NewT("{err}", "stream configuration update not allowed on template owned stream")
|
|
}
|
|
if cfg.Template != _EMPTY_ {
|
|
return nil, ApiErrors[JSStreamInvalidConfigF].NewT("{err}", "stream configuration update can not be owned by a template")
|
|
}
|
|
|
|
// Check limits.
|
|
if err := jsa.checkLimits(&cfg); err != nil {
|
|
return nil, err
|
|
}
|
|
return &cfg, nil
|
|
}
|
|
|
|
// Update will allow certain configuration properties of an existing stream to be updated.
|
|
func (mset *stream) update(config *StreamConfig) error {
|
|
ocfg := mset.config()
|
|
cfg, err := mset.jsa.configUpdateCheck(&ocfg, config)
|
|
if err != nil {
|
|
return ApiErrors[JSStreamInvalidConfigF].ErrOrNewT(err, "{err}", err)
|
|
}
|
|
|
|
mset.mu.Lock()
|
|
if mset.isLeader() {
|
|
// Now check for subject interest differences.
|
|
current := make(map[string]struct{}, len(ocfg.Subjects))
|
|
for _, s := range ocfg.Subjects {
|
|
current[s] = struct{}{}
|
|
}
|
|
// Update config with new values. The store update will enforce any stricter limits.
|
|
|
|
// Now walk new subjects. All of these need to be added, but we will check
|
|
// the originals first, since if it is in there we can skip, already added.
|
|
for _, s := range cfg.Subjects {
|
|
if _, ok := current[s]; !ok {
|
|
if _, err := mset.subscribeInternal(s, mset.processInboundJetStreamMsg); err != nil {
|
|
mset.mu.Unlock()
|
|
return err
|
|
}
|
|
}
|
|
delete(current, s)
|
|
}
|
|
// What is left in current needs to be deleted.
|
|
for s := range current {
|
|
if err := mset.unsubscribeInternal(s); err != nil {
|
|
mset.mu.Unlock()
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Check for the Duplicates
|
|
if cfg.Duplicates != ocfg.Duplicates && mset.ddtmr != nil {
|
|
// Let it fire right away, it will adjust properly on purge.
|
|
mset.ddtmr.Reset(time.Microsecond)
|
|
}
|
|
|
|
// Check for Sources.
|
|
if len(cfg.Sources) > 0 || len(ocfg.Sources) > 0 {
|
|
current := make(map[string]struct{})
|
|
for _, s := range ocfg.Sources {
|
|
current[s.Name] = struct{}{}
|
|
}
|
|
for _, s := range cfg.Sources {
|
|
s.setIndexName()
|
|
if _, ok := current[s.iname]; !ok {
|
|
if mset.sources == nil {
|
|
mset.sources = make(map[string]*sourceInfo)
|
|
}
|
|
mset.cfg.Sources = append(mset.cfg.Sources, s)
|
|
si := &sourceInfo{name: s.Name, iname: s.iname, msgs: &inbound{mch: make(chan struct{}, 1)}}
|
|
mset.sources[s.iname] = si
|
|
mset.setStartingSequenceForSource(s.iname)
|
|
mset.setSourceConsumer(s.iname, si.sseq+1)
|
|
}
|
|
delete(current, s.Name)
|
|
}
|
|
// What is left in current needs to be deleted.
|
|
for iname := range current {
|
|
mset.cancelSourceConsumer(iname)
|
|
delete(mset.sources, iname)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Now update config and store's version of our config.
|
|
mset.cfg = *cfg
|
|
|
|
var suppress bool
|
|
if mset.isClustered() && mset.sa != nil {
|
|
suppress = mset.sa.responded
|
|
}
|
|
if mset.isLeader() && !suppress {
|
|
mset.sendUpdateAdvisoryLocked()
|
|
}
|
|
mset.mu.Unlock()
|
|
|
|
mset.store.UpdateConfig(cfg)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Purge will remove all messages from the stream and underlying store.
|
|
func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err error) {
|
|
mset.mu.Lock()
|
|
if mset.client == nil {
|
|
mset.mu.Unlock()
|
|
return 0, errors.New("stream closed")
|
|
}
|
|
// Purge dedupe.
|
|
mset.ddmap = nil
|
|
var _obs [4]*consumer
|
|
obs := _obs[:0]
|
|
for _, o := range mset.consumers {
|
|
obs = append(obs, o)
|
|
}
|
|
mset.mu.Unlock()
|
|
|
|
if preq != nil {
|
|
purged, err = mset.store.PurgeEx(preq.Subject, preq.Sequence, preq.Keep)
|
|
} else {
|
|
purged, err = mset.store.Purge()
|
|
}
|
|
if err != nil {
|
|
return purged, err
|
|
}
|
|
|
|
var state StreamState
|
|
mset.store.FastState(&state)
|
|
for _, o := range obs {
|
|
o.purge(state.FirstSeq)
|
|
}
|
|
return purged, nil
|
|
}
|
|
|
|
// RemoveMsg will remove a message from a stream.
|
|
// FIXME(dlc) - Should pick one and be consistent.
|
|
func (mset *stream) removeMsg(seq uint64) (bool, error) {
|
|
return mset.deleteMsg(seq)
|
|
}
|
|
|
|
// DeleteMsg will remove a message from a stream.
|
|
func (mset *stream) deleteMsg(seq uint64) (bool, error) {
|
|
mset.mu.RLock()
|
|
if mset.client == nil {
|
|
mset.mu.RUnlock()
|
|
return false, fmt.Errorf("invalid stream")
|
|
}
|
|
mset.mu.RUnlock()
|
|
return mset.store.RemoveMsg(seq)
|
|
}
|
|
|
|
// EraseMsg will securely remove a message and rewrite the data with random data.
|
|
func (mset *stream) eraseMsg(seq uint64) (bool, error) {
|
|
mset.mu.RLock()
|
|
if mset.client == nil {
|
|
mset.mu.RUnlock()
|
|
return false, fmt.Errorf("invalid stream")
|
|
}
|
|
mset.mu.RUnlock()
|
|
return mset.store.EraseMsg(seq)
|
|
}
|
|
|
|
// Are we a mirror?
|
|
func (mset *stream) isMirror() bool {
|
|
mset.mu.RLock()
|
|
defer mset.mu.RUnlock()
|
|
return mset.cfg.Mirror != nil
|
|
}
|
|
|
|
func (mset *stream) hasSources() bool {
|
|
mset.mu.RLock()
|
|
defer mset.mu.RUnlock()
|
|
return len(mset.sources) > 0
|
|
}
|
|
|
|
func (mset *stream) sourcesInfo() (sis []*StreamSourceInfo) {
|
|
mset.mu.RLock()
|
|
defer mset.mu.RUnlock()
|
|
for _, si := range mset.sources {
|
|
sis = append(sis, mset.sourceInfo(si))
|
|
}
|
|
return sis
|
|
}
|
|
|
|
func (mset *stream) allSubjects() ([]string, bool) {
|
|
subjects, cfg, acc := mset.subjects(), mset.config(), mset.account()
|
|
|
|
var hasExt bool
|
|
var seen map[string]bool
|
|
|
|
if cfg.Mirror != nil {
|
|
var subjs []string
|
|
seen = make(map[string]bool)
|
|
subjs, hasExt = acc.streamSourceSubjects(cfg.Mirror, seen)
|
|
if len(subjs) > 0 {
|
|
subjects = append(subjects, subjs...)
|
|
}
|
|
} else if len(cfg.Sources) > 0 {
|
|
var subjs []string
|
|
seen = make(map[string]bool)
|
|
for _, si := range cfg.Sources {
|
|
subjs, hasExt = acc.streamSourceSubjects(si, seen)
|
|
if len(subjs) > 0 {
|
|
subjects = append(subjects, subjs...)
|
|
}
|
|
}
|
|
}
|
|
|
|
return subjects, hasExt
|
|
}
|
|
|
|
// Return the subjects for a stream source.
|
|
func (a *Account) streamSourceSubjects(ss *StreamSource, seen map[string]bool) (subjects []string, hasExt bool) {
|
|
if ss != nil && ss.External != nil {
|
|
return nil, true
|
|
}
|
|
|
|
s, js, _ := a.getJetStreamFromAccount()
|
|
|
|
if !s.JetStreamIsClustered() {
|
|
return a.streamSourceSubjectsNotClustered(ss.Name, seen)
|
|
} else {
|
|
return js.streamSourceSubjectsClustered(a.Name, ss.Name, seen)
|
|
}
|
|
}
|
|
|
|
func (js *jetStream) streamSourceSubjectsClustered(accountName, streamName string, seen map[string]bool) (subjects []string, hasExt bool) {
|
|
if seen[streamName] {
|
|
return nil, false
|
|
}
|
|
|
|
// We are clustered here so need to work through stream assignments.
|
|
sa := js.streamAssignment(accountName, streamName)
|
|
if sa == nil {
|
|
return nil, false
|
|
}
|
|
seen[streamName] = true
|
|
|
|
js.mu.RLock()
|
|
cfg := sa.Config
|
|
if len(cfg.Subjects) > 0 {
|
|
subjects = append(subjects, cfg.Subjects...)
|
|
}
|
|
|
|
// Check if we need to keep going.
|
|
var sources []*StreamSource
|
|
if cfg.Mirror != nil {
|
|
sources = append(sources, cfg.Mirror)
|
|
} else if len(cfg.Sources) > 0 {
|
|
sources = append(sources, cfg.Sources...)
|
|
}
|
|
js.mu.RUnlock()
|
|
|
|
if len(sources) > 0 {
|
|
var subjs []string
|
|
if acc, err := js.srv.lookupAccount(accountName); err == nil {
|
|
for _, ss := range sources {
|
|
subjs, hasExt = acc.streamSourceSubjects(ss, seen)
|
|
if len(subjs) > 0 {
|
|
subjects = append(subjects, subjs...)
|
|
}
|
|
if hasExt {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return subjects, hasExt
|
|
}
|
|
|
|
func (a *Account) streamSourceSubjectsNotClustered(streamName string, seen map[string]bool) (subjects []string, hasExt bool) {
|
|
if seen[streamName] {
|
|
return nil, false
|
|
}
|
|
|
|
mset, err := a.lookupStream(streamName)
|
|
if err != nil {
|
|
return nil, false
|
|
}
|
|
seen[streamName] = true
|
|
|
|
cfg := mset.config()
|
|
if len(cfg.Subjects) > 0 {
|
|
subjects = append(subjects, cfg.Subjects...)
|
|
}
|
|
|
|
var subjs []string
|
|
if cfg.Mirror != nil {
|
|
subjs, hasExt = a.streamSourceSubjects(cfg.Mirror, seen)
|
|
if len(subjs) > 0 {
|
|
subjects = append(subjects, subjs...)
|
|
}
|
|
} else if len(cfg.Sources) > 0 {
|
|
for _, si := range cfg.Sources {
|
|
subjs, hasExt = a.streamSourceSubjects(si, seen)
|
|
if len(subjs) > 0 {
|
|
subjects = append(subjects, subjs...)
|
|
}
|
|
if hasExt {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
return subjects, hasExt
|
|
}
|
|
|
|
// Lock should be held
|
|
func (mset *stream) sourceInfo(si *sourceInfo) *StreamSourceInfo {
|
|
if si == nil {
|
|
return nil
|
|
}
|
|
ssi := &StreamSourceInfo{Name: si.name, Lag: si.lag, Active: time.Since(si.last), Error: si.err}
|
|
var ext *ExternalStream
|
|
if mset.cfg.Mirror != nil {
|
|
ext = mset.cfg.Mirror.External
|
|
} else if ss := mset.streamSource(si.iname); ss != nil && ss.External != nil {
|
|
ext = ss.External
|
|
}
|
|
if ext != nil {
|
|
ssi.External = &ExternalStream{
|
|
ApiPrefix: ext.ApiPrefix,
|
|
DeliverPrefix: ext.DeliverPrefix,
|
|
}
|
|
}
|
|
return ssi
|
|
}
|
|
|
|
// Return our source info for our mirror.
|
|
func (mset *stream) mirrorInfo() *StreamSourceInfo {
|
|
mset.mu.RLock()
|
|
defer mset.mu.RUnlock()
|
|
return mset.sourceInfo(mset.mirror)
|
|
}
|
|
|
|
const sourceHealthCheckInterval = 2 * time.Second
|
|
|
|
// Will run as a Go routine to process mirror consumer messages.
|
|
func (mset *stream) processMirrorMsgs() {
|
|
s := mset.srv
|
|
defer s.grWG.Done()
|
|
defer func() {
|
|
mset.mu.Lock()
|
|
if mset.mirror != nil {
|
|
mset.mirror.grr = false
|
|
if mset.mirror.qch != nil {
|
|
close(mset.mirror.qch)
|
|
mset.mirror.qch = nil
|
|
}
|
|
}
|
|
mset.mu.Unlock()
|
|
}()
|
|
|
|
// Grab stream quit channel.
|
|
mset.mu.Lock()
|
|
if mset.mirror == nil {
|
|
mset.mu.Unlock()
|
|
return
|
|
}
|
|
msgs, mch, qch, siqch := mset.mirror.msgs, mset.mirror.msgs.mch, mset.qch, mset.mirror.qch
|
|
// Set the last seen as now so that we don't fail at the first check.
|
|
mset.mirror.last = time.Now()
|
|
mset.mu.Unlock()
|
|
|
|
t := time.NewTicker(sourceHealthCheckInterval)
|
|
defer t.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-s.quitCh:
|
|
return
|
|
case <-qch:
|
|
return
|
|
case <-siqch:
|
|
return
|
|
case <-mch:
|
|
for im := mset.pending(msgs); im != nil; im = im.next {
|
|
if !mset.processInboundMirrorMsg(im) {
|
|
break
|
|
}
|
|
}
|
|
case <-t.C:
|
|
mset.mu.RLock()
|
|
isLeader := mset.isLeader()
|
|
stalled := mset.mirror != nil && time.Since(mset.mirror.last) > 3*sourceHealthCheckInterval
|
|
mset.mu.RUnlock()
|
|
// No longer leader.
|
|
if !isLeader {
|
|
mset.cancelMirrorConsumer()
|
|
return
|
|
}
|
|
// We are stalled.
|
|
if stalled {
|
|
mset.retryMirrorConsumer()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Checks that the message is from our current direct consumer. We can not depend on sub comparison
|
|
// since cross account imports break.
|
|
func (si *sourceInfo) isCurrentSub(reply string) bool {
|
|
return si.cname != _EMPTY_ && strings.HasPrefix(reply, jsAckPre) && si.cname == tokenAt(reply, 4)
|
|
}
|
|
|
|
// processInboundMirrorMsg handles processing messages bound for a stream.
|
|
func (mset *stream) processInboundMirrorMsg(m *inMsg) bool {
|
|
mset.mu.Lock()
|
|
if mset.mirror == nil {
|
|
mset.mu.Unlock()
|
|
return false
|
|
}
|
|
if !mset.isLeader() {
|
|
mset.mu.Unlock()
|
|
mset.cancelMirrorConsumer()
|
|
return false
|
|
}
|
|
|
|
isControl := m.isControlMsg()
|
|
|
|
// Ignore from old subscriptions.
|
|
// The reason we can not just compare subs is that on cross account imports they will not match.
|
|
if !mset.mirror.isCurrentSub(m.rply) && !isControl {
|
|
mset.mu.Unlock()
|
|
return false
|
|
}
|
|
|
|
mset.mirror.last = time.Now()
|
|
node := mset.node
|
|
|
|
// Check for heartbeats and flow control messages.
|
|
if isControl {
|
|
var needsRetry bool
|
|
// Flow controls have reply subjects.
|
|
if m.rply != _EMPTY_ {
|
|
mset.handleFlowControl(mset.mirror, m)
|
|
} else {
|
|
// For idle heartbeats make sure we did not miss anything.
|
|
if ldseq := parseInt64(getHeader(JSLastConsumerSeq, m.hdr)); ldseq > 0 && uint64(ldseq) != mset.mirror.dseq {
|
|
needsRetry = true
|
|
}
|
|
}
|
|
mset.mu.Unlock()
|
|
if needsRetry {
|
|
mset.retryMirrorConsumer()
|
|
}
|
|
return !needsRetry
|
|
}
|
|
|
|
sseq, dseq, dc, ts, pending := replyInfo(m.rply)
|
|
|
|
if dc > 1 {
|
|
mset.mu.Unlock()
|
|
return false
|
|
}
|
|
|
|
// Mirror info tracking.
|
|
olag, osseq, odseq, oclseq := mset.mirror.lag, mset.mirror.sseq, mset.mirror.dseq, mset.mirror.clseq
|
|
if sseq == mset.mirror.sseq+1 {
|
|
mset.mirror.dseq = dseq
|
|
mset.mirror.sseq++
|
|
} else if sseq <= mset.mirror.sseq {
|
|
// Ignore older messages.
|
|
mset.mu.Unlock()
|
|
return true
|
|
} else if mset.mirror.cname == _EMPTY_ {
|
|
mset.mirror.cname = tokenAt(m.rply, 4)
|
|
mset.mirror.dseq, mset.mirror.sseq = dseq, sseq
|
|
} else {
|
|
// If the deliver sequence matches then the upstream stream has expired or deleted messages.
|
|
if dseq == mset.mirror.dseq+1 {
|
|
mset.skipMsgs(mset.mirror.sseq+1, sseq-1)
|
|
mset.mirror.dseq++
|
|
mset.mirror.sseq = sseq
|
|
} else {
|
|
mset.mu.Unlock()
|
|
mset.retryMirrorConsumer()
|
|
return false
|
|
}
|
|
}
|
|
|
|
if pending == 0 {
|
|
mset.mirror.lag = 0
|
|
} else {
|
|
mset.mirror.lag = pending - 1
|
|
}
|
|
|
|
mset.mirror.clseq = sseq - 1
|
|
js, stype := mset.js, mset.cfg.Storage
|
|
mset.mu.Unlock()
|
|
|
|
s := mset.srv
|
|
var err error
|
|
if node != nil {
|
|
if js.limitsExceeded(stype) {
|
|
s.resourcesExeededError()
|
|
err = ApiErrors[JSInsufficientResourcesErr]
|
|
} else {
|
|
err = node.Propose(encodeStreamMsg(m.subj, _EMPTY_, m.hdr, m.msg, sseq-1, ts))
|
|
}
|
|
} else {
|
|
err = mset.processJetStreamMsg(m.subj, _EMPTY_, m.hdr, m.msg, sseq-1, ts)
|
|
}
|
|
if err != nil {
|
|
if err == errLastSeqMismatch {
|
|
// We may have missed messages, restart.
|
|
if sseq <= mset.lastSeq() {
|
|
mset.mu.Lock()
|
|
mset.mirror.lag = olag
|
|
mset.mirror.sseq = osseq
|
|
mset.mirror.dseq = odseq
|
|
mset.mirror.clseq = oclseq
|
|
mset.mu.Unlock()
|
|
return false
|
|
} else {
|
|
mset.mu.Lock()
|
|
mset.mirror.dseq = odseq
|
|
mset.mirror.sseq = osseq
|
|
mset.mu.Unlock()
|
|
mset.retryMirrorConsumer()
|
|
}
|
|
} else {
|
|
s.Warnf("Got error processing JetStream mirror msg: %v", err)
|
|
}
|
|
if strings.Contains(err.Error(), "no space left") {
|
|
s.Errorf("JetStream out of space, will be DISABLED")
|
|
s.DisableJetStream()
|
|
}
|
|
}
|
|
return err == nil
|
|
}
|
|
|
|
func (mset *stream) setMirrorErr(err *ApiError) {
|
|
mset.mu.Lock()
|
|
if mset.mirror != nil {
|
|
mset.mirror.err = err
|
|
}
|
|
mset.mu.Unlock()
|
|
}
|
|
|
|
func (mset *stream) cancelMirrorConsumer() {
|
|
mset.mu.Lock()
|
|
defer mset.mu.Unlock()
|
|
if mset.mirror == nil {
|
|
return
|
|
}
|
|
if mset.mirror.sub != nil {
|
|
mset.unsubscribe(mset.mirror.sub)
|
|
mset.mirror.sub = nil
|
|
}
|
|
mset.removeInternalConsumer(mset.mirror)
|
|
// If the go routine is still running close the quit chan.
|
|
if mset.mirror.qch != nil {
|
|
close(mset.mirror.qch)
|
|
mset.mirror.qch = nil
|
|
}
|
|
}
|
|
|
|
func (mset *stream) retryMirrorConsumer() error {
|
|
mset.mu.Lock()
|
|
defer mset.mu.Unlock()
|
|
mset.srv.Debugf("Retrying mirror consumer for '%s > %s'", mset.acc.Name, mset.cfg.Name)
|
|
return mset.setupMirrorConsumer()
|
|
}
|
|
|
|
// Lock should be held.
|
|
func (mset *stream) skipMsgs(start, end uint64) {
|
|
node, store := mset.node, mset.store
|
|
var entries []*Entry
|
|
for seq := start; seq <= end; seq++ {
|
|
if node != nil {
|
|
entries = append(entries, &Entry{EntryNormal, encodeStreamMsg(_EMPTY_, _EMPTY_, nil, nil, seq-1, 0)})
|
|
// So a single message does not get too big.
|
|
if len(entries) > 10_000 {
|
|
node.ProposeDirect(entries)
|
|
entries = entries[:0]
|
|
}
|
|
} else {
|
|
mset.lseq = store.SkipMsg()
|
|
}
|
|
}
|
|
// Send all at once.
|
|
if node != nil && len(entries) > 0 {
|
|
node.ProposeDirect(entries)
|
|
}
|
|
}
|
|
|
|
// Setup our mirror consumer.
|
|
// Lock should be held.
|
|
func (mset *stream) setupMirrorConsumer() error {
|
|
if mset.outq == nil {
|
|
return errors.New("outq required")
|
|
}
|
|
|
|
isReset := mset.mirror != nil
|
|
|
|
// Reset
|
|
if isReset {
|
|
if mset.mirror.sub != nil {
|
|
mset.unsubscribe(mset.mirror.sub)
|
|
mset.mirror.sub = nil
|
|
mset.mirror.dseq = 0
|
|
mset.mirror.sseq = mset.lseq
|
|
}
|
|
// Make sure to delete any prior consumers if we know about them.
|
|
mset.removeInternalConsumer(mset.mirror)
|
|
|
|
// If we are no longer the leader stop trying.
|
|
if !mset.isLeader() {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Determine subjects etc.
|
|
var deliverSubject string
|
|
ext := mset.cfg.Mirror.External
|
|
|
|
if ext != nil && ext.DeliverPrefix != _EMPTY_ {
|
|
deliverSubject = strings.ReplaceAll(ext.DeliverPrefix+syncSubject(".M"), "..", ".")
|
|
} else {
|
|
deliverSubject = syncSubject("$JS.M")
|
|
}
|
|
|
|
if !isReset {
|
|
mset.mirror = &sourceInfo{name: mset.cfg.Mirror.Name, msgs: &inbound{mch: make(chan struct{}, 1)}}
|
|
}
|
|
|
|
if !mset.mirror.grr {
|
|
mset.mirror.grr = true
|
|
mset.mirror.qch = make(chan struct{})
|
|
mset.srv.startGoRoutine(func() { mset.processMirrorMsgs() })
|
|
}
|
|
|
|
// We want to throttle here in terms of how fast we request new consumers.
|
|
if time.Since(mset.mirror.lreq) < 2*time.Second {
|
|
return nil
|
|
}
|
|
mset.mirror.lreq = time.Now()
|
|
|
|
// Now send off request to create/update our consumer. This will be all API based even in single server mode.
|
|
// We calculate durable names apriori so we do not need to save them off.
|
|
|
|
var state StreamState
|
|
mset.store.FastState(&state)
|
|
|
|
req := &CreateConsumerRequest{
|
|
Stream: mset.cfg.Mirror.Name,
|
|
Config: ConsumerConfig{
|
|
DeliverSubject: deliverSubject,
|
|
DeliverPolicy: DeliverByStartSequence,
|
|
OptStartSeq: state.LastSeq + 1,
|
|
AckPolicy: AckNone,
|
|
AckWait: 22 * time.Hour,
|
|
MaxDeliver: 1,
|
|
Heartbeat: sourceHealthCheckInterval,
|
|
FlowControl: true,
|
|
Direct: true,
|
|
},
|
|
}
|
|
|
|
// Only use start optionals on first time.
|
|
if state.Msgs == 0 && state.FirstSeq == 0 {
|
|
req.Config.OptStartSeq = 0
|
|
if mset.cfg.Mirror.OptStartSeq > 0 {
|
|
req.Config.OptStartSeq = mset.cfg.Mirror.OptStartSeq
|
|
} else if mset.cfg.Mirror.OptStartTime != nil {
|
|
req.Config.OptStartTime = mset.cfg.Mirror.OptStartTime
|
|
req.Config.DeliverPolicy = DeliverByStartTime
|
|
}
|
|
}
|
|
if req.Config.OptStartSeq == 0 && req.Config.OptStartTime == nil {
|
|
// If starting out and lastSeq is 0.
|
|
req.Config.DeliverPolicy = DeliverAll
|
|
}
|
|
|
|
respCh := make(chan *JSApiConsumerCreateResponse, 1)
|
|
reply := infoReplySubject()
|
|
crSub, _ := mset.subscribeInternal(reply, func(sub *subscription, c *client, subject, reply string, rmsg []byte) {
|
|
mset.unsubscribeUnlocked(sub)
|
|
_, msg := c.msgParts(rmsg)
|
|
|
|
var ccr JSApiConsumerCreateResponse
|
|
if err := json.Unmarshal(msg, &ccr); err != nil {
|
|
c.Warnf("JetStream bad mirror consumer create response: %q", msg)
|
|
mset.cancelMirrorConsumer()
|
|
mset.setMirrorErr(ApiErrors[JSInvalidJSONErr])
|
|
return
|
|
}
|
|
respCh <- &ccr
|
|
})
|
|
|
|
b, _ := json.Marshal(req)
|
|
subject := fmt.Sprintf(JSApiConsumerCreateT, mset.cfg.Mirror.Name)
|
|
if ext != nil {
|
|
subject = strings.Replace(subject, JSApiPrefix, ext.ApiPrefix, 1)
|
|
subject = strings.ReplaceAll(subject, "..", ".")
|
|
}
|
|
|
|
mset.outq.send(&jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0, nil})
|
|
|
|
go func() {
|
|
select {
|
|
case ccr := <-respCh:
|
|
if ccr.Error != nil || ccr.ConsumerInfo == nil {
|
|
mset.cancelMirrorConsumer()
|
|
} else {
|
|
mset.mu.Lock()
|
|
// Mirror config has been removed.
|
|
if mset.mirror == nil {
|
|
mset.mu.Unlock()
|
|
mset.cancelMirrorConsumer()
|
|
return
|
|
}
|
|
|
|
// When an upstream stream expires messages or in general has messages that we want
|
|
// that are no longer available we need to adjust here.
|
|
var state StreamState
|
|
mset.store.FastState(&state)
|
|
|
|
// Check if we need to skip messages.
|
|
if state.LastSeq != ccr.ConsumerInfo.Delivered.Stream {
|
|
mset.skipMsgs(state.LastSeq+1, ccr.ConsumerInfo.Delivered.Stream)
|
|
}
|
|
|
|
// Capture consumer name.
|
|
mset.mirror.cname = ccr.ConsumerInfo.Name
|
|
msgs := mset.mirror.msgs
|
|
|
|
// Process inbound mirror messages from the wire.
|
|
sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, subject, reply string, rmsg []byte) {
|
|
hdr, msg := c.msgParts(append(rmsg[:0:0], rmsg...)) // Need to copy.
|
|
mset.queueInbound(msgs, subject, reply, hdr, msg)
|
|
})
|
|
if err != nil {
|
|
mset.mirror.err = ApiErrors[JSMirrorConsumerSetupFailedErrF].ErrOrNewT(err, "{err}", err)
|
|
mset.mirror.sub = nil
|
|
mset.mirror.cname = _EMPTY_
|
|
} else {
|
|
mset.mirror.err = nil
|
|
mset.mirror.sub = sub
|
|
mset.mirror.last = time.Now()
|
|
mset.mirror.dseq = 0
|
|
mset.mirror.sseq = ccr.ConsumerInfo.Delivered.Stream
|
|
}
|
|
mset.mu.Unlock()
|
|
}
|
|
mset.setMirrorErr(ccr.Error)
|
|
case <-time.After(10 * time.Second):
|
|
mset.unsubscribeUnlocked(crSub)
|
|
return
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (mset *stream) streamSource(iname string) *StreamSource {
|
|
for _, ssi := range mset.cfg.Sources {
|
|
if ssi.iname == iname {
|
|
return ssi
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (mset *stream) retrySourceConsumer(sname string) {
|
|
mset.mu.Lock()
|
|
defer mset.mu.Unlock()
|
|
|
|
si := mset.sources[sname]
|
|
if si == nil {
|
|
return
|
|
}
|
|
mset.setStartingSequenceForSource(sname)
|
|
mset.retrySourceConsumerAtSeq(sname, si.sseq+1)
|
|
}
|
|
|
|
// Lock should be held.
|
|
func (mset *stream) retrySourceConsumerAtSeq(sname string, seq uint64) {
|
|
if mset.client == nil {
|
|
return
|
|
}
|
|
s := mset.srv
|
|
|
|
s.Debugf("Retrying source consumer for '%s > %s'", mset.acc.Name, mset.cfg.Name)
|
|
|
|
// No longer configured.
|
|
if si := mset.sources[sname]; si == nil {
|
|
return
|
|
}
|
|
mset.setSourceConsumer(sname, seq)
|
|
}
|
|
|
|
// Lock should be held.
|
|
func (mset *stream) cancelSourceConsumer(sname string) {
|
|
if si := mset.sources[sname]; si != nil && si.sub != nil {
|
|
mset.unsubscribe(si.sub)
|
|
si.sub = nil
|
|
si.sseq, si.dseq = 0, 0
|
|
mset.removeInternalConsumer(si)
|
|
// If the go routine is still running close the quit chan.
|
|
if si.qch != nil {
|
|
close(si.qch)
|
|
si.qch = nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// Lock should be held.
|
|
func (mset *stream) setSourceConsumer(iname string, seq uint64) {
|
|
si := mset.sources[iname]
|
|
if si == nil {
|
|
return
|
|
}
|
|
if si.sub != nil {
|
|
mset.unsubscribe(si.sub)
|
|
si.sub = nil
|
|
}
|
|
// Need to delete the old one.
|
|
mset.removeInternalConsumer(si)
|
|
|
|
si.sseq, si.dseq = seq, 0
|
|
si.last = time.Now()
|
|
ssi := mset.streamSource(iname)
|
|
|
|
// Determine subjects etc.
|
|
var deliverSubject string
|
|
ext := ssi.External
|
|
|
|
if ext != nil && ext.DeliverPrefix != _EMPTY_ {
|
|
deliverSubject = strings.ReplaceAll(ext.DeliverPrefix+syncSubject(".S"), "..", ".")
|
|
} else {
|
|
deliverSubject = syncSubject("$JS.S")
|
|
}
|
|
|
|
if !si.grr {
|
|
si.grr = true
|
|
si.qch = make(chan struct{})
|
|
mset.srv.startGoRoutine(func() { mset.processSourceMsgs(si) })
|
|
}
|
|
|
|
// We want to throttle here in terms of how fast we request new consumers.
|
|
if time.Since(si.lreq) < 2*time.Second {
|
|
return
|
|
}
|
|
si.lreq = time.Now()
|
|
|
|
req := &CreateConsumerRequest{
|
|
Stream: si.name,
|
|
Config: ConsumerConfig{
|
|
DeliverSubject: deliverSubject,
|
|
AckPolicy: AckNone,
|
|
AckWait: 22 * time.Hour,
|
|
MaxDeliver: 1,
|
|
Heartbeat: sourceHealthCheckInterval,
|
|
FlowControl: true,
|
|
Direct: true,
|
|
},
|
|
}
|
|
// If starting, check any configs.
|
|
if seq <= 1 {
|
|
if ssi.OptStartSeq > 0 {
|
|
req.Config.OptStartSeq = ssi.OptStartSeq
|
|
req.Config.DeliverPolicy = DeliverByStartSequence
|
|
} else if ssi.OptStartTime != nil {
|
|
req.Config.OptStartTime = ssi.OptStartTime
|
|
req.Config.DeliverPolicy = DeliverByStartTime
|
|
}
|
|
} else {
|
|
req.Config.OptStartSeq = seq
|
|
req.Config.DeliverPolicy = DeliverByStartSequence
|
|
}
|
|
// Filters
|
|
if ssi.FilterSubject != _EMPTY_ {
|
|
req.Config.FilterSubject = ssi.FilterSubject
|
|
}
|
|
|
|
respCh := make(chan *JSApiConsumerCreateResponse, 1)
|
|
reply := infoReplySubject()
|
|
crSub, _ := mset.subscribeInternal(reply, func(sub *subscription, c *client, subject, reply string, rmsg []byte) {
|
|
mset.unsubscribe(sub)
|
|
_, msg := c.msgParts(rmsg)
|
|
var ccr JSApiConsumerCreateResponse
|
|
if err := json.Unmarshal(msg, &ccr); err != nil {
|
|
c.Warnf("JetStream bad source consumer create response: %q", msg)
|
|
return
|
|
}
|
|
respCh <- &ccr
|
|
})
|
|
|
|
b, _ := json.Marshal(req)
|
|
subject := fmt.Sprintf(JSApiConsumerCreateT, si.name)
|
|
if ext != nil {
|
|
subject = strings.Replace(subject, JSApiPrefix, ext.ApiPrefix, 1)
|
|
subject = strings.ReplaceAll(subject, "..", ".")
|
|
}
|
|
|
|
mset.outq.send(&jsPubMsg{subject, _EMPTY_, reply, nil, b, nil, 0, nil})
|
|
|
|
go func() {
|
|
select {
|
|
case ccr := <-respCh:
|
|
mset.mu.Lock()
|
|
if si := mset.sources[iname]; si != nil {
|
|
si.err = nil
|
|
if ccr.Error != nil || ccr.ConsumerInfo == nil {
|
|
mset.srv.Warnf("JetStream error response for create source consumer: %+v", ccr.Error)
|
|
si.err = ccr.Error
|
|
// We will retry every 10 seconds or so
|
|
mset.cancelSourceConsumer(iname)
|
|
} else {
|
|
if si.sseq != ccr.ConsumerInfo.Delivered.Stream {
|
|
si.sseq = ccr.ConsumerInfo.Delivered.Stream + 1
|
|
}
|
|
|
|
// Capture consumer name.
|
|
si.cname = ccr.ConsumerInfo.Name
|
|
// Now create sub to receive messages.
|
|
sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, subject, reply string, rmsg []byte) {
|
|
hdr, msg := c.msgParts(append(rmsg[:0:0], rmsg...)) // Need to copy.
|
|
mset.queueInbound(si.msgs, subject, reply, hdr, msg)
|
|
})
|
|
if err != nil {
|
|
si.err = ApiErrors[JSSourceConsumerSetupFailedErrF].ErrOrNewT(err, "{err}", err)
|
|
si.sub = nil
|
|
} else {
|
|
si.err = nil
|
|
si.sub = sub
|
|
si.last = time.Now()
|
|
}
|
|
}
|
|
}
|
|
mset.mu.Unlock()
|
|
case <-time.After(10 * time.Second):
|
|
mset.unsubscribe(crSub)
|
|
return
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (mset *stream) processSourceMsgs(si *sourceInfo) {
|
|
s := mset.srv
|
|
defer s.grWG.Done()
|
|
|
|
if si == nil {
|
|
return
|
|
}
|
|
|
|
defer func() {
|
|
mset.mu.Lock()
|
|
si.grr = false
|
|
if si.qch != nil {
|
|
close(si.qch)
|
|
si.qch = nil
|
|
}
|
|
mset.mu.Unlock()
|
|
}()
|
|
|
|
// Grab stream quit channel.
|
|
mset.mu.Lock()
|
|
msgs, mch, qch, siqch := si.msgs, si.msgs.mch, mset.qch, si.qch
|
|
// Set the last seen as now so that we don't fail at the first check.
|
|
si.last = time.Now()
|
|
mset.mu.Unlock()
|
|
|
|
t := time.NewTicker(sourceHealthCheckInterval)
|
|
defer t.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-s.quitCh:
|
|
return
|
|
case <-qch:
|
|
return
|
|
case <-siqch:
|
|
return
|
|
case <-mch:
|
|
for im := mset.pending(msgs); im != nil; im = im.next {
|
|
if !mset.processInboundSourceMsg(si, im) {
|
|
break
|
|
}
|
|
}
|
|
case <-t.C:
|
|
mset.mu.RLock()
|
|
iname, isLeader := si.iname, mset.isLeader()
|
|
stalled := time.Since(si.last) > 3*sourceHealthCheckInterval
|
|
mset.mu.RUnlock()
|
|
// No longer leader.
|
|
if !isLeader {
|
|
mset.mu.Lock()
|
|
mset.cancelSourceConsumer(iname)
|
|
mset.mu.Unlock()
|
|
return
|
|
}
|
|
// We are stalled.
|
|
if stalled {
|
|
mset.retrySourceConsumer(iname)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// isControlMsg determines if this is a control message.
|
|
func (m *inMsg) isControlMsg() bool {
|
|
return len(m.msg) == 0 && len(m.hdr) > 0 && bytes.HasPrefix(m.hdr, []byte("NATS/1.0 100 "))
|
|
}
|
|
|
|
// handleFlowControl will properly handle flow control messages for both R1 and R>1.
|
|
// Lock should be held.
|
|
func (mset *stream) handleFlowControl(si *sourceInfo, m *inMsg) {
|
|
// If we are clustered we want to delay signaling back the the upstream consumer.
|
|
if node := mset.node; node != nil && si.clseq > 0 {
|
|
if mset.fcr == nil {
|
|
mset.fcr = make(map[uint64]string)
|
|
}
|
|
mset.fcr[si.clseq] = m.rply
|
|
} else {
|
|
mset.outq.send(&jsPubMsg{m.rply, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil})
|
|
}
|
|
}
|
|
|
|
// processInboundSourceMsg handles processing other stream messages bound for this stream.
|
|
func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool {
|
|
mset.mu.Lock()
|
|
|
|
// If we are no longer the leader cancel this subscriber.
|
|
if !mset.isLeader() {
|
|
mset.mu.Unlock()
|
|
mset.cancelSourceConsumer(si.name)
|
|
return false
|
|
}
|
|
|
|
isControl := m.isControlMsg()
|
|
|
|
// Ignore from old subscriptions.
|
|
if !si.isCurrentSub(m.rply) && !isControl {
|
|
mset.mu.Unlock()
|
|
return false
|
|
}
|
|
|
|
si.last = time.Now()
|
|
node := mset.node
|
|
|
|
// Check for heartbeats and flow control messages.
|
|
if isControl {
|
|
var needsRetry bool
|
|
// Flow controls have reply subjects.
|
|
if m.rply != _EMPTY_ {
|
|
mset.handleFlowControl(si, m)
|
|
} else {
|
|
// For idle heartbeats make sure we did not miss anything.
|
|
if ldseq := parseInt64(getHeader(JSLastConsumerSeq, m.hdr)); ldseq > 0 && uint64(ldseq) != si.dseq {
|
|
needsRetry = true
|
|
mset.retrySourceConsumerAtSeq(si.iname, si.sseq+1)
|
|
}
|
|
}
|
|
mset.mu.Unlock()
|
|
return !needsRetry
|
|
}
|
|
|
|
sseq, dseq, dc, _, pending := replyInfo(m.rply)
|
|
|
|
if dc > 1 {
|
|
mset.mu.Unlock()
|
|
return false
|
|
}
|
|
|
|
// Tracking is done here.
|
|
if dseq == si.dseq+1 {
|
|
si.dseq++
|
|
si.sseq = sseq
|
|
} else if dseq > si.dseq {
|
|
if si.cname == _EMPTY_ {
|
|
si.cname = tokenAt(m.rply, 4)
|
|
si.dseq, si.sseq = dseq, sseq
|
|
} else {
|
|
mset.retrySourceConsumerAtSeq(si.iname, si.sseq+1)
|
|
mset.mu.Unlock()
|
|
return false
|
|
}
|
|
} else {
|
|
mset.mu.Unlock()
|
|
return false
|
|
}
|
|
|
|
if pending == 0 {
|
|
si.lag = 0
|
|
} else {
|
|
si.lag = pending - 1
|
|
}
|
|
mset.mu.Unlock()
|
|
|
|
hdr, msg := m.hdr, m.msg
|
|
|
|
// If we are daisy chained here make sure to remove the original one.
|
|
if len(hdr) > 0 {
|
|
hdr = removeHeaderIfPresent(hdr, JSStreamSource)
|
|
}
|
|
// Hold onto the origin reply which has all the metadata.
|
|
hdr = genHeader(hdr, JSStreamSource, si.genSourceHeader(m.rply))
|
|
|
|
var err error
|
|
var clseq uint64
|
|
// If we are clustered we need to propose this message to the underlying raft group.
|
|
if node != nil {
|
|
clseq, err = mset.processClusteredInboundMsg(m.subj, _EMPTY_, hdr, msg)
|
|
if err == nil {
|
|
mset.mu.Lock()
|
|
si.clseq = clseq
|
|
mset.mu.Unlock()
|
|
}
|
|
} else {
|
|
err = mset.processJetStreamMsg(m.subj, _EMPTY_, hdr, msg, 0, 0)
|
|
}
|
|
|
|
if err != nil {
|
|
s := mset.srv
|
|
if err == errLastSeqMismatch {
|
|
mset.cancelSourceConsumer(si.iname)
|
|
mset.retrySourceConsumer(si.iname)
|
|
} else {
|
|
s.Warnf("JetStream got an error processing inbound source msg: %v", err)
|
|
}
|
|
if strings.Contains(err.Error(), "no space left") {
|
|
s.Errorf("JetStream out of space, will be DISABLED")
|
|
s.DisableJetStream()
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// Generate a new style source header.
|
|
func (si *sourceInfo) genSourceHeader(reply string) string {
|
|
var b strings.Builder
|
|
b.WriteString(si.iname)
|
|
b.WriteByte(' ')
|
|
// Grab sequence as text here from reply subject.
|
|
var tsa [expectedNumReplyTokens]string
|
|
start, tokens := 0, tsa[:0]
|
|
for i := 0; i < len(reply); i++ {
|
|
if reply[i] == btsep {
|
|
tokens, start = append(tokens, reply[start:i]), i+1
|
|
}
|
|
}
|
|
tokens = append(tokens, reply[start:])
|
|
seq := "1" // Default
|
|
if len(tokens) == expectedNumReplyTokens && tokens[0] == "$JS" && tokens[1] == "ACK" {
|
|
seq = tokens[5]
|
|
}
|
|
b.WriteString(seq)
|
|
return b.String()
|
|
}
|
|
|
|
// Original version of header that stored ack reply direct.
|
|
func streamAndSeqFromAckReply(reply string) (string, uint64) {
|
|
tsa := [expectedNumReplyTokens]string{}
|
|
start, tokens := 0, tsa[:0]
|
|
for i := 0; i < len(reply); i++ {
|
|
if reply[i] == btsep {
|
|
tokens, start = append(tokens, reply[start:i]), i+1
|
|
}
|
|
}
|
|
tokens = append(tokens, reply[start:])
|
|
if len(tokens) != expectedNumReplyTokens || tokens[0] != "$JS" || tokens[1] != "ACK" {
|
|
return _EMPTY_, 0
|
|
}
|
|
return tokens[2], uint64(parseAckReplyNum(tokens[5]))
|
|
}
|
|
|
|
// Extract the stream (indexed name) and sequence from the source header.
|
|
func streamAndSeq(shdr string) (string, uint64) {
|
|
if strings.HasPrefix(shdr, jsAckPre) {
|
|
return streamAndSeqFromAckReply(shdr)
|
|
}
|
|
// New version which is stream index name <SPC> sequence
|
|
fields := strings.Fields(shdr)
|
|
if len(fields) != 2 {
|
|
return _EMPTY_, 0
|
|
}
|
|
return fields[0], uint64(parseAckReplyNum(fields[1]))
|
|
}
|
|
|
|
// Lock should be held.
|
|
func (mset *stream) setStartingSequenceForSource(sname string) {
|
|
si := mset.sources[sname]
|
|
if si == nil {
|
|
return
|
|
}
|
|
|
|
var state StreamState
|
|
mset.store.FastState(&state)
|
|
|
|
// Do not reset sseq here so we can remember when purge/expiration happens.
|
|
if state.Msgs == 0 {
|
|
si.dseq = 0
|
|
return
|
|
}
|
|
|
|
for seq := state.LastSeq; seq >= state.FirstSeq; seq-- {
|
|
_, hdr, _, _, err := mset.store.LoadMsg(seq)
|
|
if err != nil || len(hdr) == 0 {
|
|
continue
|
|
}
|
|
ss := getHeader(JSStreamSource, hdr)
|
|
if len(ss) == 0 {
|
|
continue
|
|
}
|
|
iname, sseq := streamAndSeq(string(ss))
|
|
if iname == sname {
|
|
si.sseq = sseq
|
|
si.dseq = 0
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Lock should be held.
|
|
// This will do a reverse scan on startup or leader election
|
|
// searching for the starting sequence number.
|
|
// This can be slow in degenerative cases.
|
|
// Lock should be held.
|
|
func (mset *stream) startingSequenceForSources() {
|
|
if len(mset.cfg.Sources) == 0 {
|
|
return
|
|
}
|
|
// Always reset here.
|
|
mset.sources = make(map[string]*sourceInfo)
|
|
|
|
for _, ssi := range mset.cfg.Sources {
|
|
if ssi.iname == _EMPTY_ {
|
|
ssi.setIndexName()
|
|
}
|
|
si := &sourceInfo{name: ssi.Name, iname: ssi.iname, msgs: &inbound{mch: make(chan struct{}, 1)}}
|
|
mset.sources[ssi.iname] = si
|
|
}
|
|
|
|
var state StreamState
|
|
mset.store.FastState(&state)
|
|
if state.Msgs == 0 {
|
|
return
|
|
}
|
|
// For short circuiting return.
|
|
expected := len(mset.cfg.Sources)
|
|
seqs := make(map[string]uint64)
|
|
|
|
// Stamp our si seq records on the way out.
|
|
defer func() {
|
|
for sname, seq := range seqs {
|
|
// Ignore if not set.
|
|
if seq == 0 {
|
|
continue
|
|
}
|
|
if si := mset.sources[sname]; si != nil {
|
|
si.sseq = seq
|
|
si.dseq = 0
|
|
}
|
|
}
|
|
}()
|
|
|
|
for seq := state.LastSeq; seq >= state.FirstSeq; seq-- {
|
|
_, hdr, _, _, err := mset.store.LoadMsg(seq)
|
|
if err != nil || len(hdr) == 0 {
|
|
continue
|
|
}
|
|
ss := getHeader(JSStreamSource, hdr)
|
|
if len(ss) == 0 {
|
|
continue
|
|
}
|
|
name, sseq := streamAndSeq(string(ss))
|
|
// Only update active in case we have older ones in here that got configured out.
|
|
if si := mset.sources[name]; si != nil {
|
|
if _, ok := seqs[name]; !ok {
|
|
seqs[name] = sseq
|
|
if len(seqs) == expected {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Setup our source consumers.
|
|
// Lock should be held.
|
|
func (mset *stream) setupSourceConsumers() error {
|
|
if mset.outq == nil {
|
|
return errors.New("outq required")
|
|
}
|
|
// Reset if needed.
|
|
for _, si := range mset.sources {
|
|
if si.sub != nil {
|
|
mset.cancelSourceConsumer(si.name)
|
|
}
|
|
}
|
|
|
|
mset.startingSequenceForSources()
|
|
|
|
// Setup our consumers at the proper starting position.
|
|
for _, ssi := range mset.cfg.Sources {
|
|
if si := mset.sources[ssi.iname]; si != nil {
|
|
mset.setSourceConsumer(ssi.iname, si.sseq+1)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Will create internal subscriptions for the stream.
|
|
// Lock should be held.
|
|
func (mset *stream) subscribeToStream() error {
|
|
if mset.active {
|
|
return nil
|
|
}
|
|
for _, subject := range mset.cfg.Subjects {
|
|
if _, err := mset.subscribeInternal(subject, mset.processInboundJetStreamMsg); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
// Check if we need to setup mirroring.
|
|
if mset.cfg.Mirror != nil {
|
|
if err := mset.setupMirrorConsumer(); err != nil {
|
|
return err
|
|
}
|
|
} else if len(mset.cfg.Sources) > 0 {
|
|
if err := mset.setupSourceConsumers(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
mset.active = true
|
|
return nil
|
|
}
|
|
|
|
// Stop our source consumers.
|
|
// Lock should be held.
|
|
func (mset *stream) stopSourceConsumers() {
|
|
for _, si := range mset.sources {
|
|
if si.sub != nil {
|
|
mset.unsubscribe(si.sub)
|
|
}
|
|
// Need to delete the old one.
|
|
mset.removeInternalConsumer(si)
|
|
// If the go routine is still running close the quit chan.
|
|
if si.qch != nil {
|
|
close(si.qch)
|
|
si.qch = nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// Lock should be held.
|
|
func (mset *stream) removeInternalConsumer(si *sourceInfo) {
|
|
if si == nil || si.cname == _EMPTY_ {
|
|
return
|
|
}
|
|
si.cname = _EMPTY_
|
|
}
|
|
|
|
// Will unsubscribe from the stream.
|
|
// Lock should be held.
|
|
func (mset *stream) unsubscribeToStream() error {
|
|
for _, subject := range mset.cfg.Subjects {
|
|
mset.unsubscribeInternal(subject)
|
|
}
|
|
if mset.mirror != nil {
|
|
if mset.mirror.sub != nil {
|
|
mset.unsubscribe(mset.mirror.sub)
|
|
}
|
|
mset.removeInternalConsumer(mset.mirror)
|
|
// If the go routine is still running close the quit chan.
|
|
if mset.mirror.qch != nil {
|
|
close(mset.mirror.qch)
|
|
}
|
|
mset.mirror = nil
|
|
}
|
|
|
|
if len(mset.cfg.Sources) > 0 {
|
|
mset.stopSourceConsumers()
|
|
}
|
|
|
|
mset.active = false
|
|
return nil
|
|
}
|
|
|
|
// Lock should be held.
|
|
func (mset *stream) subscribeInternal(subject string, cb msgHandler) (*subscription, error) {
|
|
c := mset.client
|
|
if c == nil {
|
|
return nil, fmt.Errorf("invalid stream")
|
|
}
|
|
if !c.srv.eventsEnabled() {
|
|
return nil, ErrNoSysAccount
|
|
}
|
|
if cb == nil {
|
|
return nil, fmt.Errorf("undefined message handler")
|
|
}
|
|
|
|
mset.sid++
|
|
|
|
// Now create the subscription
|
|
return c.processSub([]byte(subject), nil, []byte(strconv.Itoa(mset.sid)), cb, false)
|
|
}
|
|
|
|
// Helper for unlocked stream.
|
|
func (mset *stream) subscribeInternalUnlocked(subject string, cb msgHandler) (*subscription, error) {
|
|
mset.mu.Lock()
|
|
defer mset.mu.Unlock()
|
|
return mset.subscribeInternal(subject, cb)
|
|
}
|
|
|
|
// This will unsubscribe us from the exact subject given.
|
|
// We do not currently track the subs so do not have the sid.
|
|
// This should be called only on an update.
|
|
// Lock should be held.
|
|
func (mset *stream) unsubscribeInternal(subject string) error {
|
|
c := mset.client
|
|
if c == nil {
|
|
return fmt.Errorf("invalid stream")
|
|
}
|
|
|
|
var sid []byte
|
|
c.mu.Lock()
|
|
for _, sub := range c.subs {
|
|
if subject == string(sub.subject) {
|
|
sid = sub.sid
|
|
break
|
|
}
|
|
}
|
|
c.mu.Unlock()
|
|
|
|
if sid != nil {
|
|
return c.processUnsub(sid)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Lock should be held.
|
|
func (mset *stream) unsubscribe(sub *subscription) {
|
|
if sub == nil || mset.client == nil {
|
|
return
|
|
}
|
|
mset.client.processUnsub(sub.sid)
|
|
}
|
|
|
|
func (mset *stream) unsubscribeUnlocked(sub *subscription) {
|
|
mset.mu.Lock()
|
|
mset.unsubscribe(sub)
|
|
mset.mu.Unlock()
|
|
}
|
|
|
|
func (mset *stream) setupStore(fsCfg *FileStoreConfig) error {
|
|
mset.mu.Lock()
|
|
mset.created = time.Now().UTC()
|
|
|
|
switch mset.cfg.Storage {
|
|
case MemoryStorage:
|
|
ms, err := newMemStore(&mset.cfg)
|
|
if err != nil {
|
|
mset.mu.Unlock()
|
|
return err
|
|
}
|
|
mset.store = ms
|
|
case FileStorage:
|
|
s := mset.srv
|
|
fs, err := newFileStoreWithCreated(*fsCfg, mset.cfg, mset.created, s.jsKeyGen(mset.acc.Name))
|
|
if err != nil {
|
|
mset.mu.Unlock()
|
|
return err
|
|
}
|
|
mset.store = fs
|
|
}
|
|
mset.mu.Unlock()
|
|
|
|
mset.store.RegisterStorageUpdates(mset.storeUpdates)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Called for any updates to the underlying stream. We pass through the bytes to the
|
|
// jetstream account. We do local processing for stream pending for consumers, but only
|
|
// for removals.
|
|
// Lock should not be held.
|
|
func (mset *stream) storeUpdates(md, bd int64, seq uint64, subj string) {
|
|
// If we have a single negative update then we will process our consumers for stream pending.
|
|
// Purge and Store handled separately inside individual calls.
|
|
if md == -1 && seq > 0 {
|
|
mset.mu.RLock()
|
|
for _, o := range mset.consumers {
|
|
o.decStreamPending(seq, subj)
|
|
}
|
|
mset.mu.RUnlock()
|
|
}
|
|
|
|
if mset.jsa != nil {
|
|
mset.jsa.updateUsage(mset.stype, bd)
|
|
}
|
|
}
|
|
|
|
// NumMsgIds returns the number of message ids being tracked for duplicate suppression.
|
|
func (mset *stream) numMsgIds() int {
|
|
mset.mu.RLock()
|
|
defer mset.mu.RUnlock()
|
|
return len(mset.ddmap)
|
|
}
|
|
|
|
// checkMsgId will process and check for duplicates.
|
|
// Lock should be held.
|
|
func (mset *stream) checkMsgId(id string) *ddentry {
|
|
if id == "" || mset.ddmap == nil {
|
|
return nil
|
|
}
|
|
return mset.ddmap[id]
|
|
}
|
|
|
|
// Will purge the entries that are past the window.
|
|
// Should be called from a timer.
|
|
func (mset *stream) purgeMsgIds() {
|
|
mset.mu.Lock()
|
|
defer mset.mu.Unlock()
|
|
|
|
now := time.Now().UnixNano()
|
|
tmrNext := mset.cfg.Duplicates
|
|
window := int64(tmrNext)
|
|
|
|
for i, dde := range mset.ddarr[mset.ddindex:] {
|
|
if now-dde.ts >= window {
|
|
delete(mset.ddmap, dde.id)
|
|
} else {
|
|
mset.ddindex += i
|
|
// Check if we should garbage collect here if we are 1/3 total size.
|
|
if cap(mset.ddarr) > 3*(len(mset.ddarr)-mset.ddindex) {
|
|
mset.ddarr = append([]*ddentry(nil), mset.ddarr[mset.ddindex:]...)
|
|
mset.ddindex = 0
|
|
}
|
|
tmrNext = time.Duration(window - (now - dde.ts))
|
|
break
|
|
}
|
|
}
|
|
if len(mset.ddmap) > 0 {
|
|
// Make sure to not fire too quick
|
|
const minFire = 50 * time.Millisecond
|
|
if tmrNext < minFire {
|
|
tmrNext = minFire
|
|
}
|
|
mset.ddtmr.Reset(tmrNext)
|
|
} else {
|
|
mset.ddtmr.Stop()
|
|
mset.ddtmr = nil
|
|
}
|
|
}
|
|
|
|
// storeMsgId will store the message id for duplicate detection.
|
|
func (mset *stream) storeMsgId(dde *ddentry) {
|
|
mset.mu.Lock()
|
|
if mset.ddmap == nil {
|
|
mset.ddmap = make(map[string]*ddentry)
|
|
}
|
|
if mset.ddtmr == nil {
|
|
mset.ddtmr = time.AfterFunc(mset.cfg.Duplicates, mset.purgeMsgIds)
|
|
}
|
|
mset.ddmap[dde.id] = dde
|
|
mset.ddarr = append(mset.ddarr, dde)
|
|
mset.mu.Unlock()
|
|
}
|
|
|
|
// Fast lookup of msgId.
|
|
func getMsgId(hdr []byte) string {
|
|
return string(getHeader(JSMsgId, hdr))
|
|
}
|
|
|
|
// Fast lookup of expected last msgId.
|
|
func getExpectedLastMsgId(hdr []byte) string {
|
|
return string(getHeader(JSExpectedLastMsgId, hdr))
|
|
}
|
|
|
|
// Fast lookup of expected stream.
|
|
func getExpectedStream(hdr []byte) string {
|
|
return string(getHeader(JSExpectedStream, hdr))
|
|
}
|
|
|
|
// Fast lookup of expected stream.
|
|
func getExpectedLastSeq(hdr []byte) uint64 {
|
|
bseq := getHeader(JSExpectedLastSeq, hdr)
|
|
if len(bseq) == 0 {
|
|
return 0
|
|
}
|
|
return uint64(parseInt64(bseq))
|
|
}
|
|
|
|
// Fast lookup of expected stream sequence per subject.
|
|
func getExpectedLastSeqPerSubject(hdr []byte) uint64 {
|
|
bseq := getHeader(JSExpectedLastSubjSeq, hdr)
|
|
if len(bseq) == 0 {
|
|
return 0
|
|
}
|
|
return uint64(parseInt64(bseq))
|
|
}
|
|
|
|
// Lock should be held.
|
|
func (mset *stream) isClustered() bool {
|
|
return mset.node != nil
|
|
}
|
|
|
|
// Used if we have to queue things internally to avoid the route/gw path.
|
|
type inMsg struct {
|
|
subj string
|
|
rply string
|
|
hdr []byte
|
|
msg []byte
|
|
next *inMsg
|
|
}
|
|
|
|
// Linked list for inbound messages.
|
|
type inbound struct {
|
|
head *inMsg
|
|
tail *inMsg
|
|
mch chan struct{}
|
|
}
|
|
|
|
func (mset *stream) pending(msgs *inbound) *inMsg {
|
|
mset.mu.Lock()
|
|
head := msgs.head
|
|
msgs.head, msgs.tail = nil, nil
|
|
mset.mu.Unlock()
|
|
return head
|
|
}
|
|
|
|
func (mset *stream) queueInbound(ib *inbound, subj, rply string, hdr, msg []byte) {
|
|
m := &inMsg{subj, rply, hdr, msg, nil}
|
|
|
|
mset.mu.Lock()
|
|
var notify bool
|
|
if ib.head == nil {
|
|
ib.head = m
|
|
notify = true
|
|
} else {
|
|
ib.tail.next = m
|
|
}
|
|
ib.tail = m
|
|
mch := ib.mch
|
|
mset.mu.Unlock()
|
|
|
|
if notify {
|
|
select {
|
|
case mch <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
func (mset *stream) queueInboundMsg(subj, rply string, hdr, msg []byte) {
|
|
// Copy these.
|
|
if len(hdr) > 0 {
|
|
hdr = append(hdr[:0:0], hdr...)
|
|
}
|
|
if len(msg) > 0 {
|
|
msg = append(msg[:0:0], msg...)
|
|
}
|
|
mset.queueInbound(mset.msgs, subj, rply, hdr, msg)
|
|
}
|
|
|
|
// processInboundJetStreamMsg handles processing messages bound for a stream.
|
|
func (mset *stream) processInboundJetStreamMsg(_ *subscription, c *client, subject, reply string, rmsg []byte) {
|
|
mset.mu.RLock()
|
|
isLeader, isClustered := mset.isLeader(), mset.node != nil
|
|
mset.mu.RUnlock()
|
|
|
|
// If we are not the leader just ignore.
|
|
if !isLeader {
|
|
return
|
|
}
|
|
|
|
hdr, msg := c.msgParts(rmsg)
|
|
|
|
// If we are not receiving directly from a client we should move this this Go routine.
|
|
if c.kind != CLIENT {
|
|
mset.queueInboundMsg(subject, reply, hdr, msg)
|
|
return
|
|
}
|
|
|
|
// If we are clustered we need to propose this message to the underlying raft group.
|
|
if isClustered {
|
|
mset.processClusteredInboundMsg(subject, reply, hdr, msg)
|
|
} else {
|
|
mset.processJetStreamMsg(subject, reply, hdr, msg, 0, 0)
|
|
}
|
|
}
|
|
|
|
var (
|
|
errLastSeqMismatch = errors.New("last sequence mismatch")
|
|
errMsgIdDuplicate = errors.New("msgid is duplicate")
|
|
)
|
|
|
|
// processJetStreamMsg is where we try to actually process the stream msg.
|
|
func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, lseq uint64, ts int64) error {
|
|
mset.mu.Lock()
|
|
store := mset.store
|
|
c, s := mset.client, mset.srv
|
|
if c == nil {
|
|
mset.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
var accName string
|
|
if mset.acc != nil {
|
|
accName = mset.acc.Name
|
|
}
|
|
|
|
doAck, pubAck := !mset.cfg.NoAck, mset.pubAck
|
|
js, jsa := mset.js, mset.jsa
|
|
name, stype := mset.cfg.Name, mset.cfg.Storage
|
|
maxMsgSize := int(mset.cfg.MaxMsgSize)
|
|
numConsumers := len(mset.consumers)
|
|
interestRetention := mset.cfg.Retention == InterestPolicy
|
|
// Snapshot if we are the leader and if we can respond.
|
|
isLeader := mset.isLeader()
|
|
canRespond := doAck && len(reply) > 0 && isLeader
|
|
|
|
var resp = &JSPubAckResponse{}
|
|
|
|
// For clustering the lower layers will pass our expected lseq. If it is present check for that here.
|
|
if lseq > 0 && lseq != (mset.lseq+mset.clfs) {
|
|
isMisMatch := true
|
|
// If our first message for this mirror, see if we have to adjust our starting sequence.
|
|
if mset.cfg.Mirror != nil {
|
|
var state StreamState
|
|
mset.store.FastState(&state)
|
|
if state.FirstSeq == 0 {
|
|
mset.store.Compact(lseq + 1)
|
|
mset.lseq = lseq
|
|
isMisMatch = false
|
|
}
|
|
}
|
|
// Really is a mismatch.
|
|
if isMisMatch {
|
|
outq := mset.outq
|
|
mset.mu.Unlock()
|
|
if canRespond && outq != nil {
|
|
resp.PubAck = &PubAck{Stream: name}
|
|
resp.Error = ApiErrors[JSStreamSequenceNotMatchErr]
|
|
b, _ := json.Marshal(resp)
|
|
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil})
|
|
}
|
|
return errLastSeqMismatch
|
|
}
|
|
}
|
|
|
|
// If we have received this message across an account we may have request information attached.
|
|
// For now remove. TODO(dlc) - Should this be opt-in or opt-out?
|
|
if len(hdr) > 0 {
|
|
hdr = removeHeaderIfPresent(hdr, ClientInfoHdr)
|
|
}
|
|
|
|
// Process additional msg headers if still present.
|
|
var msgId string
|
|
if len(hdr) > 0 {
|
|
msgId = getMsgId(hdr)
|
|
outq := mset.outq
|
|
if dde := mset.checkMsgId(msgId); dde != nil {
|
|
mset.clfs++
|
|
mset.mu.Unlock()
|
|
if canRespond {
|
|
response := append(pubAck, strconv.FormatUint(dde.seq, 10)...)
|
|
response = append(response, ",\"duplicate\": true}"...)
|
|
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil})
|
|
}
|
|
return errMsgIdDuplicate
|
|
}
|
|
|
|
// Expected stream.
|
|
if sname := getExpectedStream(hdr); sname != _EMPTY_ && sname != name {
|
|
mset.clfs++
|
|
mset.mu.Unlock()
|
|
if canRespond {
|
|
resp.PubAck = &PubAck{Stream: name}
|
|
resp.Error = ApiErrors[JSStreamNotMatchErr]
|
|
b, _ := json.Marshal(resp)
|
|
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil})
|
|
}
|
|
return errors.New("expected stream does not match")
|
|
}
|
|
// Expected last sequence.
|
|
if seq := getExpectedLastSeq(hdr); seq > 0 && seq != mset.lseq {
|
|
mlseq := mset.lseq
|
|
mset.clfs++
|
|
mset.mu.Unlock()
|
|
if canRespond {
|
|
resp.PubAck = &PubAck{Stream: name}
|
|
resp.Error = ApiErrors[JSStreamWrongLastSequenceErrF].NewT("{seq}", mlseq)
|
|
b, _ := json.Marshal(resp)
|
|
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil})
|
|
}
|
|
return fmt.Errorf("last sequence mismatch: %d vs %d", seq, mlseq)
|
|
}
|
|
// Expected last msgId.
|
|
if lmsgId := getExpectedLastMsgId(hdr); lmsgId != _EMPTY_ && lmsgId != mset.lmsgId {
|
|
last := mset.lmsgId
|
|
mset.clfs++
|
|
mset.mu.Unlock()
|
|
if canRespond {
|
|
resp.PubAck = &PubAck{Stream: name}
|
|
resp.Error = ApiErrors[JSStreamWrongLastMsgIDErrF].NewT("{id}", last)
|
|
b, _ := json.Marshal(resp)
|
|
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil})
|
|
}
|
|
return fmt.Errorf("last msgid mismatch: %q vs %q", lmsgId, last)
|
|
}
|
|
// Expected last sequence per subject.
|
|
if seq := getExpectedLastSeqPerSubject(hdr); seq > 0 {
|
|
// TODO(dlc) - We could make a new store func that does this all in one.
|
|
_, lseq, _, _, _, err := mset.store.LoadLastMsg(subject)
|
|
if err != nil || lseq != seq {
|
|
mset.clfs++
|
|
mset.mu.Unlock()
|
|
if canRespond {
|
|
resp.PubAck = &PubAck{Stream: name}
|
|
resp.Error = ApiErrors[JSStreamWrongLastSequenceErrF].NewT("{seq}", lseq)
|
|
b, _ := json.Marshal(resp)
|
|
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil})
|
|
}
|
|
return fmt.Errorf("last sequence by subject mismatch: %d vs %d", seq, lseq)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Response Ack.
|
|
var (
|
|
response []byte
|
|
seq uint64
|
|
err error
|
|
)
|
|
|
|
// Check to see if we are over the max msg size.
|
|
if maxMsgSize >= 0 && (len(hdr)+len(msg)) > maxMsgSize {
|
|
mset.clfs++
|
|
mset.mu.Unlock()
|
|
if canRespond {
|
|
resp.PubAck = &PubAck{Stream: name}
|
|
resp.Error = ApiErrors[JSStreamMessageExceedsMaximumErr]
|
|
b, _ := json.Marshal(resp)
|
|
mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil})
|
|
}
|
|
return ErrMaxPayload
|
|
}
|
|
|
|
// Check to see if we have exceeded our limits.
|
|
if js.limitsExceeded(stype) {
|
|
s.resourcesExeededError()
|
|
mset.clfs++
|
|
mset.mu.Unlock()
|
|
if canRespond {
|
|
resp.PubAck = &PubAck{Stream: name}
|
|
resp.Error = ApiErrors[JSInsufficientResourcesErr]
|
|
b, _ := json.Marshal(resp)
|
|
mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, b, nil, 0, nil})
|
|
}
|
|
// Stepdown regardless.
|
|
if node := mset.raftNode(); node != nil {
|
|
node.StepDown()
|
|
}
|
|
return ApiErrors[JSInsufficientResourcesErr]
|
|
}
|
|
|
|
var noInterest bool
|
|
|
|
// If we are interest based retention and have no consumers then we can skip.
|
|
if interestRetention {
|
|
if numConsumers == 0 {
|
|
noInterest = true
|
|
} else if mset.numFilter > 0 {
|
|
// Assume none.
|
|
noInterest = true
|
|
for _, o := range mset.consumers {
|
|
if o.cfg.FilterSubject != _EMPTY_ && subjectIsSubsetMatch(subject, o.cfg.FilterSubject) {
|
|
noInterest = false
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Grab timestamp if not already set.
|
|
if ts == 0 && lseq > 0 {
|
|
ts = time.Now().UnixNano()
|
|
}
|
|
|
|
// Skip msg here.
|
|
if noInterest {
|
|
mset.lseq = store.SkipMsg()
|
|
mset.lmsgId = msgId
|
|
mset.mu.Unlock()
|
|
|
|
if canRespond {
|
|
response = append(pubAck, strconv.FormatUint(mset.lseq, 10)...)
|
|
response = append(response, '}')
|
|
mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil})
|
|
}
|
|
// If we have a msgId make sure to save.
|
|
if msgId != _EMPTY_ {
|
|
mset.storeMsgId(&ddentry{msgId, seq, ts})
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// If here we will attempt to store the message.
|
|
// Assume this will succeed.
|
|
olmsgId := mset.lmsgId
|
|
mset.lmsgId = msgId
|
|
clfs := mset.clfs
|
|
mset.lseq++
|
|
|
|
// We hold the lock to this point to make sure nothing gets between us since we check for pre-conditions.
|
|
// Currently can not hold while calling store b/c we have inline storage update calls that may need the lock.
|
|
// Note that upstream that sets seq/ts should be serialized as much as possible.
|
|
mset.mu.Unlock()
|
|
|
|
// Store actual msg.
|
|
if lseq == 0 && ts == 0 {
|
|
seq, ts, err = store.StoreMsg(subject, hdr, msg)
|
|
} else {
|
|
// Make sure to take into account any message assignments that we had to skip (clfs).
|
|
seq = lseq + 1 - clfs
|
|
err = store.StoreRawMsg(subject, hdr, msg, seq, ts)
|
|
}
|
|
|
|
if err != nil {
|
|
// If we did not succeed put those values back and increment clfs in case we are clustered.
|
|
mset.mu.Lock()
|
|
var state StreamState
|
|
mset.store.FastState(&state)
|
|
mset.lseq = state.LastSeq
|
|
mset.lmsgId = olmsgId
|
|
mset.clfs++
|
|
mset.mu.Unlock()
|
|
|
|
switch err {
|
|
case ErrMaxMsgs, ErrMaxBytes, ErrMaxMsgsPerSubject, ErrMsgTooLarge:
|
|
s.Debugf("JetStream failed to store a msg on stream '%s > %s': %v", accName, name, err)
|
|
case ErrStoreClosed:
|
|
default:
|
|
s.Errorf("JetStream failed to store a msg on stream '%s > %s': %v", accName, name, err)
|
|
}
|
|
|
|
if canRespond {
|
|
resp.PubAck = &PubAck{Stream: name}
|
|
resp.Error = ApiErrors[JSStreamStoreFailedF].ErrOrNewT(err, "{err}", err)
|
|
response, _ = json.Marshal(resp)
|
|
}
|
|
} else if jsa.limitsExceeded(stype) {
|
|
s.Warnf("JetStream resource limits exceeded for account: %q", accName)
|
|
if canRespond {
|
|
resp.PubAck = &PubAck{Stream: name}
|
|
resp.Error = ApiErrors[JSAccountResourcesExceededErr]
|
|
response, _ = json.Marshal(resp)
|
|
}
|
|
// If we did not succeed put those values back.
|
|
mset.mu.Lock()
|
|
var state StreamState
|
|
mset.store.FastState(&state)
|
|
mset.lseq = state.LastSeq
|
|
mset.lmsgId = olmsgId
|
|
mset.mu.Unlock()
|
|
store.RemoveMsg(seq)
|
|
seq = 0
|
|
} else {
|
|
// If we have a msgId make sure to save.
|
|
if msgId != _EMPTY_ {
|
|
mset.storeMsgId(&ddentry{msgId, seq, ts})
|
|
}
|
|
if canRespond {
|
|
response = append(pubAck, strconv.FormatUint(seq, 10)...)
|
|
response = append(response, '}')
|
|
}
|
|
}
|
|
|
|
// Send response here.
|
|
if canRespond {
|
|
mset.outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil})
|
|
}
|
|
|
|
if err == nil && seq > 0 && numConsumers > 0 {
|
|
mset.mu.Lock()
|
|
for _, o := range mset.consumers {
|
|
o.mu.Lock()
|
|
if o.isLeader() {
|
|
if o.isFilteredMatch(subject) {
|
|
o.sgap++
|
|
}
|
|
o.signalNewMessages()
|
|
}
|
|
o.mu.Unlock()
|
|
}
|
|
mset.mu.Unlock()
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// Internal message for use by jetstream subsystem.
|
|
type jsPubMsg struct {
|
|
subj string
|
|
dsubj string
|
|
reply string
|
|
hdr []byte
|
|
msg []byte
|
|
o *consumer
|
|
seq uint64
|
|
next *jsPubMsg
|
|
}
|
|
|
|
func (pm *jsPubMsg) size() int {
|
|
if pm == nil {
|
|
return 0
|
|
}
|
|
return len(pm.subj) + len(pm.reply) + len(pm.hdr) + len(pm.msg)
|
|
}
|
|
|
|
// Forms a linked list for sending internal system messages.
|
|
type jsOutQ struct {
|
|
mu sync.Mutex
|
|
mch chan struct{}
|
|
head *jsPubMsg
|
|
tail *jsPubMsg
|
|
}
|
|
|
|
func (q *jsOutQ) pending() *jsPubMsg {
|
|
if q == nil {
|
|
return nil
|
|
}
|
|
q.mu.Lock()
|
|
head := q.head
|
|
q.head, q.tail = nil, nil
|
|
q.mu.Unlock()
|
|
return head
|
|
}
|
|
|
|
func (q *jsOutQ) send(msg *jsPubMsg) {
|
|
if q == nil || msg == nil {
|
|
return
|
|
}
|
|
q.mu.Lock()
|
|
var notify bool
|
|
if q.head == nil {
|
|
q.head = msg
|
|
notify = true
|
|
} else {
|
|
q.tail.next = msg
|
|
}
|
|
q.tail = msg
|
|
q.mu.Unlock()
|
|
|
|
if notify {
|
|
select {
|
|
case q.mch <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
// StoredMsg is for raw access to messages in a stream.
|
|
type StoredMsg struct {
|
|
Subject string `json:"subject"`
|
|
Sequence uint64 `json:"seq"`
|
|
Header []byte `json:"hdrs,omitempty"`
|
|
Data []byte `json:"data,omitempty"`
|
|
Time time.Time `json:"time"`
|
|
}
|
|
|
|
// This is similar to system semantics but did not want to overload the single system sendq,
|
|
// or require system account when doing simple setup with jetstream.
|
|
func (mset *stream) setupSendCapabilities() {
|
|
mset.mu.Lock()
|
|
defer mset.mu.Unlock()
|
|
if mset.outq != nil {
|
|
return
|
|
}
|
|
mset.outq = &jsOutQ{mch: make(chan struct{}, 1)}
|
|
go mset.internalLoop()
|
|
}
|
|
|
|
// Name returns the stream name.
|
|
func (mset *stream) name() string {
|
|
if mset == nil {
|
|
return _EMPTY_
|
|
}
|
|
mset.mu.RLock()
|
|
defer mset.mu.RUnlock()
|
|
return mset.cfg.Name
|
|
}
|
|
|
|
// Returns a copy of the interest subjects for this stream.
|
|
func (mset *stream) subjects() []string {
|
|
mset.mu.RLock()
|
|
defer mset.mu.RUnlock()
|
|
if len(mset.cfg.Subjects) == 0 {
|
|
return nil
|
|
}
|
|
return append(mset.cfg.Subjects[:0:0], mset.cfg.Subjects...)
|
|
}
|
|
|
|
func (mset *stream) internalLoop() {
|
|
mset.mu.RLock()
|
|
s := mset.srv
|
|
c := s.createInternalJetStreamClient()
|
|
c.registerWithAccount(mset.acc)
|
|
defer c.closeConnection(ClientClosed)
|
|
outq, qch, mch, amch := mset.outq, mset.qch, mset.msgs.mch, mset.amch
|
|
isClustered := mset.cfg.Replicas > 1
|
|
mset.mu.RUnlock()
|
|
|
|
for {
|
|
select {
|
|
case <-outq.mch:
|
|
for pm := outq.pending(); pm != nil; pm = pm.next {
|
|
c.pa.subject = []byte(pm.subj)
|
|
c.pa.deliver = []byte(pm.dsubj)
|
|
c.pa.size = len(pm.msg) + len(pm.hdr)
|
|
c.pa.szb = []byte(strconv.Itoa(c.pa.size))
|
|
c.pa.reply = []byte(pm.reply)
|
|
|
|
var msg []byte
|
|
if len(pm.hdr) > 0 {
|
|
c.pa.hdr = len(pm.hdr)
|
|
c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr))
|
|
msg = append(pm.hdr, pm.msg...)
|
|
msg = append(msg, _CRLF_...)
|
|
} else {
|
|
c.pa.hdr = -1
|
|
c.pa.hdb = nil
|
|
msg = append(pm.msg, _CRLF_...)
|
|
}
|
|
|
|
didDeliver, _ := c.processInboundClientMsg(msg)
|
|
c.pa.szb = nil
|
|
|
|
// Check to see if this is a delivery for a consumer and
|
|
// we failed to deliver the message. If so alert the consumer.
|
|
if pm.o != nil && pm.seq > 0 && !didDeliver {
|
|
pm.o.didNotDeliver(pm.seq)
|
|
}
|
|
}
|
|
c.flushClients(10 * time.Millisecond)
|
|
case <-mch:
|
|
for im := mset.pending(mset.msgs); im != nil; im = im.next {
|
|
// If we are clustered we need to propose this message to the underlying raft group.
|
|
if isClustered {
|
|
mset.processClusteredInboundMsg(im.subj, im.rply, im.hdr, im.msg)
|
|
} else {
|
|
mset.processJetStreamMsg(im.subj, im.rply, im.hdr, im.msg, 0, 0)
|
|
}
|
|
}
|
|
case seq := <-amch:
|
|
mset.ackMsg(nil, seq)
|
|
case <-qch:
|
|
return
|
|
case <-s.quitCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Internal function to delete a stream.
|
|
func (mset *stream) delete() error {
|
|
return mset.stop(true, true)
|
|
}
|
|
|
|
// Internal function to stop or delete the stream.
|
|
func (mset *stream) stop(deleteFlag, advisory bool) error {
|
|
mset.mu.RLock()
|
|
jsa := mset.jsa
|
|
mset.mu.RUnlock()
|
|
|
|
if jsa == nil {
|
|
return ApiErrors[JSNotEnabledForAccountErr]
|
|
}
|
|
|
|
// Remove from our account map.
|
|
jsa.mu.Lock()
|
|
delete(jsa.streams, mset.cfg.Name)
|
|
jsa.mu.Unlock()
|
|
|
|
// Clean up consumers.
|
|
mset.mu.Lock()
|
|
var obs []*consumer
|
|
for _, o := range mset.consumers {
|
|
obs = append(obs, o)
|
|
}
|
|
|
|
// Check if we are a mirror.
|
|
if mset.mirror != nil && mset.mirror.sub != nil {
|
|
mset.unsubscribe(mset.mirror.sub)
|
|
mset.mirror.sub = nil
|
|
mset.removeInternalConsumer(mset.mirror)
|
|
}
|
|
// Now check for sources.
|
|
if len(mset.sources) > 0 {
|
|
for _, si := range mset.sources {
|
|
mset.cancelSourceConsumer(si.iname)
|
|
}
|
|
}
|
|
|
|
mset.mu.Unlock()
|
|
for _, o := range obs {
|
|
// Third flag says do not broadcast a signal.
|
|
// TODO(dlc) - If we have an err here we don't want to stop
|
|
// but should we log?
|
|
o.stopWithFlags(deleteFlag, deleteFlag, false, advisory)
|
|
}
|
|
mset.mu.Lock()
|
|
|
|
// Stop responding to sync requests.
|
|
mset.stopClusterSubs()
|
|
// Unsubscribe from direct stream.
|
|
mset.unsubscribeToStream()
|
|
|
|
// Our info sub if we spun it up.
|
|
if mset.infoSub != nil {
|
|
mset.srv.sysUnsubscribe(mset.infoSub)
|
|
mset.infoSub = nil
|
|
}
|
|
|
|
// Quit channel.
|
|
if mset.qch != nil {
|
|
close(mset.qch)
|
|
mset.qch = nil
|
|
}
|
|
|
|
// Cluster cleanup
|
|
if n := mset.node; n != nil {
|
|
if deleteFlag {
|
|
n.Delete()
|
|
} else {
|
|
n.Stop()
|
|
}
|
|
}
|
|
|
|
// Send stream delete advisory after the consumers.
|
|
if deleteFlag && advisory {
|
|
mset.sendDeleteAdvisoryLocked()
|
|
}
|
|
|
|
c := mset.client
|
|
mset.client = nil
|
|
if c == nil {
|
|
mset.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// Cleanup duplicate timer if running.
|
|
if mset.ddtmr != nil {
|
|
mset.ddtmr.Stop()
|
|
mset.ddtmr = nil
|
|
mset.ddarr = nil
|
|
mset.ddmap = nil
|
|
}
|
|
|
|
sysc := mset.sysc
|
|
mset.sysc = nil
|
|
|
|
// Clustered cleanup.
|
|
mset.mu.Unlock()
|
|
|
|
c.closeConnection(ClientClosed)
|
|
|
|
if sysc != nil {
|
|
sysc.closeConnection(ClientClosed)
|
|
}
|
|
|
|
if mset.store == nil {
|
|
return nil
|
|
}
|
|
|
|
if deleteFlag {
|
|
if err := mset.store.Delete(); err != nil {
|
|
return err
|
|
}
|
|
} else if err := mset.store.Stop(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (mset *stream) getMsg(seq uint64) (*StoredMsg, error) {
|
|
subj, hdr, msg, ts, err := mset.store.LoadMsg(seq)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
sm := &StoredMsg{
|
|
Subject: subj,
|
|
Sequence: seq,
|
|
Header: hdr,
|
|
Data: msg,
|
|
Time: time.Unix(0, ts).UTC(),
|
|
}
|
|
return sm, nil
|
|
}
|
|
|
|
// getConsumers will return all the current consumers for this stream.
|
|
func (mset *stream) getConsumers() []*consumer {
|
|
mset.mu.Lock()
|
|
defer mset.mu.Unlock()
|
|
|
|
var obs []*consumer
|
|
for _, o := range mset.consumers {
|
|
obs = append(obs, o)
|
|
}
|
|
return obs
|
|
}
|
|
|
|
// This returns all consumers that are not DIRECT.
|
|
func (mset *stream) getPublicConsumers() []*consumer {
|
|
mset.mu.Lock()
|
|
defer mset.mu.Unlock()
|
|
|
|
var obs []*consumer
|
|
for _, o := range mset.consumers {
|
|
if !o.cfg.Direct {
|
|
obs = append(obs, o)
|
|
}
|
|
}
|
|
return obs
|
|
}
|
|
|
|
// NumConsumers reports on number of active consumers for this stream.
|
|
func (mset *stream) numConsumers() int {
|
|
mset.mu.Lock()
|
|
defer mset.mu.Unlock()
|
|
return len(mset.consumers)
|
|
}
|
|
|
|
func (mset *stream) setConsumer(o *consumer) {
|
|
mset.consumers[o.name] = o
|
|
if o.cfg.FilterSubject != _EMPTY_ {
|
|
mset.numFilter++
|
|
}
|
|
if o.cfg.Direct {
|
|
mset.directs++
|
|
}
|
|
}
|
|
|
|
func (mset *stream) removeConsumer(o *consumer) {
|
|
if o.cfg.FilterSubject != _EMPTY_ && mset.numFilter > 0 {
|
|
mset.numFilter--
|
|
}
|
|
if o.cfg.Direct && mset.directs > 0 {
|
|
mset.directs--
|
|
}
|
|
delete(mset.consumers, o.name)
|
|
}
|
|
|
|
// lookupConsumer will retrieve a consumer by name.
|
|
func (mset *stream) lookupConsumer(name string) *consumer {
|
|
mset.mu.Lock()
|
|
defer mset.mu.Unlock()
|
|
return mset.consumers[name]
|
|
}
|
|
|
|
// State will return the current state for this stream.
|
|
func (mset *stream) state() StreamState {
|
|
return mset.stateWithDetail(false)
|
|
}
|
|
|
|
func (mset *stream) numDirectConsumers() (num int) {
|
|
mset.mu.RLock()
|
|
defer mset.mu.RUnlock()
|
|
|
|
// Consumers that are direct are not recorded at the store level.
|
|
for _, o := range mset.consumers {
|
|
o.mu.RLock()
|
|
if o.cfg.Direct {
|
|
num++
|
|
}
|
|
o.mu.RUnlock()
|
|
}
|
|
return num
|
|
}
|
|
|
|
func (mset *stream) stateWithDetail(details bool) StreamState {
|
|
mset.mu.RLock()
|
|
c, store := mset.client, mset.store
|
|
mset.mu.RUnlock()
|
|
if c == nil || store == nil {
|
|
return StreamState{}
|
|
}
|
|
// Currently rely on store.
|
|
state := store.State()
|
|
if !details {
|
|
state.Deleted = nil
|
|
}
|
|
return state
|
|
}
|
|
|
|
// Determines if the new proposed partition is unique amongst all consumers.
|
|
// Lock should be held.
|
|
func (mset *stream) partitionUnique(partition string) bool {
|
|
for _, o := range mset.consumers {
|
|
if o.cfg.FilterSubject == _EMPTY_ {
|
|
return false
|
|
}
|
|
if subjectIsSubsetMatch(partition, o.cfg.FilterSubject) {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// Lock should be held.
|
|
func (mset *stream) checkInterest(seq uint64, obs *consumer) bool {
|
|
for _, o := range mset.consumers {
|
|
if o != obs && o.needAck(seq) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// ackMsg is called into from a consumer when we have a WorkQueue or Interest Retention Policy.
|
|
func (mset *stream) ackMsg(o *consumer, seq uint64) {
|
|
switch mset.cfg.Retention {
|
|
case LimitsPolicy:
|
|
return
|
|
case WorkQueuePolicy:
|
|
// Normally we just remove a message when its ack'd here but if we have direct consumers
|
|
// from sources and/or mirrors we need to make sure they have delivered the msg.
|
|
mset.mu.RLock()
|
|
shouldRemove := mset.directs <= 0 || !mset.checkInterest(seq, o)
|
|
mset.mu.RUnlock()
|
|
if shouldRemove {
|
|
mset.store.RemoveMsg(seq)
|
|
}
|
|
case InterestPolicy:
|
|
mset.mu.RLock()
|
|
hasInterest := mset.checkInterest(seq, o)
|
|
mset.mu.RUnlock()
|
|
if !hasInterest {
|
|
mset.store.RemoveMsg(seq)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Snapshot creates a snapshot for the stream and possibly consumers.
|
|
func (mset *stream) snapshot(deadline time.Duration, checkMsgs, includeConsumers bool) (*SnapshotResult, error) {
|
|
mset.mu.RLock()
|
|
if mset.client == nil || mset.store == nil {
|
|
mset.mu.RUnlock()
|
|
return nil, fmt.Errorf("invalid stream")
|
|
}
|
|
store := mset.store
|
|
mset.mu.RUnlock()
|
|
|
|
return store.Snapshot(deadline, checkMsgs, includeConsumers)
|
|
}
|
|
|
|
const snapsDir = "__snapshots__"
|
|
|
|
// RestoreStream will restore a stream from a snapshot.
|
|
func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error) {
|
|
if ncfg == nil {
|
|
return nil, errors.New("nil config on stream restore")
|
|
}
|
|
|
|
cfg, err := checkStreamCfg(ncfg)
|
|
if err != nil {
|
|
return nil, ApiErrors[JSStreamNotFoundErr].ErrOr(err)
|
|
}
|
|
|
|
_, jsa, err := a.checkForJetStream()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
sd := path.Join(jsa.storeDir, snapsDir)
|
|
if _, err := os.Stat(sd); os.IsNotExist(err) {
|
|
if err := os.MkdirAll(sd, defaultDirPerms); err != nil {
|
|
return nil, fmt.Errorf("could not create snapshots directory - %v", err)
|
|
}
|
|
}
|
|
sdir, err := ioutil.TempDir(sd, "snap-")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if _, err := os.Stat(sdir); os.IsNotExist(err) {
|
|
if err := os.MkdirAll(sdir, defaultDirPerms); err != nil {
|
|
return nil, fmt.Errorf("could not create snapshots directory - %v", err)
|
|
}
|
|
}
|
|
defer os.RemoveAll(sdir)
|
|
|
|
tr := tar.NewReader(s2.NewReader(r))
|
|
for {
|
|
hdr, err := tr.Next()
|
|
if err == io.EOF {
|
|
break // End of snapshot
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
fpath := path.Join(sdir, filepath.Clean(hdr.Name))
|
|
os.MkdirAll(filepath.Dir(fpath), defaultDirPerms)
|
|
fd, err := os.OpenFile(fpath, os.O_CREATE|os.O_RDWR, 0600)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
_, err = io.Copy(fd, tr)
|
|
fd.Close()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// Check metadata.
|
|
// The cfg passed in will be the new identity for the stream.
|
|
var fcfg FileStreamInfo
|
|
b, err := ioutil.ReadFile(path.Join(sdir, JetStreamMetaFile))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := json.Unmarshal(b, &fcfg); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// See if this stream already exists.
|
|
if _, err := a.lookupStream(cfg.Name); err == nil {
|
|
return nil, ApiErrors[JSStreamNameExistErr]
|
|
}
|
|
// Move into the correct place here.
|
|
ndir := path.Join(jsa.storeDir, streamsDir, cfg.Name)
|
|
// Remove old one if for some reason it is still here.
|
|
if _, err := os.Stat(ndir); err == nil {
|
|
os.RemoveAll(ndir)
|
|
}
|
|
// Make sure our destination streams directory exists.
|
|
if err := os.MkdirAll(path.Join(jsa.storeDir, streamsDir), defaultDirPerms); err != nil {
|
|
return nil, err
|
|
}
|
|
// Move into new location.
|
|
if err := os.Rename(sdir, ndir); err != nil {
|
|
return nil, err
|
|
}
|
|
if cfg.Template != _EMPTY_ {
|
|
if err := jsa.addStreamNameToTemplate(cfg.Template, cfg.Name); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
mset, err := a.addStream(&cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if !fcfg.Created.IsZero() {
|
|
mset.setCreatedTime(fcfg.Created)
|
|
}
|
|
|
|
// Now do consumers.
|
|
odir := path.Join(ndir, consumerDir)
|
|
ofis, _ := ioutil.ReadDir(odir)
|
|
for _, ofi := range ofis {
|
|
metafile := path.Join(odir, ofi.Name(), JetStreamMetaFile)
|
|
metasum := path.Join(odir, ofi.Name(), JetStreamMetaFileSum)
|
|
if _, err := os.Stat(metafile); os.IsNotExist(err) {
|
|
mset.stop(true, false)
|
|
return nil, fmt.Errorf("error restoring consumer [%q]: %v", ofi.Name(), err)
|
|
}
|
|
buf, err := ioutil.ReadFile(metafile)
|
|
if err != nil {
|
|
mset.stop(true, false)
|
|
return nil, fmt.Errorf("error restoring consumer [%q]: %v", ofi.Name(), err)
|
|
}
|
|
if _, err := os.Stat(metasum); os.IsNotExist(err) {
|
|
mset.stop(true, false)
|
|
return nil, fmt.Errorf("error restoring consumer [%q]: %v", ofi.Name(), err)
|
|
}
|
|
var cfg FileConsumerInfo
|
|
if err := json.Unmarshal(buf, &cfg); err != nil {
|
|
mset.stop(true, false)
|
|
return nil, fmt.Errorf("error restoring consumer [%q]: %v", ofi.Name(), err)
|
|
}
|
|
isEphemeral := !isDurableConsumer(&cfg.ConsumerConfig)
|
|
if isEphemeral {
|
|
// This is an ephermal consumer and this could fail on restart until
|
|
// the consumer can reconnect. We will create it as a durable and switch it.
|
|
cfg.ConsumerConfig.Durable = ofi.Name()
|
|
}
|
|
obs, err := mset.addConsumer(&cfg.ConsumerConfig)
|
|
if err != nil {
|
|
mset.stop(true, false)
|
|
return nil, fmt.Errorf("error restoring consumer [%q]: %v", ofi.Name(), err)
|
|
}
|
|
if isEphemeral {
|
|
obs.switchToEphemeral()
|
|
}
|
|
if !cfg.Created.IsZero() {
|
|
obs.setCreatedTime(cfg.Created)
|
|
}
|
|
obs.mu.Lock()
|
|
err = obs.readStoredState()
|
|
obs.mu.Unlock()
|
|
if err != nil {
|
|
mset.stop(true, false)
|
|
return nil, fmt.Errorf("error restoring consumer [%q]: %v", ofi.Name(), err)
|
|
}
|
|
}
|
|
return mset, nil
|
|
}
|