Merge pull request #2561 from nats-io/mirror-deadlock

[FIXED] Deadlock with stream mirrors
This commit is contained in:
Derek Collison
2021-09-22 11:21:32 -07:00
committed by GitHub
4 changed files with 111 additions and 7 deletions

View File

@@ -41,7 +41,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.6.0"
VERSION = "2.6.1"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -2360,7 +2360,7 @@ func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint6
// If we are ack none and mset is interest only we should make sure stream removes interest.
if ap == AckNone && mset.cfg.Retention != LimitsPolicy {
if o.node == nil || o.cfg.Direct {
mset.amch <- seq
mset.ackq.push(seq)
} else {
o.updateAcks(dseq, seq)
}

View File

@@ -3456,3 +3456,51 @@ func TestNoRaceJetStreamClusterCorruptWAL(t *testing.T) {
fetchMsgs(t, sub, 100, 5*time.Second)
checkConsumerWith(300, 50, 175)
}
func TestNoRaceJetStreamClusterInterestRetentionDeadlock(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
// This can trigger deadlock with current architecture.
// If stream is !limitsRetention and consumer is DIRECT and ack none we will try to place the msg seq
// onto a chan for the stream to consider removing. All conditions above must hold to trigger.
// We will attempt to trigger here with a stream mirror setup which uses and R=1 DIRECT consumer to replicate msgs.
_, err := js.AddStream(&nats.StreamConfig{Name: "S", Retention: nats.InterestPolicy, Storage: nats.MemoryStorage})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Create a mirror which will create the consumer profile to trigger.
_, err = js.AddStream(&nats.StreamConfig{Name: "M", Mirror: &nats.StreamSource{Name: "S"}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Queue up alot of messages.
numRequests := 20_000
for i := 0; i < numRequests; i++ {
js.PublishAsync("S", []byte("Q"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("S")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != 0 {
return fmt.Errorf("Expected 0 msgs, got state: %+v", si.State)
}
return nil
})
}

View File

@@ -154,7 +154,7 @@ type stream struct {
outq *jsOutQ
msgs *inbound
store StreamStore
amch chan uint64
ackq *ackMsgQueue
lseq uint64
lmsgId string
consumers map[string]*consumer
@@ -353,7 +353,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
// For no-ack consumers when we are interest retention.
if cfg.Retention != LimitsPolicy {
mset.amch = make(chan uint64, 1024)
mset.ackq = &ackMsgQueue{mch: make(chan struct{}, 1)}
}
jsa.streams[cfg.Name] = mset
@@ -3040,14 +3040,66 @@ func (mset *stream) subjects() []string {
return append(mset.cfg.Subjects[:0:0], mset.cfg.Subjects...)
}
// Linked list for async ack of messages.
// When we have a consumer to a stream that is interest based and the
// consumer is R=1 and acknone. This is how mirrors and sources replicate.
type ackMsgQueue struct {
sync.Mutex
mch chan struct{}
seqs []uint64
back []uint64
}
// Push onto the queue.
func (q *ackMsgQueue) push(seq uint64) {
q.Lock()
notify := len(q.seqs) == 0
q.seqs = append(q.seqs, seq)
q.Unlock()
if notify {
select {
case q.mch <- struct{}{}:
default:
}
}
}
// Pop all pending off.
func (q *ackMsgQueue) pop() []uint64 {
q.Lock()
seqs := q.seqs
q.seqs, q.back = q.back, nil
q.Unlock()
return seqs
}
func (q *ackMsgQueue) recycle(seqs []uint64) {
const maxAckQueueReuse = 8 * 1024
if cap(seqs) > maxAckQueueReuse {
return
}
q.Lock()
q.back = seqs[:0]
q.Unlock()
}
func (mset *stream) internalLoop() {
mset.mu.RLock()
s := mset.srv
c := s.createInternalJetStreamClient()
c.registerWithAccount(mset.acc)
defer c.closeConnection(ClientClosed)
outq, qch, mch, amch := mset.outq, mset.qch, mset.msgs.mch, mset.amch
outq, qch, mch := mset.outq, mset.qch, mset.msgs.mch
isClustered := mset.cfg.Replicas > 1
// For the ack msgs queue for interest retention.
var (
amch chan struct{}
ackq *ackMsgQueue
)
if mset.ackq != nil {
ackq, amch = mset.ackq, mset.ackq.mch
}
mset.mu.RUnlock()
// Raw scratch buffer.
@@ -3097,8 +3149,12 @@ func (mset *stream) internalLoop() {
mset.processJetStreamMsg(im.subj, im.rply, im.hdr, im.msg, 0, 0)
}
}
case seq := <-amch:
mset.ackMsg(nil, seq)
case <-amch:
seqs := ackq.pop()
for _, seq := range seqs {
mset.ackMsg(nil, seq)
}
ackq.recycle(seqs)
case <-qch:
return
case <-s.quitCh: