Add consumerMemStore impl to allow proper replication of state.

Resolves #3006

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-04-10 08:01:13 -07:00
parent 3663d595fc
commit e7ff38a4ca
3 changed files with 335 additions and 77 deletions

View File

@@ -5461,73 +5461,22 @@ const seqsHdrSize = 6*binary.MaxVarintLen64 + hdrLen
// Encode our consumer state, version 2.
// Lock should be held.
func (o *consumerFileStore) encodeState() ([]byte, error) {
func (o *consumerFileStore) EncodedState() ([]byte, error) {
o.mu.Lock()
defer o.mu.Unlock()
if o.closed {
return nil, ErrStoreClosed
}
return encodeConsumerState(&o.state), nil
}
func encodeConsumerState(state *ConsumerState) []byte {
var hdr [seqsHdrSize]byte
var buf []byte
maxSize := seqsHdrSize
if lp := len(state.Pending); lp > 0 {
maxSize += lp*(3*binary.MaxVarintLen64) + binary.MaxVarintLen64
func (o *consumerFileStore) encodeState() ([]byte, error) {
if o.closed {
return nil, ErrStoreClosed
}
if lr := len(state.Redelivered); lr > 0 {
maxSize += lr*(2*binary.MaxVarintLen64) + binary.MaxVarintLen64
}
if maxSize == seqsHdrSize {
buf = hdr[:seqsHdrSize]
} else {
buf = make([]byte, maxSize)
}
// Write header
buf[0] = magic
buf[1] = 2
n := hdrLen
n += binary.PutUvarint(buf[n:], state.AckFloor.Consumer)
n += binary.PutUvarint(buf[n:], state.AckFloor.Stream)
n += binary.PutUvarint(buf[n:], state.Delivered.Consumer)
n += binary.PutUvarint(buf[n:], state.Delivered.Stream)
n += binary.PutUvarint(buf[n:], uint64(len(state.Pending)))
asflr := state.AckFloor.Stream
adflr := state.AckFloor.Consumer
// These are optional, but always write len. This is to avoid a truncate inline.
if len(state.Pending) > 0 {
// To save space we will use now rounded to seconds to be base timestamp.
mints := time.Now().Round(time.Second).Unix()
// Write minimum timestamp we found from above.
n += binary.PutVarint(buf[n:], mints)
for k, v := range state.Pending {
n += binary.PutUvarint(buf[n:], k-asflr)
n += binary.PutUvarint(buf[n:], v.Sequence-adflr)
// Downsample to seconds to save on space.
// Subsecond resolution not needed for recovery etc.
ts := v.Timestamp / 1_000_000_000
n += binary.PutVarint(buf[n:], mints-ts)
}
}
// We always write the redelivered len.
n += binary.PutUvarint(buf[n:], uint64(len(state.Redelivered)))
// We expect these to be small.
if len(state.Redelivered) > 0 {
for k, v := range state.Redelivered {
n += binary.PutUvarint(buf[n:], k-asflr)
n += binary.PutUvarint(buf[n:], v)
}
}
return buf[:n]
return encodeConsumerState(&o.state), nil
}
func (o *consumerFileStore) UpdateConfig(cfg *ConsumerConfig) error {
@@ -5757,6 +5706,10 @@ func (o *consumerFileStore) State() (*ConsumerState, error) {
o.mu.Lock()
defer o.mu.Unlock()
if o.closed {
return nil, ErrStoreClosed
}
state := &ConsumerState{}
// See if we have a running state or if we need to read in from disk.

View File

@@ -880,6 +880,12 @@ func (ms *memStore) Stop() error {
return nil
}
func (ms *memStore) isClosed() bool {
ms.mu.RLock()
defer ms.mu.RUnlock()
return ms.msgs == nil
}
func (ms *memStore) incConsumers() {
ms.mu.Lock()
ms.consumers++
@@ -895,40 +901,275 @@ func (ms *memStore) decConsumers() {
}
type consumerMemStore struct {
ms *memStore
mu sync.Mutex
ms *memStore
cfg ConsumerConfig
state ConsumerState
closed bool
}
func (ms *memStore) ConsumerStore(_ string, _ *ConsumerConfig) (ConsumerStore, error) {
func (ms *memStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerStore, error) {
if ms == nil {
return nil, fmt.Errorf("memstore is nil")
}
if ms.isClosed() {
return nil, ErrStoreClosed
}
if cfg == nil || name == _EMPTY_ {
return nil, fmt.Errorf("bad consumer config")
}
ms.incConsumers()
return &consumerMemStore{ms}, nil
return &consumerMemStore{ms: ms, cfg: *cfg}, nil
}
func (ms *memStore) Snapshot(_ time.Duration, _, _ bool) (*SnapshotResult, error) {
return nil, fmt.Errorf("no impl")
}
// No-ops.
func (os *consumerMemStore) Update(_ *ConsumerState) error { return nil }
func (os *consumerMemStore) UpdateDelivered(_, _, _ uint64, _ int64) error { return nil }
func (os *consumerMemStore) UpdateAcks(_, _ uint64) error { return nil }
func (os *consumerMemStore) UpdateConfig(_ *ConsumerConfig) error { return nil }
func (o *consumerMemStore) Update(state *ConsumerState) error {
// Sanity checks.
if state.AckFloor.Consumer > state.Delivered.Consumer {
return fmt.Errorf("bad ack floor for consumer")
}
if state.AckFloor.Stream > state.Delivered.Stream {
return fmt.Errorf("bad ack floor for stream")
}
// Copy to our state.
var pending map[uint64]*Pending
var redelivered map[uint64]uint64
if len(state.Pending) > 0 {
pending = make(map[uint64]*Pending, len(state.Pending))
for seq, p := range state.Pending {
pending[seq] = &Pending{p.Sequence, p.Timestamp}
}
for seq := range pending {
if seq <= state.AckFloor.Stream || seq > state.Delivered.Stream {
return fmt.Errorf("bad pending entry, sequence [%d] out of range", seq)
}
}
}
if len(state.Redelivered) > 0 {
redelivered = make(map[uint64]uint64, len(state.Redelivered))
for seq, dc := range state.Redelivered {
redelivered[seq] = dc
}
}
// Replace our state.
o.mu.Lock()
// Check to see if this is an outdated update.
if state.Delivered.Consumer < o.state.Delivered.Consumer {
o.mu.Unlock()
return fmt.Errorf("old update ignored")
}
o.state.Delivered = state.Delivered
o.state.AckFloor = state.AckFloor
o.state.Pending = pending
o.state.Redelivered = redelivered
o.mu.Unlock()
func (os *consumerMemStore) Stop() error {
os.ms.decConsumers()
return nil
}
func (os *consumerMemStore) Delete() error {
return os.Stop()
}
func (os *consumerMemStore) StreamDelete() error {
return os.Stop()
func (o *consumerMemStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) error {
o.mu.Lock()
defer o.mu.Unlock()
if dc != 1 && o.cfg.AckPolicy == AckNone {
return ErrNoAckPolicy
}
if dseq <= o.state.AckFloor.Consumer {
return nil
}
// See if we expect an ack for this.
if o.cfg.AckPolicy != AckNone {
// Need to create pending records here.
if o.state.Pending == nil {
o.state.Pending = make(map[uint64]*Pending)
}
var p *Pending
// Check for an update to a message already delivered.
if sseq <= o.state.Delivered.Stream {
if p = o.state.Pending[sseq]; p != nil {
p.Sequence, p.Timestamp = dseq, ts
}
} else {
// Add to pending.
o.state.Pending[sseq] = &Pending{dseq, ts}
}
// Update delivered as needed.
if dseq > o.state.Delivered.Consumer {
o.state.Delivered.Consumer = dseq
}
if sseq > o.state.Delivered.Stream {
o.state.Delivered.Stream = sseq
}
if dc > 1 {
if o.state.Redelivered == nil {
o.state.Redelivered = make(map[uint64]uint64)
}
o.state.Redelivered[sseq] = dc - 1
}
} else {
// For AckNone just update delivered and ackfloor at the same time.
o.state.Delivered.Consumer = dseq
o.state.Delivered.Stream = sseq
o.state.AckFloor.Consumer = dseq
o.state.AckFloor.Stream = sseq
}
return nil
}
func (os *consumerMemStore) State() (*ConsumerState, error) { return nil, nil }
func (o *consumerMemStore) UpdateAcks(dseq, sseq uint64) error {
o.mu.Lock()
defer o.mu.Unlock()
if o.cfg.AckPolicy == AckNone {
return ErrNoAckPolicy
}
if len(o.state.Pending) == 0 || o.state.Pending[sseq] == nil {
return ErrStoreMsgNotFound
}
// On restarts the old leader may get a replay from the raft logs that are old.
if dseq <= o.state.AckFloor.Consumer {
return nil
}
// Check for AckAll here.
if o.cfg.AckPolicy == AckAll {
sgap := sseq - o.state.AckFloor.Stream
o.state.AckFloor.Consumer = dseq
o.state.AckFloor.Stream = sseq
for seq := sseq; seq > sseq-sgap; seq-- {
delete(o.state.Pending, seq)
if len(o.state.Redelivered) > 0 {
delete(o.state.Redelivered, seq)
}
}
return nil
}
// AckExplicit
// First delete from our pending state.
if p, ok := o.state.Pending[sseq]; ok {
delete(o.state.Pending, sseq)
dseq = p.Sequence // Use the original.
}
// Now remove from redelivered.
if len(o.state.Redelivered) > 0 {
delete(o.state.Redelivered, sseq)
}
if len(o.state.Pending) == 0 {
o.state.AckFloor.Consumer = o.state.Delivered.Consumer
o.state.AckFloor.Stream = o.state.Delivered.Stream
} else if dseq == o.state.AckFloor.Consumer+1 {
first := o.state.AckFloor.Consumer == 0
o.state.AckFloor.Consumer = dseq
o.state.AckFloor.Stream = sseq
if !first && o.state.Delivered.Consumer > dseq {
for ss := sseq + 1; ss < o.state.Delivered.Stream; ss++ {
if p, ok := o.state.Pending[ss]; ok {
if p.Sequence > 0 {
o.state.AckFloor.Consumer = p.Sequence - 1
o.state.AckFloor.Stream = ss - 1
}
break
}
}
}
}
return nil
}
func (o *consumerMemStore) UpdateConfig(cfg *ConsumerConfig) error {
o.mu.Lock()
defer o.mu.Unlock()
// This is mostly unchecked here. We are assuming the upper layers have done sanity checking.
o.cfg = *cfg
return nil
}
func (o *consumerMemStore) Stop() error {
o.mu.Lock()
o.closed = true
ms := o.ms
o.mu.Unlock()
ms.decConsumers()
return nil
}
func (o *consumerMemStore) Delete() error {
return o.Stop()
}
func (o *consumerMemStore) StreamDelete() error {
return o.Stop()
}
func (o *consumerMemStore) State() (*ConsumerState, error) {
o.mu.Lock()
defer o.mu.Unlock()
if o.closed {
return nil, ErrStoreClosed
}
state := &ConsumerState{}
state.Delivered = o.state.Delivered
state.AckFloor = o.state.AckFloor
if len(o.state.Pending) > 0 {
state.Pending = o.copyPending()
}
if len(o.state.Redelivered) > 0 {
state.Redelivered = o.copyRedelivered()
}
return state, nil
}
// EncodeState for this consumer store.
func (o *consumerMemStore) EncodedState() ([]byte, error) {
o.mu.Lock()
defer o.mu.Unlock()
if o.closed {
return nil, ErrStoreClosed
}
return encodeConsumerState(&o.state), nil
}
func (o *consumerMemStore) copyPending() map[uint64]*Pending {
pending := make(map[uint64]*Pending, len(o.state.Pending))
for seq, p := range o.state.Pending {
pending[seq] = &Pending{p.Sequence, p.Timestamp}
}
return pending
}
func (o *consumerMemStore) copyRedelivered() map[uint64]uint64 {
redelivered := make(map[uint64]uint64, len(o.state.Redelivered))
for seq, dc := range o.state.Redelivered {
redelivered[seq] = dc
}
return redelivered
}
// Type returns the type of the underlying store.
func (os *consumerMemStore) Type() StorageType { return MemoryStorage }
func (o *consumerMemStore) Type() StorageType { return MemoryStorage }
// Templates
type templateMemStore struct{}

View File

@@ -14,6 +14,7 @@
package server
import (
"encoding/binary"
"encoding/json"
"errors"
"fmt"
@@ -170,6 +171,7 @@ type ConsumerStore interface {
UpdateConfig(cfg *ConsumerConfig) error
Update(*ConsumerState) error
State() (*ConsumerState, error)
EncodedState() ([]byte, error)
Type() StorageType
Stop() error
Delete() error
@@ -196,6 +198,68 @@ type ConsumerState struct {
Redelivered map[uint64]uint64 `json:"redelivered,omitempty"`
}
func encodeConsumerState(state *ConsumerState) []byte {
var hdr [seqsHdrSize]byte
var buf []byte
maxSize := seqsHdrSize
if lp := len(state.Pending); lp > 0 {
maxSize += lp*(3*binary.MaxVarintLen64) + binary.MaxVarintLen64
}
if lr := len(state.Redelivered); lr > 0 {
maxSize += lr*(2*binary.MaxVarintLen64) + binary.MaxVarintLen64
}
if maxSize == seqsHdrSize {
buf = hdr[:seqsHdrSize]
} else {
buf = make([]byte, maxSize)
}
// Write header
buf[0] = magic
buf[1] = 2
n := hdrLen
n += binary.PutUvarint(buf[n:], state.AckFloor.Consumer)
n += binary.PutUvarint(buf[n:], state.AckFloor.Stream)
n += binary.PutUvarint(buf[n:], state.Delivered.Consumer)
n += binary.PutUvarint(buf[n:], state.Delivered.Stream)
n += binary.PutUvarint(buf[n:], uint64(len(state.Pending)))
asflr := state.AckFloor.Stream
adflr := state.AckFloor.Consumer
// These are optional, but always write len. This is to avoid a truncate inline.
if len(state.Pending) > 0 {
// To save space we will use now rounded to seconds to be base timestamp.
mints := time.Now().Round(time.Second).Unix()
// Write minimum timestamp we found from above.
n += binary.PutVarint(buf[n:], mints)
for k, v := range state.Pending {
n += binary.PutUvarint(buf[n:], k-asflr)
n += binary.PutUvarint(buf[n:], v.Sequence-adflr)
// Downsample to seconds to save on space.
// Subsecond resolution not needed for recovery etc.
ts := v.Timestamp / 1_000_000_000
n += binary.PutVarint(buf[n:], mints-ts)
}
}
// We always write the redelivered len.
n += binary.PutUvarint(buf[n:], uint64(len(state.Redelivered)))
// We expect these to be small.
if len(state.Redelivered) > 0 {
for k, v := range state.Redelivered {
n += binary.PutUvarint(buf[n:], k-asflr)
n += binary.PutUvarint(buf[n:], v)
}
}
return buf[:n]
}
// Represents a pending message for explicit ack or ack all.
// Sequence is the original consumer sequence.
type Pending struct {