[FIXED] JetStream: possible deadlock during consumer leadership change

Would possibly show up when a consumer leader changes for a consumer
that had redelivered messages and for instance messages were inbound
on the stream.

Resolves #2912

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2022-03-24 15:08:57 -06:00
parent edcddfae58
commit 4739eebfc4
5 changed files with 107 additions and 12 deletions

View File

@@ -3,3 +3,5 @@ Here is the list of some established lock ordering.
In this list, A -> B means that you can have A.Lock() then B.Lock(), not the opposite. In this list, A -> B means that you can have A.Lock() then B.Lock(), not the opposite.
jetStream -> jsAccount -> Server -> client-> Account jetStream -> jsAccount -> Server -> client-> Account
stream -> consumer

View File

@@ -795,12 +795,12 @@ func (o *consumer) setLeader(isLeader bool) {
} }
mset.mu.RLock() mset.mu.RLock()
s, jsa, stream := mset.srv, mset.jsa, mset.cfg.Name s, jsa, stream, lseq := mset.srv, mset.jsa, mset.cfg.Name, mset.lseq
mset.mu.RUnlock() mset.mu.RUnlock()
o.mu.Lock() o.mu.Lock()
// Restore our saved state. During non-leader status we just update our underlying store. // Restore our saved state. During non-leader status we just update our underlying store.
o.readStoredState() o.readStoredState(lseq)
// Do info sub. // Do info sub.
if o.infoSub == nil && jsa != nil { if o.infoSub == nil && jsa != nil {
@@ -1787,10 +1787,10 @@ func (o *consumer) ackWait(next time.Duration) time.Duration {
} }
// Due to bug in calculation of sequences on restoring redelivered let's do quick sanity check. // Due to bug in calculation of sequences on restoring redelivered let's do quick sanity check.
func (o *consumer) checkRedelivered() { func (o *consumer) checkRedelivered(slseq uint64) {
var lseq uint64 var lseq uint64
if mset := o.mset; mset != nil { if mset := o.mset; mset != nil {
lseq = mset.lastSeq() lseq = slseq
} }
var shouldUpdateState bool var shouldUpdateState bool
for sseq := range o.rdc { for sseq := range o.rdc {
@@ -1807,7 +1807,7 @@ func (o *consumer) checkRedelivered() {
// This will restore the state from disk. // This will restore the state from disk.
// Lock should be held. // Lock should be held.
func (o *consumer) readStoredState() error { func (o *consumer) readStoredState(slseq uint64) error {
if o.store == nil { if o.store == nil {
return nil return nil
} }
@@ -1815,7 +1815,7 @@ func (o *consumer) readStoredState() error {
if err == nil && state != nil && (state.Delivered.Consumer != 0 || state.Delivered.Stream != 0) { if err == nil && state != nil && (state.Delivered.Consumer != 0 || state.Delivered.Stream != 0) {
o.applyState(state) o.applyState(state)
if len(o.rdc) > 0 { if len(o.rdc) > 0 {
o.checkRedelivered() o.checkRedelivered(slseq)
} }
} }
return err return err
@@ -3570,14 +3570,14 @@ func (o *consumer) hasNoLocalInterest() bool {
// This is when the underlying stream has been purged. // This is when the underlying stream has been purged.
// sseq is the new first seq for the stream after purge. // sseq is the new first seq for the stream after purge.
// Lock should be held. // Lock should be held.
func (o *consumer) purge(sseq uint64) { func (o *consumer) purge(sseq uint64, slseq uint64) {
// Do not update our state unless we know we are the leader. // Do not update our state unless we know we are the leader.
if !o.isLeader() { if !o.isLeader() {
return return
} }
// Signals all have been purged for this consumer. // Signals all have been purged for this consumer.
if sseq == 0 { if sseq == 0 {
sseq = o.mset.lastSeq() + 1 sseq = slseq + 1
} }
o.mu.Lock() o.mu.Lock()
@@ -3713,7 +3713,7 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
stop := mset.lastSeq() stop := mset.lastSeq()
o.mu.Lock() o.mu.Lock()
if !o.isLeader() { if !o.isLeader() {
o.readStoredState() o.readStoredState(stop)
} }
start := o.asflr start := o.asflr
o.mu.Unlock() o.mu.Unlock()

View File

@@ -1227,8 +1227,9 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error {
if !cfg.Created.IsZero() { if !cfg.Created.IsZero() {
obs.setCreatedTime(cfg.Created) obs.setCreatedTime(cfg.Created)
} }
lseq := e.mset.lastSeq()
obs.mu.Lock() obs.mu.Lock()
err = obs.readStoredState() err = obs.readStoredState(lseq)
obs.mu.Unlock() obs.mu.Unlock()
if err != nil { if err != nil {
s.Warnf(" Error restoring consumer state: %v", err) s.Warnf(" Error restoring consumer state: %v", err)

View File

@@ -26,6 +26,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"reflect" "reflect"
"runtime"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@@ -11381,6 +11382,95 @@ func TestJetStreamDuplicateMsgIdsOnCatchupAndLeaderTakeover(t *testing.T) {
} }
} }
func TestJetStreamClusterConsumerLeaderChangeDeadlock(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
// Create a stream and durable with ack explicit
_, err := js.AddStream(&nats.StreamConfig{Name: "test", Subjects: []string{"foo"}, Replicas: 3})
require_NoError(t, err)
_, err = js.AddConsumer("test", &nats.ConsumerConfig{
Durable: "test",
DeliverSubject: "bar",
AckPolicy: nats.AckExplicitPolicy,
AckWait: 250 * time.Millisecond,
})
require_NoError(t, err)
// Wait for a leader
c.waitOnConsumerLeader("$G", "test", "test")
cl := c.consumerLeader("$G", "test", "test")
// Publish a message
_, err = js.Publish("foo", []byte("msg"))
require_NoError(t, err)
// Create nats consumer on "bar" and don't ack it
sub := natsSubSync(t, nc, "bar")
natsNexMsg(t, sub, time.Second)
// Wait for redeliveries, to make sure it is in the redelivery map
natsNexMsg(t, sub, time.Second)
natsNexMsg(t, sub, time.Second)
mset, err := cl.GlobalAccount().lookupStream("test")
require_NoError(t, err)
require_True(t, mset != nil)
// There are parts in the code (for instance when signaling to consumers
// that there are new messages) where we get the mset lock and iterate
// over the consumers and get consumer lock. We are going to do that
// in a go routine while we send a consumer step down request from
// another go routine. We will watch for possible deadlock and if
// found report it.
ch := make(chan struct{})
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
for {
mset.mu.Lock()
for _, o := range mset.consumers {
o.mu.Lock()
time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
o.mu.Unlock()
}
mset.mu.Unlock()
select {
case <-ch:
return
default:
}
}
}()
// Now cause a leader changes
for i := 0; i < 5; i++ {
m, err := nc.Request("$JS.API.CONSUMER.LEADER.STEPDOWN.test.test", nil, 2*time.Second)
// Ignore error here and check for deadlock below
if err != nil {
break
}
// if there is a message, check that it is success
var resp JSApiConsumerLeaderStepDownResponse
err = json.Unmarshal(m.Data, &resp)
require_NoError(t, err)
require_True(t, resp.Success)
c.waitOnConsumerLeader("$G", "test", "test")
}
close(ch)
select {
case <-doneCh:
// OK!
case <-time.After(2 * time.Second):
buf := make([]byte, 1000000)
n := runtime.Stack(buf, true)
t.Fatalf("Suspected deadlock, printing current stack. The test suite may timeout and will also dump the stack\n%s\n", buf[:n])
}
}
// Support functions // Support functions
// Used to setup superclusters for tests. // Used to setup superclusters for tests.

View File

@@ -1147,6 +1147,7 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err
var state StreamState var state StreamState
mset.store.FastState(&state) mset.store.FastState(&state)
fseq := state.FirstSeq fseq := state.FirstSeq
lseq := state.LastSeq
// Check for filtered purge. // Check for filtered purge.
if preq != nil && preq.Subject != _EMPTY_ { if preq != nil && preq.Subject != _EMPTY_ {
@@ -1155,7 +1156,7 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err
} }
for _, o := range obs { for _, o := range obs {
o.purge(fseq) o.purge(fseq, lseq)
} }
return purged, nil return purged, nil
} }
@@ -3811,6 +3812,7 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error
if !fcfg.Created.IsZero() { if !fcfg.Created.IsZero() {
mset.setCreatedTime(fcfg.Created) mset.setCreatedTime(fcfg.Created)
} }
lseq := mset.lastSeq()
// Now do consumers. // Now do consumers.
odir := filepath.Join(ndir, consumerDir) odir := filepath.Join(ndir, consumerDir)
@@ -3854,7 +3856,7 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error
obs.setCreatedTime(cfg.Created) obs.setCreatedTime(cfg.Created)
} }
obs.mu.Lock() obs.mu.Lock()
err = obs.readStoredState() err = obs.readStoredState(lseq)
obs.mu.Unlock() obs.mu.Unlock()
if err != nil { if err != nil {
mset.stop(true, false) mset.stop(true, false)