Various fixes for this PR.

- Fix for updating delivery subject and adjusting next delivery sequences.
- When acking explicitly but out of order, need to make sure we set floor correctly.
- Only update ack floors on an ack if the message is present.
- Fix for needAck for explicitAck out of order consumers detecting if message has been acked.
- Fix for race not locking stream when checking interest during stop.
- Fix for filestore determing if a message block still has a message. Added check to first sequence as well as cache.
- Some additions to the original test.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-10-01 21:24:40 -07:00
parent eff27e26be
commit a75be04b0a
4 changed files with 75 additions and 42 deletions

View File

@@ -673,8 +673,8 @@ func (o *Consumer) updateDeliverSubject(newDeliver string) {
o.dsubj = newDeliver
o.config.DeliverSubject = newDeliver
// FIXME(dlc) - check partitions, we may need offset.
o.dseq = o.adflr
o.sseq = o.asflr
o.dseq = o.adflr + 1
o.sseq = o.asflr + 1
// If we never received an ack, set to 1.
if o.dseq == 0 {
@@ -977,14 +977,16 @@ func (o *Consumer) processAckMsg(sseq, dseq, dcount uint64, doSample bool) {
o.sampleAck(sseq, dseq, dcount)
}
delete(o.pending, sseq)
// Consumers sequence numbers can skip during redlivery since
// they always increment. So if we do not have any pending treat
// as all scenario below. Otherwise check that we filled in a gap.
if len(o.pending) == 0 {
o.adflr, o.asflr = o.dseq-1, o.sseq-1
} else if dseq == o.adflr+1 {
o.adflr, o.asflr = dseq, sseq
}
}
// Consumers sequence numbers can skip during redlivery since
// they always increment. So if we do not have any pending treat
// as all scenario below. Otherwise check that we filled in a gap.
// TODO(dlc) - check this.
if len(o.pending) == 0 || dseq == o.adflr+1 {
o.adflr, o.asflr = dseq, sseq
}
// We do these regardless.
delete(o.rdc, sseq)
o.removeFromRedeliverQueue(sseq)
case AckAll:
@@ -1024,19 +1026,24 @@ func (o *Consumer) processAckMsg(sseq, dseq, dcount uint64, doSample bool) {
}
// Check if we need an ack for this store seq.
// This is called for interest based retention streams to remove messages.
func (o *Consumer) needAck(sseq uint64) bool {
var na bool
var needAck bool
o.mu.Lock()
switch o.config.AckPolicy {
case AckNone, AckAll:
na = sseq > o.asflr
needAck = sseq > o.asflr
case AckExplicit:
if sseq > o.asflr && len(o.pending) > 0 {
_, na = o.pending[sseq]
if sseq > o.asflr {
// Generally this means we need an ack, but just double check pending acks.
needAck = true
if len(o.pending) > 0 {
_, needAck = o.pending[sseq]
}
}
}
o.mu.Unlock()
return na
return needAck
}
// Default is 1 if msg is nil.
@@ -1364,26 +1371,27 @@ func (o *Consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dcount u
sendq := o.mset.sendq
ap := o.config.AckPolicy
// This needs to be unlocked since the other side may need this lock on failed delivery.
// This needs to be unlocked since the other side may need this lock on a failed delivery.
o.mu.Unlock()
// Send message.
sendq <- pmsg
// If we are ack none and mset is interest only we should make sure stream removes interest.
if ap == AckNone && mset.config.Retention == InterestPolicy {
if ap == AckNone && mset.config.Retention == InterestPolicy && !mset.checkInterest(seq, o) {
// FIXME(dlc) - we have mset lock here, but should we??
if !mset.checkInterest(seq, o) {
mset.store.RemoveMsg(seq)
}
mset.store.RemoveMsg(seq)
}
o.mu.Lock()
if ap == AckNone {
if ap == AckExplicit || ap == AckAll {
o.trackPending(seq)
} else if ap == AckNone {
o.adflr = o.dseq
o.asflr = seq
} else if ap == AckExplicit || ap == AckAll {
o.trackPending(seq)
}
o.dseq++
o.updateStore()
}
@@ -1763,7 +1771,10 @@ func (o *Consumer) stop(dflag, doSignal, advisory bool) error {
// Sort just to keep pending sparse array state small.
sort.Slice(seqs, func(i, j int) bool { return seqs[i] < seqs[j] })
for _, seq := range seqs {
if !mset.checkInterest(seq, o) {
mset.mu.Lock()
hasNoInterest := !mset.checkInterest(seq, o)
mset.mu.Unlock()
if hasNoInterest {
mset.store.RemoveMsg(seq)
}
}

View File

@@ -713,13 +713,14 @@ func (fs *fileStore) removeMsg(seq uint64, secure bool) (bool, error) {
return false, nil
}
sm, _ := mb.fetchMsg(seq)
// We have the message here, so we can delete it.
if sm != nil {
// We might have the message here, so we can delete it.
found := sm != nil
if found {
if err := fs.deleteMsgFromBlock(mb, seq, sm, secure); err != nil {
return false, err
}
}
return sm != nil, nil
return found, nil
}
// Loop on requests to write out our index file. This is used when calling
@@ -798,11 +799,13 @@ func (fs *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStored
}
}
if seq < mb.cache.fseq || (seq-mb.cache.fseq) >= uint64(len(mb.cache.idx)) {
// See if the sequence numbers is still relevant. Check first and cache first.
if seq < mb.first.seq || seq < mb.cache.fseq || (seq-mb.cache.fseq) >= uint64(len(mb.cache.idx)) {
mb.mu.Unlock()
fs.mu.Unlock()
return nil
}
// Now check dmap if it is there.
if mb.dmap != nil {
if _, ok := mb.dmap[seq]; ok {
@@ -843,6 +846,7 @@ func (fs *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStored
mb.dmap[seq] = struct{}{}
shouldWriteIndex = true
}
if secure {
fs.eraseMsg(mb, sm)
}

View File

@@ -1120,14 +1120,12 @@ func (mset *Stream) partitionUnique(partition string) bool {
// Lock should be held.
func (mset *Stream) checkInterest(seq uint64, obs *Consumer) bool {
var needAck bool
for _, o := range mset.consumers {
if o != obs && o.needAck(seq) {
needAck = true
break
return true
}
}
return needAck
return false
}
// ackMsg is called into from an observable when we have a WorkQueue or Interest retention policy.

View File

@@ -872,7 +872,7 @@ func TestJetStreamAddStreamSameConfigOK(t *testing.T) {
func sendStreamMsg(t *testing.T, nc *nats.Conn, subject, msg string) {
t.Helper()
resp, _ := nc.Request(subject, []byte(msg), 100*time.Millisecond)
resp, _ := nc.Request(subject, []byte(msg), 500*time.Millisecond)
if resp == nil {
t.Fatalf("No response for %q, possible timeout?", msg)
}
@@ -2928,7 +2928,7 @@ func TestJetStreamConsumerMaxDeliveryAndServerRestart(t *testing.T) {
checkSubPending := func(numExpected int) {
t.Helper()
checkFor(t, time.Second, 10*time.Millisecond, func() error {
if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != numExpected {
if nmsgs, _, _ := sub.Pending(); nmsgs != numExpected {
return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, numExpected)
}
return nil
@@ -4001,7 +4001,7 @@ func TestJetStreamDurableConsumerReconnect(t *testing.T) {
}
// We should get the remaining messages here.
for i := toSend / 2; i <= toSend; i++ {
for i := toSend/2 + 1; i <= toSend; i++ {
m := getMsg(i)
m.Respond(nil)
}
@@ -5031,11 +5031,12 @@ func TestJetStreamInterestRetentionStream(t *testing.T) {
// we should have 1, 2, 3 acks now.
checkNumMsgs(totalMsgs - 3)
// Now ack last ackall message. This should clear all of them.
for i := 4; i <= totalMsgs; i++ {
nm, _, _ := sub2.Pending()
// Now ack last ackAll message. This should clear all of them.
for i := 1; i <= nm; i++ {
if m, err := sub2.NextMsg(time.Second); err != nil {
t.Fatalf("Unexpected error: %v", err)
} else if i == totalMsgs {
} else if i == nm {
m.Respond(nil)
}
}
@@ -7913,7 +7914,7 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) {
}
defer o2.Delete()
// Send 1 message
// Send 2 messages
toSend := 2
for i := 0; i < toSend; i++ {
sendStreamMsg(t, nc1, "foo.bar", fmt.Sprintf("msg%v", i+1))
@@ -7923,7 +7924,7 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) {
t.Fatalf("Expected %v messages, got %d", toSend, state.Msgs)
}
// Receive this first message and ack it.
// Receive the messages and ack them.
subs := []*nats.Subscription{sub1, sub2}
for _, sub := range subs {
for i := 0; i < toSend; i++ {
@@ -7934,15 +7935,26 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) {
m.Respond(nil)
}
}
nc1.Flush()
nc2.Flush()
// Now close the 2nd subscription...
sub2.Unsubscribe()
nc2.Flush()
checkFor(t, time.Second, 10*time.Millisecond, func() error {
if o2.Active() {
return fmt.Errorf("Consumer still active")
}
return nil
})
// Send new messages
// Send 2 more new messages
for i := 0; i < toSend; i++ {
sendStreamMsg(t, nc1, "foo.bar", fmt.Sprintf("msg%v", 2+i+1))
}
state = mset.State()
if state.Msgs != uint64(toSend) {
t.Fatalf("Expected %v messages, got %d", toSend, state.Msgs)
}
// first subscription should get it and will ack it.
for i := 0; i < toSend; i++ {
@@ -7952,11 +7964,12 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) {
}
m.Respond(nil)
}
// For acks from m.Respond above
nc1.Flush()
// Now recreate the subscription for the 2nd JS consumer
sub2, _ = nc2.SubscribeSync(nats.NewInbox())
defer sub2.Unsubscribe()
nc2.Flush()
o2, err = mset.AddConsumer(&server.ConsumerConfig{
Durable: "dur2",
@@ -7969,6 +7982,13 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) {
}
defer o2.Delete()
checkFor(t, time.Second, 100*time.Millisecond, func() error {
if nmsgs, _, _ := sub2.Pending(); nmsgs != toSend {
return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, toSend)
}
return nil
})
// Those messages should be redelivered to the 2nd consumer
for i := 0; i < toSend; i++ {
m, err := sub2.NextMsg(time.Second)