Do not deadlock when triggering lots of didNotDeliver notices. Don't hold lock on chan send

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-03-23 12:57:32 -07:00
parent 323b3d0b11
commit ade58bea87
3 changed files with 68 additions and 2 deletions

View File

@@ -2296,6 +2296,61 @@ func TestJetStreamDurableFilteredSubjectConsumerReconnect(t *testing.T) {
}
}
func TestJetStreamConsumerInactiveNoDeadlock(t *testing.T) {
cases := []struct {
name string
mconfig *server.StreamConfig
}{
{"MemoryStore", &server.StreamConfig{Name: "DC", Storage: server.MemoryStorage}},
{"FileStore", &server.StreamConfig{Name: "DC", Storage: server.FileStorage}},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
mset, err := s.GlobalAccount().AddStream(c.mconfig)
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
defer mset.Delete()
nc := clientConnectToServer(t, s)
defer nc.Close()
// Send lots of msgs and have them queued up.
for i := 0; i < 10000; i++ {
nc.Publish("DC", []byte("OK!"))
}
nc.Flush()
if state := mset.State(); state.Msgs != 10000 {
t.Fatalf("Expected %d messages, got %d", 10000, state.Msgs)
}
sub, _ := nc.SubscribeSync(nats.NewInbox())
defer sub.Unsubscribe()
nc.Flush()
o, err := mset.AddConsumer(&server.ConsumerConfig{Delivery: sub.Subject, DeliverAll: true})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer o.Delete()
for i := 0; i < 10; i++ {
if _, err := sub.NextMsg(time.Second); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
// Force us to become inactive but we want to make sure we do not lock up
// the internal sendq.
sub.Unsubscribe()
nc.Flush()
})
}
}
func TestJetStreamRedeliverCount(t *testing.T) {
cases := []struct {
name string