From 7f06d6f5a78f4c7e26a3a3596503d487edd3692a Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 26 Apr 2023 15:03:15 -0700 Subject: [PATCH] When Jsz() was asked for consumer details, would report incorrect data if not a consumer leader. This is due to the way state is maintained for leaders vs followers for consumers. Signed-off-by: Derek Collison --- server/consumer.go | 10 ++++++ server/jetstream_cluster_3_test.go | 50 ++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/server/consumer.go b/server/consumer.go index 8364b9c1..7fce8ed0 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2314,6 +2314,16 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo { NumPending: o.checkNumPending(), PushBound: o.isPushMode() && o.active, } + + // If we are replicated and we are not the leader we need to pull certain data from our store. + if rg != nil && rg.node != nil && !o.isLeader() && o.store != nil { + state, _ := o.store.BorrowState() + info.Delivered.Consumer, info.Delivered.Stream = state.Delivered.Consumer, state.Delivered.Stream + info.AckFloor.Consumer, info.AckFloor.Stream = state.AckFloor.Consumer, state.AckFloor.Stream + info.NumAckPending = len(state.Pending) + info.NumRedelivered = len(state.Redelivered) + } + // Adjust active based on non-zero etc. Also make UTC here. if !o.ldt.IsZero() { ldt := o.ldt.UTC() // This copies as well. diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 33046283..3301ebb0 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -3713,3 +3713,53 @@ func TestJetStreamClusterChangeClusterAfterStreamCreate(t *testing.T) { }) require_NoError(t, err) } + +// The consumer info() call does not take into account whether a consumer +// is a leader or not, so results would be very different when asking servers +// that housed consumer followers vs leaders. +func TestJetStreamClusterConsumerInfoForJszForFollowers(t *testing.T) { + c := createJetStreamClusterExplicit(t, "NATS", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"*"}, + Replicas: 3, + }) + require_NoError(t, err) + + for i := 0; i < 1000; i++ { + sendStreamMsg(t, nc, "foo", "HELLO") + } + + sub, err := js.PullSubscribe("foo", "d") + require_NoError(t, err) + + fetch, ack := 122, 22 + msgs, err := sub.Fetch(fetch, nats.MaxWait(10*time.Second)) + require_NoError(t, err) + require_True(t, len(msgs) == fetch) + for _, m := range msgs[:ack] { + m.AckSync() + } + // Let acks propagate. + time.Sleep(100 * time.Millisecond) + + for _, s := range c.servers { + jsz, err := s.Jsz(&JSzOptions{Accounts: true, Consumer: true}) + require_NoError(t, err) + require_True(t, len(jsz.AccountDetails) == 1) + require_True(t, len(jsz.AccountDetails[0].Streams) == 1) + require_True(t, len(jsz.AccountDetails[0].Streams[0].Consumer) == 1) + consumer := jsz.AccountDetails[0].Streams[0].Consumer[0] + if consumer.Delivered.Consumer != uint64(fetch) || consumer.Delivered.Stream != uint64(fetch) { + t.Fatalf("Incorrect delivered for %v: %+v", s, consumer.Delivered) + } + if consumer.AckFloor.Consumer != uint64(ack) || consumer.AckFloor.Stream != uint64(ack) { + t.Fatalf("Incorrect ackfloor for %v: %+v", s, consumer.AckFloor) + } + } +}