diff --git a/server/consumer.go b/server/consumer.go index 402aff3e..e956a9b6 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2759,7 +2759,8 @@ func (o *consumer) hasNoLocalInterest() bool { // This is when the underlying stream has been purged. // sseq is the new first seq for the stream after purge. func (o *consumer) purge(sseq uint64) { - if sseq == 0 { + // Do not update our state unless we know we are the leader. + if sseq == 0 || !o.isLeader() { return } diff --git a/server/filestore.go b/server/filestore.go index 1696d944..30f04d87 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -4607,6 +4607,11 @@ func (o *consumerFileStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) err return ErrNoAckPolicy } + // On restarts the old leader may get a replay from the raft logs that are old. + if dseq <= o.state.Delivered.Consumer { + return nil + } + // See if we expect an ack for this. if o.cfg.AckPolicy != AckNone { // Need to create pending records here. @@ -4617,22 +4622,27 @@ func (o *consumerFileStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) err // Check for an update to a message already delivered. if sseq <= o.state.Delivered.Stream { if p = o.state.Pending[sseq]; p != nil { - p.Timestamp = ts + p.Sequence, p.Timestamp = dseq, ts } } + // Add to pending if needed. if p == nil { - // Move delivered if this is new. - o.state.Delivered.Consumer = dseq - o.state.Delivered.Stream = sseq - p = &Pending{dseq, ts} + o.state.Pending[sseq] = &Pending{dseq, ts} } + // Update delivered as needed. + if dseq > o.state.Delivered.Consumer { + o.state.Delivered.Consumer = dseq + } + if sseq > o.state.Delivered.Stream { + o.state.Delivered.Stream = sseq + } + if dc > 1 { if o.state.Redelivered == nil { o.state.Redelivered = make(map[uint64]uint64) } o.state.Redelivered[sseq] = dc - 1 } - o.state.Pending[sseq] = &Pending{dseq, ts} } else { // For AckNone just update delivered and ackfloor at the same time. o.state.Delivered.Consumer = dseq @@ -4654,32 +4664,51 @@ func (o *consumerFileStore) UpdateAcks(dseq, sseq uint64) error { if o.cfg.AckPolicy == AckNone { return ErrNoAckPolicy } - if len(o.state.Pending) == 0 { + if len(o.state.Pending) == 0 || o.state.Pending[sseq] == nil { return ErrStoreMsgNotFound } - p := o.state.Pending[sseq] - if p == nil { - return ErrStoreMsgNotFound + + // On restarts the old leader may get a replay from the raft logs that are old. + if dseq <= o.state.AckFloor.Consumer { + return nil } - // Delete from our state. - delete(o.state.Pending, sseq) + + // Check for AckAll here. + if o.cfg.AckPolicy == AckAll { + sgap := sseq - o.state.AckFloor.Stream + o.state.AckFloor.Consumer = dseq + o.state.AckFloor.Stream = sseq + for seq := sseq; seq > sseq-sgap; seq-- { + delete(o.state.Pending, seq) + if len(o.state.Redelivered) > 0 { + delete(o.state.Redelivered, seq) + } + } + o.kickFlusher() + return nil + } + + // AckExplicit + + // First delete from our pending state. + if p, ok := o.state.Pending[sseq]; ok { + delete(o.state.Pending, sseq) + dseq = p.Sequence // Use the original. + } + // Now remove from redelivered. if len(o.state.Redelivered) > 0 { delete(o.state.Redelivered, sseq) - if len(o.state.Redelivered) == 0 { - o.state.Redelivered = nil - } } if len(o.state.Pending) == 0 { - o.state.Pending = nil o.state.AckFloor.Consumer = o.state.Delivered.Consumer o.state.AckFloor.Stream = o.state.Delivered.Stream - } else if o.state.AckFloor.Consumer == dseq-1 { - notFirst := o.state.AckFloor.Consumer != 0 + } else if dseq == o.state.AckFloor.Consumer+1 { + first := o.state.AckFloor.Consumer == 0 o.state.AckFloor.Consumer = dseq o.state.AckFloor.Stream = sseq - // Close the gap if needed. - if notFirst && o.state.Delivered.Consumer > dseq { + + if !first && o.state.Delivered.Consumer > dseq { for ss := sseq + 1; ss < o.state.Delivered.Stream; ss++ { if p, ok := o.state.Pending[ss]; ok { if p.Sequence > 0 { @@ -4691,6 +4720,7 @@ func (o *consumerFileStore) UpdateAcks(dseq, sseq uint64) error { } } } + o.kickFlusher() return nil } @@ -4800,6 +4830,13 @@ func (o *consumerFileStore) Update(state *ConsumerState) error { // Replace our state. o.mu.Lock() + + // Check to see if this is an outdated update. + if state.Delivered.Consumer < o.state.Delivered.Consumer { + o.mu.Unlock() + return fmt.Errorf("old update ignored") + } + o.state.Delivered = state.Delivered o.state.AckFloor = state.AckFloor o.state.Pending = pending @@ -5059,8 +5096,12 @@ func decodeConsumerState(buf []byte) (*ConsumerState, error) { if version == 1 { // Adjust back. Version 1 also stored delivered as next to be delivered, // so adjust that back down here. - state.Delivered.Consumer += state.AckFloor.Consumer - 1 - state.Delivered.Stream += state.AckFloor.Stream - 1 + if state.AckFloor.Consumer > 1 { + state.Delivered.Consumer += state.AckFloor.Consumer - 1 + } + if state.AckFloor.Stream > 1 { + state.Delivered.Stream += state.AckFloor.Stream - 1 + } } // We have additional stuff. diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 07f6db37..186cdbd8 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -7763,6 +7763,90 @@ func TestJetStreamClusterMaxConsumersMultipleConcurrentRequests(t *testing.T) { } } +func TestJetStreamPanicDecodingConsumerState(t *testing.T) { + c := createJetStreamClusterExplicit(t, "JSC", 3) + defer c.shutdown() + + rch := make(chan struct{}, 1) + nc, js := jsClientConnect(t, c.randomServer(), + nats.ReconnectWait(50*time.Millisecond), + nats.MaxReconnects(-1), + nats.ReconnectHandler(func(_ *nats.Conn) { + rch <- struct{}{} + }), + ) + defer nc.Close() + + if _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"ORDERS.*"}, + Storage: nats.FileStorage, + Replicas: 3, + Retention: nats.WorkQueuePolicy, + Discard: nats.DiscardNew, + MaxMsgs: -1, + MaxAge: time.Hour * 24 * 365, + }); err != nil { + t.Fatalf("Error creating stream: %v", err) + } + + sub, err := js.PullSubscribe("ORDERS.created", "durable", nats.MaxAckPending(1000)) + + if err != nil { + t.Fatalf("Error creating pull subscriber: %v", err) + } + + sendMsg := func(subject string) { + t.Helper() + if _, err := js.Publish(subject, []byte("msg")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + } + + for i := 0; i < 100; i++ { + sendMsg("ORDERS.something") + sendMsg("ORDERS.created") + } + + for total := 0; total != 100; { + msgs, err := sub.Fetch(100-total, nats.MaxWait(2*time.Second)) + if err != nil { + t.Fatalf("Failed to fetch message: %v", err) + } + for _, m := range msgs { + m.Ack() + total++ + } + } + + c.stopAll() + c.restartAllSamePorts() + c.waitOnStreamLeader("$G", "TEST") + c.waitOnConsumerLeader("$G", "TEST", "durable") + + select { + case <-rch: + case <-time.After(2 * time.Second): + t.Fatal("Did not reconnect") + } + + for i := 0; i < 100; i++ { + sendMsg("ORDERS.something") + sendMsg("ORDERS.created") + } + + for total := 0; total != 100; { + msgs, err := sub.Fetch(100-total, nats.MaxWait(2*time.Second)) + if err != nil { + t.Fatalf("Error on fetch: %v", err) + } + for _, m := range msgs { + m.Ack() + total++ + } + } +} + // Support functions // Used to setup superclusters for tests. @@ -8739,6 +8823,18 @@ func (c *cluster) restartAll() { c.waitOnClusterReady() } +func (c *cluster) restartAllSamePorts() { + c.t.Helper() + for i, s := range c.servers { + if !s.Running() { + opts := c.opts[i] + s := RunServer(opts) + c.servers[i] = s + } + } + c.waitOnClusterReady() +} + func (c *cluster) totalSubs() (total int) { c.t.Helper() for _, s := range c.servers {