Files
nats-server/server/stream.go
Derek Collison cadd39a01c Major rewrite for NATS JetStream API
API made more consistent. Noun followed by verb.
Name arguments in request subejcts are always at the end now.
Remove enabled call, just use account info.
Getting a message directly from a stream is treated like an admin API and requires JSON request.
Deleting a message directly as well.
StreamList and ConsumerList now include details and support paging.
Streams and Consumers now contain a created field in their info.

Signed-off-by: Derek Collison <derek@nats.io>
2020-05-19 14:27:45 -07:00

805 lines
20 KiB
Go

// Copyright 2019-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"fmt"
"math"
"path"
"reflect"
"strconv"
"sync"
"time"
)
// StreamConfig will determine the name, subjects and retention policy
// for a given stream. If subjects is empty the name will be used.
type StreamConfig struct {
Name string `json:"name"`
Subjects []string `json:"subjects,omitempty"`
Retention RetentionPolicy `json:"retention"`
MaxConsumers int `json:"max_consumers"`
MaxMsgs int64 `json:"max_msgs"`
MaxBytes int64 `json:"max_bytes"`
Discard DiscardPolicy `json:"discard"`
MaxAge time.Duration `json:"max_age"`
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
Storage StorageType `json:"storage"`
Replicas int `json:"num_replicas"`
NoAck bool `json:"no_ack,omitempty"`
Template string `json:"template_owner,omitempty"`
}
// PubAck is the detail you get back from a publish to a stream that was successful.
// e.g. +OK {"stream": "Orders", "seq": 22}
type PubAck struct {
Stream string `json:"stream"`
Seq uint64 `json:"seq"`
}
// StreamInfo shows config and current state for this stream.
type StreamInfo struct {
Config StreamConfig `json:"config"`
Created time.Time `json:"created"`
State StreamState `json:"state"`
}
// Stream is a jetstream stream of messages. When we receive a message internally destined
// for a Stream we will direct link from the client to this Stream structure.
type Stream struct {
mu sync.RWMutex
sg *sync.Cond
sgw int
jsa *jsAccount
client *client
sid int
pubAck []byte
sendq chan *jsPubMsg
store StreamStore
consumers map[string]*Consumer
config StreamConfig
created time.Time
}
const (
StreamDefaultReplicas = 1
StreamMaxReplicas = 8
)
// AddStream adds a stream for the given account.
func (a *Account) AddStream(config *StreamConfig) (*Stream, error) {
return a.AddStreamWithStore(config, nil)
}
// AddStreamWithStore adds a stream for the given account with custome store config options.
func (a *Account) AddStreamWithStore(config *StreamConfig, fsConfig *FileStoreConfig) (*Stream, error) {
s, jsa, err := a.checkForJetStream()
if err != nil {
return nil, err
}
// Sensible defaults.
cfg, err := checkStreamCfg(config)
if err != nil {
return nil, err
}
jsa.mu.Lock()
if mset, ok := jsa.streams[cfg.Name]; ok {
jsa.mu.Unlock()
// Check to see if configs are same.
ocfg := mset.Config()
if reflect.DeepEqual(ocfg, cfg) {
return mset, nil
} else {
return nil, fmt.Errorf("stream name already in use")
}
}
// Check for limits.
if err := jsa.checkLimits(&cfg); err != nil {
jsa.mu.Unlock()
return nil, err
}
// Check for template ownership if present.
if cfg.Template != _EMPTY_ && jsa.account != nil {
if !jsa.checkTemplateOwnership(cfg.Template, cfg.Name) {
jsa.mu.Unlock()
return nil, fmt.Errorf("stream not owned by template")
}
}
// Check for overlapping subjects. These are not allowed for now.
if jsa.subjectsOverlap(cfg.Subjects) {
jsa.mu.Unlock()
return nil, fmt.Errorf("subjects overlap with an existing stream")
}
// Setup the internal client.
c := s.createInternalJetStreamClient()
mset := &Stream{jsa: jsa, config: cfg, client: c, consumers: make(map[string]*Consumer)}
mset.sg = sync.NewCond(&mset.mu)
jsa.streams[cfg.Name] = mset
storeDir := path.Join(jsa.storeDir, streamsDir, cfg.Name)
jsa.mu.Unlock()
// Bind to the account.
c.registerWithAccount(a)
// Create the appropriate storage
fsCfg := fsConfig
if fsCfg == nil {
fsCfg = &FileStoreConfig{}
}
fsCfg.StoreDir = storeDir
if err := mset.setupStore(fsCfg); err != nil {
mset.Delete()
return nil, err
}
// Setup our internal send go routine.
mset.setupSendCapabilities()
// Setup subscriptions
if err := mset.subscribeToStream(); err != nil {
mset.Delete()
return nil, err
}
// Create our pubAck here. This will be reused and for +OK will contain JSON
// for stream name and sequence.
longestSeq := strconv.FormatUint(math.MaxUint64, 10)
lpubAck := len(OK) + len(cfg.Name) + len("{\"stream\": ,\"seq\": }") + len(longestSeq)
mset.pubAck = make([]byte, 0, lpubAck)
mset.pubAck = append(mset.pubAck, OK...)
mset.pubAck = append(mset.pubAck, fmt.Sprintf(" {\"stream\": %q, \"seq\": ", cfg.Name)...)
return mset, nil
}
// Created returns created time.
func (mset *Stream) Created() time.Time {
mset.mu.RLock()
created := mset.created
mset.mu.RUnlock()
return created
}
// Internal to allow creation time to be restored.
func (mset *Stream) setCreated(created time.Time) {
mset.mu.Lock()
mset.created = created
mset.mu.Unlock()
}
// Check to see if these subjects overlap with existing subjects.
// Lock should be held.
func (jsa *jsAccount) subjectsOverlap(subjects []string) bool {
for _, mset := range jsa.streams {
for _, subj := range mset.config.Subjects {
for _, tsubj := range subjects {
if SubjectsCollide(tsubj, subj) {
return true
}
}
}
}
return false
}
func checkStreamCfg(config *StreamConfig) (StreamConfig, error) {
if config == nil {
return StreamConfig{}, fmt.Errorf("stream configuration invalid")
}
if !isValidName(config.Name) {
return StreamConfig{}, fmt.Errorf("stream name is required and can not contain '.', '*', '>'")
}
cfg := *config
// TODO(dlc) - check config for conflicts, e.g replicas > 1 in single server mode.
if cfg.Replicas == 0 {
cfg.Replicas = 1
}
// TODO(dlc) - Remove when clustering happens.
if cfg.Replicas > 1 {
return StreamConfig{}, fmt.Errorf("maximum replicas is 1")
}
if cfg.Replicas > StreamMaxReplicas {
return cfg, fmt.Errorf("maximum replicas is %d", StreamMaxReplicas)
}
if cfg.MaxMsgs == 0 {
cfg.MaxMsgs = -1
}
if cfg.MaxBytes == 0 {
cfg.MaxBytes = -1
}
if cfg.MaxMsgSize == 0 {
cfg.MaxMsgSize = -1
}
if cfg.MaxConsumers == 0 {
cfg.MaxConsumers = -1
}
if len(cfg.Subjects) == 0 {
cfg.Subjects = append(cfg.Subjects, cfg.Name)
} else {
// We can allow overlaps, but don't allow direct duplicates.
dset := make(map[string]struct{}, len(cfg.Subjects))
for _, subj := range cfg.Subjects {
if _, ok := dset[subj]; ok {
return StreamConfig{}, fmt.Errorf("duplicate subjects detected")
}
dset[subj] = struct{}{}
}
}
return cfg, nil
}
// Config returns the stream's configuration.
func (mset *Stream) Config() StreamConfig {
mset.mu.Lock()
defer mset.mu.Unlock()
return mset.config
}
// Delete deletes a stream from the owning account.
func (mset *Stream) Delete() error {
mset.mu.Lock()
jsa := mset.jsa
mset.mu.Unlock()
if jsa == nil {
return fmt.Errorf("jetstream not enabled for account")
}
jsa.mu.Lock()
delete(jsa.streams, mset.config.Name)
jsa.mu.Unlock()
return mset.delete()
}
// Update will allow certain configuration properties of an existing stream to be updated.
func (mset *Stream) Update(config *StreamConfig) error {
cfg, err := checkStreamCfg(config)
if err != nil {
return err
}
o_cfg := mset.Config()
// Name must match.
if cfg.Name != o_cfg.Name {
return fmt.Errorf("stream configuration name must match original")
}
// Can't change MaxConsumers for now.
if cfg.MaxConsumers != o_cfg.MaxConsumers {
return fmt.Errorf("stream configuration update can not change MaxConsumers")
}
// Can't change storage types.
if cfg.Storage != o_cfg.Storage {
return fmt.Errorf("stream configuration update can not change storage type")
}
// Can't change retention.
if cfg.Retention != o_cfg.Retention {
return fmt.Errorf("stream configuration update can not change retention policy")
}
// Can not have a template owner for now.
if o_cfg.Template != "" {
return fmt.Errorf("stream configuration update not allowed on template owned stream")
}
if cfg.Template != "" {
return fmt.Errorf("stream configuration update can not be owned by a template")
}
// Check limits.
mset.mu.Lock()
jsa := mset.jsa
mset.mu.Unlock()
jsa.mu.Lock()
if cfg.MaxConsumers > 0 && cfg.MaxConsumers > jsa.limits.MaxConsumers {
jsa.mu.Unlock()
return fmt.Errorf("stream configuration maximum consumers exceeds account limit")
}
if cfg.MaxBytes > 0 && cfg.MaxBytes > o_cfg.MaxBytes {
if err := jsa.checkBytesLimits(cfg.MaxBytes*int64(cfg.Replicas), cfg.Storage); err != nil {
jsa.mu.Unlock()
return err
}
}
jsa.mu.Unlock()
// Now check for subject interest differences.
current := make(map[string]struct{}, len(o_cfg.Subjects))
for _, s := range o_cfg.Subjects {
current[s] = struct{}{}
}
// Update config with new values. The store update will enforce any stricter limits.
mset.mu.Lock()
defer mset.mu.Unlock()
// Now walk new subjects. All of these need to be added, but we will check
// the originals first, since if it is in there we can skip, already added.
for _, s := range cfg.Subjects {
if _, ok := current[s]; !ok {
if _, err := mset.subscribeInternal(s, mset.processInboundJetStreamMsg); err != nil {
return err
}
}
delete(current, s)
}
// What is left in current needs to be deleted.
for s := range current {
if err := mset.unsubscribeInternal(s); err != nil {
return err
}
}
// Now update config and store's version of our config.
mset.config = cfg
mset.store.UpdateConfig(&cfg)
return nil
}
// Purge will remove all messages from the stream and underlying store.
func (mset *Stream) Purge() uint64 {
mset.mu.Lock()
if mset.client == nil {
mset.mu.Unlock()
return 0
}
purged := mset.store.Purge()
stats := mset.store.State()
var obs []*Consumer
for _, o := range mset.consumers {
obs = append(obs, o)
}
mset.mu.Unlock()
for _, o := range obs {
o.purge(stats.FirstSeq)
}
return purged
}
// RemoveMsg will remove a message from a stream.
// FIXME(dlc) - Should pick one and be consistent.
func (mset *Stream) RemoveMsg(seq uint64) (bool, error) {
return mset.store.RemoveMsg(seq)
}
// DeleteMsg will remove a message from a stream.
func (mset *Stream) DeleteMsg(seq uint64) (bool, error) {
return mset.store.RemoveMsg(seq)
}
// EraseMsg will securely remove a message and rewrite the data with random data.
func (mset *Stream) EraseMsg(seq uint64) (bool, error) {
return mset.store.EraseMsg(seq)
}
// Will create internal subscriptions for the msgSet.
// Lock should be held.
func (mset *Stream) subscribeToStream() error {
for _, subject := range mset.config.Subjects {
if _, err := mset.subscribeInternal(subject, mset.processInboundJetStreamMsg); err != nil {
return err
}
}
return nil
}
// FIXME(dlc) - This only works in single server mode for the moment. Need to fix as we expand to clusters.
// Lock should be held.
func (mset *Stream) subscribeInternal(subject string, cb msgHandler) (*subscription, error) {
c := mset.client
if c == nil {
return nil, fmt.Errorf("invalid stream")
}
if !c.srv.eventsEnabled() {
return nil, ErrNoSysAccount
}
if cb == nil {
return nil, fmt.Errorf("undefined message handler")
}
mset.sid++
// Now create the subscription
sub, err := c.processSub([]byte(subject+" "+strconv.Itoa(mset.sid)), false)
if err != nil {
return nil, err
} else if sub == nil {
return nil, fmt.Errorf("malformed subject")
}
c.mu.Lock()
sub.icb = cb
c.mu.Unlock()
return sub, nil
}
// This will unsubscribe us from the exact subject given.
// We do not currently track the subs so do not have the sid.
// This should be called only on an update.
// Lock should be held.
func (mset *Stream) unsubscribeInternal(subject string) error {
c := mset.client
if c == nil {
return fmt.Errorf("invalid stream")
}
if !c.srv.eventsEnabled() {
return ErrNoSysAccount
}
var sid []byte
c.mu.Lock()
for _, sub := range c.subs {
if subject == string(sub.subject) {
sid = sub.sid
break
}
}
c.mu.Unlock()
if sid != nil {
return c.processUnsub(sid)
}
return nil
}
// Lock should be held.
func (mset *Stream) unsubscribe(sub *subscription) {
if sub == nil || mset.client == nil {
return
}
mset.client.unsubscribe(mset.client.acc, sub, true, true)
}
func (mset *Stream) setupStore(fsCfg *FileStoreConfig) error {
mset.mu.Lock()
defer mset.mu.Unlock()
mset.created = time.Now().UTC()
switch mset.config.Storage {
case MemoryStorage:
ms, err := newMemStore(&mset.config)
if err != nil {
return err
}
mset.store = ms
case FileStorage:
fs, err := newFileStoreWithCreated(*fsCfg, mset.config, mset.created)
if err != nil {
return err
}
mset.store = fs
}
jsa, st := mset.jsa, mset.config.Storage
mset.store.StorageBytesUpdate(func(delta int64) { jsa.updateUsage(st, delta) })
return nil
}
// processInboundJetStreamMsg handles processing messages bound for a stream.
func (mset *Stream) processInboundJetStreamMsg(_ *subscription, _ *client, subject, reply string, msg []byte) {
mset.mu.Lock()
store := mset.store
c := mset.client
var accName string
if c != nil && c.acc != nil {
accName = c.acc.Name
}
doAck := !mset.config.NoAck
pubAck := mset.pubAck
jsa := mset.jsa
stype := mset.config.Storage
name := mset.config.Name
maxMsgSize := int(mset.config.MaxMsgSize)
numConsumers := len(mset.consumers)
mset.mu.Unlock()
if c == nil {
return
}
// Response to send.
var response []byte
var seq uint64
var err error
var ts int64
if maxMsgSize >= 0 && len(msg) > maxMsgSize {
response = []byte("-ERR 'message size exceeds maximum allowed'")
} else {
// Check to see if we are over the account limit.
seq, ts, err = store.StoreMsg(subject, msg)
if err != nil {
if err != ErrStoreClosed {
c.Errorf("JetStream failed to store a msg on account: %q stream: %q - %v", accName, name, err)
}
response = []byte(fmt.Sprintf("-ERR '%v'", err))
} else if jsa.limitsExceeded(stype) {
c.Warnf("JetStream resource limits exceeded for account: %q", accName)
response = []byte("-ERR 'resource limits exceeded for account'")
store.RemoveMsg(seq)
seq = 0
} else if err == nil && doAck && len(reply) > 0 {
response = append(pubAck, strconv.FormatUint(seq, 10)...)
response = append(response, '}')
}
}
// Send response here.
if doAck && len(reply) > 0 {
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, response, nil, 0}
}
if err == nil && numConsumers > 0 && seq > 0 {
var needSignal bool
mset.mu.Lock()
for _, o := range mset.consumers {
if !o.deliverCurrentMsg(subject, msg, seq, ts) {
needSignal = true
}
}
mset.mu.Unlock()
if needSignal {
mset.signalConsumers()
}
}
}
// Will signal all waiting consumers.
func (mset *Stream) signalConsumers() {
mset.mu.Lock()
if mset.sgw > 0 {
mset.sg.Broadcast()
}
mset.mu.Unlock()
}
// Internal message for use by jetstream subsystem.
type jsPubMsg struct {
subj string
dsubj string
reply string
msg []byte
o *Consumer
seq uint64
}
// StoredMsg is for raw access to messages in a stream.
type StoredMsg struct {
Subject string `json:"subject"`
Sequence uint64 `json:"seq"`
Data []byte `json:"data"`
Time time.Time `json:"time"`
}
// TODO(dlc) - Maybe look at onering instead of chan - https://github.com/pltr/onering
const msetSendQSize = 1024
// This is similar to system semantics but did not want to overload the single system sendq,
// or require system account when doing simple setup with jetstream.
func (mset *Stream) setupSendCapabilities() {
mset.mu.Lock()
defer mset.mu.Unlock()
if mset.sendq != nil {
return
}
mset.sendq = make(chan *jsPubMsg, msetSendQSize)
go mset.internalSendLoop()
}
// Name returns the stream name.
func (mset *Stream) Name() string {
mset.mu.Lock()
defer mset.mu.Unlock()
return mset.config.Name
}
func (mset *Stream) internalSendLoop() {
mset.mu.Lock()
c := mset.client
if c == nil {
mset.mu.Unlock()
return
}
s := c.srv
sendq := mset.sendq
name := mset.config.Name
mset.mu.Unlock()
// Warn when internal send queue is backed up past 75%
warnThresh := 3 * msetSendQSize / 4
warnFreq := time.Second
last := time.Now().Add(-warnFreq)
for {
if len(sendq) > warnThresh && time.Since(last) >= warnFreq {
s.Warnf("Jetstream internal send queue > 75%% for account: %q stream: %q", c.acc.Name, name)
last = time.Now()
}
select {
case pm := <-sendq:
if pm == nil {
return
}
c.pa.subject = []byte(pm.subj)
c.pa.deliver = []byte(pm.dsubj)
c.pa.size = len(pm.msg)
c.pa.szb = []byte(strconv.Itoa(c.pa.size))
c.pa.reply = []byte(pm.reply)
msg := append(pm.msg, _CRLF_...)
didDeliver := c.processInboundClientMsg(msg)
c.pa.szb = nil
c.flushClients(0)
// Check to see if this is a delivery for an observable and
// we failed to deliver the message. If so alert the observable.
if pm.o != nil && pm.seq > 0 && !didDeliver {
pm.o.didNotDeliver(pm.seq)
}
case <-s.quitCh:
return
}
}
}
// Internal function to delete a stream.
func (mset *Stream) delete() error {
return mset.stop(true)
}
// Internal function to stop or delete the stream.
func (mset *Stream) stop(delete bool) error {
mset.mu.Lock()
if mset.sendq != nil {
mset.sendq <- nil
}
c := mset.client
mset.client = nil
if c == nil {
mset.mu.Unlock()
return nil
}
var obs []*Consumer
for _, o := range mset.consumers {
obs = append(obs, o)
}
mset.consumers = nil
mset.mu.Unlock()
c.closeConnection(ClientClosed)
if mset.store == nil {
return nil
}
if delete {
if err := mset.store.Delete(); err != nil {
return err
}
for _, o := range obs {
if err := o.Delete(); err != nil {
return err
}
}
} else {
if err := mset.store.Stop(); err != nil {
return err
}
for _, o := range obs {
if err := o.Stop(); err != nil {
return err
}
}
}
return nil
}
// Consunmers will return all the current consumers for this stream.
func (mset *Stream) Consumers() []*Consumer {
mset.mu.Lock()
defer mset.mu.Unlock()
var obs []*Consumer
for _, o := range mset.consumers {
obs = append(obs, o)
}
return obs
}
// NumConsumers reports on number of active observables for this stream.
func (mset *Stream) NumConsumers() int {
mset.mu.Lock()
defer mset.mu.Unlock()
return len(mset.consumers)
}
// LookupConsumer will retrieve a consumer by name.
func (mset *Stream) LookupConsumer(name string) *Consumer {
mset.mu.Lock()
defer mset.mu.Unlock()
for _, o := range mset.consumers {
if o.name == name {
return o
}
}
return nil
}
// State will return the current state for this stream.
func (mset *Stream) State() StreamState {
mset.mu.Lock()
c := mset.client
mset.mu.Unlock()
if c == nil {
return StreamState{}
}
// Currently rely on store.
// TODO(dlc) - This will need to change with clusters.
return mset.store.State()
}
// waitForMsgs will have the stream wait for the arrival of new messages.
func (mset *Stream) waitForMsgs() {
mset.mu.Lock()
if mset.client == nil {
mset.mu.Unlock()
return
}
mset.sgw++
mset.sg.Wait()
mset.sgw--
mset.mu.Unlock()
}
// Determines if the new proposed partition is unique amongst all observables.
// Lock should be held.
func (mset *Stream) partitionUnique(partition string) bool {
for _, o := range mset.consumers {
if o.config.FilterSubject == _EMPTY_ {
return false
}
if subjectIsSubsetMatch(partition, o.config.FilterSubject) {
return false
}
}
return true
}
// ackMsg is called into from an observable when we have a WorkQueue or Interest retention policy.
func (mset *Stream) ackMsg(obs *Consumer, seq uint64) {
switch mset.config.Retention {
case LimitsPolicy:
return
case WorkQueuePolicy:
mset.store.RemoveMsg(seq)
case InterestPolicy:
var needAck bool
mset.mu.Lock()
for _, o := range mset.consumers {
if o != obs && o.needAck(seq) {
needAck = true
break
}
}
mset.mu.Unlock()
if !needAck {
mset.store.RemoveMsg(seq)
}
}
}