mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
Gateways: some optimizations
Check sublist only when required. Send the subs list in place instead of go routine (gateways have different outbound/inbound connections so they don't suffer same issue than routes) Bump the default array size when collecting gateway connections Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -157,6 +157,8 @@ type outsie struct {
|
||||
// Contains queue subscriptions when in optimistic mode,
|
||||
// and all subs when pk is > 0.
|
||||
sl *Sublist
|
||||
// Number of queue subs
|
||||
qsubs int
|
||||
}
|
||||
|
||||
// Inbound subject interest entry.
|
||||
@@ -896,14 +898,8 @@ func (c *client) processGatewayInfo(info *Info) {
|
||||
// Now that it is registered, we can remove from temp map.
|
||||
s.removeFromTempClients(cid)
|
||||
|
||||
// Send our QSubs, since this may take some time, execute
|
||||
// in a separate go-routine so that if there is incoming
|
||||
// data from the otherside, we don't cause a slow consumer
|
||||
// error.
|
||||
s.startGoRoutine(func() {
|
||||
s.sendQueueSubsToGateway(c)
|
||||
s.grWG.Done()
|
||||
})
|
||||
// Send our QSubs.
|
||||
s.sendQueueSubsToGateway(c)
|
||||
|
||||
// Initiate outbound connection. This function will behave correctly if
|
||||
// we have already one.
|
||||
@@ -1112,7 +1108,7 @@ func (s *Server) sendGatewayConfigsToRoute(route *client) {
|
||||
return
|
||||
}
|
||||
// Collect gateway configs for which we have an outbound connection.
|
||||
gwCfgsa := [4]*gatewayCfg{}
|
||||
gwCfgsa := [16]*gatewayCfg{}
|
||||
gwCfgs := gwCfgsa[:0]
|
||||
for _, c := range gw.out {
|
||||
c.mu.Lock()
|
||||
@@ -1550,8 +1546,6 @@ func (c *client) processGatewayRUnsub(arg []byte) error {
|
||||
ei, _ := c.gw.outsim.Load(accName)
|
||||
if ei != nil {
|
||||
e = ei.(*outsie)
|
||||
e.Lock()
|
||||
defer e.Unlock()
|
||||
// If there is an entry, for plain sub we need
|
||||
// to know if we should store the sub
|
||||
useSl = queue != nil || e.mode != modeOptimistic
|
||||
@@ -1564,6 +1558,8 @@ func (c *client) processGatewayRUnsub(arg []byte) error {
|
||||
e = &outsie{ni: make(map[string]struct{}), sl: NewSublist()}
|
||||
newe = true
|
||||
}
|
||||
e.Lock()
|
||||
defer e.Unlock()
|
||||
// This is when a sub or queue sub is supposed to be in
|
||||
// the sublist. Look for it and remove.
|
||||
if useSl {
|
||||
@@ -1577,6 +1573,7 @@ func (c *client) processGatewayRUnsub(arg []byte) error {
|
||||
if e.sl.Remove(sub) == nil {
|
||||
delete(c.subs, string(key))
|
||||
if queue != nil {
|
||||
e.qsubs--
|
||||
atomic.AddInt64(&c.srv.gateway.totalQSubs, -1)
|
||||
}
|
||||
// If last, we can remove the whole entry only
|
||||
@@ -1637,8 +1634,6 @@ func (c *client) processGatewayRSub(arg []byte) error {
|
||||
// getting many RS- from the remote..
|
||||
if ei != nil {
|
||||
e = ei.(*outsie)
|
||||
e.Lock()
|
||||
defer e.Unlock()
|
||||
useSl = queue != nil || e.mode != modeOptimistic
|
||||
} else if queue == nil {
|
||||
return nil
|
||||
@@ -1647,6 +1642,8 @@ func (c *client) processGatewayRSub(arg []byte) error {
|
||||
newe = true
|
||||
useSl = true
|
||||
}
|
||||
e.Lock()
|
||||
defer e.Unlock()
|
||||
if useSl {
|
||||
var key []byte
|
||||
// We store remote subs by account/subject[/queue].
|
||||
@@ -1680,12 +1677,13 @@ func (c *client) processGatewayRSub(arg []byte) error {
|
||||
// If no error inserting in sublist...
|
||||
if e.sl.Insert(sub) == nil {
|
||||
c.subs[string(key)] = sub
|
||||
if queue != nil {
|
||||
e.qsubs++
|
||||
atomic.AddInt64(&c.srv.gateway.totalQSubs, 1)
|
||||
}
|
||||
if newe {
|
||||
c.gw.outsim.Store(string(accName), e)
|
||||
}
|
||||
if queue != nil {
|
||||
atomic.AddInt64(&c.srv.gateway.totalQSubs, 1)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
subj := string(subject)
|
||||
@@ -1725,26 +1723,24 @@ func (c *client) gatewayInterest(acc, subj string) (bool, *SublistResult) {
|
||||
if accountInMap {
|
||||
// If in map, check for subs interest with sublist.
|
||||
e := ei.(*outsie)
|
||||
r = e.sl.Match(subj)
|
||||
// If there is plain subs returned, we don't have to
|
||||
// check if we should use the no-interest map because
|
||||
// it means that we are in modeInterestOnly.
|
||||
// Only if there is nothing returned for r.psubs that
|
||||
// we need to check.
|
||||
if len(r.psubs) > 0 {
|
||||
psi = true
|
||||
} else {
|
||||
e.RLock()
|
||||
// We may be in transition to modeInterestOnly
|
||||
// but until e.ni is nil, use it to know if we
|
||||
// should suppress interest or not.
|
||||
if e.ni != nil {
|
||||
if _, inMap := e.ni[subj]; !inMap {
|
||||
psi = true
|
||||
}
|
||||
e.RLock()
|
||||
// We may be in transition to modeInterestOnly
|
||||
// but until e.ni is nil, use it to know if we
|
||||
// should suppress interest or not.
|
||||
if e.ni != nil {
|
||||
if _, inMap := e.ni[subj]; !inMap {
|
||||
psi = true
|
||||
}
|
||||
e.RUnlock()
|
||||
}
|
||||
// If we are in modeInterestOnly (e.ni will be nil)
|
||||
// or if we have queue subs, we also need to check sl.Match.
|
||||
if e.ni == nil || e.qsubs > 0 {
|
||||
r = e.sl.Match(subj)
|
||||
if len(r.psubs) > 0 {
|
||||
psi = true
|
||||
}
|
||||
}
|
||||
e.RUnlock()
|
||||
}
|
||||
return psi, r
|
||||
}
|
||||
@@ -1757,7 +1753,7 @@ func (s *Server) maybeSendSubOrUnsubToGateways(accName string, sub *subscription
|
||||
if sub.queue != nil {
|
||||
return
|
||||
}
|
||||
gwsa := [4]*client{}
|
||||
gwsa := [16]*client{}
|
||||
gws := gwsa[:0]
|
||||
s.getInboundGatewayConnections(&gws)
|
||||
if len(gws) == 0 {
|
||||
@@ -1856,7 +1852,7 @@ func (s *Server) sendQueueSubOrUnsubToGateways(accName string, qsub *subscriptio
|
||||
return
|
||||
}
|
||||
|
||||
gwsa := [4]*client{}
|
||||
gwsa := [16]*client{}
|
||||
gws := gwsa[:0]
|
||||
s.getInboundGatewayConnections(&gws)
|
||||
if len(gws) == 0 {
|
||||
@@ -1966,7 +1962,7 @@ func (s *Server) gatewayUpdateSubInterest(accName string, sub *subscription, cha
|
||||
// subject, etc..
|
||||
// <Invoked from any client connection's readLoop>
|
||||
func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgroups [][]byte) {
|
||||
gwsa := [4]*client{}
|
||||
gwsa := [16]*client{}
|
||||
gws := gwsa[:0]
|
||||
// This is in fast path, so avoid calling function when possible.
|
||||
// Get the outbound connections in place instead of calling
|
||||
|
||||
@@ -24,6 +24,7 @@ import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"net/url"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -916,3 +917,212 @@ func Benchmark____FanIn_64kx100x1(b *testing.B) {
|
||||
func Benchmark___FanIn_128kx100x1(b *testing.B) {
|
||||
doFanIn(b, 1, 100, 1, sub, sizedString(65536*2))
|
||||
}
|
||||
|
||||
func gatewaysBench(b *testing.B, optimisticMode bool, payload string, numPublishers int, subInterest bool) {
|
||||
b.Helper()
|
||||
if b.N < numPublishers {
|
||||
return
|
||||
}
|
||||
|
||||
ob := testDefaultOptionsForGateway("B")
|
||||
sb := RunServer(ob)
|
||||
defer sb.Shutdown()
|
||||
|
||||
server.SetGatewaysSolicitDelay(10 * time.Millisecond)
|
||||
defer server.ResetGatewaysSolicitDelay()
|
||||
|
||||
gwbURL, err := url.Parse(fmt.Sprintf("nats://%s:%d", ob.Gateway.Host, ob.Gateway.Port))
|
||||
if err != nil {
|
||||
b.Fatalf("Error parsing url: %v", err)
|
||||
}
|
||||
oa := testDefaultOptionsForGateway("A")
|
||||
oa.Gateway.Gateways = []*server.RemoteGatewayOpts{
|
||||
&server.RemoteGatewayOpts{
|
||||
Name: "B",
|
||||
URLs: []*url.URL{gwbURL},
|
||||
},
|
||||
}
|
||||
sa := RunServer(oa)
|
||||
defer sa.Shutdown()
|
||||
|
||||
sub := createClientConn(b, ob.Host, ob.Port)
|
||||
defer sub.Close()
|
||||
doDefaultConnect(b, sub)
|
||||
sendProto(b, sub, "SUB end.test 1\r\n")
|
||||
if subInterest {
|
||||
sendProto(b, sub, "SUB foo 2\r\n")
|
||||
}
|
||||
flushConnection(b, sub)
|
||||
|
||||
// If not optimisticMode, make B switch GW connection
|
||||
// to interest mode only
|
||||
if !optimisticMode {
|
||||
pub := createClientConn(b, oa.Host, oa.Port)
|
||||
doDefaultConnect(b, pub)
|
||||
// has to be more that defaultGatewayMaxRUnsubBeforeSwitch
|
||||
for i := 0; i < 2000; i++ {
|
||||
sendProto(b, pub, fmt.Sprintf("PUB reject.me.%d 2\r\nok\r\n", i+1))
|
||||
}
|
||||
flushConnection(b, pub)
|
||||
pub.Close()
|
||||
}
|
||||
|
||||
ch := make(chan bool)
|
||||
var msgOp string
|
||||
var expected int
|
||||
if subInterest {
|
||||
msgOp = fmt.Sprintf("MSG foo 2 %d\r\n%s\r\n", len(payload), payload)
|
||||
expected = len(msgOp) * b.N
|
||||
}
|
||||
// Last message sent to end.test
|
||||
lastMsg := "MSG end.test 1 2\r\nok\r\n"
|
||||
expected += len(lastMsg) * numPublishers
|
||||
go drainConnection(b, sub, ch, expected)
|
||||
|
||||
sendOp := []byte(fmt.Sprintf("PUB foo %d\r\n%s\r\n", len(payload), payload))
|
||||
startCh := make(chan bool)
|
||||
l := b.N / numPublishers
|
||||
|
||||
pubLoop := func(c net.Conn, ch chan bool) {
|
||||
bw := bufio.NewWriterSize(c, defaultSendBufSize)
|
||||
|
||||
// Signal we are ready
|
||||
close(ch)
|
||||
|
||||
// Wait to start up actual sends.
|
||||
<-startCh
|
||||
|
||||
for i := 0; i < l; i++ {
|
||||
if _, err := bw.Write(sendOp); err != nil {
|
||||
b.Errorf("Received error on PUB write: %v\n", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
if _, err := bw.Write([]byte("PUB end.test 2\r\nok\r\n")); err != nil {
|
||||
b.Errorf("Received error on PUB write: %v\n", err)
|
||||
return
|
||||
}
|
||||
if err := bw.Flush(); err != nil {
|
||||
b.Errorf("Received error on FLUSH write: %v\n", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Publish Connections SPINUP
|
||||
for i := 0; i < numPublishers; i++ {
|
||||
c := createClientConn(b, oa.Host, oa.Port)
|
||||
doDefaultConnect(b, c)
|
||||
flushConnection(b, c)
|
||||
ch := make(chan bool)
|
||||
|
||||
go pubLoop(c, ch)
|
||||
<-ch
|
||||
}
|
||||
|
||||
b.SetBytes(int64(len(sendOp) + len(msgOp)))
|
||||
b.ResetTimer()
|
||||
|
||||
// Closing this will start all publishers at once (roughly)
|
||||
close(startCh)
|
||||
|
||||
// Wait for end of test
|
||||
<-ch
|
||||
|
||||
b.StopTimer()
|
||||
}
|
||||
|
||||
func Benchmark_Gateways___Optimistic_1kx01x0(b *testing.B) {
|
||||
gatewaysBench(b, true, sizedString(1024), 1, false)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways___Optimistic_2kx01x0(b *testing.B) {
|
||||
gatewaysBench(b, true, sizedString(2048), 1, false)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways___Optimistic_4kx01x0(b *testing.B) {
|
||||
gatewaysBench(b, true, sizedString(4096), 1, false)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways___Optimistic_1kx10x0(b *testing.B) {
|
||||
gatewaysBench(b, true, sizedString(1024), 10, false)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways___Optimistic_2kx10x0(b *testing.B) {
|
||||
gatewaysBench(b, true, sizedString(2048), 10, false)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways___Optimistic_4kx10x0(b *testing.B) {
|
||||
gatewaysBench(b, true, sizedString(4096), 10, false)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways___Optimistic_1kx01x1(b *testing.B) {
|
||||
gatewaysBench(b, true, sizedString(1024), 1, true)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways___Optimistic_2kx01x1(b *testing.B) {
|
||||
gatewaysBench(b, true, sizedString(2048), 1, true)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways___Optimistic_4kx01x1(b *testing.B) {
|
||||
gatewaysBench(b, true, sizedString(4096), 1, true)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways___Optimistic_1kx10x1(b *testing.B) {
|
||||
gatewaysBench(b, true, sizedString(1024), 10, true)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways___Optimistic_2kx10x1(b *testing.B) {
|
||||
gatewaysBench(b, true, sizedString(2048), 10, true)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways___Optimistic_4kx10x1(b *testing.B) {
|
||||
gatewaysBench(b, true, sizedString(4096), 10, true)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways_InterestOnly_1kx01x0(b *testing.B) {
|
||||
gatewaysBench(b, false, sizedString(1024), 1, false)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways_InterestOnly_2kx01x0(b *testing.B) {
|
||||
gatewaysBench(b, false, sizedString(2048), 1, false)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways_InterestOnly_4kx01x0(b *testing.B) {
|
||||
gatewaysBench(b, false, sizedString(4096), 1, false)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways_InterestOnly_1kx10x0(b *testing.B) {
|
||||
gatewaysBench(b, false, sizedString(1024), 10, false)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways_InterestOnly_2kx10x0(b *testing.B) {
|
||||
gatewaysBench(b, false, sizedString(2048), 10, false)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways_InterestOnly_4kx10x0(b *testing.B) {
|
||||
gatewaysBench(b, false, sizedString(4096), 10, false)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways_InterestOnly_1kx01x1(b *testing.B) {
|
||||
gatewaysBench(b, false, sizedString(1024), 1, true)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways_InterestOnly_2kx01x1(b *testing.B) {
|
||||
gatewaysBench(b, false, sizedString(2048), 1, true)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways_InterestOnly_4kx01x1(b *testing.B) {
|
||||
gatewaysBench(b, false, sizedString(4096), 1, true)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways_InterestOnly_1kx10x1(b *testing.B) {
|
||||
gatewaysBench(b, false, sizedString(1024), 10, true)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways_InterestOnly_2kx10x1(b *testing.B) {
|
||||
gatewaysBench(b, false, sizedString(2048), 10, true)
|
||||
}
|
||||
|
||||
func Benchmark_Gateways_InterestOnly_4kx10x1(b *testing.B) {
|
||||
gatewaysBench(b, false, sizedString(4096), 10, true)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user