diff --git a/server/filestore.go b/server/filestore.go index cd6f50b5..2677c084 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -5461,73 +5461,22 @@ const seqsHdrSize = 6*binary.MaxVarintLen64 + hdrLen // Encode our consumer state, version 2. // Lock should be held. -func (o *consumerFileStore) encodeState() ([]byte, error) { + +func (o *consumerFileStore) EncodedState() ([]byte, error) { + o.mu.Lock() + defer o.mu.Unlock() + if o.closed { return nil, ErrStoreClosed } return encodeConsumerState(&o.state), nil } -func encodeConsumerState(state *ConsumerState) []byte { - var hdr [seqsHdrSize]byte - var buf []byte - - maxSize := seqsHdrSize - if lp := len(state.Pending); lp > 0 { - maxSize += lp*(3*binary.MaxVarintLen64) + binary.MaxVarintLen64 +func (o *consumerFileStore) encodeState() ([]byte, error) { + if o.closed { + return nil, ErrStoreClosed } - if lr := len(state.Redelivered); lr > 0 { - maxSize += lr*(2*binary.MaxVarintLen64) + binary.MaxVarintLen64 - } - if maxSize == seqsHdrSize { - buf = hdr[:seqsHdrSize] - } else { - buf = make([]byte, maxSize) - } - - // Write header - buf[0] = magic - buf[1] = 2 - - n := hdrLen - n += binary.PutUvarint(buf[n:], state.AckFloor.Consumer) - n += binary.PutUvarint(buf[n:], state.AckFloor.Stream) - n += binary.PutUvarint(buf[n:], state.Delivered.Consumer) - n += binary.PutUvarint(buf[n:], state.Delivered.Stream) - n += binary.PutUvarint(buf[n:], uint64(len(state.Pending))) - - asflr := state.AckFloor.Stream - adflr := state.AckFloor.Consumer - - // These are optional, but always write len. This is to avoid a truncate inline. - if len(state.Pending) > 0 { - // To save space we will use now rounded to seconds to be base timestamp. - mints := time.Now().Round(time.Second).Unix() - // Write minimum timestamp we found from above. - n += binary.PutVarint(buf[n:], mints) - - for k, v := range state.Pending { - n += binary.PutUvarint(buf[n:], k-asflr) - n += binary.PutUvarint(buf[n:], v.Sequence-adflr) - // Downsample to seconds to save on space. - // Subsecond resolution not needed for recovery etc. - ts := v.Timestamp / 1_000_000_000 - n += binary.PutVarint(buf[n:], mints-ts) - } - } - - // We always write the redelivered len. - n += binary.PutUvarint(buf[n:], uint64(len(state.Redelivered))) - - // We expect these to be small. - if len(state.Redelivered) > 0 { - for k, v := range state.Redelivered { - n += binary.PutUvarint(buf[n:], k-asflr) - n += binary.PutUvarint(buf[n:], v) - } - } - - return buf[:n] + return encodeConsumerState(&o.state), nil } func (o *consumerFileStore) UpdateConfig(cfg *ConsumerConfig) error { @@ -5757,6 +5706,10 @@ func (o *consumerFileStore) State() (*ConsumerState, error) { o.mu.Lock() defer o.mu.Unlock() + if o.closed { + return nil, ErrStoreClosed + } + state := &ConsumerState{} // See if we have a running state or if we need to read in from disk. diff --git a/server/memstore.go b/server/memstore.go index e6010f7f..6b992fca 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -880,6 +880,12 @@ func (ms *memStore) Stop() error { return nil } +func (ms *memStore) isClosed() bool { + ms.mu.RLock() + defer ms.mu.RUnlock() + return ms.msgs == nil +} + func (ms *memStore) incConsumers() { ms.mu.Lock() ms.consumers++ @@ -895,40 +901,275 @@ func (ms *memStore) decConsumers() { } type consumerMemStore struct { - ms *memStore + mu sync.Mutex + ms *memStore + cfg ConsumerConfig + state ConsumerState + closed bool } -func (ms *memStore) ConsumerStore(_ string, _ *ConsumerConfig) (ConsumerStore, error) { +func (ms *memStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerStore, error) { + if ms == nil { + return nil, fmt.Errorf("memstore is nil") + } + if ms.isClosed() { + return nil, ErrStoreClosed + } + if cfg == nil || name == _EMPTY_ { + return nil, fmt.Errorf("bad consumer config") + } ms.incConsumers() - return &consumerMemStore{ms}, nil + return &consumerMemStore{ms: ms, cfg: *cfg}, nil } func (ms *memStore) Snapshot(_ time.Duration, _, _ bool) (*SnapshotResult, error) { return nil, fmt.Errorf("no impl") } -// No-ops. -func (os *consumerMemStore) Update(_ *ConsumerState) error { return nil } -func (os *consumerMemStore) UpdateDelivered(_, _, _ uint64, _ int64) error { return nil } -func (os *consumerMemStore) UpdateAcks(_, _ uint64) error { return nil } -func (os *consumerMemStore) UpdateConfig(_ *ConsumerConfig) error { return nil } +func (o *consumerMemStore) Update(state *ConsumerState) error { + // Sanity checks. + if state.AckFloor.Consumer > state.Delivered.Consumer { + return fmt.Errorf("bad ack floor for consumer") + } + if state.AckFloor.Stream > state.Delivered.Stream { + return fmt.Errorf("bad ack floor for stream") + } + + // Copy to our state. + var pending map[uint64]*Pending + var redelivered map[uint64]uint64 + if len(state.Pending) > 0 { + pending = make(map[uint64]*Pending, len(state.Pending)) + for seq, p := range state.Pending { + pending[seq] = &Pending{p.Sequence, p.Timestamp} + } + for seq := range pending { + if seq <= state.AckFloor.Stream || seq > state.Delivered.Stream { + return fmt.Errorf("bad pending entry, sequence [%d] out of range", seq) + } + } + } + if len(state.Redelivered) > 0 { + redelivered = make(map[uint64]uint64, len(state.Redelivered)) + for seq, dc := range state.Redelivered { + redelivered[seq] = dc + } + } + + // Replace our state. + o.mu.Lock() + + // Check to see if this is an outdated update. + if state.Delivered.Consumer < o.state.Delivered.Consumer { + o.mu.Unlock() + return fmt.Errorf("old update ignored") + } + + o.state.Delivered = state.Delivered + o.state.AckFloor = state.AckFloor + o.state.Pending = pending + o.state.Redelivered = redelivered + o.mu.Unlock() -func (os *consumerMemStore) Stop() error { - os.ms.decConsumers() return nil } -func (os *consumerMemStore) Delete() error { - return os.Stop() -} -func (os *consumerMemStore) StreamDelete() error { - return os.Stop() +func (o *consumerMemStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) error { + o.mu.Lock() + defer o.mu.Unlock() + + if dc != 1 && o.cfg.AckPolicy == AckNone { + return ErrNoAckPolicy + } + + if dseq <= o.state.AckFloor.Consumer { + return nil + } + + // See if we expect an ack for this. + if o.cfg.AckPolicy != AckNone { + // Need to create pending records here. + if o.state.Pending == nil { + o.state.Pending = make(map[uint64]*Pending) + } + var p *Pending + // Check for an update to a message already delivered. + if sseq <= o.state.Delivered.Stream { + if p = o.state.Pending[sseq]; p != nil { + p.Sequence, p.Timestamp = dseq, ts + } + } else { + // Add to pending. + o.state.Pending[sseq] = &Pending{dseq, ts} + } + // Update delivered as needed. + if dseq > o.state.Delivered.Consumer { + o.state.Delivered.Consumer = dseq + } + if sseq > o.state.Delivered.Stream { + o.state.Delivered.Stream = sseq + } + + if dc > 1 { + if o.state.Redelivered == nil { + o.state.Redelivered = make(map[uint64]uint64) + } + o.state.Redelivered[sseq] = dc - 1 + } + } else { + // For AckNone just update delivered and ackfloor at the same time. + o.state.Delivered.Consumer = dseq + o.state.Delivered.Stream = sseq + o.state.AckFloor.Consumer = dseq + o.state.AckFloor.Stream = sseq + } + + return nil } -func (os *consumerMemStore) State() (*ConsumerState, error) { return nil, nil } +func (o *consumerMemStore) UpdateAcks(dseq, sseq uint64) error { + o.mu.Lock() + defer o.mu.Unlock() + + if o.cfg.AckPolicy == AckNone { + return ErrNoAckPolicy + } + if len(o.state.Pending) == 0 || o.state.Pending[sseq] == nil { + return ErrStoreMsgNotFound + } + + // On restarts the old leader may get a replay from the raft logs that are old. + if dseq <= o.state.AckFloor.Consumer { + return nil + } + + // Check for AckAll here. + if o.cfg.AckPolicy == AckAll { + sgap := sseq - o.state.AckFloor.Stream + o.state.AckFloor.Consumer = dseq + o.state.AckFloor.Stream = sseq + for seq := sseq; seq > sseq-sgap; seq-- { + delete(o.state.Pending, seq) + if len(o.state.Redelivered) > 0 { + delete(o.state.Redelivered, seq) + } + } + return nil + } + + // AckExplicit + + // First delete from our pending state. + if p, ok := o.state.Pending[sseq]; ok { + delete(o.state.Pending, sseq) + dseq = p.Sequence // Use the original. + } + // Now remove from redelivered. + if len(o.state.Redelivered) > 0 { + delete(o.state.Redelivered, sseq) + } + + if len(o.state.Pending) == 0 { + o.state.AckFloor.Consumer = o.state.Delivered.Consumer + o.state.AckFloor.Stream = o.state.Delivered.Stream + } else if dseq == o.state.AckFloor.Consumer+1 { + first := o.state.AckFloor.Consumer == 0 + o.state.AckFloor.Consumer = dseq + o.state.AckFloor.Stream = sseq + + if !first && o.state.Delivered.Consumer > dseq { + for ss := sseq + 1; ss < o.state.Delivered.Stream; ss++ { + if p, ok := o.state.Pending[ss]; ok { + if p.Sequence > 0 { + o.state.AckFloor.Consumer = p.Sequence - 1 + o.state.AckFloor.Stream = ss - 1 + } + break + } + } + } + } + + return nil +} + +func (o *consumerMemStore) UpdateConfig(cfg *ConsumerConfig) error { + o.mu.Lock() + defer o.mu.Unlock() + + // This is mostly unchecked here. We are assuming the upper layers have done sanity checking. + o.cfg = *cfg + return nil +} + +func (o *consumerMemStore) Stop() error { + o.mu.Lock() + o.closed = true + ms := o.ms + o.mu.Unlock() + ms.decConsumers() + return nil +} + +func (o *consumerMemStore) Delete() error { + return o.Stop() +} + +func (o *consumerMemStore) StreamDelete() error { + return o.Stop() +} + +func (o *consumerMemStore) State() (*ConsumerState, error) { + o.mu.Lock() + defer o.mu.Unlock() + + if o.closed { + return nil, ErrStoreClosed + } + + state := &ConsumerState{} + + state.Delivered = o.state.Delivered + state.AckFloor = o.state.AckFloor + if len(o.state.Pending) > 0 { + state.Pending = o.copyPending() + } + if len(o.state.Redelivered) > 0 { + state.Redelivered = o.copyRedelivered() + } + return state, nil +} + +// EncodeState for this consumer store. +func (o *consumerMemStore) EncodedState() ([]byte, error) { + o.mu.Lock() + defer o.mu.Unlock() + + if o.closed { + return nil, ErrStoreClosed + } + + return encodeConsumerState(&o.state), nil +} + +func (o *consumerMemStore) copyPending() map[uint64]*Pending { + pending := make(map[uint64]*Pending, len(o.state.Pending)) + for seq, p := range o.state.Pending { + pending[seq] = &Pending{p.Sequence, p.Timestamp} + } + return pending +} + +func (o *consumerMemStore) copyRedelivered() map[uint64]uint64 { + redelivered := make(map[uint64]uint64, len(o.state.Redelivered)) + for seq, dc := range o.state.Redelivered { + redelivered[seq] = dc + } + return redelivered +} // Type returns the type of the underlying store. -func (os *consumerMemStore) Type() StorageType { return MemoryStorage } +func (o *consumerMemStore) Type() StorageType { return MemoryStorage } // Templates type templateMemStore struct{} diff --git a/server/store.go b/server/store.go index 5aa91949..10fc10a7 100644 --- a/server/store.go +++ b/server/store.go @@ -14,6 +14,7 @@ package server import ( + "encoding/binary" "encoding/json" "errors" "fmt" @@ -170,6 +171,7 @@ type ConsumerStore interface { UpdateConfig(cfg *ConsumerConfig) error Update(*ConsumerState) error State() (*ConsumerState, error) + EncodedState() ([]byte, error) Type() StorageType Stop() error Delete() error @@ -196,6 +198,68 @@ type ConsumerState struct { Redelivered map[uint64]uint64 `json:"redelivered,omitempty"` } +func encodeConsumerState(state *ConsumerState) []byte { + var hdr [seqsHdrSize]byte + var buf []byte + + maxSize := seqsHdrSize + if lp := len(state.Pending); lp > 0 { + maxSize += lp*(3*binary.MaxVarintLen64) + binary.MaxVarintLen64 + } + if lr := len(state.Redelivered); lr > 0 { + maxSize += lr*(2*binary.MaxVarintLen64) + binary.MaxVarintLen64 + } + if maxSize == seqsHdrSize { + buf = hdr[:seqsHdrSize] + } else { + buf = make([]byte, maxSize) + } + + // Write header + buf[0] = magic + buf[1] = 2 + + n := hdrLen + n += binary.PutUvarint(buf[n:], state.AckFloor.Consumer) + n += binary.PutUvarint(buf[n:], state.AckFloor.Stream) + n += binary.PutUvarint(buf[n:], state.Delivered.Consumer) + n += binary.PutUvarint(buf[n:], state.Delivered.Stream) + n += binary.PutUvarint(buf[n:], uint64(len(state.Pending))) + + asflr := state.AckFloor.Stream + adflr := state.AckFloor.Consumer + + // These are optional, but always write len. This is to avoid a truncate inline. + if len(state.Pending) > 0 { + // To save space we will use now rounded to seconds to be base timestamp. + mints := time.Now().Round(time.Second).Unix() + // Write minimum timestamp we found from above. + n += binary.PutVarint(buf[n:], mints) + + for k, v := range state.Pending { + n += binary.PutUvarint(buf[n:], k-asflr) + n += binary.PutUvarint(buf[n:], v.Sequence-adflr) + // Downsample to seconds to save on space. + // Subsecond resolution not needed for recovery etc. + ts := v.Timestamp / 1_000_000_000 + n += binary.PutVarint(buf[n:], mints-ts) + } + } + + // We always write the redelivered len. + n += binary.PutUvarint(buf[n:], uint64(len(state.Redelivered))) + + // We expect these to be small. + if len(state.Redelivered) > 0 { + for k, v := range state.Redelivered { + n += binary.PutUvarint(buf[n:], k-asflr) + n += binary.PutUvarint(buf[n:], v) + } + } + + return buf[:n] +} + // Represents a pending message for explicit ack or ack all. // Sequence is the original consumer sequence. type Pending struct {