mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-14 02:07:59 -07:00
Keep SequencePair vs SequenceInfo
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -37,8 +37,8 @@ type ConsumerInfo struct {
|
||||
Name string `json:"name"`
|
||||
Created time.Time `json:"created"`
|
||||
Config *ConsumerConfig `json:"config,omitempty"`
|
||||
Delivered SequenceInfo `json:"delivered"`
|
||||
AckFloor SequenceInfo `json:"ack_floor"`
|
||||
Delivered SequencePair `json:"delivered"`
|
||||
AckFloor SequencePair `json:"ack_floor"`
|
||||
NumAckPending int `json:"num_ack_pending"`
|
||||
NumRedelivered int `json:"num_redelivered"`
|
||||
NumWaiting int `json:"num_waiting"`
|
||||
@@ -71,13 +71,6 @@ type ConsumerConfig struct {
|
||||
Direct bool `json:"direct,omitempty"`
|
||||
}
|
||||
|
||||
// SequenceInfo has both the consumer and the stream sequence and last activity.
|
||||
type SequenceInfo struct {
|
||||
Consumer uint64 `json:"consumer_seq"`
|
||||
Stream uint64 `json:"stream_seq"`
|
||||
Last *time.Time `json:"last_active,omitempty"`
|
||||
}
|
||||
|
||||
type CreateConsumerRequest struct {
|
||||
Stream string `json:"stream_name"`
|
||||
Config ConsumerConfig `json:"config"`
|
||||
@@ -1501,11 +1494,11 @@ func (o *consumer) info() *ConsumerInfo {
|
||||
Name: o.name,
|
||||
Created: o.created,
|
||||
Config: &cfg,
|
||||
Delivered: SequenceInfo{
|
||||
Delivered: SequencePair{
|
||||
Consumer: o.dseq - 1,
|
||||
Stream: o.sseq - 1,
|
||||
},
|
||||
AckFloor: SequenceInfo{
|
||||
AckFloor: SequencePair{
|
||||
Consumer: o.adflr,
|
||||
Stream: o.asflr,
|
||||
},
|
||||
|
||||
@@ -2484,7 +2484,7 @@ func TestFileStoreConsumerDeliveredUpdates(t *testing.T) {
|
||||
if state == nil {
|
||||
t.Fatalf("No state available")
|
||||
}
|
||||
expected := SequencePair{dseq, sseq}
|
||||
expected := SequencePair{dseq, sseq, nil}
|
||||
if state.Delivered != expected {
|
||||
t.Fatalf("Unexpected state, wanted %+v, got %+v", expected, state.Delivered)
|
||||
}
|
||||
@@ -2545,7 +2545,7 @@ func TestFileStoreConsumerDeliveredAndAckUpdates(t *testing.T) {
|
||||
if state == nil {
|
||||
t.Fatalf("No state available")
|
||||
}
|
||||
expected := SequencePair{dseq, sseq}
|
||||
expected := SequencePair{dseq, sseq, nil}
|
||||
if state.Delivered != expected {
|
||||
t.Fatalf("Unexpected delivered state, wanted %+v, got %+v", expected, state.Delivered)
|
||||
}
|
||||
@@ -2585,7 +2585,7 @@ func TestFileStoreConsumerDeliveredAndAckUpdates(t *testing.T) {
|
||||
if len(state.Pending) != pending {
|
||||
t.Fatalf("Expected %d pending, got %d pending", pending, len(state.Pending))
|
||||
}
|
||||
eflr := SequencePair{dflr, sflr}
|
||||
eflr := SequencePair{dflr, sflr, nil}
|
||||
if state.AckFloor != eflr {
|
||||
t.Fatalf("Unexpected ack floor state, wanted %+v, got %+v", eflr, state.AckFloor)
|
||||
}
|
||||
|
||||
@@ -162,8 +162,9 @@ type ConsumerStore interface {
|
||||
|
||||
// SequencePair has both the consumer and the stream sequence. They point to same message.
|
||||
type SequencePair struct {
|
||||
Consumer uint64 `json:"consumer_seq"`
|
||||
Stream uint64 `json:"stream_seq"`
|
||||
Consumer uint64 `json:"consumer_seq"`
|
||||
Stream uint64 `json:"stream_seq"`
|
||||
Last *time.Time `json:"last_active,omitempty"`
|
||||
}
|
||||
|
||||
// ConsumerState represents a stored state for a consumer.
|
||||
|
||||
Reference in New Issue
Block a user