Merge branch 'main' into dev

This commit is contained in:
Derek Collison
2023-04-26 18:42:31 -07:00
4 changed files with 117 additions and 8 deletions

View File

@@ -2395,6 +2395,16 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
NumPending: o.checkNumPending(),
PushBound: o.isPushMode() && o.active,
}
// If we are replicated and we are not the leader we need to pull certain data from our store.
if rg != nil && rg.node != nil && !o.isLeader() && o.store != nil {
state, _ := o.store.BorrowState()
info.Delivered.Consumer, info.Delivered.Stream = state.Delivered.Consumer, state.Delivered.Stream
info.AckFloor.Consumer, info.AckFloor.Stream = state.AckFloor.Consumer, state.AckFloor.Stream
info.NumAckPending = len(state.Pending)
info.NumRedelivered = len(state.Redelivered)
}
// Adjust active based on non-zero etc. Also make UTC here.
if !o.ldt.IsZero() {
ldt := o.ldt.UTC() // This copies as well.

View File

@@ -6796,6 +6796,10 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt
o.qch = make(chan struct{})
go o.flushLoop(o.fch, o.qch)
// Make sure to load in our state from disk if needed.
o.loadState()
// Assign to filestore.
fs.AddConsumer(o)
return o, nil
@@ -7101,18 +7105,16 @@ const seqsHdrSize = 6*binary.MaxVarintLen64 + hdrLen
func (o *consumerFileStore) EncodedState() ([]byte, error) {
o.mu.Lock()
defer o.mu.Unlock()
if o.closed {
return nil, ErrStoreClosed
}
return encodeConsumerState(&o.state), nil
return o.encodeState()
}
func (o *consumerFileStore) encodeState() ([]byte, error) {
if o.closed {
return nil, ErrStoreClosed
// Grab reference to state, but make sure we load in if needed, so do not reference o.state directly.
state, err := o.stateWithCopyLocked(false)
if err != nil {
return nil, err
}
return encodeConsumerState(&o.state), nil
return encodeConsumerState(state), nil
}
func (o *consumerFileStore) UpdateConfig(cfg *ConsumerConfig) error {
@@ -7344,7 +7346,11 @@ func (o *consumerFileStore) BorrowState() (*ConsumerState, error) {
func (o *consumerFileStore) stateWithCopy(doCopy bool) (*ConsumerState, error) {
o.mu.Lock()
defer o.mu.Unlock()
return o.stateWithCopyLocked(doCopy)
}
// Lock should be held.
func (o *consumerFileStore) stateWithCopyLocked(doCopy bool) (*ConsumerState, error) {
if o.closed {
return nil, ErrStoreClosed
}
@@ -7423,6 +7429,14 @@ func (o *consumerFileStore) stateWithCopy(doCopy bool) (*ConsumerState, error) {
return state, nil
}
// Lock should be held. Called at startup.
func (o *consumerFileStore) loadState() {
if _, err := os.Stat(o.ifn); err == nil {
// This will load our state in from disk.
o.stateWithCopyLocked(false)
}
}
// Decode consumer state.
func decodeConsumerState(buf []byte) (*ConsumerState, error) {
version, err := checkConsumerHeader(buf)

View File

@@ -5457,3 +5457,38 @@ func TestFileStoreNewWriteIndexInfo(t *testing.T) {
defer fs.Stop()
})
}
func TestFileStoreConsumerStoreEncodeAfterRestart(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()
o, err := fs.ConsumerStore("o22", &ConsumerConfig{AckPolicy: AckExplicit})
require_NoError(t, err)
state := &ConsumerState{}
state.Delivered.Consumer = 22
state.Delivered.Stream = 22
state.AckFloor.Consumer = 11
state.AckFloor.Stream = 11
err = o.Update(state)
require_NoError(t, err)
fs.Stop()
fs, err = newFileStore(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()
o, err = fs.ConsumerStore("o22", &ConsumerConfig{AckPolicy: AckExplicit})
require_NoError(t, err)
if o.(*consumerFileStore).state.Delivered != state.Delivered {
t.Fatalf("Consumer state is wrong %+v vs %+v", o.(*consumerFileStore).state, state)
}
if o.(*consumerFileStore).state.AckFloor != state.AckFloor {
t.Fatalf("Consumer state is wrong %+v vs %+v", o.(*consumerFileStore).state, state)
}
})
}

View File

@@ -3713,3 +3713,53 @@ func TestJetStreamClusterChangeClusterAfterStreamCreate(t *testing.T) {
})
require_NoError(t, err)
}
// The consumer info() call does not take into account whether a consumer
// is a leader or not, so results would be very different when asking servers
// that housed consumer followers vs leaders.
func TestJetStreamClusterConsumerInfoForJszForFollowers(t *testing.T) {
c := createJetStreamClusterExplicit(t, "NATS", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Replicas: 3,
})
require_NoError(t, err)
for i := 0; i < 1000; i++ {
sendStreamMsg(t, nc, "foo", "HELLO")
}
sub, err := js.PullSubscribe("foo", "d")
require_NoError(t, err)
fetch, ack := 122, 22
msgs, err := sub.Fetch(fetch, nats.MaxWait(10*time.Second))
require_NoError(t, err)
require_True(t, len(msgs) == fetch)
for _, m := range msgs[:ack] {
m.AckSync()
}
// Let acks propagate.
time.Sleep(100 * time.Millisecond)
for _, s := range c.servers {
jsz, err := s.Jsz(&JSzOptions{Accounts: true, Consumer: true})
require_NoError(t, err)
require_True(t, len(jsz.AccountDetails) == 1)
require_True(t, len(jsz.AccountDetails[0].Streams) == 1)
require_True(t, len(jsz.AccountDetails[0].Streams[0].Consumer) == 1)
consumer := jsz.AccountDetails[0].Streams[0].Consumer[0]
if consumer.Delivered.Consumer != uint64(fetch) || consumer.Delivered.Stream != uint64(fetch) {
t.Fatalf("Incorrect delivered for %v: %+v", s, consumer.Delivered)
}
if consumer.AckFloor.Consumer != uint64(ack) || consumer.AckFloor.Stream != uint64(ack) {
t.Fatalf("Incorrect ackfloor for %v: %+v", s, consumer.AckFloor)
}
}
}