When Jsz() was asked for consumer details, would report incorrect data if not a consumer leader.

This is due to the way state is maintained for leaders vs followers for consumers.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-04-26 15:03:15 -07:00
parent aea4a4115d
commit 7f06d6f5a7
2 changed files with 60 additions and 0 deletions

View File

@@ -2314,6 +2314,16 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
NumPending: o.checkNumPending(),
PushBound: o.isPushMode() && o.active,
}
// If we are replicated and we are not the leader we need to pull certain data from our store.
if rg != nil && rg.node != nil && !o.isLeader() && o.store != nil {
state, _ := o.store.BorrowState()
info.Delivered.Consumer, info.Delivered.Stream = state.Delivered.Consumer, state.Delivered.Stream
info.AckFloor.Consumer, info.AckFloor.Stream = state.AckFloor.Consumer, state.AckFloor.Stream
info.NumAckPending = len(state.Pending)
info.NumRedelivered = len(state.Redelivered)
}
// Adjust active based on non-zero etc. Also make UTC here.
if !o.ldt.IsZero() {
ldt := o.ldt.UTC() // This copies as well.

View File

@@ -3713,3 +3713,53 @@ func TestJetStreamClusterChangeClusterAfterStreamCreate(t *testing.T) {
})
require_NoError(t, err)
}
// The consumer info() call does not take into account whether a consumer
// is a leader or not, so results would be very different when asking servers
// that housed consumer followers vs leaders.
func TestJetStreamClusterConsumerInfoForJszForFollowers(t *testing.T) {
c := createJetStreamClusterExplicit(t, "NATS", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Replicas: 3,
})
require_NoError(t, err)
for i := 0; i < 1000; i++ {
sendStreamMsg(t, nc, "foo", "HELLO")
}
sub, err := js.PullSubscribe("foo", "d")
require_NoError(t, err)
fetch, ack := 122, 22
msgs, err := sub.Fetch(fetch, nats.MaxWait(10*time.Second))
require_NoError(t, err)
require_True(t, len(msgs) == fetch)
for _, m := range msgs[:ack] {
m.AckSync()
}
// Let acks propagate.
time.Sleep(100 * time.Millisecond)
for _, s := range c.servers {
jsz, err := s.Jsz(&JSzOptions{Accounts: true, Consumer: true})
require_NoError(t, err)
require_True(t, len(jsz.AccountDetails) == 1)
require_True(t, len(jsz.AccountDetails[0].Streams) == 1)
require_True(t, len(jsz.AccountDetails[0].Streams[0].Consumer) == 1)
consumer := jsz.AccountDetails[0].Streams[0].Consumer[0]
if consumer.Delivered.Consumer != uint64(fetch) || consumer.Delivered.Stream != uint64(fetch) {
t.Fatalf("Incorrect delivered for %v: %+v", s, consumer.Delivered)
}
if consumer.AckFloor.Consumer != uint64(ack) || consumer.AckFloor.Stream != uint64(ack) {
t.Fatalf("Incorrect ackfloor for %v: %+v", s, consumer.AckFloor)
}
}
}