mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 11:04:42 -07:00
Merge pull request #3338 from nats-io/fix_qunsub_leak
[FIXED] Memory leak when unsubscribing the last queue subscription
This commit is contained in:
@@ -1608,7 +1608,7 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del
|
||||
if ls, ok := lqws[key]; ok && ls == n {
|
||||
acc.mu.Unlock()
|
||||
return
|
||||
} else {
|
||||
} else if n > 0 {
|
||||
lqws[key] = n
|
||||
}
|
||||
acc.mu.Unlock()
|
||||
|
||||
@@ -1526,3 +1526,67 @@ func TestSubjectRenameViaJetStreamAck(t *testing.T) {
|
||||
t.Fatalf("Expected error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestClusterQueueGroupWeightTrackingLeak(t *testing.T) {
|
||||
o := DefaultOptions()
|
||||
o.ServerName = "A"
|
||||
s := RunServer(o)
|
||||
defer s.Shutdown()
|
||||
|
||||
o2 := DefaultOptions()
|
||||
o2.ServerName = "B"
|
||||
o2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", o.Cluster.Port))
|
||||
s2 := RunServer(o2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
nc := natsConnect(t, s.ClientURL())
|
||||
defer nc.Close()
|
||||
|
||||
// Create a queue subscription
|
||||
sub := natsQueueSubSync(t, nc, "foo", "bar")
|
||||
|
||||
// Check on s0 that we have the proper queue weight info
|
||||
acc := s.GlobalAccount()
|
||||
|
||||
check := func(present bool, expected int32) {
|
||||
t.Helper()
|
||||
checkFor(t, time.Second, 15*time.Millisecond, func() error {
|
||||
acc.mu.RLock()
|
||||
v, ok := acc.lqws["foo bar"]
|
||||
acc.mu.RUnlock()
|
||||
if present {
|
||||
if !ok {
|
||||
return fmt.Errorf("the key is not present")
|
||||
}
|
||||
if v != expected {
|
||||
return fmt.Errorf("lqws doest not contain expected value of %v: %v", expected, v)
|
||||
}
|
||||
} else if ok {
|
||||
return fmt.Errorf("the key is present with value %v and should not be", v)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
check(true, 1)
|
||||
|
||||
// Now unsub, and it should be removed, not just be 0
|
||||
sub.Unsubscribe()
|
||||
check(false, 0)
|
||||
|
||||
// Still make sure that the subject interest is gone from both servers.
|
||||
checkSubGone := func(s *Server) {
|
||||
t.Helper()
|
||||
checkFor(t, time.Second, 15*time.Millisecond, func() error {
|
||||
acc := s.GlobalAccount()
|
||||
acc.mu.RLock()
|
||||
res := acc.sl.Match("foo")
|
||||
acc.mu.RUnlock()
|
||||
if res != nil && len(res.qsubs) > 0 {
|
||||
return fmt.Errorf("Found queue sub on foo for server %v", s)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
checkSubGone(s)
|
||||
checkSubGone(s2)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user