From 4739eebfc4d85a1ccc053c28d25c228910f8300a Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 24 Mar 2022 15:08:57 -0600 Subject: [PATCH] [FIXED] JetStream: possible deadlock during consumer leadership change Would possibly show up when a consumer leader changes for a consumer that had redelivered messages and for instance messages were inbound on the stream. Resolves #2912 Signed-off-by: Ivan Kozlovic --- locksordering.txt | 2 + server/consumer.go | 18 +++---- server/jetstream.go | 3 +- server/jetstream_cluster_test.go | 90 ++++++++++++++++++++++++++++++++ server/stream.go | 6 ++- 5 files changed, 107 insertions(+), 12 deletions(-) diff --git a/locksordering.txt b/locksordering.txt index fec4b250..1baf5203 100644 --- a/locksordering.txt +++ b/locksordering.txt @@ -3,3 +3,5 @@ Here is the list of some established lock ordering. In this list, A -> B means that you can have A.Lock() then B.Lock(), not the opposite. jetStream -> jsAccount -> Server -> client-> Account + +stream -> consumer diff --git a/server/consumer.go b/server/consumer.go index 64731e3d..a17dc5e4 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -795,12 +795,12 @@ func (o *consumer) setLeader(isLeader bool) { } mset.mu.RLock() - s, jsa, stream := mset.srv, mset.jsa, mset.cfg.Name + s, jsa, stream, lseq := mset.srv, mset.jsa, mset.cfg.Name, mset.lseq mset.mu.RUnlock() o.mu.Lock() // Restore our saved state. During non-leader status we just update our underlying store. - o.readStoredState() + o.readStoredState(lseq) // Do info sub. if o.infoSub == nil && jsa != nil { @@ -1787,10 +1787,10 @@ func (o *consumer) ackWait(next time.Duration) time.Duration { } // Due to bug in calculation of sequences on restoring redelivered let's do quick sanity check. -func (o *consumer) checkRedelivered() { +func (o *consumer) checkRedelivered(slseq uint64) { var lseq uint64 if mset := o.mset; mset != nil { - lseq = mset.lastSeq() + lseq = slseq } var shouldUpdateState bool for sseq := range o.rdc { @@ -1807,7 +1807,7 @@ func (o *consumer) checkRedelivered() { // This will restore the state from disk. // Lock should be held. -func (o *consumer) readStoredState() error { +func (o *consumer) readStoredState(slseq uint64) error { if o.store == nil { return nil } @@ -1815,7 +1815,7 @@ func (o *consumer) readStoredState() error { if err == nil && state != nil && (state.Delivered.Consumer != 0 || state.Delivered.Stream != 0) { o.applyState(state) if len(o.rdc) > 0 { - o.checkRedelivered() + o.checkRedelivered(slseq) } } return err @@ -3570,14 +3570,14 @@ func (o *consumer) hasNoLocalInterest() bool { // This is when the underlying stream has been purged. // sseq is the new first seq for the stream after purge. // Lock should be held. -func (o *consumer) purge(sseq uint64) { +func (o *consumer) purge(sseq uint64, slseq uint64) { // Do not update our state unless we know we are the leader. if !o.isLeader() { return } // Signals all have been purged for this consumer. if sseq == 0 { - sseq = o.mset.lastSeq() + 1 + sseq = slseq + 1 } o.mu.Lock() @@ -3713,7 +3713,7 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error { stop := mset.lastSeq() o.mu.Lock() if !o.isLeader() { - o.readStoredState() + o.readStoredState(stop) } start := o.asflr o.mu.Unlock() diff --git a/server/jetstream.go b/server/jetstream.go index 6b13ac7a..2a2ce886 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1227,8 +1227,9 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { if !cfg.Created.IsZero() { obs.setCreatedTime(cfg.Created) } + lseq := e.mset.lastSeq() obs.mu.Lock() - err = obs.readStoredState() + err = obs.readStoredState(lseq) obs.mu.Unlock() if err != nil { s.Warnf(" Error restoring consumer state: %v", err) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 40e07a49..fc95a21d 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -26,6 +26,7 @@ import ( "os" "path/filepath" "reflect" + "runtime" "strconv" "strings" "sync" @@ -11381,6 +11382,95 @@ func TestJetStreamDuplicateMsgIdsOnCatchupAndLeaderTakeover(t *testing.T) { } } +func TestJetStreamClusterConsumerLeaderChangeDeadlock(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // Create a stream and durable with ack explicit + _, err := js.AddStream(&nats.StreamConfig{Name: "test", Subjects: []string{"foo"}, Replicas: 3}) + require_NoError(t, err) + _, err = js.AddConsumer("test", &nats.ConsumerConfig{ + Durable: "test", + DeliverSubject: "bar", + AckPolicy: nats.AckExplicitPolicy, + AckWait: 250 * time.Millisecond, + }) + require_NoError(t, err) + + // Wait for a leader + c.waitOnConsumerLeader("$G", "test", "test") + cl := c.consumerLeader("$G", "test", "test") + + // Publish a message + _, err = js.Publish("foo", []byte("msg")) + require_NoError(t, err) + + // Create nats consumer on "bar" and don't ack it + sub := natsSubSync(t, nc, "bar") + natsNexMsg(t, sub, time.Second) + // Wait for redeliveries, to make sure it is in the redelivery map + natsNexMsg(t, sub, time.Second) + natsNexMsg(t, sub, time.Second) + + mset, err := cl.GlobalAccount().lookupStream("test") + require_NoError(t, err) + require_True(t, mset != nil) + + // There are parts in the code (for instance when signaling to consumers + // that there are new messages) where we get the mset lock and iterate + // over the consumers and get consumer lock. We are going to do that + // in a go routine while we send a consumer step down request from + // another go routine. We will watch for possible deadlock and if + // found report it. + ch := make(chan struct{}) + doneCh := make(chan struct{}) + go func() { + defer close(doneCh) + for { + mset.mu.Lock() + for _, o := range mset.consumers { + o.mu.Lock() + time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond) + o.mu.Unlock() + } + mset.mu.Unlock() + select { + case <-ch: + return + default: + } + } + }() + + // Now cause a leader changes + for i := 0; i < 5; i++ { + m, err := nc.Request("$JS.API.CONSUMER.LEADER.STEPDOWN.test.test", nil, 2*time.Second) + // Ignore error here and check for deadlock below + if err != nil { + break + } + // if there is a message, check that it is success + var resp JSApiConsumerLeaderStepDownResponse + err = json.Unmarshal(m.Data, &resp) + require_NoError(t, err) + require_True(t, resp.Success) + c.waitOnConsumerLeader("$G", "test", "test") + } + + close(ch) + select { + case <-doneCh: + // OK! + case <-time.After(2 * time.Second): + buf := make([]byte, 1000000) + n := runtime.Stack(buf, true) + t.Fatalf("Suspected deadlock, printing current stack. The test suite may timeout and will also dump the stack\n%s\n", buf[:n]) + } +} + // Support functions // Used to setup superclusters for tests. diff --git a/server/stream.go b/server/stream.go index e1c1f50e..82999aea 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1147,6 +1147,7 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err var state StreamState mset.store.FastState(&state) fseq := state.FirstSeq + lseq := state.LastSeq // Check for filtered purge. if preq != nil && preq.Subject != _EMPTY_ { @@ -1155,7 +1156,7 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err } for _, o := range obs { - o.purge(fseq) + o.purge(fseq, lseq) } return purged, nil } @@ -3811,6 +3812,7 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error if !fcfg.Created.IsZero() { mset.setCreatedTime(fcfg.Created) } + lseq := mset.lastSeq() // Now do consumers. odir := filepath.Join(ndir, consumerDir) @@ -3854,7 +3856,7 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error obs.setCreatedTime(cfg.Created) } obs.mu.Lock() - err = obs.readStoredState() + err = obs.readStoredState(lseq) obs.mu.Unlock() if err != nil { mset.stop(true, false)