Merge pull request #1079 from nats-io/leaksubs

Fix for #1065 (leaked subscribers from dq subs across routes)
This commit is contained in:
Derek Collison
2019-07-23 08:32:25 -07:00
committed by GitHub
5 changed files with 277 additions and 82 deletions

View File

@@ -50,6 +50,7 @@ type Account struct {
nrleafs int32
clients map[*client]*client
rm map[string]int32
lqws map[string]int32
lleafs []*client
imports importMap
exports exportMap

View File

@@ -954,13 +954,13 @@ func (c *client) flushOutbound() bool {
}
if sce {
atomic.AddInt64(&srv.slowConsumers, 1)
c.clearConnection(SlowConsumerWriteDeadline)
c.Noticef("Slow Consumer Detected: WriteDeadline of %v exceeded with %d chunks of %d total bytes.",
c.out.wdl, len(cnb), attempted)
c.clearConnection(SlowConsumerWriteDeadline)
}
} else {
c.clearConnection(WriteError)
c.Debugf("Error flushing: %v", err)
c.clearConnection(WriteError)
}
return true
}
@@ -1343,9 +1343,9 @@ func (c *client) queueOutbound(data []byte) bool {
// Check for slow consumer via pending bytes limit.
// ok to return here, client is going away.
if c.out.pb > c.out.mp {
c.clearConnection(SlowConsumerPendingBytes)
atomic.AddInt64(&c.srv.slowConsumers, 1)
c.Noticef("Slow Consumer Detected: MaxPending of %d Exceeded", c.out.mp)
c.clearConnection(SlowConsumerPendingBytes)
return referenced
}
@@ -2913,7 +2913,8 @@ func (c *client) closeConnection(reason ClosedState) {
// and reference existing one.
var subs []*subscription
if kind == CLIENT || kind == LEAF {
subs = make([]*subscription, 0, len(c.subs))
var _subs [32]*subscription
subs = _subs[:0]
for _, sub := range c.subs {
// Auto-unsubscribe subscriptions must be unsubscribed forcibly.
sub.max = 0

View File

@@ -712,11 +712,8 @@ func (c *client) parseUnsubProto(arg []byte) (string, []byte, []byte, error) {
c.in.subs++
args := splitArg(arg)
var (
accountName string
subject []byte
queue []byte
)
var queue []byte
switch len(args) {
case 2:
case 3:
@@ -724,9 +721,7 @@ func (c *client) parseUnsubProto(arg []byte) (string, []byte, []byte, error) {
default:
return "", nil, nil, fmt.Errorf("parse error: '%s'", arg)
}
subject = args[1]
accountName = string(args[0])
return accountName, subject, queue, nil
return string(args[0]), args[1], queue, nil
}
// Indicates no more interest in the given account/subject for the remote side.
@@ -944,10 +939,7 @@ func (s *Server) sendSubsToRoute(route *client) {
}
a.mu.RUnlock()
closed = route.sendRouteSubProtos(subs, false, func(sub *subscription) bool {
return route.canImport(string(sub.subject))
})
closed = route.sendRouteSubProtos(subs, false, route.importFilter)
if closed {
route.mu.Unlock()
return
@@ -1305,71 +1297,72 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) {
return !exists, sendInfo
}
// Import filter check.
func (c *client) importFilter(sub *subscription) bool {
return c.canImport(string(sub.subject))
}
// updateRouteSubscriptionMap will make sure to update the route map for the subscription. Will
// also forward to all routes if needed.
func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, delta int32) {
if acc == nil || sub == nil {
return
}
acc.mu.RLock()
rm := acc.rm
acc.mu.RUnlock()
// This is non-nil when we know we are in cluster mode.
if rm == nil {
return
}
// We only store state on local subs for transmission across all other routes.
if sub.client == nil || (sub.client.kind != CLIENT && sub.client.kind != SYSTEM && sub.client.kind != LEAF) {
return
}
// Create the fast key which will use the subject or 'subject<spc>queue' for queue subscribers.
var (
_rkey [1024]byte
key []byte
update bool
)
if sub.queue != nil {
// Just make the key subject spc group, e.g. 'foo bar'
key = _rkey[:0]
key = append(key, sub.subject...)
key = append(key, byte(' '))
key = append(key, sub.queue...)
// We always update for a queue subscriber since we need to send our relative weight.
update = true
} else {
key = sub.subject
}
// Copy to hold outside acc lock.
var n int32
var ok bool
acc.mu.Lock()
if n, ok = rm[string(key)]; ok {
// This is non-nil when we know we are in cluster mode.
rm, lqws := acc.rm, acc.lqws
if rm == nil {
acc.mu.Unlock()
return
}
// Create the fast key which will use the subject or 'subject<spc>queue' for queue subscribers.
key := keyFromSub(sub)
isq := len(sub.queue) > 0
// Decide whether we need to send an update out to all the routes.
update := isq
// This is where we do update to account. For queues we need to take
// special care that this order of updates is same as what is sent out
// over routes.
if n, ok = rm[key]; ok {
n += delta
if n <= 0 {
delete(rm, string(key))
delete(rm, key)
if isq {
delete(lqws, key)
}
update = true // Update for deleting (N->0)
} else {
rm[string(key)] = n
rm[key] = n
}
} else if delta > 0 {
n = delta
rm[string(key)] = delta
rm[key] = delta
update = true // Adding a new entry for normal sub means update (0->1)
}
acc.mu.Unlock()
if !update {
return
}
// We need to send out this update.
// If we are sending a queue sub, copy and place in the queue weight.
if sub.queue != nil {
// If we are sending a queue sub, make a copy and place in the queue weight.
// FIXME(dlc) - We can be smarter here and avoid copying and acquiring the lock.
if isq {
sub.client.mu.Lock()
nsub := *sub
sub.client.mu.Unlock()
@@ -1377,45 +1370,46 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del
sub = &nsub
}
// Note that queue unsubs where entry.n > 0 are still
// subscribes with a smaller weight.
if n > 0 {
s.broadcastSubscribe(sub)
} else {
s.broadcastUnSubscribe(sub)
}
}
// We need to send out this update. Gather routes
var _routes [32]*client
routes := _routes[:0]
// broadcastSubscribe will forward a client subscription
// to all active routes as needed.
func (s *Server) broadcastSubscribe(sub *subscription) {
trace := atomic.LoadInt32(&s.logging.trace) == 1
s.mu.Lock()
subs := []*subscription{sub}
for _, route := range s.routes {
routes = append(routes, route)
}
trace := atomic.LoadInt32(&s.logging.trace) == 1
s.mu.Unlock()
// If we are a queue subscriber we need to make sure our updates are serialized from
// potential multiple connections. We want to make sure that the order above is preserved
// here but not necessarily all updates need to be sent. We need to block and recheck the
// n count with the lock held through sending here. We will suppress duplicate sends of same qw.
if isq {
acc.mu.Lock()
defer acc.mu.Unlock()
n = rm[key]
sub.qw = n
// Check the last sent weight here. If same, then someone
// beat us to it and we can just return here. Otherwise update
if ls, ok := lqws[key]; ok && ls == n {
return
} else {
lqws[key] = n
}
}
// Snapshot into array
subs := []*subscription{sub}
// Deliver to all routes.
for _, route := range routes {
route.mu.Lock()
route.sendRouteSubProtos(subs, trace, func(sub *subscription) bool {
return route.canImport(string(sub.subject))
})
// Note that queue unsubs where n > 0 are still
// subscribes with a smaller weight.
route.sendRouteSubOrUnSubProtos(subs, n > 0, trace, route.importFilter)
route.mu.Unlock()
}
s.mu.Unlock()
}
// broadcastUnSubscribe will forward a client unsubscribe
// action to all active routes.
func (s *Server) broadcastUnSubscribe(sub *subscription) {
trace := atomic.LoadInt32(&s.logging.trace) == 1
s.mu.Lock()
subs := []*subscription{sub}
for _, route := range s.routes {
route.mu.Lock()
route.sendRouteUnSubProtos(subs, trace, func(sub *subscription) bool {
return route.canImport(string(sub.subject))
})
route.mu.Unlock()
}
s.mu.Unlock()
}
func (s *Server) routeAcceptLoop(ch chan struct{}) {

View File

@@ -785,6 +785,7 @@ func (s *Server) registerAccount(acc *Account) {
// TODO(dlc)- Double check that we need this for GWs.
if acc.rm == nil && s.opts != nil && s.shouldTrackSubscriptions() {
acc.rm = make(map[string]int32)
acc.lqws = make(map[string]int32)
}
acc.srv = s
acc.mu.Unlock()

View File

@@ -14,17 +14,21 @@
package test
import (
"context"
"encoding/json"
"fmt"
"net"
"net/url"
"runtime"
"strconv"
"sync"
"testing"
"time"
"github.com/nats-io/nats-server/v2/logger"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
"github.com/nats-io/nuid"
)
func runNewRouteServer(t *testing.T) (*server.Server, *server.Options) {
@@ -1651,3 +1655,197 @@ func TestLargeClusterMem(t *testing.T) {
s.Shutdown()
}
}
func TestClusterLeaksSubscriptions(t *testing.T) {
srvA, srvB, optsA, optsB := runServers(t)
defer srvA.Shutdown()
defer srvB.Shutdown()
checkClusterFormed(t, srvA, srvB)
urlA := fmt.Sprintf("nats://%s:%d/", optsA.Host, optsA.Port)
urlB := fmt.Sprintf("nats://%s:%d/", optsB.Host, optsB.Port)
numResponses := 100
repliers := make([]*nats.Conn, 0, numResponses)
// Create 100 repliers
for i := 0; i < 50; i++ {
nc1, _ := nats.Connect(urlA)
nc2, _ := nats.Connect(urlB)
repliers = append(repliers, nc1, nc2)
nc1.Subscribe("test.reply", func(m *nats.Msg) {
m.Respond([]byte("{\"sender\": 22 }"))
})
nc2.Subscribe("test.reply", func(m *nats.Msg) {
m.Respond([]byte("{\"sender\": 33 }"))
})
nc1.Flush()
nc2.Flush()
}
servers := fmt.Sprintf("%s, %s", urlA, urlB)
req := sizedBytes(8 * 1024)
// Now run a requestor in a loop, creating and tearing down each time to
// simulate running a modified nats-req.
doReq := func() {
msgs := make(chan *nats.Msg, 1)
inbox := nats.NewInbox()
grp := nuid.Next()
// Create 8 queue Subscribers for responses.
for i := 0; i < 8; i++ {
nc, _ := nats.Connect(servers)
nc.ChanQueueSubscribe(inbox, grp, msgs)
nc.Flush()
defer nc.Close()
}
nc, _ := nats.Connect(servers)
nc.PublishRequest("test.reply", inbox, req)
defer nc.Close()
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
var received int
for {
select {
case <-msgs:
received++
if received >= numResponses {
return
}
case <-ctx.Done():
return
}
}
}
var wg sync.WaitGroup
doRequests := func(n int) {
for i := 0; i < n; i++ {
doReq()
}
wg.Done()
}
concurrent := 10
wg.Add(concurrent)
for i := 0; i < concurrent; i++ {
go doRequests(10)
}
wg.Wait()
// Close responders too, should have zero(0) subs attached to routes.
for _, nc := range repliers {
nc.Close()
}
// Make sure no clients remain. This is to make sure the test is correct and that
// we have closed all the client connections.
checkFor(t, time.Second, 10*time.Millisecond, func() error {
v1, _ := srvA.Varz(nil)
v2, _ := srvB.Varz(nil)
if v1.Connections != 0 || v2.Connections != 0 {
return fmt.Errorf("We have lingering client connections %d:%d", v1.Connections, v2.Connections)
}
return nil
})
loadRoutez := func() (*server.Routez, *server.Routez) {
v1, err := srvA.Routez(&server.RoutezOptions{Subscriptions: true})
if err != nil {
t.Fatalf("Error getting Routez: %v", err)
}
v2, err := srvB.Routez(&server.RoutezOptions{Subscriptions: true})
if err != nil {
t.Fatalf("Error getting Routez: %v", err)
}
return v1, v2
}
checkFor(t, time.Second, 10*time.Millisecond, func() error {
r1, r2 := loadRoutez()
if r1.Routes[0].NumSubs != 0 {
return fmt.Errorf("Leaked %d subs: %+v", r1.Routes[0].NumSubs, r1.Routes[0].Subs)
}
if r2.Routes[0].NumSubs != 0 {
return fmt.Errorf("Leaked %d subs: %+v", r2.Routes[0].NumSubs, r2.Routes[0].Subs)
}
return nil
})
}
// Make sure we have the correct remote state when dealing with queue subscribers
// across many client connections.
func TestQueueSubWeightOrderMultipleConnections(t *testing.T) {
s, opts := runNewRouteServer(t)
defer s.Shutdown()
// Create 100 connections to s
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
clients := make([]*nats.Conn, 0, 100)
for i := 0; i < 100; i++ {
nc, err := nats.Connect(url, nats.NoReconnect())
if err != nil {
t.Fatalf("Error connecting: %v", err)
}
defer nc.Close()
clients = append(clients, nc)
}
rc := createRouteConn(t, opts.Cluster.Host, opts.Cluster.Port)
defer rc.Close()
routeID := "RTEST_NEW:22"
routeSend, routeExpect := setupRouteEx(t, rc, opts, routeID)
info := checkInfoMsg(t, rc)
info.ID = routeID
b, err := json.Marshal(info)
if err != nil {
t.Fatalf("Could not marshal test route info: %v", err)
}
routeSend(fmt.Sprintf("INFO %s\r\n", b))
start := make(chan bool)
for _, nc := range clients {
go func(nc *nats.Conn) {
<-start
// Now create 100 identical queue subscribers on each connection.
for i := 0; i < 100; i++ {
if _, err := nc.QueueSubscribeSync("foo", "bar"); err != nil {
return
}
}
nc.Flush()
}(nc)
}
close(start)
// We did have this where we wanted to get every update, but now with optimizations
// we just want to make sure we always are increasing and that a previous update to
// a lesser queue weight is never delivered for this test.
max_expected := 10000
for qw := 0; qw < max_expected; {
buf := routeExpect(rsubRe)
matches := rsubRe.FindAllSubmatch(buf, -1)
for _, m := range matches {
if len(m) != 5 {
t.Fatalf("Expected a weight for the queue group")
}
nqw, err := strconv.Atoi(string(m[4]))
if err != nil {
t.Fatalf("Got an error converting queue weight: %v", err)
}
// Make sure the new value only increases, ok to skip since we will
// optimize this now, but needs to always be increasing.
if nqw <= qw {
t.Fatalf("Was expecting increasing queue weight after %d, got %d", qw, nqw)
}
qw = nqw
}
}
}