Merge pull request #2404 from nats-io/js-consumer-deadlock

[FIXED] Fix for #2403
This commit is contained in:
Derek Collison
2021-08-03 16:18:27 -07:00
committed by GitHub
3 changed files with 66 additions and 5 deletions

View File

@@ -12434,6 +12434,57 @@ func TestJetStreamPurgeEffectsConsumerDelivery(t *testing.T) {
}
}
// Issue #2403
func TestJetStreamExpireCausesDeadlock(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
// Client for API requests.
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.*"},
Storage: nats.MemoryStorage,
MaxMsgs: 10,
Retention: nats.InterestPolicy,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
sub, err := js.SubscribeSync("foo.bar")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()
// Publish from two connections to get the write lock request wedged in between
// having the RLock and wanting it again deeper in the stack.
nc2, js2 := jsClientConnect(t, s)
defer nc2.Close()
for i := 0; i < 1000; i++ {
js.PublishAsync("foo.bar", []byte("HELLO"))
js2.PublishAsync("foo.bar", []byte("HELLO"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
// If we deadlocked then we will not be able to get stream info.
if _, err := js.StreamInfo("TEST"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
///////////////////////////////////////////////////////////////////////////
// Simple JetStream Benchmarks
///////////////////////////////////////////////////////////////////////////

View File

@@ -1998,7 +1998,7 @@ func (s *Server) setInfoHostPort() error {
// When this function is called, opts.Port is set to the actual listen
// port (if option was originally set to RANDOM), even during a config
// reload. So use of s.opts.Port is safe.
if s.opts.ClientAdvertise != "" {
if s.opts.ClientAdvertise != _EMPTY_ {
h, p, err := parseHostPort(s.opts.ClientAdvertise, s.opts.Port)
if err != nil {
return err

View File

@@ -2341,11 +2341,21 @@ func (mset *stream) storeUpdates(md, bd int64, seq uint64, subj string) {
// If we have a single negative update then we will process our consumers for stream pending.
// Purge and Store handled separately inside individual calls.
if md == -1 && seq > 0 {
// We need to pull these out here and release the lock, even and RLock. RLocks are allowed to
// be reentrant, however once anyone signals interest in a write lock any subsequent RLocks
// will block. decStreamPending can try to re-acquire the RLock for this stream.
var _cl [8]*consumer
cl := _cl[:0]
mset.mu.RLock()
for _, o := range mset.consumers {
o.decStreamPending(seq, subj)
cl = append(cl, o)
}
mset.mu.RUnlock()
for _, o := range cl {
o.decStreamPending(seq, subj)
}
}
if mset.jsa != nil {
@@ -2363,7 +2373,7 @@ func (mset *stream) numMsgIds() int {
// checkMsgId will process and check for duplicates.
// Lock should be held.
func (mset *stream) checkMsgId(id string) *ddentry {
if id == "" || mset.ddmap == nil {
if id == _EMPTY_ || mset.ddmap == nil {
return nil
}
return mset.ddmap[id]
@@ -3194,8 +3204,8 @@ func (mset *stream) getPublicConsumers() []*consumer {
// NumConsumers reports on number of active consumers for this stream.
func (mset *stream) numConsumers() int {
mset.mu.Lock()
defer mset.mu.Unlock()
mset.mu.RLock()
defer mset.mu.RUnlock()
return len(mset.consumers)
}