diff --git a/server/consumer.go b/server/consumer.go index ca753d72..1bc7c0a4 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -33,16 +33,16 @@ import ( ) type ConsumerInfo struct { - Stream string `json:"stream_name"` - Name string `json:"name"` - Created time.Time `json:"created"` - Config ConsumerConfig `json:"config"` - Delivered SequencePair `json:"delivered"` - AckFloor SequencePair `json:"ack_floor"` - NumPending int `json:"num_pending"` - NumRedelivered int `json:"num_redelivered"` - NumWaiting int `json:"num_waiting"` - NumStreamPending uint64 `json:"num_stream_pending"` + Stream string `json:"stream_name"` + Name string `json:"name"` + Created time.Time `json:"created"` + Config ConsumerConfig `json:"config"` + Delivered SequencePair `json:"delivered"` + AckFloor SequencePair `json:"ack_floor"` + NumAckPending int `json:"num_ack_pending"` + NumRedelivered int `json:"num_redelivered"` + NumWaiting int `json:"num_waiting"` + NumPending uint64 `json:"num_pending"` } type ConsumerConfig struct { @@ -903,9 +903,9 @@ func (o *Consumer) Info() *ConsumerInfo { ConsumerSeq: o.adflr, StreamSeq: o.asflr, }, - NumPending: len(o.pending), - NumRedelivered: len(o.rdc), - NumStreamPending: o.spending, + NumAckPending: len(o.pending), + NumRedelivered: len(o.rdc), + NumPending: o.spending, } // If we are a pull mode consumer, report on number of waiting requests. if o.isPullMode() { diff --git a/test/jetstream_test.go b/test/jetstream_test.go index 24ffbfcb..ee70a253 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -2386,8 +2386,8 @@ func TestJetStreamAckReplyStreamPending(t *testing.T) { t.Fatalf("Expected ack reply pending of %d, got %d", ep, pending) } // Now check consumer info. - if info := o.Info(); int(info.NumStreamPending) != ep { - t.Fatalf("Expected consumer info pending of %d, got %d", ep, info.NumStreamPending) + if info := o.Info(); int(info.NumPending) != ep { + t.Fatalf("Expected consumer info pending of %d, got %d", ep, info.NumPending) } } @@ -2438,8 +2438,8 @@ func TestJetStreamAckReplyStreamPending(t *testing.T) { for i := 0; i < toSend; i++ { sendStreamMsg(t, nc, "foo.33", "Hello World!") } - if info := o.Info(); info.NumStreamPending != 0 { - t.Fatalf("Expected no pending, got %d", info.NumStreamPending) + if info := o.Info(); info.NumPending != 0 { + t.Fatalf("Expected no pending, got %d", info.NumPending) } // Now send one message that will match us. sendStreamMsg(t, nc, "foo.22", "Hello World!") @@ -2519,8 +2519,8 @@ func TestJetStreamAckReplyStreamPendingWithAcks(t *testing.T) { } defer o.Delete() - if info := o.Info(); int(info.NumStreamPending) != toSend { - t.Fatalf("Expected consumer info pending of %d, got %d", toSend, info.NumStreamPending) + if info := o.Info(); int(info.NumPending) != toSend { + t.Fatalf("Expected consumer info pending of %d, got %d", toSend, info.NumPending) } sub, _ := nc.SubscribeSync(dsubj) @@ -2533,10 +2533,10 @@ func TestJetStreamAckReplyStreamPendingWithAcks(t *testing.T) { }) // Should be zero. - if info := o.Info(); int(info.NumStreamPending) != 0 { - t.Fatalf("Expected consumer info pending of %d, got %d", 0, info.NumStreamPending) - } else if info.NumPending != toSend { - t.Fatalf("Expected %d to be pending acks, got %d", toSend, info.NumPending) + if info := o.Info(); int(info.NumPending) != 0 { + t.Fatalf("Expected consumer info pending of %d, got %d", 0, info.NumPending) + } else if info.NumAckPending != toSend { + t.Fatalf("Expected %d to be pending acks, got %d", toSend, info.NumAckPending) } }) } @@ -5341,8 +5341,8 @@ func TestJetStreamStreamPurgeWithConsumer(t *testing.T) { if state.AckFloor.ConsumerSeq != 50 { t.Fatalf("Expected ack floor of 50, got %d", state.AckFloor.ConsumerSeq) } - if state.NumPending != 25 { - t.Fatalf("Expected len(pending) to be 25, got %d", state.NumPending) + if state.NumAckPending != 25 { + t.Fatalf("Expected len(pending) to be 25, got %d", state.NumAckPending) } // Now do purge. mset.Purge() @@ -5353,8 +5353,8 @@ func TestJetStreamStreamPurgeWithConsumer(t *testing.T) { // Pending should be cleared, and stream sequences should have been set // to the total messages before purge + 1. state = o.Info() - if state.NumPending != 0 { - t.Fatalf("Expected no pending, got %d", state.NumPending) + if state.NumAckPending != 0 { + t.Fatalf("Expected no pending, got %d", state.NumAckPending) } if state.Delivered.StreamSeq != 100 { t.Fatalf("Expected to have setseq now at next seq of 100, got %d", state.Delivered.StreamSeq) @@ -5441,8 +5441,8 @@ func TestJetStreamStreamPurgeWithConsumerAndRedelivery(t *testing.T) { // Pending should be cleared, and stream sequences should have been set // to the total messages before purge + 1. state := o.Info() - if state.NumPending != 0 { - t.Fatalf("Expected no pending, got %d", state.NumPending) + if state.NumAckPending != 0 { + t.Fatalf("Expected no pending, got %d", state.NumAckPending) } if state.Delivered.StreamSeq != 100 { t.Fatalf("Expected to have setseq now at next seq of 100, got %d", state.Delivered.StreamSeq) @@ -7688,7 +7688,7 @@ func TestJetStreamNextMsgNoInterest(t *testing.T) { } nc.Flush() ostate := o.Info() - if ostate.AckFloor.StreamSeq != 11 || ostate.NumPending > 0 { + if ostate.AckFloor.StreamSeq != 11 || ostate.NumAckPending > 0 { t.Fatalf("Inconsistent ack state: %+v", ostate) } })