Merge pull request #1628 from nats-io/ack_explicit_msg_removal

AckExplicit removes message for "offline" durable
This commit is contained in:
Ivan Kozlovic
2020-10-02 17:41:11 -06:00
committed by GitHub
4 changed files with 234 additions and 63 deletions

View File

@@ -664,28 +664,13 @@ func (o *Consumer) updateDeliverSubject(newDeliver string) {
o.mu.Lock()
defer o.mu.Unlock()
mset := o.mset
if mset == nil || o.isPullMode() {
if o.closed || o.isPullMode() {
return
}
oldDeliver := o.config.DeliverSubject
o.dsubj = newDeliver
o.config.DeliverSubject = newDeliver
// FIXME(dlc) - check partitions, we may need offset.
o.dseq = o.adflr
o.sseq = o.asflr
// If we never received an ack, set to 1.
if o.dseq == 0 {
o.dseq = 1
}
if o.sseq == 0 {
o.sseq = 1
}
o.acc.sl.ClearNotification(o.dsubj, o.inch)
o.dsubj, o.config.DeliverSubject = newDeliver, newDeliver
// When we register new one it will deliver to update state loop.
o.acc.sl.ClearNotification(oldDeliver, o.inch)
o.acc.sl.RegisterNotification(newDeliver, o.inch)
}
@@ -977,14 +962,16 @@ func (o *Consumer) processAckMsg(sseq, dseq, dcount uint64, doSample bool) {
o.sampleAck(sseq, dseq, dcount)
}
delete(o.pending, sseq)
// Consumers sequence numbers can skip during redlivery since
// they always increment. So if we do not have any pending treat
// as all scenario below. Otherwise check that we filled in a gap.
if len(o.pending) == 0 {
o.adflr, o.asflr = o.dseq-1, o.sseq-1
} else if dseq == o.adflr+1 {
o.adflr, o.asflr = dseq, sseq
}
}
// Consumers sequence numbers can skip during redlivery since
// they always increment. So if we do not have any pending treat
// as all scenario below. Otherwise check that we filled in a gap.
// TODO(dlc) - check this.
if len(o.pending) == 0 || dseq == o.adflr+1 {
o.adflr, o.asflr = dseq, sseq
}
// We do these regardless.
delete(o.rdc, sseq)
o.removeFromRedeliverQueue(sseq)
case AckAll:
@@ -1024,19 +1011,24 @@ func (o *Consumer) processAckMsg(sseq, dseq, dcount uint64, doSample bool) {
}
// Check if we need an ack for this store seq.
// This is called for interest based retention streams to remove messages.
func (o *Consumer) needAck(sseq uint64) bool {
var na bool
var needAck bool
o.mu.Lock()
switch o.config.AckPolicy {
case AckNone, AckAll:
na = sseq > o.asflr
needAck = sseq > o.asflr
case AckExplicit:
if sseq > o.asflr && len(o.pending) > 0 {
_, na = o.pending[sseq]
if sseq > o.asflr {
// Generally this means we need an ack, but just double check pending acks.
needAck = true
if len(o.pending) > 0 && sseq < o.sseq {
_, needAck = o.pending[sseq]
}
}
}
o.mu.Unlock()
return na
return needAck
}
// Default is 1 if msg is nil.
@@ -1364,26 +1356,27 @@ func (o *Consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dcount u
sendq := o.mset.sendq
ap := o.config.AckPolicy
// This needs to be unlocked since the other side may need this lock on failed delivery.
// This needs to be unlocked since the other side may need this lock on a failed delivery.
o.mu.Unlock()
// Send message.
sendq <- pmsg
// If we are ack none and mset is interest only we should make sure stream removes interest.
if ap == AckNone && mset.config.Retention == InterestPolicy {
if ap == AckNone && mset.config.Retention == InterestPolicy && !mset.checkInterest(seq, o) {
// FIXME(dlc) - we have mset lock here, but should we??
if !mset.checkInterest(seq, o) {
mset.store.RemoveMsg(seq)
}
mset.store.RemoveMsg(seq)
}
o.mu.Lock()
if ap == AckNone {
if ap == AckExplicit || ap == AckAll {
o.trackPending(seq)
} else if ap == AckNone {
o.adflr = o.dseq
o.asflr = seq
} else if ap == AckExplicit || ap == AckAll {
o.trackPending(seq)
}
o.dseq++
o.updateStore()
}
@@ -1763,7 +1756,10 @@ func (o *Consumer) stop(dflag, doSignal, advisory bool) error {
// Sort just to keep pending sparse array state small.
sort.Slice(seqs, func(i, j int) bool { return seqs[i] < seqs[j] })
for _, seq := range seqs {
if !mset.checkInterest(seq, o) {
mset.mu.Lock()
hasNoInterest := !mset.checkInterest(seq, o)
mset.mu.Unlock()
if hasNoInterest {
mset.store.RemoveMsg(seq)
}
}

View File

@@ -713,13 +713,14 @@ func (fs *fileStore) removeMsg(seq uint64, secure bool) (bool, error) {
return false, nil
}
sm, _ := mb.fetchMsg(seq)
// We have the message here, so we can delete it.
if sm != nil {
// We might have the message here, so we can delete it.
found := sm != nil
if found {
if err := fs.deleteMsgFromBlock(mb, seq, sm, secure); err != nil {
return false, err
}
}
return sm != nil, nil
return found, nil
}
// Loop on requests to write out our index file. This is used when calling
@@ -798,11 +799,13 @@ func (fs *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStored
}
}
if seq < mb.cache.fseq || (seq-mb.cache.fseq) >= uint64(len(mb.cache.idx)) {
// See if the sequence numbers is still relevant. Check first and cache first.
if seq < mb.first.seq || seq < mb.cache.fseq || (seq-mb.cache.fseq) >= uint64(len(mb.cache.idx)) {
mb.mu.Unlock()
fs.mu.Unlock()
return nil
}
// Now check dmap if it is there.
if mb.dmap != nil {
if _, ok := mb.dmap[seq]; ok {
@@ -843,6 +846,7 @@ func (fs *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStored
mb.dmap[seq] = struct{}{}
shouldWriteIndex = true
}
if secure {
fs.eraseMsg(mb, sm)
}

View File

@@ -1120,14 +1120,12 @@ func (mset *Stream) partitionUnique(partition string) bool {
// Lock should be held.
func (mset *Stream) checkInterest(seq uint64, obs *Consumer) bool {
var needAck bool
for _, o := range mset.consumers {
if o != obs && o.needAck(seq) {
needAck = true
break
return true
}
}
return needAck
return false
}
// ackMsg is called into from an observable when we have a WorkQueue or Interest retention policy.

View File

@@ -872,7 +872,7 @@ func TestJetStreamAddStreamSameConfigOK(t *testing.T) {
func sendStreamMsg(t *testing.T, nc *nats.Conn, subject, msg string) {
t.Helper()
resp, _ := nc.Request(subject, []byte(msg), 100*time.Millisecond)
resp, _ := nc.Request(subject, []byte(msg), 500*time.Millisecond)
if resp == nil {
t.Fatalf("No response for %q, possible timeout?", msg)
}
@@ -2928,7 +2928,7 @@ func TestJetStreamConsumerMaxDeliveryAndServerRestart(t *testing.T) {
checkSubPending := func(numExpected int) {
t.Helper()
checkFor(t, time.Second, 10*time.Millisecond, func() error {
if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != numExpected {
if nmsgs, _, _ := sub.Pending(); nmsgs != numExpected {
return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, numExpected)
}
return nil
@@ -3934,7 +3934,11 @@ func TestJetStreamDurableConsumerReconnect(t *testing.T) {
dname := "d22"
subj1 := nats.NewInbox()
o, err := mset.AddConsumer(&server.ConsumerConfig{Durable: dname, DeliverSubject: subj1, AckPolicy: server.AckExplicit})
o, err := mset.AddConsumer(&server.ConsumerConfig{
Durable: dname,
DeliverSubject: subj1,
AckPolicy: server.AckExplicit,
AckWait: 50 * time.Millisecond})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -3967,7 +3971,7 @@ func TestJetStreamDurableConsumerReconnect(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if seq := o.SeqFromReply(m.Reply); seq != uint64(seqno) {
if seq := o.StreamSeqFromReply(m.Reply); seq != uint64(seqno) {
t.Fatalf("Expected sequence of %d , got %d", seqno, seq)
}
m.Respond(nil)
@@ -3995,13 +3999,17 @@ func TestJetStreamDurableConsumerReconnect(t *testing.T) {
defer sub.Unsubscribe()
nc.Flush()
o, err = mset.AddConsumer(&server.ConsumerConfig{Durable: dname, DeliverSubject: subj2, AckPolicy: server.AckExplicit})
o, err = mset.AddConsumer(&server.ConsumerConfig{
Durable: dname,
DeliverSubject: subj2,
AckPolicy: server.AckExplicit,
AckWait: 50 * time.Millisecond})
if err != nil {
t.Fatalf("Unexpected error trying to add a new durable consumer: %v", err)
}
// We should get the remaining messages here.
for i := toSend / 2; i <= toSend; i++ {
for i := toSend/2 + 1; i <= toSend; i++ {
m := getMsg(i)
m.Respond(nil)
}
@@ -4038,7 +4046,11 @@ func TestJetStreamDurableConsumerReconnectWithOnlyPending(t *testing.T) {
dname := "d22"
subj1 := nats.NewInbox()
o, err := mset.AddConsumer(&server.ConsumerConfig{Durable: dname, DeliverSubject: subj1, AckPolicy: server.AckExplicit})
o, err := mset.AddConsumer(&server.ConsumerConfig{
Durable: dname,
DeliverSubject: subj1,
AckPolicy: server.AckExplicit,
AckWait: 25 * time.Millisecond})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -4076,7 +4088,11 @@ func TestJetStreamDurableConsumerReconnectWithOnlyPending(t *testing.T) {
// Now we should be able to replace the delivery subject.
subj2 := nats.NewInbox()
o, err = mset.AddConsumer(&server.ConsumerConfig{Durable: dname, DeliverSubject: subj2, AckPolicy: server.AckExplicit})
o, err = mset.AddConsumer(&server.ConsumerConfig{
Durable: dname,
DeliverSubject: subj2,
AckPolicy: server.AckExplicit,
AckWait: 25 * time.Millisecond})
if err != nil {
t.Fatalf("Unexpected error trying to add a new durable consumer: %v", err)
}
@@ -4084,15 +4100,18 @@ func TestJetStreamDurableConsumerReconnectWithOnlyPending(t *testing.T) {
defer sub.Unsubscribe()
nc.Flush()
// We should get msg "1" and "2" delivered.
// We should get msg "1" and "2" delivered. They will be reversed.
for i := 0; i < 2; i++ {
msg, err := sub.NextMsg(250 * time.Millisecond)
msg, err := sub.NextMsg(500 * time.Millisecond)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
expected := fmt.Sprintf("%d", i+1)
if string(msg.Data) != expected {
t.Fatalf("Expected message %q, got %q", expected, msg.Data)
sseq, _, dc, _ := o.ReplyInfo(msg.Reply)
if sseq == 1 && dc == 1 {
t.Fatalf("Expected a redelivery count greater then 1 for sseq 1, got %d", dc)
}
if sseq != 1 && sseq != 2 {
t.Fatalf("Expected stream sequence of 1 or 2 but got %d", sseq)
}
}
})
@@ -5031,11 +5050,12 @@ func TestJetStreamInterestRetentionStream(t *testing.T) {
// we should have 1, 2, 3 acks now.
checkNumMsgs(totalMsgs - 3)
// Now ack last ackall message. This should clear all of them.
for i := 4; i <= totalMsgs; i++ {
nm, _, _ := sub2.Pending()
// Now ack last ackAll message. This should clear all of them.
for i := 1; i <= nm; i++ {
if m, err := sub2.NextMsg(time.Second); err != nil {
t.Fatalf("Unexpected error: %v", err)
} else if i == totalMsgs {
} else if i == nm {
m.Respond(nil)
}
}
@@ -7842,3 +7862,156 @@ func TestJetStreamPubSubPerf(t *testing.T) {
fmt.Printf("time is %v\n", tt)
fmt.Printf("%.0f msgs/sec\n", float64(toSend)/tt.Seconds())
}
func TestJetStreamAckExplicitMsgRemoval(t *testing.T) {
cases := []struct {
name string
mconfig *server.StreamConfig
}{
{"MemoryStore", &server.StreamConfig{
Name: "MY_STREAM",
Storage: server.MemoryStorage,
Subjects: []string{"foo.*"},
Retention: server.InterestPolicy,
}},
{"FileStore", &server.StreamConfig{
Name: "MY_STREAM",
Storage: server.FileStorage,
Subjects: []string{"foo.*"},
Retention: server.InterestPolicy,
}},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
mset, err := s.GlobalAccount().AddStream(c.mconfig)
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
defer mset.Delete()
nc1 := clientConnectToServer(t, s)
defer nc1.Close()
nc2 := clientConnectToServer(t, s)
defer nc2.Close()
// Create two durable consumers on the same subject
sub1, _ := nc1.SubscribeSync(nats.NewInbox())
defer sub1.Unsubscribe()
nc1.Flush()
o1, err := mset.AddConsumer(&server.ConsumerConfig{
Durable: "dur1",
DeliverSubject: sub1.Subject,
FilterSubject: "foo.bar",
AckPolicy: server.AckExplicit,
})
if err != nil {
t.Fatalf("Unexpected error adding consumer: %v", err)
}
defer o1.Delete()
sub2, _ := nc2.SubscribeSync(nats.NewInbox())
defer sub2.Unsubscribe()
o2, err := mset.AddConsumer(&server.ConsumerConfig{
Durable: "dur2",
DeliverSubject: sub2.Subject,
FilterSubject: "foo.bar",
AckPolicy: server.AckExplicit,
AckWait: 25 * time.Millisecond,
})
if err != nil {
t.Fatalf("Unexpected error adding consumer: %v", err)
}
defer o2.Delete()
// Send 2 messages
toSend := 2
for i := 0; i < toSend; i++ {
sendStreamMsg(t, nc1, "foo.bar", fmt.Sprintf("msg%v", i+1))
}
state := mset.State()
if state.Msgs != uint64(toSend) {
t.Fatalf("Expected %v messages, got %d", toSend, state.Msgs)
}
// Receive the messages and ack them.
subs := []*nats.Subscription{sub1, sub2}
for _, sub := range subs {
for i := 0; i < toSend; i++ {
m, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error acking message: %v", err)
}
m.Respond(nil)
}
}
// To make sure acks are processed for checking state after sending new ones.
nc1.Flush()
nc2.Flush()
// Now close the 2nd subscription...
sub2.Unsubscribe()
// Send 2 more new messages
for i := 0; i < toSend; i++ {
sendStreamMsg(t, nc1, "foo.bar", fmt.Sprintf("msg%v", 2+i+1))
}
state = mset.State()
if state.Msgs != uint64(toSend) {
t.Fatalf("Expected %v messages, got %d", toSend, state.Msgs)
}
// first subscription should get it and will ack it.
for i := 0; i < toSend; i++ {
m, err := sub1.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error acking message: %v", err)
}
m.Respond(nil)
}
// For acks from m.Respond above
nc1.Flush()
// Now recreate the subscription for the 2nd JS consumer
sub2, _ = nc2.SubscribeSync(nats.NewInbox())
defer sub2.Unsubscribe()
o2, err = mset.AddConsumer(&server.ConsumerConfig{
Durable: "dur2",
DeliverSubject: sub2.Subject,
FilterSubject: "foo.bar",
AckPolicy: server.AckExplicit,
AckWait: 25 * time.Millisecond,
})
if err != nil {
t.Fatalf("Unexpected error adding consumer: %v", err)
}
defer o2.Delete()
// Those messages should be redelivered to the 2nd consumer
for i := 1; i <= toSend; i++ {
m, err := sub2.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving message %d: %v", i, err)
}
m.Respond(nil)
sseq := o2.StreamSeqFromReply(m.Reply)
// Depending on timing from above we could receive stream sequences out of order but
// we know we want 3 & 4.
if sseq != 3 && sseq != 4 {
t.Fatalf("Expected stream sequence of 3 or 4 but got %d", sseq)
}
}
})
}
}