Merge pull request #1677 from ripienaar/rename_consumer_pending

rename consumer pending fields for clarity
This commit is contained in:
Derek Collison
2020-10-28 08:39:21 -07:00
committed by GitHub
2 changed files with 30 additions and 30 deletions

View File

@@ -33,16 +33,16 @@ import (
)
type ConsumerInfo struct {
Stream string `json:"stream_name"`
Name string `json:"name"`
Created time.Time `json:"created"`
Config ConsumerConfig `json:"config"`
Delivered SequencePair `json:"delivered"`
AckFloor SequencePair `json:"ack_floor"`
NumPending int `json:"num_pending"`
NumRedelivered int `json:"num_redelivered"`
NumWaiting int `json:"num_waiting"`
NumStreamPending uint64 `json:"num_stream_pending"`
Stream string `json:"stream_name"`
Name string `json:"name"`
Created time.Time `json:"created"`
Config ConsumerConfig `json:"config"`
Delivered SequencePair `json:"delivered"`
AckFloor SequencePair `json:"ack_floor"`
NumAckPending int `json:"num_ack_pending"`
NumRedelivered int `json:"num_redelivered"`
NumWaiting int `json:"num_waiting"`
NumPending uint64 `json:"num_pending"`
}
type ConsumerConfig struct {
@@ -903,9 +903,9 @@ func (o *Consumer) Info() *ConsumerInfo {
ConsumerSeq: o.adflr,
StreamSeq: o.asflr,
},
NumPending: len(o.pending),
NumRedelivered: len(o.rdc),
NumStreamPending: o.spending,
NumAckPending: len(o.pending),
NumRedelivered: len(o.rdc),
NumPending: o.spending,
}
// If we are a pull mode consumer, report on number of waiting requests.
if o.isPullMode() {

View File

@@ -2386,8 +2386,8 @@ func TestJetStreamAckReplyStreamPending(t *testing.T) {
t.Fatalf("Expected ack reply pending of %d, got %d", ep, pending)
}
// Now check consumer info.
if info := o.Info(); int(info.NumStreamPending) != ep {
t.Fatalf("Expected consumer info pending of %d, got %d", ep, info.NumStreamPending)
if info := o.Info(); int(info.NumPending) != ep {
t.Fatalf("Expected consumer info pending of %d, got %d", ep, info.NumPending)
}
}
@@ -2438,8 +2438,8 @@ func TestJetStreamAckReplyStreamPending(t *testing.T) {
for i := 0; i < toSend; i++ {
sendStreamMsg(t, nc, "foo.33", "Hello World!")
}
if info := o.Info(); info.NumStreamPending != 0 {
t.Fatalf("Expected no pending, got %d", info.NumStreamPending)
if info := o.Info(); info.NumPending != 0 {
t.Fatalf("Expected no pending, got %d", info.NumPending)
}
// Now send one message that will match us.
sendStreamMsg(t, nc, "foo.22", "Hello World!")
@@ -2519,8 +2519,8 @@ func TestJetStreamAckReplyStreamPendingWithAcks(t *testing.T) {
}
defer o.Delete()
if info := o.Info(); int(info.NumStreamPending) != toSend {
t.Fatalf("Expected consumer info pending of %d, got %d", toSend, info.NumStreamPending)
if info := o.Info(); int(info.NumPending) != toSend {
t.Fatalf("Expected consumer info pending of %d, got %d", toSend, info.NumPending)
}
sub, _ := nc.SubscribeSync(dsubj)
@@ -2533,10 +2533,10 @@ func TestJetStreamAckReplyStreamPendingWithAcks(t *testing.T) {
})
// Should be zero.
if info := o.Info(); int(info.NumStreamPending) != 0 {
t.Fatalf("Expected consumer info pending of %d, got %d", 0, info.NumStreamPending)
} else if info.NumPending != toSend {
t.Fatalf("Expected %d to be pending acks, got %d", toSend, info.NumPending)
if info := o.Info(); int(info.NumPending) != 0 {
t.Fatalf("Expected consumer info pending of %d, got %d", 0, info.NumPending)
} else if info.NumAckPending != toSend {
t.Fatalf("Expected %d to be pending acks, got %d", toSend, info.NumAckPending)
}
})
}
@@ -5341,8 +5341,8 @@ func TestJetStreamStreamPurgeWithConsumer(t *testing.T) {
if state.AckFloor.ConsumerSeq != 50 {
t.Fatalf("Expected ack floor of 50, got %d", state.AckFloor.ConsumerSeq)
}
if state.NumPending != 25 {
t.Fatalf("Expected len(pending) to be 25, got %d", state.NumPending)
if state.NumAckPending != 25 {
t.Fatalf("Expected len(pending) to be 25, got %d", state.NumAckPending)
}
// Now do purge.
mset.Purge()
@@ -5353,8 +5353,8 @@ func TestJetStreamStreamPurgeWithConsumer(t *testing.T) {
// Pending should be cleared, and stream sequences should have been set
// to the total messages before purge + 1.
state = o.Info()
if state.NumPending != 0 {
t.Fatalf("Expected no pending, got %d", state.NumPending)
if state.NumAckPending != 0 {
t.Fatalf("Expected no pending, got %d", state.NumAckPending)
}
if state.Delivered.StreamSeq != 100 {
t.Fatalf("Expected to have setseq now at next seq of 100, got %d", state.Delivered.StreamSeq)
@@ -5441,8 +5441,8 @@ func TestJetStreamStreamPurgeWithConsumerAndRedelivery(t *testing.T) {
// Pending should be cleared, and stream sequences should have been set
// to the total messages before purge + 1.
state := o.Info()
if state.NumPending != 0 {
t.Fatalf("Expected no pending, got %d", state.NumPending)
if state.NumAckPending != 0 {
t.Fatalf("Expected no pending, got %d", state.NumAckPending)
}
if state.Delivered.StreamSeq != 100 {
t.Fatalf("Expected to have setseq now at next seq of 100, got %d", state.Delivered.StreamSeq)
@@ -7688,7 +7688,7 @@ func TestJetStreamNextMsgNoInterest(t *testing.T) {
}
nc.Flush()
ostate := o.Info()
if ostate.AckFloor.StreamSeq != 11 || ostate.NumPending > 0 {
if ostate.AckFloor.StreamSeq != 11 || ostate.NumAckPending > 0 {
t.Fatalf("Inconsistent ack state: %+v", ostate)
}
})