From 9d7123213a25c4082aab471d00aaf6005f0c2bba Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 14 Aug 2021 12:01:29 -0700 Subject: [PATCH] Keep SequencePair vs SequenceInfo Signed-off-by: Derek Collison --- server/consumer.go | 15 ++++----------- server/filestore_test.go | 6 +++--- server/store.go | 5 +++-- 3 files changed, 10 insertions(+), 16 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index ec772f86..5f4fde15 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -37,8 +37,8 @@ type ConsumerInfo struct { Name string `json:"name"` Created time.Time `json:"created"` Config *ConsumerConfig `json:"config,omitempty"` - Delivered SequenceInfo `json:"delivered"` - AckFloor SequenceInfo `json:"ack_floor"` + Delivered SequencePair `json:"delivered"` + AckFloor SequencePair `json:"ack_floor"` NumAckPending int `json:"num_ack_pending"` NumRedelivered int `json:"num_redelivered"` NumWaiting int `json:"num_waiting"` @@ -71,13 +71,6 @@ type ConsumerConfig struct { Direct bool `json:"direct,omitempty"` } -// SequenceInfo has both the consumer and the stream sequence and last activity. -type SequenceInfo struct { - Consumer uint64 `json:"consumer_seq"` - Stream uint64 `json:"stream_seq"` - Last *time.Time `json:"last_active,omitempty"` -} - type CreateConsumerRequest struct { Stream string `json:"stream_name"` Config ConsumerConfig `json:"config"` @@ -1501,11 +1494,11 @@ func (o *consumer) info() *ConsumerInfo { Name: o.name, Created: o.created, Config: &cfg, - Delivered: SequenceInfo{ + Delivered: SequencePair{ Consumer: o.dseq - 1, Stream: o.sseq - 1, }, - AckFloor: SequenceInfo{ + AckFloor: SequencePair{ Consumer: o.adflr, Stream: o.asflr, }, diff --git a/server/filestore_test.go b/server/filestore_test.go index 79d2760a..a715d469 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -2484,7 +2484,7 @@ func TestFileStoreConsumerDeliveredUpdates(t *testing.T) { if state == nil { t.Fatalf("No state available") } - expected := SequencePair{dseq, sseq} + expected := SequencePair{dseq, sseq, nil} if state.Delivered != expected { t.Fatalf("Unexpected state, wanted %+v, got %+v", expected, state.Delivered) } @@ -2545,7 +2545,7 @@ func TestFileStoreConsumerDeliveredAndAckUpdates(t *testing.T) { if state == nil { t.Fatalf("No state available") } - expected := SequencePair{dseq, sseq} + expected := SequencePair{dseq, sseq, nil} if state.Delivered != expected { t.Fatalf("Unexpected delivered state, wanted %+v, got %+v", expected, state.Delivered) } @@ -2585,7 +2585,7 @@ func TestFileStoreConsumerDeliveredAndAckUpdates(t *testing.T) { if len(state.Pending) != pending { t.Fatalf("Expected %d pending, got %d pending", pending, len(state.Pending)) } - eflr := SequencePair{dflr, sflr} + eflr := SequencePair{dflr, sflr, nil} if state.AckFloor != eflr { t.Fatalf("Unexpected ack floor state, wanted %+v, got %+v", eflr, state.AckFloor) } diff --git a/server/store.go b/server/store.go index 5da8274a..50eaa904 100644 --- a/server/store.go +++ b/server/store.go @@ -162,8 +162,9 @@ type ConsumerStore interface { // SequencePair has both the consumer and the stream sequence. They point to same message. type SequencePair struct { - Consumer uint64 `json:"consumer_seq"` - Stream uint64 `json:"stream_seq"` + Consumer uint64 `json:"consumer_seq"` + Stream uint64 `json:"stream_seq"` + Last *time.Time `json:"last_active,omitempty"` } // ConsumerState represents a stored state for a consumer.