Files
nats-server/server/stream.go
2020-05-19 14:16:03 -07:00

669 lines
16 KiB
Go

// Copyright 2019-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"encoding/json"
"fmt"
"path"
"strconv"
"sync"
"time"
)
// StreamConfig will determine the name, subjects and retention policy
// for a given stream. If subjects is empty the name will be used.
type StreamConfig struct {
Name string `json:"name"`
Subjects []string `json:"subjects,omitempty"`
Retention RetentionPolicy `json:"retention"`
MaxConsumers int `json:"max_consumers"`
MaxMsgs int64 `json:"max_msgs"`
MaxBytes int64 `json:"max_bytes"`
MaxAge time.Duration `json:"max_age"`
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
Storage StorageType `json:"storage"`
Replicas int `json:"num_replicas"`
NoAck bool `json:"no_ack,omitempty"`
Template string `json:"template_owner,omitempty"`
}
type StreamInfo struct {
Config StreamConfig `json:"config"`
State StreamState `json:"state"`
}
// Stream is a jetstream stream of messages. When we receive a message internally destined
// for a Stream we will direct link from the client to this Stream structure.
type Stream struct {
mu sync.RWMutex
sg *sync.Cond
sgw int
jsa *jsAccount
client *client
sid int
sendq chan *jsPubMsg
store StreamStore
consumers map[string]*Consumer
config StreamConfig
}
const (
StreamDefaultReplicas = 1
StreamMaxReplicas = 8
)
// AddStream adds a stream for the given account.
func (a *Account) AddStream(config *StreamConfig) (*Stream, error) {
s, jsa, err := a.checkForJetStream()
if err != nil {
return nil, err
}
// Sensible defaults.
cfg, err := checkStreamCfg(config)
if err != nil {
return nil, err
}
jsa.mu.Lock()
if _, ok := jsa.streams[cfg.Name]; ok {
jsa.mu.Unlock()
return nil, fmt.Errorf("stream name already in use")
}
// Check for limits.
if err := jsa.checkLimits(&cfg); err != nil {
jsa.mu.Unlock()
return nil, err
}
// Check for template ownership if present.
if cfg.Template != _EMPTY_ && jsa.account != nil {
if !jsa.checkTemplateOwnership(cfg.Template, cfg.Name) {
jsa.mu.Unlock()
return nil, fmt.Errorf("stream not owned by template")
}
}
if len(cfg.Subjects) == 0 {
cfg.Subjects = append(cfg.Subjects, cfg.Name)
}
// Check for overlapping subjects. These are not allowed for now.
if jsa.subjectsOverlap(cfg.Subjects) {
jsa.mu.Unlock()
return nil, fmt.Errorf("subjects overlap with an existing stream")
}
// Setup the internal client.
c := s.createInternalJetStreamClient()
mset := &Stream{jsa: jsa, config: cfg, client: c, consumers: make(map[string]*Consumer)}
mset.sg = sync.NewCond(&mset.mu)
jsa.streams[cfg.Name] = mset
storeDir := path.Join(jsa.storeDir, streamsDir, cfg.Name)
jsa.mu.Unlock()
// Bind to the account.
c.registerWithAccount(a)
// Create the appropriate storage
if err := mset.setupStore(storeDir); err != nil {
mset.delete()
return nil, err
}
// Setup our internal send go routine.
mset.setupSendCapabilities()
// Setup subscriptions
if err := mset.subscribeToStream(); err != nil {
mset.delete()
return nil, err
}
return mset, nil
}
// Check to see if these subjects overlap with existing subjects.
// Lock should be held.
func (jsa *jsAccount) subjectsOverlap(subjects []string) bool {
for _, mset := range jsa.streams {
for _, subj := range mset.config.Subjects {
for _, tsubj := range subjects {
if SubjectsCollide(tsubj, subj) {
return true
}
}
}
}
return false
}
func checkStreamCfg(config *StreamConfig) (StreamConfig, error) {
if config == nil {
return StreamConfig{}, fmt.Errorf("stream configuration invalid")
}
if !isValidName(config.Name) {
return StreamConfig{}, fmt.Errorf("stream name is required and can not contain '.', '*', '>'")
}
cfg := *config
// TODO(dlc) - check config for conflicts, e.g replicas > 1 in single server mode.
if cfg.Replicas == 0 {
cfg.Replicas = 1
}
if cfg.Replicas > StreamMaxReplicas {
return cfg, fmt.Errorf("maximum replicas is %d", StreamMaxReplicas)
}
if cfg.MaxMsgs == 0 {
cfg.MaxMsgs = -1
}
if cfg.MaxBytes == 0 {
cfg.MaxBytes = -1
}
if cfg.MaxMsgSize == 0 {
cfg.MaxMsgSize = -1
}
return cfg, nil
}
// Config returns the stream's configuration.
func (mset *Stream) Config() StreamConfig {
mset.mu.Lock()
defer mset.mu.Unlock()
return mset.config
}
// Delete deletes a stream from the owning account.
func (mset *Stream) Delete() error {
mset.mu.Lock()
jsa := mset.jsa
mset.mu.Unlock()
if jsa == nil {
return fmt.Errorf("jetstream not enabled for account")
}
jsa.mu.Lock()
delete(jsa.streams, mset.config.Name)
jsa.mu.Unlock()
return mset.delete()
}
// Purge will remove all messages from the stream and underlying store.
func (mset *Stream) Purge() uint64 {
mset.mu.Lock()
if mset.client == nil {
mset.mu.Unlock()
return 0
}
purged := mset.store.Purge()
stats := mset.store.State()
var obs []*Consumer
for _, o := range mset.consumers {
obs = append(obs, o)
}
mset.mu.Unlock()
for _, o := range obs {
o.purge(stats.FirstSeq)
}
return purged
}
// RemoveMsg will remove a message from a stream.
// FIXME(dlc) - Should pick one and be consistent.
func (mset *Stream) RemoveMsg(seq uint64) bool {
return mset.store.RemoveMsg(seq)
}
// DeleteMsg will remove a message from a stream.
func (mset *Stream) DeleteMsg(seq uint64) bool {
return mset.store.RemoveMsg(seq)
}
// EraseMsg will securely remove a message and rewrite the data with random data.
func (mset *Stream) EraseMsg(seq uint64) bool {
return mset.store.EraseMsg(seq)
}
// Will create internal subscriptions for the msgSet.
// Lock should be held.
func (mset *Stream) subscribeToStream() error {
for _, subject := range mset.config.Subjects {
if _, err := mset.subscribeInternal(subject, mset.processInboundJetStreamMsg); err != nil {
return err
}
}
// Now subscribe for direct access
subj := fmt.Sprintf(JetStreamMsgBySeqT, mset.config.Name)
if _, err := mset.subscribeInternal(subj, mset.processMsgBySeq); err != nil {
return err
}
return nil
}
// FIXME(dlc) - This only works in single server mode for the moment. Need to fix as we expand to clusters.
func (mset *Stream) subscribeInternal(subject string, cb msgHandler) (*subscription, error) {
return mset.nmsSubscribeInternal(subject, false, cb)
}
func (mset *Stream) nmsSubscribeInternal(subject string, internalOnly bool, cb msgHandler) (*subscription, error) {
c := mset.client
if c == nil {
return nil, fmt.Errorf("invalid stream")
}
if !c.srv.eventsEnabled() {
return nil, ErrNoSysAccount
}
if cb == nil {
return nil, fmt.Errorf("undefined message handler")
}
mset.sid++
// Now create the subscription
sub, err := c.processSub([]byte(subject+" "+strconv.Itoa(mset.sid)), internalOnly)
if err != nil {
return nil, err
}
c.mu.Lock()
sub.icb = cb
c.mu.Unlock()
return sub, nil
}
// Lock should be held.
func (mset *Stream) unsubscribe(sub *subscription) {
if sub == nil || mset.client == nil {
return
}
mset.client.unsubscribe(mset.client.acc, sub, true, true)
}
func (mset *Stream) setupStore(storeDir string) error {
mset.mu.Lock()
defer mset.mu.Unlock()
switch mset.config.Storage {
case MemoryStorage:
ms, err := newMemStore(&mset.config)
if err != nil {
return err
}
mset.store = ms
case FileStorage:
fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, mset.config)
if err != nil {
return err
}
mset.store = fs
}
jsa, st := mset.jsa, mset.config.Storage
mset.store.StorageBytesUpdate(func(delta int64) { jsa.updateUsage(st, delta) })
return nil
}
// processMsgBySeq will return the message at the given sequence, or an -ERR if not found.
func (mset *Stream) processMsgBySeq(_ *subscription, _ *client, subject, reply string, msg []byte) {
mset.mu.Lock()
store := mset.store
c := mset.client
name := mset.config.Name
mset.mu.Unlock()
if c == nil {
return
}
var response []byte
var seq uint64
var err error
// If no sequence arg assume last sequence we have.
if len(msg) == 0 {
stats := store.State()
seq = stats.LastSeq
} else {
seq, err = strconv.ParseUint(string(msg), 10, 64)
if err != nil {
c.Debugf("JetStream request for message from message: %q - %q bad sequence arg %q", c.acc.Name, name, msg)
response = []byte("-ERR 'bad sequence argument'")
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, response, nil, 0}
return
}
}
subj, msg, ts, err := store.LoadMsg(seq)
if err != nil {
c.Debugf("JetStream request for message: %q - %q - %d error %v", c.acc.Name, name, seq, err)
response = []byte("-ERR 'could not load message from storage'")
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, response, nil, 0}
return
}
sm := &StoredMsg{
Subject: subj,
Sequence: seq,
Data: msg,
Time: time.Unix(0, ts),
}
response, _ = json.MarshalIndent(sm, "", " ")
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, response, nil, 0}
}
// processInboundJetStreamMsg handles processing messages bound for a stream.
func (mset *Stream) processInboundJetStreamMsg(_ *subscription, _ *client, subject, reply string, msg []byte) {
mset.mu.Lock()
store := mset.store
c := mset.client
var accName string
if c != nil && c.acc != nil {
accName = c.acc.Name
}
doAck := !mset.config.NoAck
jsa := mset.jsa
stype := mset.config.Storage
name := mset.config.Name
maxMsgSize := int(mset.config.MaxMsgSize)
mset.mu.Unlock()
if c == nil {
return
}
// Response to send.
response := AckAck
var seq uint64
var err error
if maxMsgSize >= 0 && len(msg) > maxMsgSize {
response = []byte("-ERR 'message size exceeds maximum allowed'")
} else {
// Check to see if we are over the account limit.
seq, err = store.StoreMsg(subject, msg)
if err != nil {
c.Errorf("JetStream failed to store a msg on account: %q stream: %q - %v", accName, name, err)
response = []byte(fmt.Sprintf("-ERR '%v'", err))
} else if jsa.limitsExceeded(stype) {
c.Warnf("JetStream resource limits exceeded for account: %q", accName)
response = []byte("-ERR 'resource limits exceeded for account'")
store.RemoveMsg(seq)
seq = 0
}
}
// Send response here.
if doAck && len(reply) > 0 {
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, response, nil, 0}
}
if err == nil && seq > 0 {
var needSignal bool
mset.mu.Lock()
for _, o := range mset.consumers {
if !o.deliverCurrentMsg(subject, msg, seq) {
needSignal = true
}
}
mset.mu.Unlock()
if needSignal {
mset.signalConsumers()
}
}
}
// Will signal all waiting consumers.
func (mset *Stream) signalConsumers() {
mset.mu.Lock()
if mset.sgw > 0 {
mset.sg.Broadcast()
}
mset.mu.Unlock()
}
// Internal message for use by jetstream subsystem.
type jsPubMsg struct {
subj string
dsubj string
reply string
msg []byte
o *Consumer
seq uint64
}
// StoredMsg is for raw access to messages in a stream.
type StoredMsg struct {
Subject string `json:"subject"`
Sequence uint64 `json:"seq"`
Data []byte `json:"data"`
Time time.Time `json:"time"`
}
// TODO(dlc) - Maybe look at onering instead of chan - https://github.com/pltr/onering
const msetSendQSize = 1024
// This is similar to system semantics but did not want to overload the single system sendq,
// or require system account when doing simple setup with jetstream.
func (mset *Stream) setupSendCapabilities() {
mset.mu.Lock()
defer mset.mu.Unlock()
if mset.sendq != nil {
return
}
mset.sendq = make(chan *jsPubMsg, msetSendQSize)
go mset.internalSendLoop()
}
// Name returns the stream name.
func (mset *Stream) Name() string {
mset.mu.Lock()
defer mset.mu.Unlock()
return mset.config.Name
}
func (mset *Stream) internalSendLoop() {
mset.mu.Lock()
c := mset.client
if c == nil {
mset.mu.Unlock()
return
}
s := c.srv
sendq := mset.sendq
name := mset.config.Name
mset.mu.Unlock()
// Warn when internal send queue is backed up past 75%
warnThresh := 3 * msetSendQSize / 4
warnFreq := time.Second
last := time.Now().Add(-warnFreq)
for {
if len(sendq) > warnThresh && time.Since(last) >= warnFreq {
s.Warnf("Jetstream internal send queue > 75% for account: %q stream: %q", c.acc.Name, name)
last = time.Now()
}
select {
case pm := <-sendq:
if pm == nil {
return
}
c.pa.subject = []byte(pm.subj)
c.pa.deliver = []byte(pm.dsubj)
c.pa.size = len(pm.msg)
c.pa.szb = []byte(strconv.Itoa(c.pa.size))
c.pa.reply = []byte(pm.reply)
msg := append(pm.msg, _CRLF_...)
didDeliver := c.processInboundClientMsg(msg)
c.pa.szb = nil
c.flushClients(0)
// Check to see if this is a delivery for an observable and
// we failed to deliver the message. If so alert the observable.
if pm.o != nil && !didDeliver {
pm.o.didNotDeliver(pm.seq)
}
case <-s.quitCh:
return
}
}
}
// Internal function to delete a stream.
func (mset *Stream) delete() error {
return mset.stop(true)
}
// Internal function to stop or delete the stream.
func (mset *Stream) stop(delete bool) error {
mset.mu.Lock()
if mset.sendq != nil {
mset.sendq <- nil
}
c := mset.client
mset.client = nil
if c == nil {
mset.mu.Unlock()
return nil
}
var obs []*Consumer
for _, o := range mset.consumers {
obs = append(obs, o)
}
mset.consumers = nil
mset.mu.Unlock()
c.closeConnection(ClientClosed)
if mset.store == nil {
return nil
}
if delete {
if err := mset.store.Delete(); err != nil {
return err
}
for _, o := range obs {
if err := o.Delete(); err != nil {
return err
}
}
} else {
if err := mset.store.Stop(); err != nil {
return err
}
for _, o := range obs {
if err := o.Stop(); err != nil {
return err
}
}
}
return nil
}
// Consunmers will return all the current consumers for this stream.
func (mset *Stream) Consumers() []*Consumer {
mset.mu.Lock()
defer mset.mu.Unlock()
var obs []*Consumer
for _, o := range mset.consumers {
obs = append(obs, o)
}
return obs
}
// NumConsumers reports on number of active observables for this stream.
func (mset *Stream) NumConsumers() int {
mset.mu.Lock()
defer mset.mu.Unlock()
return len(mset.consumers)
}
// LookupConsumer will retrieve a consumer by name.
func (mset *Stream) LookupConsumer(name string) *Consumer {
mset.mu.Lock()
defer mset.mu.Unlock()
for _, o := range mset.consumers {
if o.name == name {
return o
}
}
return nil
}
// State will return the current state for this stream.
func (mset *Stream) State() StreamState {
mset.mu.Lock()
c := mset.client
mset.mu.Unlock()
if c == nil {
return StreamState{}
}
// Currently rely on store.
// TODO(dlc) - This will need to change with clusters.
return mset.store.State()
}
// waitForMsgs will have the stream wait for the arrival of new messages.
func (mset *Stream) waitForMsgs() {
mset.mu.Lock()
if mset.client == nil {
mset.mu.Unlock()
return
}
mset.sgw++
mset.sg.Wait()
mset.sgw--
mset.mu.Unlock()
}
// Determines if the new proposed partition is unique amongst all observables.
// Lock should be held.
func (mset *Stream) partitionUnique(partition string) bool {
for _, o := range mset.consumers {
if o.config.FilterSubject == _EMPTY_ {
return false
}
if subjectIsSubsetMatch(partition, o.config.FilterSubject) {
return false
}
}
return true
}
// ackMsg is called into from an observable when we have a WorkQueue or Interest retention policy.
func (mset *Stream) ackMsg(obs *Consumer, seq uint64) {
switch mset.config.Retention {
case LimitsPolicy:
return
case WorkQueuePolicy:
mset.store.RemoveMsg(seq)
case InterestPolicy:
var needAck bool
mset.mu.Lock()
for _, o := range mset.consumers {
if o != obs && o.needAck(seq) {
needAck = true
break
}
}
mset.mu.Unlock()
if !needAck {
mset.store.RemoveMsg(seq)
}
}
}