mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
268 lines
7.2 KiB
Go
268 lines
7.2 KiB
Go
// Copyright 2019-2020 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package server
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
// StorageType determines how messages are stored for retention.
|
|
type StorageType int
|
|
|
|
const (
|
|
// MemoryStorage specifies in memory only.
|
|
MemoryStorage StorageType = iota
|
|
// FileStorage specifies on disk, designated by the JetStream config StoreDir.
|
|
FileStorage
|
|
)
|
|
|
|
var (
|
|
// ErrStoreClosed is returned when the store has been closed
|
|
ErrStoreClosed = errors.New("store is closed")
|
|
// ErrStoreMsgNotFound when message was not found but was expected to be.
|
|
ErrStoreMsgNotFound = errors.New("no message found")
|
|
// ErrStoreEOF is returned when message seq is greater than the last sequence.
|
|
ErrStoreEOF = errors.New("stream EOF")
|
|
// ErrStoreSnapshotInProgress is returned when RemoveMsg or EraseMsg is called
|
|
// while a snapshot is in progress.
|
|
ErrStoreSnapshotInProgress = errors.New("snapshot in progress")
|
|
)
|
|
|
|
type StreamStore interface {
|
|
StoreMsg(subj string, msg []byte) (uint64, error)
|
|
LoadMsg(seq uint64) (subj string, msg []byte, ts int64, err error)
|
|
RemoveMsg(seq uint64) (bool, error)
|
|
EraseMsg(seq uint64) (bool, error)
|
|
Purge() uint64
|
|
GetSeqFromTime(t time.Time) uint64
|
|
State() StreamState
|
|
StorageBytesUpdate(func(int64))
|
|
UpdateConfig(cfg *StreamConfig) error
|
|
Delete() error
|
|
Stop() error
|
|
ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerStore, error)
|
|
Snapshot() (io.ReadCloser, error)
|
|
}
|
|
|
|
// RetentionPolicy determines how messages in a set are retained.
|
|
type RetentionPolicy int
|
|
|
|
const (
|
|
// LimitsPolicy (default) means that messages are retained until any given limit is reached.
|
|
// This could be one of MaxMsgs, MaxBytes, or MaxAge.
|
|
LimitsPolicy RetentionPolicy = iota
|
|
// InterestPolicy specifies that when all known observables have acknowledged a message it can be removed.
|
|
InterestPolicy
|
|
// WorkQueuePolicy specifies that when the first worker or subscriber acknowledges the message it can be removed.
|
|
WorkQueuePolicy
|
|
)
|
|
|
|
// StreamStats is information about the given stream.
|
|
type StreamState struct {
|
|
Msgs uint64 `json:"messages"`
|
|
Bytes uint64 `json:"bytes"`
|
|
FirstSeq uint64 `json:"first_seq"`
|
|
LastSeq uint64 `json:"last_seq"`
|
|
Consumers int `json:"consumer_count"`
|
|
}
|
|
|
|
// ConsumerStore stores state on consumers for streams.
|
|
type ConsumerStore interface {
|
|
State() (*ConsumerState, error)
|
|
Update(*ConsumerState) error
|
|
Stop() error
|
|
Delete() error
|
|
}
|
|
|
|
// SequencePair has both the consumer and the stream sequence. They point to same message.
|
|
type SequencePair struct {
|
|
ConsumerSeq uint64 `json:"consumer_seq"`
|
|
StreamSeq uint64 `json:"stream_seq"`
|
|
}
|
|
|
|
// ConsumerState represents a stored state for a consumer.
|
|
type ConsumerState struct {
|
|
// Delivered keeps track of last delivered sequence numbers for both the stream and the consumer.
|
|
Delivered SequencePair `json:"delivered"`
|
|
// AckFloor keeps track of the ack floors for both the stream and the consumer.
|
|
AckFloor SequencePair `json:"ack_floor"`
|
|
// These are both in stream sequence context.
|
|
// Pending is for all messages pending and the timestamp for the delivered time.
|
|
// This will only be present when the AckPolicy is ExplicitAck.
|
|
Pending map[uint64]int64 `json:"pending"`
|
|
// This is for messages that have been redelivered, so count > 1.
|
|
Redelivered map[uint64]uint64 `json:"redelivered"`
|
|
}
|
|
|
|
// TemplateStore stores templates.
|
|
type TemplateStore interface {
|
|
Store(*StreamTemplate) error
|
|
Delete(*StreamTemplate) error
|
|
}
|
|
|
|
func jsonString(s string) string {
|
|
return "\"" + s + "\""
|
|
}
|
|
|
|
const (
|
|
limitsPolicyString = "limits"
|
|
interestPolicyString = "interest"
|
|
workQueuePolicyString = "workqueue"
|
|
)
|
|
|
|
func (rp RetentionPolicy) String() string {
|
|
switch rp {
|
|
case LimitsPolicy:
|
|
return "Limits"
|
|
case InterestPolicy:
|
|
return "Interest"
|
|
case WorkQueuePolicy:
|
|
return "WorkQueue"
|
|
default:
|
|
return "Unknown Retention Policy"
|
|
}
|
|
}
|
|
|
|
func (rp RetentionPolicy) MarshalJSON() ([]byte, error) {
|
|
switch rp {
|
|
case LimitsPolicy:
|
|
return json.Marshal(limitsPolicyString)
|
|
case InterestPolicy:
|
|
return json.Marshal(interestPolicyString)
|
|
case WorkQueuePolicy:
|
|
return json.Marshal(workQueuePolicyString)
|
|
default:
|
|
return nil, fmt.Errorf("can not marshal %v", rp)
|
|
}
|
|
}
|
|
|
|
func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error {
|
|
switch string(data) {
|
|
case jsonString(limitsPolicyString):
|
|
*rp = LimitsPolicy
|
|
case jsonString(interestPolicyString):
|
|
*rp = InterestPolicy
|
|
case jsonString(workQueuePolicyString):
|
|
*rp = WorkQueuePolicy
|
|
default:
|
|
return fmt.Errorf("can not unmarshal %q", data)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
const (
|
|
memoryStorageString = "memory"
|
|
fileStorageString = "file"
|
|
)
|
|
|
|
func (st StorageType) String() string {
|
|
switch st {
|
|
case MemoryStorage:
|
|
return strings.Title(memoryStorageString)
|
|
case FileStorage:
|
|
return strings.Title(fileStorageString)
|
|
default:
|
|
return "Unknown Storage Type"
|
|
}
|
|
}
|
|
|
|
func (st StorageType) MarshalJSON() ([]byte, error) {
|
|
switch st {
|
|
case MemoryStorage:
|
|
return json.Marshal(memoryStorageString)
|
|
case FileStorage:
|
|
return json.Marshal(fileStorageString)
|
|
default:
|
|
return nil, fmt.Errorf("can not marshal %v", st)
|
|
}
|
|
}
|
|
|
|
func (st *StorageType) UnmarshalJSON(data []byte) error {
|
|
switch string(data) {
|
|
case jsonString(memoryStorageString):
|
|
*st = MemoryStorage
|
|
case jsonString(fileStorageString):
|
|
*st = FileStorage
|
|
default:
|
|
return fmt.Errorf("can not unmarshal %q", data)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
const (
|
|
ackNonePolicyString = "none"
|
|
ackAllPolicyString = "all"
|
|
ackExplicitPolicyString = "explicit"
|
|
)
|
|
|
|
func (ap AckPolicy) MarshalJSON() ([]byte, error) {
|
|
switch ap {
|
|
case AckNone:
|
|
return json.Marshal(ackNonePolicyString)
|
|
case AckAll:
|
|
return json.Marshal(ackAllPolicyString)
|
|
case AckExplicit:
|
|
return json.Marshal(ackExplicitPolicyString)
|
|
default:
|
|
return nil, fmt.Errorf("can not marshal %v", ap)
|
|
}
|
|
}
|
|
|
|
func (ap *AckPolicy) UnmarshalJSON(data []byte) error {
|
|
switch string(data) {
|
|
case jsonString(ackNonePolicyString):
|
|
*ap = AckNone
|
|
case jsonString(ackAllPolicyString):
|
|
*ap = AckAll
|
|
case jsonString(ackExplicitPolicyString):
|
|
*ap = AckExplicit
|
|
default:
|
|
return fmt.Errorf("can not unmarshal %q", data)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
const (
|
|
replayInstantPolicyString = "instant"
|
|
replayOriginalPolicyString = "original"
|
|
)
|
|
|
|
func (rp ReplayPolicy) MarshalJSON() ([]byte, error) {
|
|
switch rp {
|
|
case ReplayInstant:
|
|
return json.Marshal(replayInstantPolicyString)
|
|
case ReplayOriginal:
|
|
return json.Marshal(replayOriginalPolicyString)
|
|
default:
|
|
return nil, fmt.Errorf("can not marshal %v", rp)
|
|
}
|
|
}
|
|
|
|
func (rp *ReplayPolicy) UnmarshalJSON(data []byte) error {
|
|
switch string(data) {
|
|
case jsonString(replayInstantPolicyString):
|
|
*rp = ReplayInstant
|
|
case jsonString(replayOriginalPolicyString):
|
|
*rp = ReplayOriginal
|
|
default:
|
|
return fmt.Errorf("can not unmarshal %q", data)
|
|
}
|
|
return nil
|
|
}
|