Merge pull request #1694 from nats-io/consumer_update

Redeliver pending
This commit is contained in:
Derek Collison
2020-11-05 15:01:22 -08:00
committed by GitHub
3 changed files with 212 additions and 8 deletions

View File

@@ -678,6 +678,29 @@ func (o *Consumer) Config() ConsumerConfig {
return o.config
}
// Force expiration of all pending.
// Lock should be held.
func (o *Consumer) forceExpirePending() {
now := time.Now().UnixNano()
var expired []uint64
for seq := range o.pending {
if !o.onRedeliverQueue(seq) {
expired = append(expired, seq)
}
}
if len(expired) > 0 {
sort.Slice(expired, func(i, j int) bool { return expired[i] < expired[j] })
o.rdq = append(o.rdq, expired...)
// Now we should update the timestamp here since we are redelivering.
// We will use an incrementing time to preserve order for any other redelivery.
off := now - o.pending[expired[0]]
for _, seq := range expired {
o.pending[seq] += off
}
o.ptmr.Reset(o.ackWait(0))
}
}
// This is a config change for the delivery subject for a
// push based consumer.
func (o *Consumer) updateDeliverSubject(newDeliver string) {
@@ -685,10 +708,15 @@ func (o *Consumer) updateDeliverSubject(newDeliver string) {
o.mu.Lock()
defer o.mu.Unlock()
if o.closed || o.isPullMode() {
if o.closed || o.isPullMode() || o.config.DeliverSubject == newDeliver {
return
}
// Force redeliver of all pending on change of delivery subject.
if len(o.pending) > 0 {
o.forceExpirePending()
}
o.acc.sl.ClearNotification(o.dsubj, o.inch)
o.dsubj, o.config.DeliverSubject = newDeliver, newDeliver
// When we register new one it will deliver to update state loop.

View File

@@ -2170,6 +2170,15 @@ func (mb *msgBlock) writeIndexInfo() error {
return err
}
// Make sure the header is correct.
func checkHeader(hdr []byte) error {
if hdr == nil || len(hdr) < 2 || hdr[0] != magic || hdr[1] != version {
return fmt.Errorf("corrupt state file")
}
return nil
}
// readIndexInfo will read in the index information for the message block.
func (mb *msgBlock) readIndexInfo() error {
buf, err := ioutil.ReadFile(mb.ifn)
if err != nil {
@@ -2920,13 +2929,6 @@ func (o *consumerFileStore) ensureStateFileOpen() error {
return nil
}
func checkHeader(hdr []byte) error {
if hdr == nil || len(hdr) < 2 || hdr[0] != magic || hdr[1] != version {
return fmt.Errorf("corrupt state file")
}
return nil
}
// State retrieves the state from the state file.
// This is not expected to be called in high performance code, only on startup.
func (o *consumerFileStore) State() (*ConsumerState, error) {

View File

@@ -8942,3 +8942,177 @@ func TestJetStreamStoredMsgsDontDisappearAfterCacheExpiration(t *testing.T) {
getMsgSeq(2)
getMsgSeq(3)
}
func TestJetStreamConsumerUpdateRedelivery(t *testing.T) {
cases := []struct {
name string
mconfig *server.StreamConfig
}{
{"MemoryStore", &server.StreamConfig{
Name: "MY_STREAM",
Storage: server.MemoryStorage,
Subjects: []string{"foo.>"},
Retention: server.InterestPolicy,
}},
{"FileStore", &server.StreamConfig{
Name: "MY_STREAM",
Storage: server.FileStorage,
Subjects: []string{"foo.>"},
Retention: server.InterestPolicy,
}},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
mset, err := s.GlobalAccount().AddStream(c.mconfig)
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
defer mset.Delete()
nc := clientConnectToServer(t, s)
defer nc.Close()
// Create a durable consumer.
sub, _ := nc.SubscribeSync(nats.NewInbox())
defer sub.Unsubscribe()
o, err := mset.AddConsumer(&server.ConsumerConfig{
Durable: "dur22",
DeliverSubject: sub.Subject,
FilterSubject: "foo.bar",
AckPolicy: server.AckExplicit,
AckWait: 25 * time.Millisecond,
MaxDeliver: 3,
})
if err != nil {
t.Fatalf("Unexpected error adding consumer: %v", err)
}
defer o.Delete()
// Send 20 messages
toSend := 20
for i := 1; i <= toSend; i++ {
sendStreamMsg(t, nc, "foo.bar", fmt.Sprintf("msg-%v", i))
}
state := mset.State()
if state.Msgs != uint64(toSend) {
t.Fatalf("Expected %v messages, got %d", toSend, state.Msgs)
}
// Receive the messages and ack only every 4th
for i := 0; i < toSend; i++ {
m, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error getting message: %v", err)
}
seq, _, _, _, _ := o.ReplyInfo(m.Reply)
// 4, 8, 12, 16, 20
if seq%4 == 0 {
m.Respond(nil)
}
}
// Now close the sub and open a new one and update the consumer.
sub.Unsubscribe()
// Wait for it to become inactive
checkFor(t, 200*time.Millisecond, 10*time.Millisecond, func() error {
if o.Active() {
return fmt.Errorf("Consumer still active")
}
return nil
})
// Send 20 more messages.
for i := toSend; i < toSend*2; i++ {
sendStreamMsg(t, nc, "foo.bar", fmt.Sprintf("msg-%v", i))
}
// Create new subscription.
sub, _ = nc.SubscribeSync(nats.NewInbox())
defer sub.Unsubscribe()
nc.Flush()
o, err = mset.AddConsumer(&server.ConsumerConfig{
Durable: "dur22",
DeliverSubject: sub.Subject,
FilterSubject: "foo.bar",
AckPolicy: server.AckExplicit,
AckWait: 25 * time.Millisecond,
MaxDeliver: 3,
})
if err != nil {
t.Fatalf("Unexpected error adding consumer: %v", err)
}
defer o.Delete()
expect := toSend + toSend - 5 // mod 4 acks
checkFor(t, time.Second, 5*time.Millisecond, func() error {
if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != expect {
return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, expect)
}
return nil
})
for i, eseq := 0, uint64(1); i < expect; i++ {
m, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error getting message: %v", err)
}
// Skip the ones we ack'd from above. We should not get them back here.
if eseq <= uint64(toSend) && eseq%4 == 0 {
eseq++
}
seq, _, dc, _, _ := o.ReplyInfo(m.Reply)
if seq != eseq {
t.Fatalf("Expected stream sequence of %d, got %d", eseq, seq)
}
if seq <= uint64(toSend) && dc != 2 {
t.Fatalf("Expected delivery count of 2 for sequence of %d, got %d", seq, dc)
}
if seq > uint64(toSend) && dc != 1 {
t.Fatalf("Expected delivery count of 1 for sequence of %d, got %d", seq, dc)
}
if seq > uint64(toSend) {
m.Respond(nil) // Ack
}
eseq++
}
// We should get the second half back since we did not ack those from above.
expect = toSend - 5
checkFor(t, time.Second, 5*time.Millisecond, func() error {
if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != expect {
return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, expect)
}
return nil
})
for i, eseq := 0, uint64(1); i < expect; i++ {
m, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error getting message: %v", err)
}
// Skip the ones we ack'd from above. We should not get them back here.
if eseq <= uint64(toSend) && eseq%4 == 0 {
eseq++
}
seq, _, dc, _, _ := o.ReplyInfo(m.Reply)
if seq != eseq {
t.Fatalf("Expected stream sequence of %d, got %d", eseq, seq)
}
if dc != 3 {
t.Fatalf("Expected delivery count of 3 for sequence of %d, got %d", seq, dc)
}
eseq++
}
})
}
}