mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
Merge pull request #827 from nats-io/gw_send_all_subs
Switch to send-all-subs when number of RS- gets too big
This commit is contained in:
@@ -629,6 +629,9 @@ func (c *client) readLoop() {
|
||||
s := c.srv
|
||||
c.in.rsz = startBufSize
|
||||
defer s.grWG.Done()
|
||||
if c.gw != nil && c.gw.outbound {
|
||||
defer c.gatewayOutboundConnectionReadLoopExited()
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
if nc == nil {
|
||||
@@ -1489,6 +1492,7 @@ func (c *client) processSub(argo []byte) (err error) {
|
||||
sid := string(sub.sid)
|
||||
acc := c.acc
|
||||
|
||||
updateGWs := false
|
||||
// Subscribe here.
|
||||
if c.subs[sid] == nil {
|
||||
c.subs[sid] = sub
|
||||
@@ -1496,6 +1500,8 @@ func (c *client) processSub(argo []byte) (err error) {
|
||||
err = acc.sl.Insert(sub)
|
||||
if err != nil {
|
||||
delete(c.subs, sid)
|
||||
} else {
|
||||
updateGWs = c.srv.gateway.enabled
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1515,6 +1521,9 @@ func (c *client) processSub(argo []byte) (err error) {
|
||||
// If we are routing and this is a local sub, add to the route map for the associated account.
|
||||
if kind == CLIENT || kind == SYSTEM {
|
||||
c.srv.updateRouteSubscriptionMap(acc, sub, 1)
|
||||
if updateGWs {
|
||||
c.srv.gatewayUpdateSubInterest(acc.Name, sub, 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1739,6 +1748,7 @@ func (c *client) processUnsub(arg []byte) error {
|
||||
kind := c.kind
|
||||
var acc *Account
|
||||
|
||||
updateGWs := false
|
||||
if sub, ok = c.subs[string(sid)]; ok {
|
||||
acc = c.acc
|
||||
if max > 0 {
|
||||
@@ -1748,6 +1758,7 @@ func (c *client) processUnsub(arg []byte) error {
|
||||
sub.max = 0
|
||||
unsub = true
|
||||
}
|
||||
updateGWs = c.srv.gateway.enabled
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
@@ -1759,6 +1770,9 @@ func (c *client) processUnsub(arg []byte) error {
|
||||
c.unsubscribe(acc, sub, false)
|
||||
if acc != nil && kind == CLIENT || kind == SYSTEM {
|
||||
c.srv.updateRouteSubscriptionMap(acc, sub, -1)
|
||||
if updateGWs {
|
||||
c.srv.gatewayUpdateSubInterest(acc.Name, sub, -1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2089,7 +2103,8 @@ func (c *client) processInboundClientMsg(msg []byte) {
|
||||
// mode and the remote gateways have queue subs, then we need to
|
||||
// collect the queue groups this message was sent to so that we
|
||||
// exclude them when sending to gateways.
|
||||
if len(r.qsubs) > 0 && c.srv.gateway.enabled && atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
|
||||
if len(r.qsubs) > 0 && c.srv.gateway.enabled &&
|
||||
atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
|
||||
qnames = &queues
|
||||
}
|
||||
c.processMsgResults(c.acc, r, msg, c.pa.subject, c.pa.reply, qnames)
|
||||
@@ -2619,7 +2634,7 @@ func (c *client) closeConnection(reason ClosedState) {
|
||||
// Remove clients subscriptions.
|
||||
if kind == CLIENT {
|
||||
acc.sl.RemoveBatch(subs)
|
||||
} else {
|
||||
} else if kind == ROUTER {
|
||||
go c.removeRemoteSubs()
|
||||
}
|
||||
|
||||
@@ -2653,6 +2668,9 @@ func (c *client) closeConnection(reason ClosedState) {
|
||||
qsubs[key] = &qsub{sub, 1}
|
||||
}
|
||||
}
|
||||
if srv.gateway.enabled {
|
||||
srv.gatewayUpdateSubInterest(acc.Name, sub, -1)
|
||||
}
|
||||
}
|
||||
// Process any qsubs here.
|
||||
for _, esub := range qsubs {
|
||||
|
||||
1047
server/gateway.go
1047
server/gateway.go
File diff suppressed because it is too large
Load Diff
@@ -80,10 +80,13 @@ func checkForRegisteredQSubInterest(t *testing.T, s *Server, gwName, acc, subj s
|
||||
checkFor(t, timeout, 15*time.Millisecond, func() error {
|
||||
count := 0
|
||||
c := s.getOutboundGatewayConnection(gwName)
|
||||
qsi, _ := c.gw.qsubsInterest.Load(acc)
|
||||
if qsi != nil {
|
||||
qsl := qsi.(*Sublist)
|
||||
count = int(qsl.Count())
|
||||
ei, _ := c.gw.outsim.Load(acc)
|
||||
if ei != nil {
|
||||
sl := ei.(*outsie).sl
|
||||
r := sl.Match(subj)
|
||||
for _, qsubs := range r.qsubs {
|
||||
count += len(qsubs)
|
||||
}
|
||||
}
|
||||
if count == expected {
|
||||
return nil
|
||||
@@ -92,6 +95,30 @@ func checkForRegisteredQSubInterest(t *testing.T, s *Server, gwName, acc, subj s
|
||||
})
|
||||
}
|
||||
|
||||
func checkForNoInterest(t *testing.T, c *client, account, subject string, expectNoInterest bool, timeout time.Duration) {
|
||||
t.Helper()
|
||||
checkFor(t, timeout, 15*time.Millisecond, func() error {
|
||||
ei, _ := c.gw.outsim.Load(account)
|
||||
if ei == nil {
|
||||
return fmt.Errorf("Did not receive subject no-interest")
|
||||
}
|
||||
e := ei.(*outsie)
|
||||
e.RLock()
|
||||
_, inMap := e.ni[subject]
|
||||
e.RUnlock()
|
||||
if expectNoInterest {
|
||||
if inMap {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("Did not receive subject no-interest on %q", subject)
|
||||
}
|
||||
if inMap {
|
||||
return fmt.Errorf("No-interest on subject %q was not cleared", subject)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func waitCh(t *testing.T, ch chan bool, errTxt string) {
|
||||
t.Helper()
|
||||
select {
|
||||
@@ -1165,7 +1192,7 @@ func TestGatewayAccountInterest(t *testing.T) {
|
||||
|
||||
// S2 should have sent a protocol indicating no interest.
|
||||
checkFor(t, time.Second, 15*time.Millisecond, func() error {
|
||||
if _, inMap := gwcb.gw.noInterest.Load("$foo"); !inMap {
|
||||
if _, inMap := gwcb.gw.outsim.Load("$foo"); !inMap {
|
||||
return fmt.Errorf("Did not receive account no interest")
|
||||
}
|
||||
return nil
|
||||
@@ -1181,7 +1208,7 @@ func TestGatewayAccountInterest(t *testing.T) {
|
||||
// Add account to S2, this should clear the no interest for that account.
|
||||
s2.RegisterAccount("$foo")
|
||||
checkFor(t, time.Second, 15*time.Millisecond, func() error {
|
||||
if _, inMap := gwcb.gw.noInterest.Load("$foo"); inMap {
|
||||
if _, inMap := gwcb.gw.outsim.Load("$foo"); inMap {
|
||||
return fmt.Errorf("NoInterest has not been cleared")
|
||||
}
|
||||
return nil
|
||||
@@ -1249,27 +1276,11 @@ func TestGatewaySubjectInterest(t *testing.T) {
|
||||
checkCount(t, gwcb, 1)
|
||||
|
||||
// S2 should have sent a protocol indicating no subject interest.
|
||||
checkForNoInterest := func(t *testing.T, subject string, expectNoInterest bool) {
|
||||
checkNoInterest := func(t *testing.T, subject string, expectedNoInterest bool) {
|
||||
t.Helper()
|
||||
checkFor(t, time.Second, 15*time.Millisecond, func() error {
|
||||
sni, _ := gwcb.gw.noInterest.Load("$foo")
|
||||
if sni == nil {
|
||||
return fmt.Errorf("Did not receive subject no-interest")
|
||||
}
|
||||
_, subjNoInterest := sni.(*sync.Map).Load(subject)
|
||||
if expectNoInterest {
|
||||
if subjNoInterest {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("Did not receive subject no-interest on %q", subject)
|
||||
}
|
||||
if subjNoInterest {
|
||||
return fmt.Errorf("No-interest on subject %q was not cleared", subject)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
checkForNoInterest(t, gwcb, "$foo", subject, expectedNoInterest, 2*time.Second)
|
||||
}
|
||||
checkForNoInterest(t, "foo", true)
|
||||
checkNoInterest(t, "foo", true)
|
||||
// Second send should not go through to B
|
||||
natsPub(t, nc, "foo", []byte("hello"))
|
||||
natsFlush(t, nc)
|
||||
@@ -1288,7 +1299,7 @@ func TestGatewaySubjectInterest(t *testing.T) {
|
||||
checkExpectedSubs(t, 0, s1)
|
||||
|
||||
// This should clear the no interest for this subject
|
||||
checkForNoInterest(t, "foo", false)
|
||||
checkNoInterest(t, "foo", false)
|
||||
// Third send should go to B
|
||||
natsPub(t, nc, "foo", []byte("hello"))
|
||||
natsFlush(t, nc)
|
||||
@@ -1307,7 +1318,7 @@ func TestGatewaySubjectInterest(t *testing.T) {
|
||||
natsFlush(t, nc)
|
||||
checkCount(t, gwcb, 3)
|
||||
|
||||
checkForNoInterest(t, "foo", true)
|
||||
checkNoInterest(t, "foo", true)
|
||||
|
||||
// Send one more time and now it should not go to B
|
||||
natsPub(t, nc, "foo", []byte("hello"))
|
||||
@@ -1320,7 +1331,7 @@ func TestGatewaySubjectInterest(t *testing.T) {
|
||||
checkCount(t, gwcb, 4)
|
||||
|
||||
// But now we should have receives an RS- on bar.
|
||||
checkForNoInterest(t, "bar", true)
|
||||
checkNoInterest(t, "bar", true)
|
||||
|
||||
// Check that wildcards are supported. Create a subscription on '*' on B.
|
||||
// This should clear the no-interest on both "foo" and "bar"
|
||||
@@ -1328,8 +1339,8 @@ func TestGatewaySubjectInterest(t *testing.T) {
|
||||
natsFlush(t, ncb)
|
||||
checkExpectedSubs(t, 1, s2)
|
||||
checkExpectedSubs(t, 0, s1)
|
||||
checkForNoInterest(t, "foo", false)
|
||||
checkForNoInterest(t, "bar", false)
|
||||
checkNoInterest(t, "foo", false)
|
||||
checkNoInterest(t, "bar", false)
|
||||
// Publish on message on foo and one on bar and they should go.
|
||||
natsPub(t, nc, "foo", []byte("hello"))
|
||||
natsPub(t, nc, "bar", []byte("hello"))
|
||||
@@ -1351,7 +1362,7 @@ func TestGatewaySubjectInterest(t *testing.T) {
|
||||
natsFlush(t, nc)
|
||||
checkCount(t, gwcb, 1)
|
||||
|
||||
checkForNoInterest(t, "foo", true)
|
||||
checkNoInterest(t, "foo", true)
|
||||
|
||||
natsPub(t, nc, "foo", []byte("hello"))
|
||||
natsFlush(t, nc)
|
||||
@@ -1386,7 +1397,7 @@ func TestGatewaySubjectInterest(t *testing.T) {
|
||||
checkExpectedSubs(t, 1, s2, s2bis)
|
||||
|
||||
// Check that subject no-interest on A was cleared.
|
||||
checkForNoInterest(t, "foo", false)
|
||||
checkNoInterest(t, "foo", false)
|
||||
|
||||
// Now publish. Remember, s1 has outbound gateway to s2, and s2 does not
|
||||
// have a local subscription and has previously sent a no-interest on "foo".
|
||||
@@ -1710,13 +1721,14 @@ func TestGatewayQueueSub(t *testing.T) {
|
||||
|
||||
// gwcB should have the qsubs interest map empty now.
|
||||
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
|
||||
gwcB.mu.Lock()
|
||||
_, exists := gwcB.gw.qsubsInterest.Load(globalAccountName)
|
||||
gwcB.mu.Unlock()
|
||||
if exists {
|
||||
return fmt.Errorf("Qsub interest for account should have been removed")
|
||||
ei, _ := gwcB.gw.outsim.Load(globalAccountName)
|
||||
if ei != nil {
|
||||
sl := ei.(*outsie).sl
|
||||
if sl.Count() == 0 {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return fmt.Errorf("Qsub interest for account should have been removed")
|
||||
})
|
||||
|
||||
// Reset counters
|
||||
@@ -1730,17 +1742,34 @@ func TestGatewayQueueSub(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGatewayTotalQSubs(t *testing.T) {
|
||||
o2 := testDefaultOptionsForGateway("B")
|
||||
s2 := runGatewayServer(o2)
|
||||
defer s2.Shutdown()
|
||||
ob1 := testDefaultOptionsForGateway("B")
|
||||
sb1 := runGatewayServer(ob1)
|
||||
defer sb1.Shutdown()
|
||||
|
||||
s2Url := fmt.Sprintf("nats://127.0.0.1:%d", o2.Port)
|
||||
subnc := natsConnect(t, s2Url)
|
||||
defer subnc.Close()
|
||||
ob2 := testDefaultOptionsForGateway("B")
|
||||
ob2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", sb1.ClusterAddr().Port))
|
||||
sb2 := runGatewayServer(ob2)
|
||||
defer sb2.Shutdown()
|
||||
|
||||
o1 := testGatewayOptionsFromToWithServers(t, "A", "B", s2)
|
||||
s1 := runGatewayServer(o1)
|
||||
defer s1.Shutdown()
|
||||
checkClusterFormed(t, sb1, sb2)
|
||||
|
||||
sb1URL := fmt.Sprintf("nats://%s:%d", ob1.Host, ob1.Port)
|
||||
ncb1 := natsConnect(t, sb1URL, nats.ReconnectWait(50*time.Millisecond))
|
||||
defer ncb1.Close()
|
||||
|
||||
sb2URL := fmt.Sprintf("nats://%s:%d", ob2.Host, ob2.Port)
|
||||
ncb2 := natsConnect(t, sb2URL, nats.ReconnectWait(50*time.Millisecond))
|
||||
defer ncb2.Close()
|
||||
|
||||
oa := testGatewayOptionsFromToWithServers(t, "A", "B", sb1)
|
||||
sa := runGatewayServer(oa)
|
||||
defer sa.Shutdown()
|
||||
|
||||
waitForOutboundGateways(t, sa, 1, 2*time.Second)
|
||||
waitForOutboundGateways(t, sb1, 1, 2*time.Second)
|
||||
waitForOutboundGateways(t, sb2, 1, 2*time.Second)
|
||||
waitForInboundGateways(t, sa, 2, 2*time.Second)
|
||||
waitForInboundGateways(t, sb1, 1, 2*time.Second)
|
||||
|
||||
checkTotalQSubs := func(t *testing.T, s *Server, expected int) {
|
||||
t.Helper()
|
||||
@@ -1752,18 +1781,61 @@ func TestGatewayTotalQSubs(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
qsub1 := natsQueueSub(t, subnc, "foo", "bar", func(_ *nats.Msg) {})
|
||||
checkTotalQSubs(t, s1, 1)
|
||||
qsub2 := natsQueueSub(t, subnc, "foo", "baz", func(_ *nats.Msg) {})
|
||||
checkTotalQSubs(t, s1, 2)
|
||||
qsub3 := natsQueueSub(t, subnc, "bar", "bar", func(_ *nats.Msg) {})
|
||||
checkTotalQSubs(t, s1, 3)
|
||||
natsUnsub(t, qsub1)
|
||||
checkTotalQSubs(t, s1, 2)
|
||||
cb := func(_ *nats.Msg) {}
|
||||
|
||||
natsQueueSub(t, ncb1, "foo", "bar", cb)
|
||||
checkTotalQSubs(t, sa, 1)
|
||||
qsub2 := natsQueueSub(t, ncb1, "foo", "baz", cb)
|
||||
checkTotalQSubs(t, sa, 2)
|
||||
qsub3 := natsQueueSub(t, ncb1, "foo", "baz", cb)
|
||||
checkTotalQSubs(t, sa, 2)
|
||||
|
||||
// Shutdown sb1, there should be a failover from clients
|
||||
// to sb2. sb2 will then send the queue subs to sa.
|
||||
sb1.Shutdown()
|
||||
|
||||
checkClientsCount(t, sb2, 2)
|
||||
checkExpectedSubs(t, 3, sb2)
|
||||
|
||||
waitForOutboundGateways(t, sa, 1, 2*time.Second)
|
||||
waitForOutboundGateways(t, sb2, 1, 2*time.Second)
|
||||
waitForInboundGateways(t, sa, 1, 2*time.Second)
|
||||
waitForInboundGateways(t, sb2, 1, 2*time.Second)
|
||||
|
||||
// When sb1 is shutdown, the total qsubs on sa should fall
|
||||
// down to 0, but will be updated as soon as sa and sb2
|
||||
// connect to each other. So instead we will verify by
|
||||
// making sure that the count is 2 instead of 4 if there
|
||||
// was a bug.
|
||||
// (note that there are 2 qsubs on same group, so only
|
||||
// 1 RS+ would have been sent for that group)
|
||||
checkTotalQSubs(t, sa, 2)
|
||||
|
||||
// Restart sb1
|
||||
sb1 = runGatewayServer(ob1)
|
||||
defer sb1.Shutdown()
|
||||
|
||||
checkClusterFormed(t, sb1, sb2)
|
||||
|
||||
waitForOutboundGateways(t, sa, 1, 2*time.Second)
|
||||
waitForOutboundGateways(t, sb1, 1, 2*time.Second)
|
||||
waitForOutboundGateways(t, sb2, 1, 2*time.Second)
|
||||
waitForInboundGateways(t, sa, 2, 2*time.Second)
|
||||
waitForInboundGateways(t, sb1, 0, 2*time.Second)
|
||||
waitForInboundGateways(t, sb2, 1, 2*time.Second)
|
||||
|
||||
// Now start unsubscribing. Start with one of the duplicate
|
||||
// and check that count stays same.
|
||||
natsUnsub(t, qsub3)
|
||||
checkTotalQSubs(t, s1, 1)
|
||||
checkTotalQSubs(t, sa, 2)
|
||||
// Now the other, which would cause an RS-
|
||||
natsUnsub(t, qsub2)
|
||||
checkTotalQSubs(t, s1, 0)
|
||||
checkTotalQSubs(t, sa, 1)
|
||||
// Now test that if connections are closed, things are updated
|
||||
// properly.
|
||||
ncb1.Close()
|
||||
ncb2.Close()
|
||||
checkTotalQSubs(t, sa, 0)
|
||||
}
|
||||
|
||||
func TestGatewaySendQSubsOnGatewayConnect(t *testing.T) {
|
||||
@@ -1868,10 +1940,16 @@ func TestGatewaySendRemoteQSubs(t *testing.T) {
|
||||
// Wait for the no interest to be received by A
|
||||
checkFor(t, time.Second, 15*time.Millisecond, func() error {
|
||||
gw := sa.getOutboundGatewayConnection("B").gw
|
||||
if ni, _ := gw.noInterest.Load(globalAccountName); ni == nil {
|
||||
return fmt.Errorf("No-interest still not registered")
|
||||
ei, _ := gw.outsim.Load(globalAccountName)
|
||||
if ei != nil {
|
||||
e := ei.(*outsie)
|
||||
e.RLock()
|
||||
defer e.RUnlock()
|
||||
if _, inMap := e.ni["foo"]; inMap {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return fmt.Errorf("No-interest still not registered")
|
||||
})
|
||||
|
||||
// Unsubscribe 1 qsub
|
||||
@@ -1892,13 +1970,17 @@ func TestGatewaySendRemoteQSubs(t *testing.T) {
|
||||
// No more subs now on both sb1 and sb2
|
||||
checkExpectedSubs(t, 0, sb1, sb2)
|
||||
|
||||
// Server sb1 should not have qsub in its rqs map
|
||||
// Server sb1 should not have qsub in its sub interest map
|
||||
checkFor(t, time.Second, 15*time.Millisecond, func() error {
|
||||
sb1.gateway.RLock()
|
||||
entry := sb1.gateway.rqs[fmt.Sprintf("%s foo bar", globalAccountName)]
|
||||
sb1.gateway.RUnlock()
|
||||
var entry *sitally
|
||||
sb1.gateway.pasi.Lock()
|
||||
asim := sb1.gateway.pasi.m[globalAccountName]
|
||||
if asim != nil {
|
||||
entry = asim["foo bar"]
|
||||
}
|
||||
sb1.gateway.pasi.Unlock()
|
||||
if entry != nil {
|
||||
return fmt.Errorf("Map rqs should not have an entry, got %#v", entry)
|
||||
return fmt.Errorf("Map should not have an entry, got %#v", entry)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
@@ -1906,10 +1988,14 @@ func TestGatewaySendRemoteQSubs(t *testing.T) {
|
||||
// Let's wait for A to receive the unsubscribe
|
||||
checkFor(t, time.Second, 15*time.Millisecond, func() error {
|
||||
gw := sa.getOutboundGatewayConnection("B").gw
|
||||
if sli, _ := gw.qsubsInterest.Load(globalAccountName); sli != nil {
|
||||
return fmt.Errorf("Interest still present")
|
||||
ei, _ := gw.outsim.Load(globalAccountName)
|
||||
if ei != nil {
|
||||
sl := ei.(*outsie).sl
|
||||
if sl.Count() == 0 {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return fmt.Errorf("Interest still present")
|
||||
})
|
||||
|
||||
// Now send a message, it won't be sent because A received an RS-
|
||||
@@ -1937,10 +2023,10 @@ func TestGatewaySendRemoteQSubs(t *testing.T) {
|
||||
// Check qsubs interest should be empty
|
||||
checkFor(t, time.Second, 15*time.Millisecond, func() error {
|
||||
gw := sa.getOutboundGatewayConnection("B").gw
|
||||
if sli, _ := gw.qsubsInterest.Load(globalAccountName); sli != nil {
|
||||
return fmt.Errorf("Interest still present")
|
||||
if ei, _ := gw.outsim.Load(globalAccountName); ei == nil {
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
return fmt.Errorf("Interest still present")
|
||||
})
|
||||
}
|
||||
|
||||
@@ -2093,12 +2179,7 @@ func TestGatewayComplexSetup(t *testing.T) {
|
||||
natsFlush(t, ncsb2)
|
||||
checkExpectedSubs(t, 1, sb2)
|
||||
|
||||
// TODO(ik): When server sa1 that has inbound from sb1 gets a local qsub on foo - bar,
|
||||
// it sends it, but when sa1 gets a remote qsub on foo - bar, it send it too.
|
||||
// We lack the optimization to suppress the remote sending RS+ since we already have
|
||||
// sent for local. So expected count here is 2. If we later optimize, use "1" here
|
||||
// instead.
|
||||
checkForRegisteredQSubInterest(t, sb1, "A", globalAccountName, "foo", 2, time.Second)
|
||||
checkForRegisteredQSubInterest(t, sb1, "A", globalAccountName, "foo", 1, time.Second)
|
||||
|
||||
// Publish all messages. The queue sub on cluster B should receive all
|
||||
// messages.
|
||||
@@ -2487,8 +2568,11 @@ func TestGatewaySendQSubsBufSize(t *testing.T) {
|
||||
|
||||
// Make sure we have the 4 we expected
|
||||
c := s1.getOutboundGatewayConnection("B")
|
||||
qsi, _ := c.gw.qsubsInterest.Load(globalAccountName)
|
||||
sl := qsi.(*Sublist)
|
||||
ei, _ := c.gw.outsim.Load(globalAccountName)
|
||||
if ei == nil {
|
||||
t.Fatalf("No interest found")
|
||||
}
|
||||
sl := ei.(*outsie).sl
|
||||
r := sl.Match("foo")
|
||||
if len(r.qsubs) != 4 {
|
||||
t.Fatalf("Expected 4 groups, got %v", len(r.qsubs))
|
||||
@@ -2577,6 +2661,273 @@ func TestGatewayRaceBetweenPubAndSub(t *testing.T) {
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestGatewaySendAllSubs(t *testing.T) {
|
||||
gatewayMaxRUnsubBeforeSwitch = 100
|
||||
defer func() { gatewayMaxRUnsubBeforeSwitch = defaultGatewayMaxRUnsubBeforeSwitch }()
|
||||
|
||||
ob := testDefaultOptionsForGateway("B")
|
||||
sb := runGatewayServer(ob)
|
||||
defer sb.Shutdown()
|
||||
|
||||
oa := testGatewayOptionsFromToWithServers(t, "A", "B", sb)
|
||||
sa := runGatewayServer(oa)
|
||||
defer sa.Shutdown()
|
||||
|
||||
oc := testGatewayOptionsFromToWithServers(t, "C", "B", sb)
|
||||
sc := runGatewayServer(oc)
|
||||
defer sc.Shutdown()
|
||||
|
||||
waitForOutboundGateways(t, sa, 2, time.Second)
|
||||
waitForOutboundGateways(t, sb, 2, time.Second)
|
||||
waitForOutboundGateways(t, sc, 2, time.Second)
|
||||
waitForInboundGateways(t, sa, 2, time.Second)
|
||||
waitForInboundGateways(t, sb, 2, time.Second)
|
||||
waitForInboundGateways(t, sc, 2, time.Second)
|
||||
|
||||
// On B, create a sub that will reply to requests
|
||||
bURL := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port)
|
||||
ncB := natsConnect(t, bURL)
|
||||
defer ncB.Close()
|
||||
natsSub(t, ncB, "foo", func(m *nats.Msg) {
|
||||
ncB.Publish(m.Reply, m.Data)
|
||||
})
|
||||
natsFlush(t, ncB)
|
||||
checkExpectedSubs(t, 1, sb)
|
||||
|
||||
// On C, have some delayed activity while it receives
|
||||
// unwanted messages and switches to sendAllSubs.
|
||||
cURL := fmt.Sprintf("nats://%s:%d", oc.Host, oc.Port)
|
||||
ncC := natsConnect(t, cURL)
|
||||
defer ncC.Close()
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(2)
|
||||
done := make(chan bool)
|
||||
consCount := 0
|
||||
accsCount := 0
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for i := 0; ; i++ {
|
||||
// Create subs and qsubs on same subject
|
||||
natsSub(t, ncC, fmt.Sprintf("foo.%d", i+1), func(_ *nats.Msg) {})
|
||||
natsQueueSub(t, ncC, fmt.Sprintf("foo.%d", i+1), fmt.Sprintf("bar.%d", i+1), func(_ *nats.Msg) {})
|
||||
// Create psubs and qsubs on unique subjects
|
||||
natsSub(t, ncC, fmt.Sprintf("foox.%d", i+1), func(_ *nats.Msg) {})
|
||||
natsQueueSub(t, ncC, fmt.Sprintf("fooy.%d", i+1), fmt.Sprintf("bar.%d", i+1), func(_ *nats.Msg) {})
|
||||
consCount += 4
|
||||
// Register account
|
||||
sc.RegisterAccount(fmt.Sprintf("acc.%d", i+1))
|
||||
accsCount++
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
case <-time.After(15 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
for {
|
||||
for i := 0; i < 10; i++ {
|
||||
natsPub(t, ncB, fmt.Sprintf("foo.%d", i+1), []byte("hello"))
|
||||
}
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
case <-time.After(5 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// From A, send a lot of requests.
|
||||
aURL := fmt.Sprintf("nats://%s:%d", oa.Host, oa.Port)
|
||||
ncA := natsConnect(t, aURL)
|
||||
defer ncA.Close()
|
||||
// TODO(ik): May need to change that if we change the threshold
|
||||
// for when the switch happens.
|
||||
total := 300
|
||||
for i := 0; i < total; i++ {
|
||||
req := fmt.Sprintf("%d", i)
|
||||
reply, err := ncA.Request("foo", []byte(req), time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Error waiting for reply: %v", err)
|
||||
}
|
||||
if string(reply.Data) != req {
|
||||
t.Fatalf("Expected reply %q, got %q", req, reply.Data)
|
||||
}
|
||||
}
|
||||
close(done)
|
||||
|
||||
// Normally, C would receive a message for each req inbox and
|
||||
// would send and RS- on that to B, making both have an unbounded
|
||||
// growth of the no-interest map. But after a certain amount
|
||||
// of RS-, C will send all its sub for the given account and
|
||||
// instruct B to send only if there is explicit interest.
|
||||
checkFor(t, 2*time.Second, 50*time.Millisecond, func() error {
|
||||
// Check C inbound connection from B
|
||||
c := sc.getInboundGatewayConnection("B")
|
||||
c.mu.Lock()
|
||||
var switchedMode bool
|
||||
e := c.gw.insim[globalAccountName]
|
||||
if e != nil {
|
||||
switchedMode = e.ni == nil && e.mode == modeInterestOnly
|
||||
}
|
||||
c.mu.Unlock()
|
||||
if !switchedMode {
|
||||
return fmt.Errorf("C has still not switched mode")
|
||||
}
|
||||
switchedMode = false
|
||||
// Now check B outbound connection to C
|
||||
c = sb.getOutboundGatewayConnection("C")
|
||||
ei, _ := c.gw.outsim.Load(globalAccountName)
|
||||
if ei != nil {
|
||||
e := ei.(*outsie)
|
||||
e.RLock()
|
||||
switchedMode = e.ni == nil && e.mode == modeInterestOnly
|
||||
e.RUnlock()
|
||||
}
|
||||
if !switchedMode {
|
||||
return fmt.Errorf("C has still not switched mode")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
wg.Wait()
|
||||
|
||||
// Check consCount and accsCount on C
|
||||
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
|
||||
sc.gateway.pasi.Lock()
|
||||
scount := len(sc.gateway.pasi.m[globalAccountName])
|
||||
sc.gateway.pasi.Unlock()
|
||||
if scount != consCount {
|
||||
return fmt.Errorf("Expected %v consumers for global account, got %v", consCount, scount)
|
||||
}
|
||||
sc.mu.Lock()
|
||||
acount := len(sc.accounts)
|
||||
sc.mu.Unlock()
|
||||
if acount != accsCount+1 {
|
||||
return fmt.Errorf("Expected %v accounts, got %v", accsCount+1, acount)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
for i := 0; i < total; i++ {
|
||||
req := fmt.Sprintf("%d", i)
|
||||
reply, err := ncA.Request("foo", []byte(req), time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Error waiting for reply: %v", err)
|
||||
}
|
||||
if string(reply.Data) != req {
|
||||
t.Fatalf("Expected reply %q, got %q", req, reply.Data)
|
||||
}
|
||||
}
|
||||
|
||||
// Also, after all that, if a sub is created on C, it should
|
||||
// be sent to B (but not A). Check that this is the case.
|
||||
// So first send from A on the subject that we are going to
|
||||
// use for this new sub.
|
||||
natsPub(t, ncA, "newsub", []byte("hello"))
|
||||
natsFlush(t, ncA)
|
||||
aOutboundToC := sa.getOutboundGatewayConnection("C")
|
||||
checkForNoInterest(t, aOutboundToC, globalAccountName, "newsub", true, 2*time.Second)
|
||||
|
||||
newSubSub := natsSub(t, ncC, "newsub", func(_ *nats.Msg) {})
|
||||
natsFlush(t, ncC)
|
||||
checkExpectedSubs(t, consCount+1)
|
||||
checkFor(t, time.Second, 15*time.Millisecond, func() error {
|
||||
c := sb.getOutboundGatewayConnection("C")
|
||||
ei, _ := c.gw.outsim.Load(globalAccountName)
|
||||
if ei != nil {
|
||||
sl := ei.(*outsie).sl
|
||||
r := sl.Match("newsub")
|
||||
if len(r.psubs) == 1 {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("Newsub not registered on B")
|
||||
})
|
||||
checkForNoInterest(t, aOutboundToC, globalAccountName, "newsub", false, 2*time.Second)
|
||||
|
||||
natsUnsub(t, newSubSub)
|
||||
natsFlush(t, ncC)
|
||||
checkExpectedSubs(t, consCount)
|
||||
checkFor(t, time.Second, 15*time.Millisecond, func() error {
|
||||
c := sb.getOutboundGatewayConnection("C")
|
||||
ei, _ := c.gw.outsim.Load(globalAccountName)
|
||||
if ei != nil {
|
||||
sl := ei.(*outsie).sl
|
||||
r := sl.Match("newsub")
|
||||
if len(r.psubs) == 0 {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("Newsub still registered on B")
|
||||
})
|
||||
}
|
||||
|
||||
func TestGatewaySendAllSubsBadProtocol(t *testing.T) {
|
||||
ob := testDefaultOptionsForGateway("B")
|
||||
sb := runGatewayServer(ob)
|
||||
defer sb.Shutdown()
|
||||
|
||||
oa := testGatewayOptionsFromToWithServers(t, "A", "B", sb)
|
||||
sa := runGatewayServer(oa)
|
||||
defer sa.Shutdown()
|
||||
|
||||
waitForOutboundGateways(t, sa, 1, time.Second)
|
||||
waitForOutboundGateways(t, sb, 1, time.Second)
|
||||
waitForInboundGateways(t, sa, 1, time.Second)
|
||||
waitForInboundGateways(t, sb, 1, time.Second)
|
||||
|
||||
c := sa.getInboundGatewayConnection("B")
|
||||
// Mock an invalid protocol (account name missing)
|
||||
info := &Info{
|
||||
Gateway: "A",
|
||||
GatewayCmd: gatewayCmdAllSubsStart,
|
||||
}
|
||||
b, _ := json.Marshal(info)
|
||||
c.mu.Lock()
|
||||
c.sendProto([]byte(fmt.Sprintf("INFO %s\r\n", b)), true)
|
||||
c.mu.Unlock()
|
||||
|
||||
orgConn := c
|
||||
checkFor(t, 3*time.Second, 100*time.Millisecond, func() error {
|
||||
curConn := sa.getInboundGatewayConnection("B")
|
||||
if orgConn == curConn {
|
||||
return fmt.Errorf("Not reconnected")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
waitForOutboundGateways(t, sa, 1, 2*time.Second)
|
||||
waitForOutboundGateways(t, sb, 1, 2*time.Second)
|
||||
|
||||
// Refresh
|
||||
c = sa.getInboundGatewayConnection("B")
|
||||
// Do correct start
|
||||
info.GatewayCmdPayload = []byte(globalAccountName)
|
||||
b, _ = json.Marshal(info)
|
||||
c.mu.Lock()
|
||||
c.sendProto([]byte(fmt.Sprintf("INFO %s\r\n", b)), true)
|
||||
c.mu.Unlock()
|
||||
// But incorrect end.
|
||||
info.GatewayCmd = gatewayCmdAllSubsComplete
|
||||
info.GatewayCmdPayload = nil
|
||||
b, _ = json.Marshal(info)
|
||||
c.mu.Lock()
|
||||
c.sendProto([]byte(fmt.Sprintf("INFO %s\r\n", b)), true)
|
||||
c.mu.Unlock()
|
||||
|
||||
orgConn = c
|
||||
checkFor(t, 3*time.Second, 100*time.Millisecond, func() error {
|
||||
curConn := sa.getInboundGatewayConnection("B")
|
||||
if orgConn == curConn {
|
||||
return fmt.Errorf("Not reconnected")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
/*
|
||||
func TestGatewayPermissions(t *testing.T) {
|
||||
bo := testDefaultOptionsForGateway("B")
|
||||
|
||||
@@ -395,7 +395,7 @@ func (c *client) parse(buf []byte) error {
|
||||
case ROUTER:
|
||||
err = c.processRemoteSub(arg)
|
||||
case GATEWAY:
|
||||
err = c.processGatewaySubjectSub(arg)
|
||||
err = c.processGatewayRSub(arg)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -487,7 +487,7 @@ func (c *client) parse(buf []byte) error {
|
||||
case ROUTER:
|
||||
err = c.processRemoteUnsub(arg)
|
||||
case GATEWAY:
|
||||
err = c.processGatewaySubjectUnsub(arg)
|
||||
err = c.processGatewayRUnsub(arg)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -657,6 +657,9 @@ func (c *client) removeRemoteSubs() {
|
||||
} else {
|
||||
ase.subs = append(ase.subs, sub)
|
||||
}
|
||||
if srv.gateway.enabled {
|
||||
srv.gatewayUpdateSubInterest(accountName, sub, -1)
|
||||
}
|
||||
}
|
||||
|
||||
// Now remove the subs by batch for each account sublist.
|
||||
@@ -715,7 +718,7 @@ func (c *client) processRemoteUnsub(arg []byte) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
sendToGWs := false
|
||||
updateGWs := false
|
||||
// We store local subs by account and subject and optionally queue name.
|
||||
// RS- will have the arg exactly as the key.
|
||||
key := string(arg)
|
||||
@@ -724,13 +727,12 @@ func (c *client) processRemoteUnsub(arg []byte) (err error) {
|
||||
delete(c.subs, key)
|
||||
acc.sl.Remove(sub)
|
||||
c.removeReplySubTimeout(sub)
|
||||
// Send only for queue subs
|
||||
sendToGWs = srv.gateway.enabled && sub.queue != nil
|
||||
updateGWs = srv.gateway.enabled
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
if sendToGWs {
|
||||
srv.sendQueueUnsubToGateways(accountName, sub, true)
|
||||
if updateGWs {
|
||||
srv.gatewayUpdateSubInterest(accountName, sub, -1)
|
||||
}
|
||||
|
||||
if c.opts.Verbose {
|
||||
@@ -809,7 +811,7 @@ func (c *client) processRemoteSub(argo []byte) (err error) {
|
||||
}
|
||||
key := string(sub.sid)
|
||||
osub := c.subs[key]
|
||||
sendToGWs := false
|
||||
updateGWs := false
|
||||
if osub == nil {
|
||||
c.subs[string(key)] = sub
|
||||
// Now place into the account sl.
|
||||
@@ -820,7 +822,7 @@ func (c *client) processRemoteSub(argo []byte) (err error) {
|
||||
c.sendErr("Invalid Subscription")
|
||||
return nil
|
||||
}
|
||||
sendToGWs = srv.gateway.enabled
|
||||
updateGWs = srv.gateway.enabled
|
||||
} else if sub.queue != nil {
|
||||
// For a queue we need to update the weight.
|
||||
atomic.StoreInt32(&osub.qw, sub.qw)
|
||||
@@ -831,18 +833,8 @@ func (c *client) processRemoteSub(argo []byte) (err error) {
|
||||
if c.opts.Verbose {
|
||||
c.sendOK()
|
||||
}
|
||||
if sendToGWs {
|
||||
// For queue subs, we will send an RS+, but if we are here, we
|
||||
// know there is a single qsub per account/subject/queue:
|
||||
// sendToGWs is true only if we did not find that key before.
|
||||
if sub.queue != nil {
|
||||
srv.sendQueueSubToGateways(acc.Name, sub, true)
|
||||
} else {
|
||||
// For a plain sub, this will send an RS+ to gateways only if
|
||||
// we had previously sent an RS-. In other words, we don't send
|
||||
// an RS+ per plain sub.
|
||||
srv.endSubjectNoInterestForGateways(acc.Name, sub)
|
||||
}
|
||||
if updateGWs {
|
||||
srv.gatewayUpdateSubInterest(acc.Name, sub, 1)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -1260,11 +1252,8 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del
|
||||
}
|
||||
|
||||
// We always update for a queue subscriber since we need to send our relative weight.
|
||||
var (
|
||||
entry *rme
|
||||
ok bool
|
||||
added bool
|
||||
)
|
||||
var entry *rme
|
||||
var ok bool
|
||||
|
||||
// Always update if a queue subscriber.
|
||||
update := qi > 0
|
||||
@@ -1283,7 +1272,6 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del
|
||||
entry = &rme{qi, delta}
|
||||
rm[string(key)] = entry
|
||||
update = true // Adding for normal sub means update.
|
||||
added = true
|
||||
}
|
||||
if entry != nil {
|
||||
entryN = entry.n
|
||||
@@ -1308,24 +1296,8 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del
|
||||
// subscribes with a smaller weight.
|
||||
if entryN > 0 {
|
||||
s.broadcastSubscribe(sub)
|
||||
// Here we want to send RS+ only when going from 0 to 1
|
||||
if s.gateway.enabled && added && entryN == 1 {
|
||||
// Always send for queues
|
||||
if sub.queue != nil {
|
||||
s.sendQueueSubToGateways(acc.Name, sub, false)
|
||||
} else {
|
||||
// If plain sub, send an RS+ only if we had previously
|
||||
// sent an RS-
|
||||
s.endSubjectNoInterestForGateways(acc.Name, sub)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
s.broadcastUnSubscribe(sub)
|
||||
// Last of the queue member of this group, so send to
|
||||
// gateways.
|
||||
if s.gateway.enabled && sub.queue != nil {
|
||||
s.sendQueueUnsubToGateways(acc.Name, sub, false)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -71,10 +71,11 @@ type Info struct {
|
||||
Export *SubjectPermission `json:"export,omitempty"`
|
||||
|
||||
// Gateways Specific
|
||||
Gateway string `json:"gateway,omitempty"` // Name of the origin Gateway (sent by gateway's INFO)
|
||||
GatewayURLs []string `json:"gateway_urls,omitempty"` // Gateway URLs in the originating cluster (sent by gateway's INFO)
|
||||
GatewayURL string `json:"gateway_url,omitempty"` // Gateway URL on that server (sent by route's INFO)
|
||||
GatewayCmd byte `json:"gateway_cmd,omitempty"` // Command code for the receiving server to know what to do
|
||||
Gateway string `json:"gateway,omitempty"` // Name of the origin Gateway (sent by gateway's INFO)
|
||||
GatewayURLs []string `json:"gateway_urls,omitempty"` // Gateway URLs in the originating cluster (sent by gateway's INFO)
|
||||
GatewayURL string `json:"gateway_url,omitempty"` // Gateway URL on that server (sent by route's INFO)
|
||||
GatewayCmd byte `json:"gateway_cmd,omitempty"` // Command code for the receiving server to know what to do
|
||||
GatewayCmdPayload []byte `json:"gateway_cmd_payload,omitempty"` // Command payload when needed
|
||||
}
|
||||
|
||||
// Server is our main struct.
|
||||
|
||||
@@ -249,14 +249,27 @@ func TestGatewaySubjectInterest(t *testing.T) {
|
||||
gAExpect(runsubRe)
|
||||
gASend("RMSG $foo bar 2\r\nok\r\n")
|
||||
gAExpect(runsubRe)
|
||||
// Now have client create sub on "*", this should cause RS+ on foo
|
||||
// and RS+ on bar.
|
||||
// Now have client create sub on "*", this should cause RS+ on *
|
||||
// The remote will have cleared its no-interest on foo and bar
|
||||
// and this receiving side is supposed to be doing the same.
|
||||
clientSend("SUB * 5\r\nPING\r\n")
|
||||
clientExpect(pongRe)
|
||||
expectNumberOfProtos(t, gAExpect, rsubRe, 2)
|
||||
buf := gAExpect(rsubRe)
|
||||
if !bytes.Contains(buf, []byte("$foo *")) {
|
||||
t.Fatalf("Expected RS+ on %q, got %q", "*", buf)
|
||||
}
|
||||
// Check that the remote has cleared by sending from the client
|
||||
// on foo and bar
|
||||
clientSend("PUB foo 2\r\nok\r\n")
|
||||
clientExpect(msgRe)
|
||||
clientSend("PUB bar 2\r\nok\r\n")
|
||||
clientExpect(msgRe)
|
||||
// Check that A can send too and does not receive an RS-
|
||||
gASend("RMSG $foo foo 2\r\nok\r\n")
|
||||
expectNothing(t, gA)
|
||||
clientExpect(msgRe)
|
||||
gASend("RMSG $foo bar 2\r\nok\r\n")
|
||||
expectNothing(t, gA)
|
||||
clientExpect(msgRe)
|
||||
}
|
||||
|
||||
@@ -361,3 +374,29 @@ func TestGatewayQueue(t *testing.T) {
|
||||
}
|
||||
expectNothing(t, gA)
|
||||
}
|
||||
|
||||
func TestGatewaySendAllSubs(t *testing.T) {
|
||||
ob := testDefaultOptionsForGateway("B")
|
||||
sb := runGatewayServer(ob)
|
||||
defer sb.Shutdown()
|
||||
|
||||
gA := createGatewayConn(t, ob.Gateway.Host, ob.Gateway.Port)
|
||||
defer gA.Close()
|
||||
|
||||
gASend, gAExpect := setupGatewayConn(t, gA, "A", "B")
|
||||
gASend("PING\r\n")
|
||||
gAExpect(pongRe)
|
||||
|
||||
// Bombard B with messages on different subjects.
|
||||
// TODO(ik): Adapt if/when we change the conditions for the
|
||||
// switch.
|
||||
for i := 0; i < 10001; i++ {
|
||||
gASend(fmt.Sprintf("RMSG $G foo.%d 2\r\nok\r\n", i))
|
||||
if i < 1000 {
|
||||
gAExpect(runsubRe)
|
||||
}
|
||||
}
|
||||
// Since B has no sub, we should get 2 INFOs with start/end
|
||||
// commands.
|
||||
expectNumberOfProtos(t, gAExpect, infoRe, 2)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user