ConsumerFileStore could encode an empty state or update an empty state on startup.

We needed to make sure at the lowest level that the state was read from disk and not depend on upper layer consumer.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-04-26 15:48:10 -07:00
parent 7f06d6f5a7
commit 9999f63853
2 changed files with 57 additions and 8 deletions

View File

@@ -6536,6 +6536,10 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt
o.qch = make(chan struct{})
go o.flushLoop(o.fch, o.qch)
// Make sure to load in our state from disk if needed.
o.loadState()
// Assign to filestore.
fs.AddConsumer(o)
return o, nil
@@ -6841,18 +6845,16 @@ const seqsHdrSize = 6*binary.MaxVarintLen64 + hdrLen
func (o *consumerFileStore) EncodedState() ([]byte, error) {
o.mu.Lock()
defer o.mu.Unlock()
if o.closed {
return nil, ErrStoreClosed
}
return encodeConsumerState(&o.state), nil
return o.encodeState()
}
func (o *consumerFileStore) encodeState() ([]byte, error) {
if o.closed {
return nil, ErrStoreClosed
// Grab reference to state, but make sure we load in if needed, so do not reference o.state directly.
state, err := o.stateWithCopyLocked(false)
if err != nil {
return nil, err
}
return encodeConsumerState(&o.state), nil
return encodeConsumerState(state), nil
}
func (o *consumerFileStore) UpdateConfig(cfg *ConsumerConfig) error {
@@ -7084,7 +7086,11 @@ func (o *consumerFileStore) BorrowState() (*ConsumerState, error) {
func (o *consumerFileStore) stateWithCopy(doCopy bool) (*ConsumerState, error) {
o.mu.Lock()
defer o.mu.Unlock()
return o.stateWithCopyLocked(doCopy)
}
// Lock should be held.
func (o *consumerFileStore) stateWithCopyLocked(doCopy bool) (*ConsumerState, error) {
if o.closed {
return nil, ErrStoreClosed
}
@@ -7163,6 +7169,14 @@ func (o *consumerFileStore) stateWithCopy(doCopy bool) (*ConsumerState, error) {
return state, nil
}
// Lock should be held. Called at startup.
func (o *consumerFileStore) loadState() {
if _, err := os.Stat(o.ifn); err == nil {
// This will load our state in from disk.
o.stateWithCopyLocked(false)
}
}
// Decode consumer state.
func decodeConsumerState(buf []byte) (*ConsumerState, error) {
version, err := checkConsumerHeader(buf)

View File

@@ -5373,3 +5373,38 @@ func TestFileStoreSubjectsTotals(t *testing.T) {
t.Fatalf("Expected %d subjects for %q, got %d", expected, "*.*", len(st))
}
}
func TestFileStoreConsumerStoreEncodeAfterRestart(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()
o, err := fs.ConsumerStore("o22", &ConsumerConfig{AckPolicy: AckExplicit})
require_NoError(t, err)
state := &ConsumerState{}
state.Delivered.Consumer = 22
state.Delivered.Stream = 22
state.AckFloor.Consumer = 11
state.AckFloor.Stream = 11
err = o.Update(state)
require_NoError(t, err)
fs.Stop()
fs, err = newFileStore(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()
o, err = fs.ConsumerStore("o22", &ConsumerConfig{AckPolicy: AckExplicit})
require_NoError(t, err)
if o.(*consumerFileStore).state.Delivered != state.Delivered {
t.Fatalf("Consumer state is wrong %+v vs %+v", o.(*consumerFileStore).state, state)
}
if o.(*consumerFileStore).state.AckFloor != state.AckFloor {
t.Fatalf("Consumer state is wrong %+v vs %+v", o.(*consumerFileStore).state, state)
}
})
}