mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
[FIXED] More than expected switch to Interest-Only mode for account
When an account is switched to interest-only mode due to no interest, it was not possible to switch that account more than once. But the function switchAccountToInterestMode() that triggers a switch could possibly doing it more than once. This should not cause problems but increased the number of traces in a big super cluster. Also fixed some flappers and a data race. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -1013,6 +1013,8 @@ func (c *client) flushOutbound() bool {
|
||||
attempted := c.out.pb
|
||||
apm := c.out.pm
|
||||
|
||||
// Capture this (we change the value in some tests)
|
||||
wdl := c.out.wdl
|
||||
// Do NOT hold lock during actual IO.
|
||||
c.mu.Unlock()
|
||||
|
||||
@@ -1020,7 +1022,7 @@ func (c *client) flushOutbound() bool {
|
||||
now := time.Now()
|
||||
// FIXME(dlc) - writev will do multiple IOs past 1024 on
|
||||
// most platforms, need to account for that with deadline?
|
||||
nc.SetWriteDeadline(now.Add(c.out.wdl))
|
||||
nc.SetWriteDeadline(now.Add(wdl))
|
||||
|
||||
// Actual write to the socket.
|
||||
n, err := nb.WriteTo(nc)
|
||||
|
||||
@@ -2847,9 +2847,11 @@ func getAccountFromGatewayCommand(c *client, info *Info, cmd string) string {
|
||||
// The client's lock is held on entry.
|
||||
// <Invoked from inbound connection's readLoop>
|
||||
func (c *client) gatewaySwitchAccountToSendAllSubs(e *insie, accName string) {
|
||||
// Set this map to nil so that the no-interest is
|
||||
// no longer checked.
|
||||
// Set this map to nil so that the no-interest is no longer checked.
|
||||
e.ni = nil
|
||||
// Switch mode to transitioning to prevent switchAccountToInterestMode
|
||||
// to possibly call this function multiple times.
|
||||
e.mode = Transitioning
|
||||
s := c.srv
|
||||
|
||||
remoteGWName := c.gw.name
|
||||
|
||||
@@ -5491,6 +5491,48 @@ func TestGatewayLogAccountInterestModeSwitch(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestGatewayAccountInterestModeSwitchOnlyOncePerAccount(t *testing.T) {
|
||||
ob := testDefaultOptionsForGateway("B")
|
||||
sb := runGatewayServer(ob)
|
||||
defer sb.Shutdown()
|
||||
|
||||
logB := &captureGWInterestSwitchLogger{}
|
||||
sb.SetLogger(logB, true, true)
|
||||
|
||||
nc := natsConnect(t, sb.ClientURL())
|
||||
defer nc.Close()
|
||||
natsSubSync(t, nc, "foo")
|
||||
natsQueueSubSync(t, nc, "bar", "baz")
|
||||
|
||||
oa := testGatewayOptionsFromToWithServers(t, "A", "B", sb)
|
||||
sa := runGatewayServer(oa)
|
||||
defer sa.Shutdown()
|
||||
|
||||
waitForOutboundGateways(t, sa, 1, 2*time.Second)
|
||||
waitForInboundGateways(t, sa, 1, 2*time.Second)
|
||||
waitForOutboundGateways(t, sb, 1, 2*time.Second)
|
||||
waitForInboundGateways(t, sb, 1, 2*time.Second)
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
total := 20
|
||||
wg.Add(total)
|
||||
for i := 0; i < total; i++ {
|
||||
go func() {
|
||||
sb.switchAccountToInterestMode(globalAccountName)
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
logB.Lock()
|
||||
nl := len(logB.imss)
|
||||
logB.Unlock()
|
||||
// There should be a trace for switching and when switch is complete
|
||||
if nl != 2 {
|
||||
t.Fatalf("Attempted to switch account too many times, number lines=%v", nl)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGatewaySingleOutbound(t *testing.T) {
|
||||
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
|
||||
@@ -438,7 +438,7 @@ func TestQueueDistributionAcrossRoutes(t *testing.T) {
|
||||
t.Fatalf("%v", err)
|
||||
}
|
||||
|
||||
send := 600
|
||||
send := 10000
|
||||
for i := 0; i < send; i++ {
|
||||
nc2.Publish("foo", nil)
|
||||
}
|
||||
@@ -470,7 +470,7 @@ func TestQueueDistributionAcrossRoutes(t *testing.T) {
|
||||
total, _, _ := qsubs[i].Pending()
|
||||
if total > avg+(avg*3/10) {
|
||||
if i == 8 {
|
||||
t.Fatalf("Qsub in 8th position gets majority of the messages (prior 6 spots) in this test")
|
||||
t.Fatalf("Qsub in 8th position gets majority of the messages (prior 6 spots) in this test")
|
||||
}
|
||||
t.Fatalf("Received too high, %d vs %d", total, avg)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user