From 944dd248c47e9247f9d15f61603c1ec297bb0971 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 13 Aug 2021 11:08:42 -0700 Subject: [PATCH] Fix for tests Signed-off-by: Derek Collison --- .travis.yml | 4 +++ server/events.go | 2 +- server/events_test.go | 48 +++++++++++++++++++------------- server/jetstream_cluster_test.go | 5 ++-- server/leafnode_test.go | 34 +++++++++++----------- server/monitor_test.go | 4 +-- server/routes_test.go | 4 +-- test/gateway_test.go | 2 +- test/leafnode_test.go | 30 ++++++++++---------- 9 files changed, 74 insertions(+), 59 deletions(-) diff --git a/.travis.yml b/.travis.yml index 208aa62f..209910cf 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,10 @@ +os: linux dist: focal +vm: + size: 2x-large + language: go go: - 1.16.x diff --git a/server/events.go b/server/events.go index 25fa80db..c80d9a60 100644 --- a/server/events.go +++ b/server/events.go @@ -801,7 +801,7 @@ func (s *Server) initEventTracking() { if acc, err := extractAccount(subject); err != nil { return nil, err } else { - if ci, _, _, _, err := c.srv.getRequestInfo(c, msg); err == nil { + if ci, _, _, _, err := c.srv.getRequestInfo(c, msg); err == nil && ci.Account != _EMPTY_ { // Make sure the accounts match. if ci.Account != acc { // Do not leak too much here. diff --git a/server/events_test.go b/server/events_test.go index abdd0a7b..f0b9df99 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -1256,8 +1256,8 @@ func TestAccountReqMonitoring(t *testing.T) { // query SUBSZ for account if resp, err := ncSys.Request(subsz, nil, time.Second); err != nil { t.Fatalf("Error on request: %v", err) - } else if !strings.Contains(string(resp.Data), `"num_subscriptions":1,`) { - t.Fatalf("unexpected subs count (expected 1): %v", string(resp.Data)) + } else if !strings.Contains(string(resp.Data), `"num_subscriptions":3,`) { + t.Fatalf("unexpected subs count (expected 3): %v", string(resp.Data)) } // create a subscription if sub, err := nc.Subscribe("foo", func(msg *nats.Msg) {}); err != nil { @@ -1269,8 +1269,8 @@ func TestAccountReqMonitoring(t *testing.T) { // query SUBSZ for account if resp, err := ncSys.Request(subsz, nil, time.Second); err != nil { t.Fatalf("Error on request: %v", err) - } else if !strings.Contains(string(resp.Data), `"num_subscriptions":2,`) { - t.Fatalf("unexpected subs count (expected 2): %v", string(resp.Data)) + } else if !strings.Contains(string(resp.Data), `"num_subscriptions":4,`) { + t.Fatalf("unexpected subs count (expected 4): %v", string(resp.Data)) } else if !strings.Contains(string(resp.Data), `"subject":"foo"`) { t.Fatalf("expected subscription foo: %v", string(resp.Data)) } @@ -1362,15 +1362,15 @@ func TestAccountReqInfo(t *testing.T) { t.Fatalf("Unmarshalling failed: %v", err) } else if len(info.Exports) != 1 { t.Fatalf("Unexpected value: %v", info.Exports) - } else if len(info.Imports) != 0 { - t.Fatalf("Unexpected value: %v", info.Imports) + } else if len(info.Imports) != 2 { + t.Fatalf("Unexpected value: %+v", info.Imports) } else if info.Exports[0].Subject != "req.*" { t.Fatalf("Unexpected value: %v", info.Exports) } else if info.Exports[0].Type != jwt.Service { t.Fatalf("Unexpected value: %v", info.Exports) } else if info.Exports[0].ResponseType != jwt.ResponseTypeSingleton { t.Fatalf("Unexpected value: %v", info.Exports) - } else if info.SubCnt != 0 { + } else if info.SubCnt != 2 { t.Fatalf("Unexpected value: %v", info.SubCnt) } else { checkCommon(&info, &srv, pub1, ajwt1) @@ -1383,16 +1383,26 @@ func TestAccountReqInfo(t *testing.T) { t.Fatalf("Unmarshalling failed: %v", err) } else if len(info.Exports) != 0 { t.Fatalf("Unexpected value: %v", info.Exports) - } else if len(info.Imports) != 1 { - t.Fatalf("Unexpected value: %v", info.Imports) - } else if info.Imports[0].Subject != "req.1" { - t.Fatalf("Unexpected value: %v", info.Exports) - } else if info.Imports[0].Type != jwt.Service { - t.Fatalf("Unexpected value: %v", info.Exports) - } else if info.Imports[0].Account != pub1 { - t.Fatalf("Unexpected value: %v", info.Exports) - } else if info.SubCnt != 1 { - t.Fatalf("Unexpected value: %v", info.SubCnt) + } else if len(info.Imports) != 3 { + t.Fatalf("Unexpected value: %+v", info.Imports) + } + // Here we need to find our import + var si *ExtImport + for _, im := range info.Imports { + if im.Subject == "req.1" { + si = &im + break + } + } + if si == nil { + t.Fatalf("Could not find our import") + } + if si.Type != jwt.Service { + t.Fatalf("Unexpected value: %+v", si) + } else if si.Account != pub1 { + t.Fatalf("Unexpected value: %+v", si) + } else if info.SubCnt != 3 { + t.Fatalf("Unexpected value: %+v", si) } else { checkCommon(&info, &srv, pub2, ajwt2) } @@ -1449,7 +1459,7 @@ func TestAccountClaimsUpdatesWithServiceImports(t *testing.T) { } nc.Flush() - if startSubs != s.NumSubscriptions() { + if startSubs < s.NumSubscriptions() { t.Fatalf("Subscriptions leaked: %d vs %d", startSubs, s.NumSubscriptions()) } } @@ -1594,7 +1604,7 @@ func TestSystemAccountWithGateways(t *testing.T) { // If this tests fails with wrong number after 10 seconds we may have // added a new inititial subscription for the eventing system. - checkExpectedSubs(t, 37, sa) + checkExpectedSubs(t, 39, sa) // Create a client on B and see if we receive the event urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 314e119e..4816877c 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -3261,8 +3261,8 @@ func TestJetStreamClusterExtendedAccountInfo(t *testing.T) { if ai.Streams != 3 || ai.Consumers != 3 { t.Fatalf("AccountInfo not correct: %+v", ai) } - if ai.API.Total < 8 { - t.Fatalf("Expected at least 8 total API calls, got %d", ai.API.Total) + if ai.API.Total < 7 { + t.Fatalf("Expected at least 7 total API calls, got %d", ai.API.Total) } // Now do a failure to make sure we track API errors. @@ -8930,4 +8930,5 @@ func (c *cluster) stableTotalSubs() (total int) { return fmt.Errorf("Still stabilizing") }) return nsubs + } diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 72c325e3..d3121449 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -1285,8 +1285,8 @@ func TestLeafNodePermissions(t *testing.T) { // Create a sub on ">" on LN1 subAll := natsSubSync(t, nc1, ">") - // this should be registered in LN2 (there is 1 sub for LN1 $LDS subject) - checkSubs(ln2.globalAccount(), 2) + // this should be registered in LN2 (there is 1 sub for LN1 $LDS subject) + SYS IMPORTS + checkSubs(ln2.globalAccount(), 6) // Check deny export clause from messages published from LN2 for _, test := range []struct { @@ -1312,8 +1312,8 @@ func TestLeafNodePermissions(t *testing.T) { } subAll.Unsubscribe() - // Goes down to 1 (the $LDS one) - checkSubs(ln2.globalAccount(), 1) + // Goes down by 1. + checkSubs(ln2.globalAccount(), 5) // We used to make sure we would not do subscriptions however that // was incorrect. We need to check publishes, not the subscriptions. @@ -1338,7 +1338,7 @@ func TestLeafNodePermissions(t *testing.T) { } { t.Run(test.name, func(t *testing.T) { sub := natsSubSync(t, nc2, test.subSubject) - checkSubs(ln2.globalAccount(), 2) + checkSubs(ln2.globalAccount(), 6) if !test.ok { nc1.Publish(test.pubSubject, []byte("msg")) @@ -1346,12 +1346,12 @@ func TestLeafNodePermissions(t *testing.T) { t.Fatalf("Did not expect to get the message") } } else { - checkSubs(ln1.globalAccount(), 2) + checkSubs(ln1.globalAccount(), 6) nc1.Publish(test.pubSubject, []byte("msg")) natsNexMsg(t, sub, time.Second) } sub.Unsubscribe() - checkSubs(ln1.globalAccount(), 1) + checkSubs(ln1.globalAccount(), 5) }) } @@ -1472,8 +1472,8 @@ func TestLeafNodeExportPermissionsNotForSpecialSubs(t *testing.T) { // The deny is totally restrictive, but make sure that we still accept the $LDS, $GR and _GR_ go from LN1. checkFor(t, time.Second, 15*time.Millisecond, func() error { // We should have registered the 3 subs from the accepting leafnode. - if n := ln2.globalAccount().TotalSubs(); n != 3 { - return fmt.Errorf("Expected %d subs, got %v", 3, n) + if n := ln2.globalAccount().TotalSubs(); n != 7 { + return fmt.Errorf("Expected %d subs, got %v", 7, n) } return nil }) @@ -3269,8 +3269,8 @@ func TestLeafNodeRouteSubWithOrigin(t *testing.T) { r1.Shutdown() checkFor(t, time.Second, 15*time.Millisecond, func() error { acc := l2.GlobalAccount() - if n := acc.TotalSubs(); n != 0 { - return fmt.Errorf("Account %q should have 0 sub, got %v", acc.GetName(), n) + if n := acc.TotalSubs(); n != 2 { + return fmt.Errorf("Account %q should have 2 subs, got %v", acc.GetName(), n) } return nil }) @@ -3720,9 +3720,9 @@ func TestLeafNodeUniqueServerNameCrossJSDomain(t *testing.T) { listen: -1 server_name: %s jetstream { - max_mem_store: 256MB, - max_file_store: 2GB, - store_dir: "%s", + max_mem_store: 256MB, + max_file_store: 2GB, + store_dir: "%s", domain: hub } accounts { @@ -3737,9 +3737,9 @@ func TestLeafNodeUniqueServerNameCrossJSDomain(t *testing.T) { listen: -1 server_name: %s jetstream { - max_mem_store: 256MB, - max_file_store: 2GB, - store_dir: "%s", + max_mem_store: 256MB, + max_file_store: 2GB, + store_dir: "%s", domain: %s } accounts { diff --git a/server/monitor_test.go b/server/monitor_test.go index b4de3d8d..cd163c01 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -3784,8 +3784,8 @@ func TestMonitorLeafz(t *testing.T) { t.Fatalf("RTT not tracked?") } // LDS should be only one. - if ln.NumSubs != 1 || len(ln.Subs) != 1 { - t.Fatalf("Expected 1 sub, got %v (%v)", ln.NumSubs, ln.Subs) + if ln.NumSubs != 3 || len(ln.Subs) != 3 { + t.Fatalf("Expected 3 subs, got %v (%v)", ln.NumSubs, ln.Subs) } } } diff --git a/server/routes_test.go b/server/routes_test.go index be08a9bc..4c1771a2 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -1095,8 +1095,8 @@ func TestRouteNoCrashOnAddingSubToRoute(t *testing.T) { // Make sure all subs are registered in s. checkFor(t, time.Second, 15*time.Millisecond, func() error { - if s.globalAccount().TotalSubs() != int(numRoutes) { - return fmt.Errorf("Not all %v routed subs were registered", numRoutes) + if ts := s.globalAccount().TotalSubs() - 2; ts != int(numRoutes) { + return fmt.Errorf("Not all %d routed subs were registered: %d", numRoutes, ts) } return nil }) diff --git a/test/gateway_test.go b/test/gateway_test.go index efef6065..96948720 100644 --- a/test/gateway_test.go +++ b/test/gateway_test.go @@ -511,7 +511,7 @@ func TestGatewayNoAccUnsubAfterQSub(t *testing.T) { // Simulate a client connecting to A and publishing a message // so we get an A- from B since there is no interest. gASend("RMSG $G foo 2\r\nok\r\n") - gAExpect(aunsubRe) + gAExpect(runsubRe) // Now create client on B and create queue sub. client := createClientConn(t, ob.Host, ob.Port) diff --git a/test/leafnode_test.go b/test/leafnode_test.go index 7207d131..03c088cf 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -440,7 +440,7 @@ func TestLeafNodeAndRoutes(t *testing.T) { lc := createLeafConn(t, optsA.LeafNode.Host, optsA.LeafNode.Port) defer lc.Close() - leafSend, leafExpect := setupLeaf(t, lc, 1) + leafSend, leafExpect := setupLeaf(t, lc, 3) leafSend("PING\r\n") leafExpect(pongRe) @@ -834,7 +834,7 @@ func TestLeafNodeGatewaySendsSystemEvent(t *testing.T) { defer lc.Close() // This is for our global responses since we are setting up GWs above. - leafSend, leafExpect := setupLeaf(t, lc, 3) + leafSend, leafExpect := setupLeaf(t, lc, 5) leafSend("PING\r\n") leafExpect(pongRe) @@ -878,14 +878,14 @@ func TestLeafNodeGatewayInterestPropagation(t *testing.T) { buf := leafExpect(infoRe) buf = infoRe.ReplaceAll(buf, []byte(nil)) foundFoo := false - for count := 0; count != 5; { + for count := 0; count != 7; { // skip first time if we still have data (buf from above may already have some left) if count != 0 || len(buf) == 0 { buf = append(buf, leafExpect(anyRe)...) } count += len(lsubRe.FindAllSubmatch(buf, -1)) - if count > 5 { - t.Fatalf("Expected %v matches, got %v (buf=%s)", 4, count, buf) + if count > 7 { + t.Fatalf("Expected %v matches, got %v (buf=%s)", 7, count, buf) } if strings.Contains(string(buf), "foo") { foundFoo = true @@ -937,7 +937,7 @@ func TestLeafNodeWithRouteAndGateway(t *testing.T) { defer lc.Close() // This is for our global responses since we are setting up GWs above. - leafSend, leafExpect := setupLeaf(t, lc, 3) + leafSend, leafExpect := setupLeaf(t, lc, 5) leafSend("PING\r\n") leafExpect(pongRe) @@ -996,7 +996,7 @@ func TestLeafNodeWithGatewaysAndStaggeredStart(t *testing.T) { lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port) defer lc.Close() - leafSend, leafExpect := setupLeaf(t, lc, 3) + leafSend, leafExpect := setupLeaf(t, lc, 5) leafSend("PING\r\n") leafExpect(pongRe) @@ -1036,7 +1036,7 @@ func TestLeafNodeWithGatewaysServerRestart(t *testing.T) { lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port) defer lc.Close() - leafSend, leafExpect := setupLeaf(t, lc, 3) + leafSend, leafExpect := setupLeaf(t, lc, 5) leafSend("PING\r\n") leafExpect(pongRe) @@ -1070,7 +1070,7 @@ func TestLeafNodeWithGatewaysServerRestart(t *testing.T) { lc = createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port) defer lc.Close() - _, leafExpect = setupLeaf(t, lc, 3) + _, leafExpect = setupLeaf(t, lc, 5) // Now wait on GW solicit to fire time.Sleep(500 * time.Millisecond) @@ -1174,7 +1174,7 @@ func TestLeafNodeBasicAuth(t *testing.T) { leafExpect(infoRe) leafExpect(lsubRe) leafSend("PING\r\n") - leafExpect(pongRe) + expectResult(t, lc, pongRe) checkLeafNodeConnected(t, s) } @@ -1576,7 +1576,7 @@ func TestLeafNodeMultipleAccounts(t *testing.T) { // Wait for the subs to propagate. LDS + foo.test checkFor(t, 2*time.Second, 10*time.Millisecond, func() error { - if subs := s.NumSubscriptions(); subs < 2 { + if subs := s.NumSubscriptions(); subs < 4 { return fmt.Errorf("Number of subs is %d", subs) } return nil @@ -1760,7 +1760,7 @@ func TestLeafNodeExportsImports(t *testing.T) { // Wait for all subs to propagate. checkFor(t, time.Second, 10*time.Millisecond, func() error { - if subs := s.NumSubscriptions(); subs < 3 { + if subs := s.NumSubscriptions(); subs < 5 { return fmt.Errorf("Number of subs is %d", subs) } return nil @@ -1924,8 +1924,8 @@ func TestLeafNodeExportImportComplexSetup(t *testing.T) { // Wait for the sub to propagate to s2. LDS + subject above. checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { - if acc1.RoutedSubs() != 2 { - return fmt.Errorf("Still no routed subscription") + if acc1.RoutedSubs() != 4 { + return fmt.Errorf("Still no routed subscription: %d", acc1.RoutedSubs()) } return nil }) @@ -2510,7 +2510,7 @@ func TestLeafNodeSwitchGatewayToInterestModeOnly(t *testing.T) { defer lc.Close() // This is for our global responses since we are setting up GWs above. - leafSend, leafExpect := setupLeaf(t, lc, 3) + leafSend, leafExpect := setupLeaf(t, lc, 5) leafSend("PING\r\n") leafExpect(pongRe) }