Merge pull request #2406 from nats-io/consumer-panic

Various bug fixes and improvements to clustered filestore consumer stores.
This commit is contained in:
Derek Collison
2021-08-04 08:36:24 -07:00
committed by GitHub
3 changed files with 161 additions and 23 deletions

View File

@@ -2759,7 +2759,8 @@ func (o *consumer) hasNoLocalInterest() bool {
// This is when the underlying stream has been purged.
// sseq is the new first seq for the stream after purge.
func (o *consumer) purge(sseq uint64) {
if sseq == 0 {
// Do not update our state unless we know we are the leader.
if sseq == 0 || !o.isLeader() {
return
}

View File

@@ -4607,6 +4607,11 @@ func (o *consumerFileStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) err
return ErrNoAckPolicy
}
// On restarts the old leader may get a replay from the raft logs that are old.
if dseq <= o.state.Delivered.Consumer {
return nil
}
// See if we expect an ack for this.
if o.cfg.AckPolicy != AckNone {
// Need to create pending records here.
@@ -4617,22 +4622,27 @@ func (o *consumerFileStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) err
// Check for an update to a message already delivered.
if sseq <= o.state.Delivered.Stream {
if p = o.state.Pending[sseq]; p != nil {
p.Timestamp = ts
p.Sequence, p.Timestamp = dseq, ts
}
}
// Add to pending if needed.
if p == nil {
// Move delivered if this is new.
o.state.Delivered.Consumer = dseq
o.state.Delivered.Stream = sseq
p = &Pending{dseq, ts}
o.state.Pending[sseq] = &Pending{dseq, ts}
}
// Update delivered as needed.
if dseq > o.state.Delivered.Consumer {
o.state.Delivered.Consumer = dseq
}
if sseq > o.state.Delivered.Stream {
o.state.Delivered.Stream = sseq
}
if dc > 1 {
if o.state.Redelivered == nil {
o.state.Redelivered = make(map[uint64]uint64)
}
o.state.Redelivered[sseq] = dc - 1
}
o.state.Pending[sseq] = &Pending{dseq, ts}
} else {
// For AckNone just update delivered and ackfloor at the same time.
o.state.Delivered.Consumer = dseq
@@ -4654,32 +4664,51 @@ func (o *consumerFileStore) UpdateAcks(dseq, sseq uint64) error {
if o.cfg.AckPolicy == AckNone {
return ErrNoAckPolicy
}
if len(o.state.Pending) == 0 {
if len(o.state.Pending) == 0 || o.state.Pending[sseq] == nil {
return ErrStoreMsgNotFound
}
p := o.state.Pending[sseq]
if p == nil {
return ErrStoreMsgNotFound
// On restarts the old leader may get a replay from the raft logs that are old.
if dseq <= o.state.AckFloor.Consumer {
return nil
}
// Delete from our state.
delete(o.state.Pending, sseq)
// Check for AckAll here.
if o.cfg.AckPolicy == AckAll {
sgap := sseq - o.state.AckFloor.Stream
o.state.AckFloor.Consumer = dseq
o.state.AckFloor.Stream = sseq
for seq := sseq; seq > sseq-sgap; seq-- {
delete(o.state.Pending, seq)
if len(o.state.Redelivered) > 0 {
delete(o.state.Redelivered, seq)
}
}
o.kickFlusher()
return nil
}
// AckExplicit
// First delete from our pending state.
if p, ok := o.state.Pending[sseq]; ok {
delete(o.state.Pending, sseq)
dseq = p.Sequence // Use the original.
}
// Now remove from redelivered.
if len(o.state.Redelivered) > 0 {
delete(o.state.Redelivered, sseq)
if len(o.state.Redelivered) == 0 {
o.state.Redelivered = nil
}
}
if len(o.state.Pending) == 0 {
o.state.Pending = nil
o.state.AckFloor.Consumer = o.state.Delivered.Consumer
o.state.AckFloor.Stream = o.state.Delivered.Stream
} else if o.state.AckFloor.Consumer == dseq-1 {
notFirst := o.state.AckFloor.Consumer != 0
} else if dseq == o.state.AckFloor.Consumer+1 {
first := o.state.AckFloor.Consumer == 0
o.state.AckFloor.Consumer = dseq
o.state.AckFloor.Stream = sseq
// Close the gap if needed.
if notFirst && o.state.Delivered.Consumer > dseq {
if !first && o.state.Delivered.Consumer > dseq {
for ss := sseq + 1; ss < o.state.Delivered.Stream; ss++ {
if p, ok := o.state.Pending[ss]; ok {
if p.Sequence > 0 {
@@ -4691,6 +4720,7 @@ func (o *consumerFileStore) UpdateAcks(dseq, sseq uint64) error {
}
}
}
o.kickFlusher()
return nil
}
@@ -4800,6 +4830,13 @@ func (o *consumerFileStore) Update(state *ConsumerState) error {
// Replace our state.
o.mu.Lock()
// Check to see if this is an outdated update.
if state.Delivered.Consumer < o.state.Delivered.Consumer {
o.mu.Unlock()
return fmt.Errorf("old update ignored")
}
o.state.Delivered = state.Delivered
o.state.AckFloor = state.AckFloor
o.state.Pending = pending
@@ -5059,8 +5096,12 @@ func decodeConsumerState(buf []byte) (*ConsumerState, error) {
if version == 1 {
// Adjust back. Version 1 also stored delivered as next to be delivered,
// so adjust that back down here.
state.Delivered.Consumer += state.AckFloor.Consumer - 1
state.Delivered.Stream += state.AckFloor.Stream - 1
if state.AckFloor.Consumer > 1 {
state.Delivered.Consumer += state.AckFloor.Consumer - 1
}
if state.AckFloor.Stream > 1 {
state.Delivered.Stream += state.AckFloor.Stream - 1
}
}
// We have additional stuff.

View File

@@ -7763,6 +7763,90 @@ func TestJetStreamClusterMaxConsumersMultipleConcurrentRequests(t *testing.T) {
}
}
func TestJetStreamPanicDecodingConsumerState(t *testing.T) {
c := createJetStreamClusterExplicit(t, "JSC", 3)
defer c.shutdown()
rch := make(chan struct{}, 1)
nc, js := jsClientConnect(t, c.randomServer(),
nats.ReconnectWait(50*time.Millisecond),
nats.MaxReconnects(-1),
nats.ReconnectHandler(func(_ *nats.Conn) {
rch <- struct{}{}
}),
)
defer nc.Close()
if _, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"ORDERS.*"},
Storage: nats.FileStorage,
Replicas: 3,
Retention: nats.WorkQueuePolicy,
Discard: nats.DiscardNew,
MaxMsgs: -1,
MaxAge: time.Hour * 24 * 365,
}); err != nil {
t.Fatalf("Error creating stream: %v", err)
}
sub, err := js.PullSubscribe("ORDERS.created", "durable", nats.MaxAckPending(1000))
if err != nil {
t.Fatalf("Error creating pull subscriber: %v", err)
}
sendMsg := func(subject string) {
t.Helper()
if _, err := js.Publish(subject, []byte("msg")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
}
for i := 0; i < 100; i++ {
sendMsg("ORDERS.something")
sendMsg("ORDERS.created")
}
for total := 0; total != 100; {
msgs, err := sub.Fetch(100-total, nats.MaxWait(2*time.Second))
if err != nil {
t.Fatalf("Failed to fetch message: %v", err)
}
for _, m := range msgs {
m.Ack()
total++
}
}
c.stopAll()
c.restartAllSamePorts()
c.waitOnStreamLeader("$G", "TEST")
c.waitOnConsumerLeader("$G", "TEST", "durable")
select {
case <-rch:
case <-time.After(2 * time.Second):
t.Fatal("Did not reconnect")
}
for i := 0; i < 100; i++ {
sendMsg("ORDERS.something")
sendMsg("ORDERS.created")
}
for total := 0; total != 100; {
msgs, err := sub.Fetch(100-total, nats.MaxWait(2*time.Second))
if err != nil {
t.Fatalf("Error on fetch: %v", err)
}
for _, m := range msgs {
m.Ack()
total++
}
}
}
// Support functions
// Used to setup superclusters for tests.
@@ -8739,6 +8823,18 @@ func (c *cluster) restartAll() {
c.waitOnClusterReady()
}
func (c *cluster) restartAllSamePorts() {
c.t.Helper()
for i, s := range c.servers {
if !s.Running() {
opts := c.opts[i]
s := RunServer(opts)
c.servers[i] = s
}
}
c.waitOnClusterReady()
}
func (c *cluster) totalSubs() (total int) {
c.t.Helper()
for _, s := range c.servers {