Merge pull request #1437 from nats-io/ackall-redelivery

Allow redelivery for AckAll
This commit is contained in:
Derek Collison
2020-05-31 13:58:20 -07:00
committed by GitHub
2 changed files with 110 additions and 10 deletions

View File

@@ -227,7 +227,7 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) {
}
// Setup proper default for ack wait if we are in explicit ack mode.
if config.AckPolicy == AckExplicit && config.AckWait == time.Duration(0) {
if config.AckWait == 0 && (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) {
config.AckWait = JsAckWaitDefault
}
// Setup default of -1, meaning no limit for MaxDeliver.
@@ -742,6 +742,18 @@ func (o *Consumer) processTerm(sseq, dseq, dcount uint64) {
o.sendAdvisory(subj, j)
}
// Introduce a small delay in when timer fires to check pending.
// Allows bursts to be treated in same time frame.
const ackWaitDelay = time.Millisecond
// ackWait returns how long to wait to fire the pending timer.
func (o *Consumer) ackWait(next time.Duration) time.Duration {
if next != 0 {
return next + ackWaitDelay
}
return o.config.AckWait + ackWaitDelay
}
// This will restore the state from disk.
func (o *Consumer) readStoredState() error {
if o.store == nil {
@@ -761,7 +773,7 @@ func (o *Consumer) readStoredState() error {
// Setup tracking timer if we have restored pending.
if len(o.pending) > 0 && o.ptmr == nil {
o.mu.Lock()
o.ptmr = time.AfterFunc(o.config.AckWait, o.checkPending)
o.ptmr = time.AfterFunc(o.ackWait(0), o.checkPending)
o.mu.Unlock()
}
return err
@@ -1305,10 +1317,11 @@ func (o *Consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dcount u
sendq <- pmsg
o.mu.Lock()
if o.config.AckPolicy == AckNone {
ap := o.config.AckPolicy
if ap == AckNone {
o.adflr = o.dseq
o.asflr = seq
} else if o.config.AckPolicy == AckExplicit {
} else if ap == AckExplicit || ap == AckAll {
o.trackPending(seq)
}
o.dseq++
@@ -1322,7 +1335,7 @@ func (o *Consumer) trackPending(seq uint64) {
o.pending = make(map[uint64]int64)
}
if o.ptmr == nil {
o.ptmr = time.AfterFunc(o.config.AckWait, o.checkPending)
o.ptmr = time.AfterFunc(o.ackWait(0), o.checkPending)
}
o.pending[seq] = time.Now().UnixNano()
}
@@ -1389,29 +1402,40 @@ func (o *Consumer) checkPending() {
o.mu.Unlock()
return
}
aw := int64(o.config.AckWait)
ttl := int64(o.config.AckWait)
next := int64(o.ackWait(0))
now := time.Now().UnixNano()
shouldSignal := false
// Since we can update timestamps, we have to review all pending.
// We may want to unlock here or warn if list is big.
// we also need to sort after.
// We also need to sort after.
var expired []uint64
for seq, ts := range o.pending {
if now-ts > aw && !o.onRedeliverQueue(seq) {
elapsed := now - ts
if elapsed > ttl && !o.onRedeliverQueue(seq) {
expired = append(expired, seq)
shouldSignal = true
} else if elapsed-ttl < next {
// Update when we should fire next.
next = elapsed - ttl
}
}
if len(expired) > 0 {
sort.Slice(expired, func(i, j int) bool { return expired[i] < expired[j] })
o.rdq = append(o.rdq, expired...)
// Now we should update the timestamp here since we are redelivering.
// We will use an incrementing time to preserve order for any other redelivery.
now := time.Now()
for _, seq := range expired {
now = now.Add(time.Microsecond)
o.pending[seq] = now.UnixNano()
}
}
if len(o.pending) > 0 {
o.ptmr.Reset(o.config.AckWait)
o.ptmr.Reset(o.ackWait(time.Duration(next)))
} else {
o.ptmr.Stop()
o.ptmr = nil

View File

@@ -1844,6 +1844,82 @@ func TestJetStreamWorkQueueRetentionStream(t *testing.T) {
}
}
func TestJetStreamAckAllRedelivery(t *testing.T) {
cases := []struct {
name string
mconfig *server.StreamConfig
}{
{"MemoryStore", &server.StreamConfig{Name: "MY_S22", Storage: server.MemoryStorage}},
{"FileStore", &server.StreamConfig{Name: "MY_S22", Storage: server.FileStorage}},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
mset, err := s.GlobalAccount().AddStream(c.mconfig)
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
defer mset.Delete()
nc := clientConnectToServer(t, s)
defer nc.Close()
// Now load up some messages.
toSend := 100
for i := 0; i < toSend; i++ {
sendStreamMsg(t, nc, c.mconfig.Name, "Hello World!")
}
state := mset.State()
if state.Msgs != uint64(toSend) {
t.Fatalf("Expected %d messages, got %d", toSend, state.Msgs)
}
sub, _ := nc.SubscribeSync(nats.NewInbox())
defer sub.Unsubscribe()
nc.Flush()
o, err := mset.AddConsumer(&server.ConsumerConfig{
DeliverSubject: sub.Subject,
AckWait: 20 * time.Millisecond,
AckPolicy: server.AckAll,
})
if err != nil {
t.Fatalf("Unexpected error adding consumer: %v", err)
}
defer o.Delete()
// Wait for messages.
// We will do 5 redeliveries.
for i := 1; i <= 5; i++ {
checkFor(t, 250*time.Millisecond, 10*time.Millisecond, func() error {
if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != toSend*i {
return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, toSend*i)
}
return nil
})
}
// Stop redeliveries.
o.Delete()
// Now make sure that they are all redelivered in order for each redelivered batch.
for l := 1; l <= 5; l++ {
for i := 1; i <= toSend; i++ {
m, _ := sub.NextMsg(time.Second)
if seq := o.StreamSeqFromReply(m.Reply); seq != uint64(i) {
t.Fatalf("Expected stream sequence of %d, got %d", i, seq)
}
}
}
})
}
}
func TestJetStreamWorkQueueAckWaitRedelivery(t *testing.T) {
cases := []struct {
name string