mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
1124 lines
28 KiB
Go
1124 lines
28 KiB
Go
// Copyright 2019-2020 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package server
|
|
|
|
import (
|
|
"archive/tar"
|
|
"compress/gzip"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"math"
|
|
"os"
|
|
"path"
|
|
"path/filepath"
|
|
"reflect"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/nats-io/nuid"
|
|
)
|
|
|
|
// StreamConfig will determine the name, subjects and retention policy
|
|
// for a given stream. If subjects is empty the name will be used.
|
|
type StreamConfig struct {
|
|
Name string `json:"name"`
|
|
Subjects []string `json:"subjects,omitempty"`
|
|
Retention RetentionPolicy `json:"retention"`
|
|
MaxConsumers int `json:"max_consumers"`
|
|
MaxMsgs int64 `json:"max_msgs"`
|
|
MaxBytes int64 `json:"max_bytes"`
|
|
Discard DiscardPolicy `json:"discard"`
|
|
MaxAge time.Duration `json:"max_age"`
|
|
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
|
|
Storage StorageType `json:"storage"`
|
|
Replicas int `json:"num_replicas"`
|
|
NoAck bool `json:"no_ack,omitempty"`
|
|
Template string `json:"template_owner,omitempty"`
|
|
}
|
|
|
|
// PubAck is the detail you get back from a publish to a stream that was successful.
|
|
// e.g. +OK {"stream": "Orders", "seq": 22}
|
|
type PubAck struct {
|
|
Stream string `json:"stream"`
|
|
Seq uint64 `json:"seq"`
|
|
}
|
|
|
|
// StreamInfo shows config and current state for this stream.
|
|
type StreamInfo struct {
|
|
Config StreamConfig `json:"config"`
|
|
Created time.Time `json:"created"`
|
|
State StreamState `json:"state"`
|
|
}
|
|
|
|
// Stream is a jetstream stream of messages. When we receive a message internally destined
|
|
// for a Stream we will direct link from the client to this Stream structure.
|
|
type Stream struct {
|
|
mu sync.RWMutex
|
|
sg *sync.Cond
|
|
sgw int
|
|
jsa *jsAccount
|
|
client *client
|
|
sid int
|
|
pubAck []byte
|
|
sendq chan *jsPubMsg
|
|
store StreamStore
|
|
consumers map[string]*Consumer
|
|
config StreamConfig
|
|
created time.Time
|
|
}
|
|
|
|
const (
|
|
StreamDefaultReplicas = 1
|
|
StreamMaxReplicas = 8
|
|
)
|
|
|
|
// AddStream adds a stream for the given account.
|
|
func (a *Account) AddStream(config *StreamConfig) (*Stream, error) {
|
|
return a.AddStreamWithStore(config, nil)
|
|
}
|
|
|
|
// AddStreamWithStore adds a stream for the given account with custome store config options.
|
|
func (a *Account) AddStreamWithStore(config *StreamConfig, fsConfig *FileStoreConfig) (*Stream, error) {
|
|
s, jsa, err := a.checkForJetStream()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Sensible defaults.
|
|
cfg, err := checkStreamCfg(config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
jsa.mu.Lock()
|
|
if mset, ok := jsa.streams[cfg.Name]; ok {
|
|
jsa.mu.Unlock()
|
|
// Check to see if configs are same.
|
|
ocfg := mset.Config()
|
|
if reflect.DeepEqual(ocfg, cfg) {
|
|
return mset, nil
|
|
} else {
|
|
return nil, fmt.Errorf("stream name already in use")
|
|
}
|
|
}
|
|
// Check for limits.
|
|
if err := jsa.checkLimits(&cfg); err != nil {
|
|
jsa.mu.Unlock()
|
|
return nil, err
|
|
}
|
|
// Check for template ownership if present.
|
|
if cfg.Template != _EMPTY_ && jsa.account != nil {
|
|
if !jsa.checkTemplateOwnership(cfg.Template, cfg.Name) {
|
|
jsa.mu.Unlock()
|
|
return nil, fmt.Errorf("stream not owned by template")
|
|
}
|
|
}
|
|
|
|
// Check for overlapping subjects. These are not allowed for now.
|
|
if jsa.subjectsOverlap(cfg.Subjects) {
|
|
jsa.mu.Unlock()
|
|
return nil, fmt.Errorf("subjects overlap with an existing stream")
|
|
}
|
|
|
|
// Setup the internal client.
|
|
c := s.createInternalJetStreamClient()
|
|
mset := &Stream{jsa: jsa, config: cfg, client: c, consumers: make(map[string]*Consumer)}
|
|
mset.sg = sync.NewCond(&mset.mu)
|
|
|
|
jsa.streams[cfg.Name] = mset
|
|
storeDir := path.Join(jsa.storeDir, streamsDir, cfg.Name)
|
|
jsa.mu.Unlock()
|
|
|
|
// Bind to the account.
|
|
c.registerWithAccount(a)
|
|
|
|
// Create the appropriate storage
|
|
fsCfg := fsConfig
|
|
if fsCfg == nil {
|
|
fsCfg = &FileStoreConfig{}
|
|
}
|
|
fsCfg.StoreDir = storeDir
|
|
if err := mset.setupStore(fsCfg); err != nil {
|
|
mset.Delete()
|
|
return nil, err
|
|
}
|
|
// Setup our internal send go routine.
|
|
mset.setupSendCapabilities()
|
|
|
|
// Setup subscriptions
|
|
if err := mset.subscribeToStream(); err != nil {
|
|
mset.Delete()
|
|
return nil, err
|
|
}
|
|
|
|
// Create our pubAck here. This will be reused and for +OK will contain JSON
|
|
// for stream name and sequence.
|
|
longestSeq := strconv.FormatUint(math.MaxUint64, 10)
|
|
lpubAck := len(OK) + len(cfg.Name) + len("{\"stream\": ,\"seq\": }") + len(longestSeq)
|
|
mset.pubAck = make([]byte, 0, lpubAck)
|
|
mset.pubAck = append(mset.pubAck, OK...)
|
|
mset.pubAck = append(mset.pubAck, fmt.Sprintf(" {\"stream\": %q, \"seq\": ", cfg.Name)...)
|
|
|
|
mset.sendCreateAdvisory()
|
|
|
|
return mset, nil
|
|
}
|
|
|
|
func (mset *Stream) sendCreateAdvisory() {
|
|
mset.mu.Lock()
|
|
name := mset.config.Name
|
|
template := mset.config.Template
|
|
sendq := mset.sendq
|
|
mset.mu.Unlock()
|
|
|
|
if sendq == nil {
|
|
return
|
|
}
|
|
|
|
// finally send an event that this stream was created
|
|
m := JSStreamActionAdvisory{
|
|
TypedEvent: TypedEvent{
|
|
Type: JSStreamActionAdvisoryType,
|
|
ID: nuid.Next(),
|
|
Time: time.Now().UTC(),
|
|
},
|
|
Stream: name,
|
|
Action: CreateEvent,
|
|
Template: template,
|
|
}
|
|
|
|
j, err := json.MarshalIndent(m, "", " ")
|
|
if err == nil {
|
|
subj := JSAdvisoryStreamCreatedPre + "." + name
|
|
sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0}
|
|
}
|
|
}
|
|
|
|
func (mset *Stream) sendDeleteAdvisoryLocked() {
|
|
if mset.sendq == nil {
|
|
return
|
|
}
|
|
|
|
m := JSStreamActionAdvisory{
|
|
TypedEvent: TypedEvent{
|
|
Type: JSStreamActionAdvisoryType,
|
|
ID: nuid.Next(),
|
|
Time: time.Now().UTC(),
|
|
},
|
|
Stream: mset.config.Name,
|
|
Action: DeleteEvent,
|
|
Template: mset.config.Template,
|
|
}
|
|
|
|
j, err := json.MarshalIndent(m, "", " ")
|
|
if err == nil {
|
|
subj := JSAdvisoryStreamDeletedPre + "." + mset.config.Name
|
|
mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0}
|
|
}
|
|
}
|
|
|
|
func (mset *Stream) sendUpdateAdvisoryLocked() {
|
|
if mset.sendq == nil {
|
|
return
|
|
}
|
|
|
|
m := JSStreamActionAdvisory{
|
|
TypedEvent: TypedEvent{
|
|
Type: JSStreamActionAdvisoryType,
|
|
ID: nuid.Next(),
|
|
Time: time.Now().UTC(),
|
|
},
|
|
Stream: mset.config.Name,
|
|
Action: ModifyEvent,
|
|
}
|
|
|
|
j, err := json.MarshalIndent(m, "", " ")
|
|
if err == nil {
|
|
subj := JSAdvisoryStreamUpdatedPre + "." + mset.config.Name
|
|
mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0}
|
|
}
|
|
}
|
|
|
|
// Created returns created time.
|
|
func (mset *Stream) Created() time.Time {
|
|
mset.mu.RLock()
|
|
created := mset.created
|
|
mset.mu.RUnlock()
|
|
return created
|
|
}
|
|
|
|
// Internal to allow creation time to be restored.
|
|
func (mset *Stream) setCreated(created time.Time) {
|
|
mset.mu.Lock()
|
|
mset.created = created
|
|
mset.mu.Unlock()
|
|
}
|
|
|
|
// Check to see if these subjects overlap with existing subjects.
|
|
// Lock should be held.
|
|
func (jsa *jsAccount) subjectsOverlap(subjects []string) bool {
|
|
for _, mset := range jsa.streams {
|
|
for _, subj := range mset.config.Subjects {
|
|
for _, tsubj := range subjects {
|
|
if SubjectsCollide(tsubj, subj) {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func checkStreamCfg(config *StreamConfig) (StreamConfig, error) {
|
|
if config == nil {
|
|
return StreamConfig{}, fmt.Errorf("stream configuration invalid")
|
|
}
|
|
if !isValidName(config.Name) {
|
|
return StreamConfig{}, fmt.Errorf("stream name is required and can not contain '.', '*', '>'")
|
|
}
|
|
if len(config.Name) > JSMaxNameLen {
|
|
return StreamConfig{}, fmt.Errorf("stream name is too long, maximum allowed is %d", JSMaxNameLen)
|
|
}
|
|
cfg := *config
|
|
|
|
// TODO(dlc) - check config for conflicts, e.g replicas > 1 in single server mode.
|
|
if cfg.Replicas == 0 {
|
|
cfg.Replicas = 1
|
|
}
|
|
// TODO(dlc) - Remove when clustering happens.
|
|
if cfg.Replicas > 1 {
|
|
return StreamConfig{}, fmt.Errorf("maximum replicas is 1")
|
|
}
|
|
if cfg.Replicas > StreamMaxReplicas {
|
|
return cfg, fmt.Errorf("maximum replicas is %d", StreamMaxReplicas)
|
|
}
|
|
if cfg.MaxMsgs == 0 {
|
|
cfg.MaxMsgs = -1
|
|
}
|
|
if cfg.MaxBytes == 0 {
|
|
cfg.MaxBytes = -1
|
|
}
|
|
if cfg.MaxMsgSize == 0 {
|
|
cfg.MaxMsgSize = -1
|
|
}
|
|
if cfg.MaxConsumers == 0 {
|
|
cfg.MaxConsumers = -1
|
|
}
|
|
if len(cfg.Subjects) == 0 {
|
|
cfg.Subjects = append(cfg.Subjects, cfg.Name)
|
|
} else {
|
|
// We can allow overlaps, but don't allow direct duplicates.
|
|
dset := make(map[string]struct{}, len(cfg.Subjects))
|
|
for _, subj := range cfg.Subjects {
|
|
if _, ok := dset[subj]; ok {
|
|
return StreamConfig{}, fmt.Errorf("duplicate subjects detected")
|
|
}
|
|
// Also check to make sure we do not overlap with our $JS API subjects.
|
|
if subjectIsSubsetMatch(subj, "$JS.API.>") {
|
|
return StreamConfig{}, fmt.Errorf("subjects overlap with jetstream api")
|
|
}
|
|
|
|
dset[subj] = struct{}{}
|
|
}
|
|
}
|
|
return cfg, nil
|
|
}
|
|
|
|
// Config returns the stream's configuration.
|
|
func (mset *Stream) Config() StreamConfig {
|
|
mset.mu.Lock()
|
|
defer mset.mu.Unlock()
|
|
return mset.config
|
|
}
|
|
|
|
// Delete deletes a stream from the owning account.
|
|
func (mset *Stream) Delete() error {
|
|
mset.mu.Lock()
|
|
jsa := mset.jsa
|
|
mset.mu.Unlock()
|
|
if jsa == nil {
|
|
return fmt.Errorf("jetstream not enabled for account")
|
|
}
|
|
jsa.mu.Lock()
|
|
delete(jsa.streams, mset.config.Name)
|
|
jsa.mu.Unlock()
|
|
|
|
return mset.delete()
|
|
}
|
|
|
|
// Update will allow certain configuration properties of an existing stream to be updated.
|
|
func (mset *Stream) Update(config *StreamConfig) error {
|
|
cfg, err := checkStreamCfg(config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
o_cfg := mset.Config()
|
|
|
|
// Name must match.
|
|
if cfg.Name != o_cfg.Name {
|
|
return fmt.Errorf("stream configuration name must match original")
|
|
}
|
|
// Can't change MaxConsumers for now.
|
|
if cfg.MaxConsumers != o_cfg.MaxConsumers {
|
|
return fmt.Errorf("stream configuration update can not change MaxConsumers")
|
|
}
|
|
// Can't change storage types.
|
|
if cfg.Storage != o_cfg.Storage {
|
|
return fmt.Errorf("stream configuration update can not change storage type")
|
|
}
|
|
// Can't change retention.
|
|
if cfg.Retention != o_cfg.Retention {
|
|
return fmt.Errorf("stream configuration update can not change retention policy")
|
|
}
|
|
// Can not have a template owner for now.
|
|
if o_cfg.Template != "" {
|
|
return fmt.Errorf("stream configuration update not allowed on template owned stream")
|
|
}
|
|
if cfg.Template != "" {
|
|
return fmt.Errorf("stream configuration update can not be owned by a template")
|
|
}
|
|
|
|
// Check limits.
|
|
mset.mu.Lock()
|
|
jsa := mset.jsa
|
|
mset.mu.Unlock()
|
|
|
|
jsa.mu.Lock()
|
|
if cfg.MaxConsumers > 0 && cfg.MaxConsumers > jsa.limits.MaxConsumers {
|
|
jsa.mu.Unlock()
|
|
return fmt.Errorf("stream configuration maximum consumers exceeds account limit")
|
|
}
|
|
if cfg.MaxBytes > 0 && cfg.MaxBytes > o_cfg.MaxBytes {
|
|
if err := jsa.checkBytesLimits(cfg.MaxBytes*int64(cfg.Replicas), cfg.Storage); err != nil {
|
|
jsa.mu.Unlock()
|
|
return err
|
|
}
|
|
}
|
|
jsa.mu.Unlock()
|
|
|
|
// Now check for subject interest differences.
|
|
current := make(map[string]struct{}, len(o_cfg.Subjects))
|
|
for _, s := range o_cfg.Subjects {
|
|
current[s] = struct{}{}
|
|
}
|
|
// Update config with new values. The store update will enforce any stricter limits.
|
|
mset.mu.Lock()
|
|
defer mset.mu.Unlock()
|
|
|
|
// Now walk new subjects. All of these need to be added, but we will check
|
|
// the originals first, since if it is in there we can skip, already added.
|
|
for _, s := range cfg.Subjects {
|
|
if _, ok := current[s]; !ok {
|
|
if _, err := mset.subscribeInternal(s, mset.processInboundJetStreamMsg); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
delete(current, s)
|
|
}
|
|
// What is left in current needs to be deleted.
|
|
for s := range current {
|
|
if err := mset.unsubscribeInternal(s); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Now update config and store's version of our config.
|
|
mset.config = cfg
|
|
mset.store.UpdateConfig(&cfg)
|
|
|
|
mset.sendUpdateAdvisoryLocked()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Purge will remove all messages from the stream and underlying store.
|
|
func (mset *Stream) Purge() uint64 {
|
|
mset.mu.Lock()
|
|
if mset.client == nil {
|
|
mset.mu.Unlock()
|
|
return 0
|
|
}
|
|
purged := mset.store.Purge()
|
|
stats := mset.store.State()
|
|
var obs []*Consumer
|
|
for _, o := range mset.consumers {
|
|
obs = append(obs, o)
|
|
}
|
|
mset.mu.Unlock()
|
|
for _, o := range obs {
|
|
o.purge(stats.FirstSeq)
|
|
}
|
|
return purged
|
|
}
|
|
|
|
// RemoveMsg will remove a message from a stream.
|
|
// FIXME(dlc) - Should pick one and be consistent.
|
|
func (mset *Stream) RemoveMsg(seq uint64) (bool, error) {
|
|
return mset.store.RemoveMsg(seq)
|
|
}
|
|
|
|
// DeleteMsg will remove a message from a stream.
|
|
func (mset *Stream) DeleteMsg(seq uint64) (bool, error) {
|
|
return mset.store.RemoveMsg(seq)
|
|
}
|
|
|
|
// EraseMsg will securely remove a message and rewrite the data with random data.
|
|
func (mset *Stream) EraseMsg(seq uint64) (bool, error) {
|
|
return mset.store.EraseMsg(seq)
|
|
}
|
|
|
|
// Will create internal subscriptions for the msgSet.
|
|
// Lock should be held.
|
|
func (mset *Stream) subscribeToStream() error {
|
|
for _, subject := range mset.config.Subjects {
|
|
if _, err := mset.subscribeInternal(subject, mset.processInboundJetStreamMsg); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// FIXME(dlc) - This only works in single server mode for the moment. Need to fix as we expand to clusters.
|
|
// Lock should be held.
|
|
func (mset *Stream) subscribeInternal(subject string, cb msgHandler) (*subscription, error) {
|
|
c := mset.client
|
|
if c == nil {
|
|
return nil, fmt.Errorf("invalid stream")
|
|
}
|
|
if !c.srv.eventsEnabled() {
|
|
return nil, ErrNoSysAccount
|
|
}
|
|
if cb == nil {
|
|
return nil, fmt.Errorf("undefined message handler")
|
|
}
|
|
|
|
mset.sid++
|
|
|
|
// Now create the subscription
|
|
sub, err := c.processSub([]byte(subject+" "+strconv.Itoa(mset.sid)), false)
|
|
if err != nil {
|
|
return nil, err
|
|
} else if sub == nil {
|
|
return nil, fmt.Errorf("malformed subject")
|
|
}
|
|
c.mu.Lock()
|
|
sub.icb = cb
|
|
c.mu.Unlock()
|
|
return sub, nil
|
|
}
|
|
|
|
// Helper for unlocked stream.
|
|
func (mset *Stream) subscribeInternalUnlocked(subject string, cb msgHandler) (*subscription, error) {
|
|
mset.mu.Lock()
|
|
defer mset.mu.Unlock()
|
|
return mset.subscribeInternal(subject, cb)
|
|
}
|
|
|
|
// This will unsubscribe us from the exact subject given.
|
|
// We do not currently track the subs so do not have the sid.
|
|
// This should be called only on an update.
|
|
// Lock should be held.
|
|
func (mset *Stream) unsubscribeInternal(subject string) error {
|
|
c := mset.client
|
|
if c == nil {
|
|
return fmt.Errorf("invalid stream")
|
|
}
|
|
if !c.srv.eventsEnabled() {
|
|
return ErrNoSysAccount
|
|
}
|
|
|
|
var sid []byte
|
|
|
|
c.mu.Lock()
|
|
for _, sub := range c.subs {
|
|
if subject == string(sub.subject) {
|
|
sid = sub.sid
|
|
break
|
|
}
|
|
}
|
|
c.mu.Unlock()
|
|
|
|
if sid != nil {
|
|
return c.processUnsub(sid)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Lock should be held.
|
|
func (mset *Stream) unsubscribe(sub *subscription) {
|
|
if sub == nil || mset.client == nil {
|
|
return
|
|
}
|
|
mset.client.unsubscribe(mset.client.acc, sub, true, true)
|
|
}
|
|
|
|
func (mset *Stream) unsubscribeUnlocked(sub *subscription) {
|
|
mset.mu.Lock()
|
|
mset.unsubscribe(sub)
|
|
mset.mu.Unlock()
|
|
}
|
|
|
|
func (mset *Stream) setupStore(fsCfg *FileStoreConfig) error {
|
|
mset.mu.Lock()
|
|
defer mset.mu.Unlock()
|
|
|
|
mset.created = time.Now().UTC()
|
|
|
|
switch mset.config.Storage {
|
|
case MemoryStorage:
|
|
ms, err := newMemStore(&mset.config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
mset.store = ms
|
|
case FileStorage:
|
|
fs, err := newFileStoreWithCreated(*fsCfg, mset.config, mset.created)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
mset.store = fs
|
|
}
|
|
jsa, st := mset.jsa, mset.config.Storage
|
|
mset.store.StorageBytesUpdate(func(delta int64) { jsa.updateUsage(st, delta) })
|
|
return nil
|
|
}
|
|
|
|
// processInboundJetStreamMsg handles processing messages bound for a stream.
|
|
func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subject, reply string, msg []byte) {
|
|
mset.mu.Lock()
|
|
store := mset.store
|
|
c := mset.client
|
|
var accName string
|
|
if c != nil && c.acc != nil {
|
|
accName = c.acc.Name
|
|
}
|
|
doAck := !mset.config.NoAck
|
|
pubAck := mset.pubAck
|
|
jsa := mset.jsa
|
|
stype := mset.config.Storage
|
|
name := mset.config.Name
|
|
maxMsgSize := int(mset.config.MaxMsgSize)
|
|
numConsumers := len(mset.consumers)
|
|
mset.mu.Unlock()
|
|
|
|
if c == nil {
|
|
return
|
|
}
|
|
|
|
// Response Ack.
|
|
var (
|
|
response []byte
|
|
seq uint64
|
|
err error
|
|
ts int64
|
|
)
|
|
|
|
// Header support.
|
|
var hdr []byte
|
|
|
|
// Check to see if we are over the account limit.
|
|
if maxMsgSize >= 0 && len(msg) > maxMsgSize {
|
|
response = []byte("-ERR 'message size exceeds maximum allowed'")
|
|
} else {
|
|
// Headers.
|
|
if pc != nil && pc.pa.hdr > 0 {
|
|
hdr = msg[:pc.pa.hdr]
|
|
msg = msg[pc.pa.hdr:]
|
|
}
|
|
seq, ts, err = store.StoreMsg(subject, hdr, msg)
|
|
if err != nil {
|
|
if err != ErrStoreClosed {
|
|
c.Errorf("JetStream failed to store a msg on account: %q stream: %q - %v", accName, name, err)
|
|
}
|
|
response = []byte(fmt.Sprintf("-ERR '%v'", err))
|
|
} else if jsa.limitsExceeded(stype) {
|
|
c.Warnf("JetStream resource limits exceeded for account: %q", accName)
|
|
response = []byte("-ERR 'resource limits exceeded for account'")
|
|
store.RemoveMsg(seq)
|
|
seq = 0
|
|
} else if err == nil && doAck && len(reply) > 0 {
|
|
response = append(pubAck, strconv.FormatUint(seq, 10)...)
|
|
response = append(response, '}')
|
|
}
|
|
}
|
|
|
|
// Send response here.
|
|
if doAck && len(reply) > 0 {
|
|
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
|
|
}
|
|
|
|
if err == nil && numConsumers > 0 && seq > 0 {
|
|
var needSignal bool
|
|
mset.mu.Lock()
|
|
for _, o := range mset.consumers {
|
|
if !o.deliverCurrentMsg(subject, hdr, msg, seq, ts) {
|
|
needSignal = true
|
|
}
|
|
}
|
|
mset.mu.Unlock()
|
|
|
|
if needSignal {
|
|
mset.signalConsumers()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Will signal all waiting consumers.
|
|
func (mset *Stream) signalConsumers() {
|
|
mset.mu.Lock()
|
|
if mset.sgw > 0 {
|
|
mset.sg.Broadcast()
|
|
}
|
|
mset.mu.Unlock()
|
|
}
|
|
|
|
// Internal message for use by jetstream subsystem.
|
|
type jsPubMsg struct {
|
|
subj string
|
|
dsubj string
|
|
reply string
|
|
hdr []byte
|
|
msg []byte
|
|
o *Consumer
|
|
seq uint64
|
|
}
|
|
|
|
// StoredMsg is for raw access to messages in a stream.
|
|
type StoredMsg struct {
|
|
Subject string `json:"subject"`
|
|
Sequence uint64 `json:"seq"`
|
|
Header []byte `json:"hdrs,omitempty"`
|
|
Data []byte `json:"data,omitempty"`
|
|
Time time.Time `json:"time"`
|
|
}
|
|
|
|
// TODO(dlc) - Maybe look at onering instead of chan - https://github.com/pltr/onering
|
|
const msetSendQSize = 1024
|
|
|
|
// This is similar to system semantics but did not want to overload the single system sendq,
|
|
// or require system account when doing simple setup with jetstream.
|
|
func (mset *Stream) setupSendCapabilities() {
|
|
mset.mu.Lock()
|
|
defer mset.mu.Unlock()
|
|
if mset.sendq != nil {
|
|
return
|
|
}
|
|
mset.sendq = make(chan *jsPubMsg, msetSendQSize)
|
|
go mset.internalSendLoop()
|
|
}
|
|
|
|
// Name returns the stream name.
|
|
func (mset *Stream) Name() string {
|
|
mset.mu.Lock()
|
|
defer mset.mu.Unlock()
|
|
return mset.config.Name
|
|
}
|
|
|
|
func (mset *Stream) internalSendLoop() {
|
|
mset.mu.Lock()
|
|
c := mset.client
|
|
if c == nil {
|
|
mset.mu.Unlock()
|
|
return
|
|
}
|
|
s := c.srv
|
|
sendq := mset.sendq
|
|
name := mset.config.Name
|
|
mset.mu.Unlock()
|
|
|
|
// Warn when internal send queue is backed up past 75%
|
|
warnThresh := 3 * msetSendQSize / 4
|
|
warnFreq := time.Second
|
|
last := time.Now().Add(-warnFreq)
|
|
|
|
for {
|
|
if len(sendq) > warnThresh && time.Since(last) >= warnFreq {
|
|
s.Warnf("Jetstream internal send queue > 75%% for account: %q stream: %q", c.acc.Name, name)
|
|
last = time.Now()
|
|
}
|
|
select {
|
|
case pm := <-sendq:
|
|
if pm == nil {
|
|
return
|
|
}
|
|
c.pa.subject = []byte(pm.subj)
|
|
c.pa.deliver = []byte(pm.dsubj)
|
|
c.pa.size = len(pm.msg) + len(pm.hdr)
|
|
c.pa.szb = []byte(strconv.Itoa(c.pa.size))
|
|
c.pa.reply = []byte(pm.reply)
|
|
|
|
var msg []byte
|
|
if len(pm.hdr) > 0 {
|
|
c.pa.hdr = len(pm.hdr)
|
|
c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr))
|
|
msg = append(pm.hdr, pm.msg...)
|
|
msg = append(msg, _CRLF_...)
|
|
} else {
|
|
c.pa.hdr = -1
|
|
c.pa.hdb = nil
|
|
msg = append(pm.msg, _CRLF_...)
|
|
}
|
|
didDeliver := c.processInboundClientMsg(msg)
|
|
c.pa.szb = nil
|
|
c.flushClients(0)
|
|
// Check to see if this is a delivery for an observable and
|
|
// we failed to deliver the message. If so alert the observable.
|
|
if pm.o != nil && pm.seq > 0 && !didDeliver {
|
|
pm.o.didNotDeliver(pm.seq)
|
|
}
|
|
case <-s.quitCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Internal function to delete a stream.
|
|
func (mset *Stream) delete() error {
|
|
return mset.stop(true)
|
|
}
|
|
|
|
// Internal function to stop or delete the stream.
|
|
func (mset *Stream) stop(delete bool) error {
|
|
mset.mu.Lock()
|
|
|
|
if delete {
|
|
mset.sendDeleteAdvisoryLocked()
|
|
}
|
|
|
|
if mset.sendq != nil {
|
|
mset.sendq <- nil
|
|
}
|
|
|
|
c := mset.client
|
|
mset.client = nil
|
|
if c == nil {
|
|
mset.mu.Unlock()
|
|
return nil
|
|
}
|
|
var obs []*Consumer
|
|
for _, o := range mset.consumers {
|
|
obs = append(obs, o)
|
|
}
|
|
mset.consumers = nil
|
|
mset.mu.Unlock()
|
|
|
|
c.closeConnection(ClientClosed)
|
|
|
|
// Clean up consumers.
|
|
for _, o := range obs {
|
|
// Second flag says do not broadcast to signal.
|
|
if err := o.stop(delete, false, delete); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Make sure we release them all here at once.
|
|
mset.mu.Lock()
|
|
mset.sg.Broadcast()
|
|
mset.mu.Unlock()
|
|
|
|
if mset.store == nil {
|
|
return nil
|
|
}
|
|
|
|
if delete {
|
|
if err := mset.store.Delete(); err != nil {
|
|
return err
|
|
}
|
|
} else if err := mset.store.Stop(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (mset *Stream) GetMsg(seq uint64) (*StoredMsg, error) {
|
|
subj, hdr, msg, ts, err := mset.store.LoadMsg(seq)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
sm := &StoredMsg{
|
|
Subject: subj,
|
|
Sequence: seq,
|
|
Header: hdr,
|
|
Data: msg,
|
|
Time: time.Unix(0, ts).UTC(),
|
|
}
|
|
return sm, nil
|
|
}
|
|
|
|
// Consunmers will return all the current consumers for this stream.
|
|
func (mset *Stream) Consumers() []*Consumer {
|
|
mset.mu.Lock()
|
|
defer mset.mu.Unlock()
|
|
|
|
var obs []*Consumer
|
|
for _, o := range mset.consumers {
|
|
obs = append(obs, o)
|
|
}
|
|
return obs
|
|
}
|
|
|
|
// NumConsumers reports on number of active observables for this stream.
|
|
func (mset *Stream) NumConsumers() int {
|
|
mset.mu.Lock()
|
|
defer mset.mu.Unlock()
|
|
return len(mset.consumers)
|
|
}
|
|
|
|
// LookupConsumer will retrieve a consumer by name.
|
|
func (mset *Stream) LookupConsumer(name string) *Consumer {
|
|
mset.mu.Lock()
|
|
defer mset.mu.Unlock()
|
|
|
|
for _, o := range mset.consumers {
|
|
if o.name == name {
|
|
return o
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// State will return the current state for this stream.
|
|
func (mset *Stream) State() StreamState {
|
|
mset.mu.Lock()
|
|
c := mset.client
|
|
mset.mu.Unlock()
|
|
if c == nil {
|
|
return StreamState{}
|
|
}
|
|
// Currently rely on store.
|
|
// TODO(dlc) - This will need to change with clusters.
|
|
return mset.store.State()
|
|
}
|
|
|
|
// waitForMsgs will have the stream wait for the arrival of new messages.
|
|
func (mset *Stream) waitForMsgs() {
|
|
mset.mu.Lock()
|
|
|
|
if mset.client == nil {
|
|
mset.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
mset.sgw++
|
|
mset.sg.Wait()
|
|
mset.sgw--
|
|
|
|
mset.mu.Unlock()
|
|
}
|
|
|
|
// Determines if the new proposed partition is unique amongst all observables.
|
|
// Lock should be held.
|
|
func (mset *Stream) partitionUnique(partition string) bool {
|
|
for _, o := range mset.consumers {
|
|
if o.config.FilterSubject == _EMPTY_ {
|
|
return false
|
|
}
|
|
if subjectIsSubsetMatch(partition, o.config.FilterSubject) {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// ackMsg is called into from an observable when we have a WorkQueue or Interest retention policy.
|
|
func (mset *Stream) ackMsg(obs *Consumer, seq uint64) {
|
|
switch mset.config.Retention {
|
|
case LimitsPolicy:
|
|
return
|
|
case WorkQueuePolicy:
|
|
mset.store.RemoveMsg(seq)
|
|
case InterestPolicy:
|
|
var needAck bool
|
|
mset.mu.Lock()
|
|
for _, o := range mset.consumers {
|
|
if o != obs && o.needAck(seq) {
|
|
needAck = true
|
|
break
|
|
}
|
|
}
|
|
mset.mu.Unlock()
|
|
if !needAck {
|
|
mset.store.RemoveMsg(seq)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Snapshot creates a snapshot for the stream and possibly consumers.
|
|
func (mset *Stream) Snapshot(deadline time.Duration, checkMsgs, includeConsumers bool) (*SnapshotResult, error) {
|
|
mset.mu.Lock()
|
|
if mset.client == nil || mset.store == nil {
|
|
mset.mu.Unlock()
|
|
return nil, fmt.Errorf("invalid stream")
|
|
}
|
|
store := mset.store
|
|
var obs []*Consumer
|
|
for _, o := range mset.consumers {
|
|
obs = append(obs, o)
|
|
}
|
|
mset.mu.Unlock()
|
|
|
|
// Make sure to sync their state.
|
|
for _, o := range obs {
|
|
o.writeState()
|
|
}
|
|
|
|
return store.Snapshot(deadline, checkMsgs, includeConsumers)
|
|
}
|
|
|
|
const snapsDir = "__snapshots__"
|
|
|
|
// RestoreStream will restore a stream from a snapshot.
|
|
func (a *Account) RestoreStream(stream string, r io.Reader) (*Stream, error) {
|
|
_, jsa, err := a.checkForJetStream()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
sd := path.Join(jsa.storeDir, snapsDir)
|
|
defer os.RemoveAll(sd)
|
|
|
|
if _, err := os.Stat(sd); os.IsNotExist(err) {
|
|
if err := os.MkdirAll(sd, 0755); err != nil {
|
|
return nil, fmt.Errorf("could not create snapshots directory - %v", err)
|
|
}
|
|
}
|
|
sdir, err := ioutil.TempDir(sd, "snap-")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if _, err := os.Stat(sdir); os.IsNotExist(err) {
|
|
if err := os.MkdirAll(sdir, 0755); err != nil {
|
|
return nil, fmt.Errorf("could not create snapshots directory - %v", err)
|
|
}
|
|
}
|
|
|
|
gzr, err := gzip.NewReader(r)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer gzr.Close()
|
|
tr := tar.NewReader(gzr)
|
|
|
|
for {
|
|
hdr, err := tr.Next()
|
|
if err == io.EOF {
|
|
break // End of snapshot
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
fpath := path.Join(sdir, filepath.Clean(hdr.Name))
|
|
pdir := filepath.Dir(fpath)
|
|
os.MkdirAll(pdir, 0750)
|
|
fd, err := os.OpenFile(fpath, os.O_CREATE|os.O_RDWR, 0600)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
_, err = io.Copy(fd, tr)
|
|
fd.Close()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// Check metadata
|
|
var cfg FileStreamInfo
|
|
b, err := ioutil.ReadFile(path.Join(sdir, JetStreamMetaFile))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := json.Unmarshal(b, &cfg); err != nil {
|
|
return nil, err
|
|
}
|
|
// See if names match
|
|
if cfg.Name != stream {
|
|
return nil, fmt.Errorf("stream name [%q] does not match snapshot stream [%q]", stream, cfg.Name)
|
|
}
|
|
|
|
// See if this stream already exists.
|
|
if _, err := a.LookupStream(cfg.Name); err == nil {
|
|
return nil, fmt.Errorf("stream [%q] already exists", cfg.Name)
|
|
}
|
|
// Move into the correct place here.
|
|
ndir := path.Join(jsa.storeDir, streamsDir, cfg.Name)
|
|
if err := os.Rename(sdir, ndir); err != nil {
|
|
return nil, err
|
|
}
|
|
if cfg.Template != _EMPTY_ {
|
|
if err := jsa.addStreamNameToTemplate(cfg.Template, cfg.Name); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
mset, err := a.AddStream(&cfg.StreamConfig)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if !cfg.Created.IsZero() {
|
|
mset.setCreated(cfg.Created)
|
|
}
|
|
|
|
// Now do consumers.
|
|
odir := path.Join(ndir, consumerDir)
|
|
ofis, _ := ioutil.ReadDir(odir)
|
|
for _, ofi := range ofis {
|
|
metafile := path.Join(odir, ofi.Name(), JetStreamMetaFile)
|
|
metasum := path.Join(odir, ofi.Name(), JetStreamMetaFileSum)
|
|
if _, err := os.Stat(metafile); os.IsNotExist(err) {
|
|
mset.Delete()
|
|
return nil, fmt.Errorf("error restoring consumer [%q]: %v", ofi.Name(), err)
|
|
}
|
|
buf, err := ioutil.ReadFile(metafile)
|
|
if err != nil {
|
|
mset.Delete()
|
|
return nil, fmt.Errorf("error restoring consumer [%q]: %v", ofi.Name(), err)
|
|
}
|
|
if _, err := os.Stat(metasum); os.IsNotExist(err) {
|
|
mset.Delete()
|
|
return nil, fmt.Errorf("error restoring consumer [%q]: %v", ofi.Name(), err)
|
|
}
|
|
var cfg FileConsumerInfo
|
|
if err := json.Unmarshal(buf, &cfg); err != nil {
|
|
mset.Delete()
|
|
return nil, fmt.Errorf("error restoring consumer [%q]: %v", ofi.Name(), err)
|
|
}
|
|
isEphemeral := !isDurableConsumer(&cfg.ConsumerConfig)
|
|
if isEphemeral {
|
|
// This is an ephermal consumer and this could fail on restart until
|
|
// the consumer can reconnect. We will create it as a durable and switch it.
|
|
cfg.ConsumerConfig.Durable = ofi.Name()
|
|
}
|
|
obs, err := mset.AddConsumer(&cfg.ConsumerConfig)
|
|
if err != nil {
|
|
mset.Delete()
|
|
return nil, fmt.Errorf("error restoring consumer [%q]: %v", ofi.Name(), err)
|
|
}
|
|
if isEphemeral {
|
|
obs.switchToEphemeral()
|
|
}
|
|
if !cfg.Created.IsZero() {
|
|
obs.setCreated(cfg.Created)
|
|
}
|
|
if err := obs.readStoredState(); err != nil {
|
|
mset.Delete()
|
|
return nil, fmt.Errorf("error restoring consumer [%q]: %v", ofi.Name(), err)
|
|
}
|
|
}
|
|
return mset, nil
|
|
}
|