mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
When a consumer reached a max delivered condition, we did not properly synchronize the state such that on a restore or leader switch the ack pending could jump and be higher than max ack pending and block the consumer.
This propagates a delivered update and we updated the store state engine to do the right thing when the condition is reached. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -3092,7 +3092,10 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
|
||||
o.notifyDeliveryExceeded(seq, dc-1)
|
||||
}
|
||||
// Make sure to remove from pending.
|
||||
delete(o.pending, seq)
|
||||
if p, ok := o.pending[seq]; ok && p != nil {
|
||||
delete(o.pending, seq)
|
||||
o.updateDelivered(p.Sequence, seq, dc, p.Timestamp)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if seq > 0 {
|
||||
|
||||
@@ -6946,20 +6946,28 @@ func (o *consumerFileStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) err
|
||||
}
|
||||
|
||||
if dc > 1 {
|
||||
if maxdc := uint64(o.cfg.MaxDeliver); maxdc > 0 && dc > maxdc {
|
||||
// Make sure to remove from pending.
|
||||
delete(o.state.Pending, sseq)
|
||||
}
|
||||
if o.state.Redelivered == nil {
|
||||
o.state.Redelivered = make(map[uint64]uint64)
|
||||
}
|
||||
// Only update if greater then what we already have.
|
||||
if o.state.Redelivered[sseq] < dc {
|
||||
if o.state.Redelivered[sseq] < dc-1 {
|
||||
o.state.Redelivered[sseq] = dc - 1
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// For AckNone just update delivered and ackfloor at the same time.
|
||||
o.state.Delivered.Consumer = dseq
|
||||
o.state.Delivered.Stream = sseq
|
||||
o.state.AckFloor.Consumer = dseq
|
||||
o.state.AckFloor.Stream = sseq
|
||||
if dseq > o.state.Delivered.Consumer {
|
||||
o.state.Delivered.Consumer = dseq
|
||||
o.state.AckFloor.Consumer = dseq
|
||||
}
|
||||
if sseq > o.state.Delivered.Stream {
|
||||
o.state.Delivered.Stream = sseq
|
||||
o.state.AckFloor.Stream = sseq
|
||||
}
|
||||
}
|
||||
// Make sure we flush to disk.
|
||||
o.kickFlusher()
|
||||
|
||||
@@ -5156,3 +5156,122 @@ func TestJetStreamClusterDurableConsumerInactiveThresholdLeaderSwitch(t *testing
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "R3S", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"*"},
|
||||
Replicas: 3,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
// send 50 msgs
|
||||
for i := 0; i < 50; i++ {
|
||||
_, err := js.Publish("foo", []byte("ok"))
|
||||
require_NoError(t, err)
|
||||
}
|
||||
|
||||
// File based.
|
||||
_, err = js.Subscribe("foo",
|
||||
func(msg *nats.Msg) {},
|
||||
nats.Durable("file"),
|
||||
nats.ManualAck(),
|
||||
nats.MaxDeliver(1),
|
||||
nats.AckWait(time.Second),
|
||||
nats.MaxAckPending(10),
|
||||
)
|
||||
require_NoError(t, err)
|
||||
|
||||
// Let first batch retry and expire.
|
||||
time.Sleep(1200 * time.Millisecond)
|
||||
|
||||
cia, err := js.ConsumerInfo("TEST", "file")
|
||||
require_NoError(t, err)
|
||||
|
||||
// Make sure followers will have exact same state.
|
||||
_, err = nc.Request(fmt.Sprintf(JSApiConsumerLeaderStepDownT, "TEST", "file"), nil, time.Second)
|
||||
require_NoError(t, err)
|
||||
c.waitOnConsumerLeader(globalAccountName, "TEST", "file")
|
||||
|
||||
cib, err := js.ConsumerInfo("TEST", "file")
|
||||
require_NoError(t, err)
|
||||
|
||||
// Want to compare sans cluster details which we know will change due to leader change.
|
||||
// Also last activity for delivered can be slightly off so nil out as well.
|
||||
checkConsumerInfo := func(a, b *nats.ConsumerInfo) {
|
||||
t.Helper()
|
||||
a.Cluster, b.Cluster = nil, nil
|
||||
a.Delivered.Last, b.Delivered.Last = nil, nil
|
||||
if !reflect.DeepEqual(a, b) {
|
||||
t.Fatalf("ConsumerInfo do not match\n\t%+v\n\t%+v", a, b)
|
||||
}
|
||||
}
|
||||
|
||||
checkConsumerInfo(cia, cib)
|
||||
|
||||
// Memory based.
|
||||
_, err = js.Subscribe("foo",
|
||||
func(msg *nats.Msg) {},
|
||||
nats.Durable("mem"),
|
||||
nats.ManualAck(),
|
||||
nats.MaxDeliver(1),
|
||||
nats.AckWait(time.Second),
|
||||
nats.MaxAckPending(10),
|
||||
nats.ConsumerMemoryStorage(),
|
||||
)
|
||||
require_NoError(t, err)
|
||||
|
||||
// Let first batch retry and expire.
|
||||
time.Sleep(1200 * time.Millisecond)
|
||||
|
||||
cia, err = js.ConsumerInfo("TEST", "mem")
|
||||
require_NoError(t, err)
|
||||
|
||||
// Make sure followers will have exact same state.
|
||||
_, err = nc.Request(fmt.Sprintf(JSApiConsumerLeaderStepDownT, "TEST", "mem"), nil, time.Second)
|
||||
require_NoError(t, err)
|
||||
c.waitOnConsumerLeader(globalAccountName, "TEST", "mem")
|
||||
|
||||
cib, err = js.ConsumerInfo("TEST", "mem")
|
||||
require_NoError(t, err)
|
||||
|
||||
checkConsumerInfo(cia, cib)
|
||||
|
||||
// Now file based but R1 and server restart.
|
||||
_, err = js.Subscribe("foo",
|
||||
func(msg *nats.Msg) {},
|
||||
nats.Durable("r1"),
|
||||
nats.ManualAck(),
|
||||
nats.MaxDeliver(1),
|
||||
nats.AckWait(time.Second),
|
||||
nats.MaxAckPending(10),
|
||||
nats.ConsumerReplicas(1),
|
||||
)
|
||||
require_NoError(t, err)
|
||||
|
||||
// Let first batch retry and expire.
|
||||
time.Sleep(1200 * time.Millisecond)
|
||||
|
||||
cia, err = js.ConsumerInfo("TEST", "r1")
|
||||
require_NoError(t, err)
|
||||
|
||||
cl := c.consumerLeader(globalAccountName, "TEST", "r1")
|
||||
cl.Shutdown()
|
||||
cl.WaitForShutdown()
|
||||
cl = c.restartServer(cl)
|
||||
c.waitOnServerCurrent(cl)
|
||||
|
||||
cib, err = js.ConsumerInfo("TEST", "r1")
|
||||
require_NoError(t, err)
|
||||
|
||||
// Created can skew a small bit due to server restart, this is expected.
|
||||
now := time.Now()
|
||||
cia.Created, cib.Created = now, now
|
||||
checkConsumerInfo(cia, cib)
|
||||
}
|
||||
|
||||
@@ -1307,17 +1307,28 @@ func (o *consumerMemStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) erro
|
||||
}
|
||||
|
||||
if dc > 1 {
|
||||
if maxdc := uint64(o.cfg.MaxDeliver); maxdc > 0 && dc > maxdc {
|
||||
// Make sure to remove from pending.
|
||||
delete(o.state.Pending, sseq)
|
||||
}
|
||||
if o.state.Redelivered == nil {
|
||||
o.state.Redelivered = make(map[uint64]uint64)
|
||||
}
|
||||
o.state.Redelivered[sseq] = dc - 1
|
||||
// Only update if greater then what we already have.
|
||||
if o.state.Redelivered[sseq] < dc-1 {
|
||||
o.state.Redelivered[sseq] = dc - 1
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// For AckNone just update delivered and ackfloor at the same time.
|
||||
o.state.Delivered.Consumer = dseq
|
||||
o.state.Delivered.Stream = sseq
|
||||
o.state.AckFloor.Consumer = dseq
|
||||
o.state.AckFloor.Stream = sseq
|
||||
if dseq > o.state.Delivered.Consumer {
|
||||
o.state.Delivered.Consumer = dseq
|
||||
o.state.AckFloor.Consumer = dseq
|
||||
}
|
||||
if sseq > o.state.Delivered.Stream {
|
||||
o.state.Delivered.Stream = sseq
|
||||
o.state.AckFloor.Stream = sseq
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
Reference in New Issue
Block a user