diff --git a/server/client_test.go b/server/client_test.go index 2d3f8acb..a233d26c 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -135,6 +135,7 @@ func genAsyncParser(c *client) (func(string), chan bool) { var defaultServerOptions = Options{ Host: "127.0.0.1", + Port: -1, Trace: true, Debug: true, DisableShortFirstPing: true, @@ -199,7 +200,7 @@ func TestAsyncClientWithRunningServer(t *testing.T) { } func TestClientCreateAndInfo(t *testing.T) { - c, l := setUpClientWithResponse() + s, c, _, l := rawSetup(defaultServerOptions) defer c.close() if c.cid != 1 { @@ -221,14 +222,13 @@ func TestClientCreateAndInfo(t *testing.T) { // Sanity checks if info.MaxPayload != MAX_PAYLOAD_SIZE || info.AuthRequired || info.TLSRequired || - info.Port != DEFAULT_PORT { + int(info.Port) != s.opts.Port { t.Fatalf("INFO inconsistent: %+v\n", info) } } func TestClientNoResponderSupport(t *testing.T) { opts := defaultServerOptions - opts.Port = -1 s := New(&opts) c, _, _ := newClientForServer(s) @@ -258,7 +258,6 @@ func TestClientNoResponderSupport(t *testing.T) { func TestServerHeaderSupport(t *testing.T) { opts := defaultServerOptions - opts.Port = -1 s := New(&opts) c, _, l := newClientForServer(s) @@ -295,7 +294,6 @@ func TestServerHeaderSupport(t *testing.T) { // is bi-directional and functions properly. func TestClientHeaderSupport(t *testing.T) { opts := defaultServerOptions - opts.Port = -1 s := New(&opts) c, _, _ := newClientForServer(s) @@ -331,7 +329,6 @@ var hmsgPat = regexp.MustCompile(`HMSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n func TestClientHeaderDeliverMsg(t *testing.T) { opts := defaultServerOptions - opts.Port = -1 s := New(&opts) c, cr, _ := newClientForServer(s) @@ -375,7 +372,6 @@ var smsgPat = regexp.MustCompile(`^MSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n func TestClientHeaderDeliverStrippedMsg(t *testing.T) { opts := defaultServerOptions - opts.Port = -1 s := New(&opts) c, _, _ := newClientForServer(s) @@ -424,7 +420,6 @@ func TestClientHeaderDeliverStrippedMsg(t *testing.T) { func TestClientHeaderDeliverQueueSubStrippedMsg(t *testing.T) { opts := defaultServerOptions - opts.Port = -1 s := New(&opts) c, _, _ := newClientForServer(s) @@ -2543,6 +2538,7 @@ func TestClientClampMaxSubsErrReport(t *testing.T) { checkLeafNodeConnected(t, s2) nc := natsConnect(t, s2.ClientURL()) + defer nc.Close() natsSubSync(t, nc, "foo") natsSubSync(t, nc, "bar") diff --git a/server/gateway_test.go b/server/gateway_test.go index eb9e2a5c..4d4d6f7e 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -6456,3 +6456,98 @@ func TestGatewayAuthDiscovered(t *testing.T) { waitForInboundGateways(t, srvB, 1, time.Second) waitForOutboundGateways(t, srvB, 1, time.Second) } + +func TestTLSGatewaysCertificateImplicitAllowPass(t *testing.T) { + testTLSGatewaysCertificateImplicitAllow(t, true) +} + +func TestTLSGatewaysCertificateImplicitAllowFail(t *testing.T) { + testTLSGatewaysCertificateImplicitAllow(t, false) +} + +func testTLSGatewaysCertificateImplicitAllow(t *testing.T, pass bool) { + // Base config for the servers + cfg := createFile(t, "cfg") + defer removeFile(t, cfg.Name()) + cfg.WriteString(fmt.Sprintf(` + gateway { + tls { + cert_file = "../test/configs/certs/tlsauth/server.pem" + key_file = "../test/configs/certs/tlsauth/server-key.pem" + ca_file = "../test/configs/certs/tlsauth/ca.pem" + verify_cert_and_check_known_urls = true + insecure = %t + timeout = 1 + } + } + `, !pass)) // set insecure to skip verification on the outgoing end + if err := cfg.Sync(); err != nil { + t.Fatal(err) + } + + optsA := LoadConfig(cfg.Name()) + optsB := LoadConfig(cfg.Name()) + + urlA := "nats://localhost:9995" + urlB := "nats://localhost:9996" + if !pass { + urlA = "nats://127.0.0.1:9995" + urlB = "nats://127.0.0.1:9996" + } + + gwA, err := url.Parse(urlA) + if err != nil { + t.Fatal(err) + } + gwB, err := url.Parse(urlB) + if err != nil { + t.Fatal(err) + } + + optsA.Host = "127.0.0.1" + optsA.Port = -1 + optsA.Gateway.Name = "A" + optsA.Gateway.Port = 9995 + optsA.Gateway.resolver = &localhostResolver{} + + optsB.Host = "127.0.0.1" + optsB.Port = -1 + optsB.Gateway.Name = "B" + optsB.Gateway.Port = 9996 + optsB.Gateway.resolver = &localhostResolver{} + + gateways := make([]*RemoteGatewayOpts, 2) + gateways[0] = &RemoteGatewayOpts{ + Name: optsA.Gateway.Name, + URLs: []*url.URL{gwA}, + } + gateways[1] = &RemoteGatewayOpts{ + Name: optsB.Gateway.Name, + URLs: []*url.URL{gwB}, + } + + optsA.Gateway.Gateways = gateways + optsB.Gateway.Gateways = gateways + + SetGatewaysSolicitDelay(100 * time.Millisecond) + defer ResetGatewaysSolicitDelay() + + srvA := RunServer(optsA) + defer srvA.Shutdown() + + srvB := RunServer(optsB) + defer srvB.Shutdown() + + if pass { + waitForOutboundGateways(t, srvA, 1, 5*time.Second) + waitForOutboundGateways(t, srvB, 1, 5*time.Second) + } else { + time.Sleep(1 * time.Second) // the fail case uses the IP, so a short wait is sufficient + checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + if srvA.NumOutboundGateways() != 0 || srvB.NumOutboundGateways() != 0 { + return fmt.Errorf("No outbound gateway connection expected") + } + return nil + }) + } +} diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 4ddcd528..0906ea50 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -6770,6 +6770,7 @@ func TestJetStreamClusterSuperClusterPullConsumerAndHeaders(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + defer nc.Close() m, err := nc.Request("$JS.API.CONSUMER.MSG.NEXT.S.dlc", nil, 2*time.Second) if err != nil { t.Fatalf("Unexpected error: %v", err) diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 3a33192d..992cbf5b 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -3743,6 +3743,7 @@ func TestLeafNodeUniqueServerNameCrossJSDomain(t *testing.T) { } // ensure that an update for every server was received sysNc := natsConnect(t, fmt.Sprintf("nats://admin:s3cr3t!@127.0.0.1:%d", s.opts.Port)) + defer sysNc.Close() sub, err := sysNc.SubscribeSync(fmt.Sprintf(serverStatsSubj, "*")) require_NoError(t, err) for { diff --git a/server/monitor_test.go b/server/monitor_test.go index cd163c01..2f267349 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -206,6 +206,7 @@ func TestVarzSubscriptionsResetProperly(t *testing.T) { opts := DefaultMonitorOptions() opts.JetStream = true s := RunServer(opts) + defer s.Shutdown() // This bug seems to only happen via the http endpoint, not direct calls. // Every time you call it doubles. diff --git a/server/ocsp.go b/server/ocsp.go index 3f7dbcdf..56b664be 100644 --- a/server/ocsp.go +++ b/server/ocsp.go @@ -233,7 +233,15 @@ func (oc *OCSPMonitor) run() { quitCh := s.quitCh s.mu.Unlock() - defer s.grWG.Done() + var doShutdown bool + defer func() { + // Need to decrement before shuting down, otherwise shutdown + // would be stuck waiting on grWG to go down to 0. + s.grWG.Done() + if doShutdown { + s.Shutdown() + } + }() oc.mu.Lock() shutdownOnRevoke := oc.shutdownOnRevoke @@ -254,7 +262,7 @@ func (oc *OCSPMonitor) run() { } else if err == nil && shutdownOnRevoke { // If resp.Status is ocsp.Revoked, ocsp.Unknown, or any other value. s.Errorf("Found OCSP status for %s certificate at '%s': %s", kind, certFile, ocspStatusString(resp.Status)) - s.Shutdown() + doShutdown = true return } @@ -288,7 +296,7 @@ func (oc *OCSPMonitor) run() { default: s.Errorf("Received OCSP status for %s certificate '%s': %s", kind, certFile, ocspStatusString(n)) if shutdownOnRevoke { - s.Shutdown() + doShutdown = true } return } diff --git a/server/routes_test.go b/server/routes_test.go index 2ba766e0..5a37ea33 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -1431,3 +1431,80 @@ func TestRouteLockReleasedOnTLSFailure(t *testing.T) { t.Fatal("Seem connection lock was not released") } } + +type localhostResolver struct{} + +func (r *localhostResolver) LookupHost(ctx context.Context, host string) ([]string, error) { + return []string{"127.0.0.1"}, nil +} + +func TestTLSRoutesCertificateImplicitAllowPass(t *testing.T) { + testTLSRoutesCertificateImplicitAllow(t, true) +} + +func TestTLSRoutesCertificateImplicitAllowFail(t *testing.T) { + testTLSRoutesCertificateImplicitAllow(t, false) +} + +func testTLSRoutesCertificateImplicitAllow(t *testing.T, pass bool) { + // Base config for the servers + cfg := createFile(t, "cfg") + defer removeFile(t, cfg.Name()) + cfg.WriteString(fmt.Sprintf(` + cluster { + tls { + cert_file = "../test/configs/certs/tlsauth/server.pem" + key_file = "../test/configs/certs/tlsauth/server-key.pem" + ca_file = "../test/configs/certs/tlsauth/ca.pem" + verify_cert_and_check_known_urls = true + insecure = %t + timeout = 1 + } + } + `, !pass)) // set insecure to skip verification on the outgoing end + if err := cfg.Sync(); err != nil { + t.Fatal(err) + } + + optsA := LoadConfig(cfg.Name()) + optsB := LoadConfig(cfg.Name()) + + routeURLs := "nats://localhost:9935, nats://localhost:9936" + if !pass { + routeURLs = "nats://127.0.0.1:9935, nats://127.0.0.1:9936" + } + optsA.Host = "127.0.0.1" + optsA.Port = 9335 + optsA.Cluster.Name = "xyz" + optsA.Cluster.Host = optsA.Host + optsA.Cluster.Port = 9935 + optsA.Cluster.resolver = &localhostResolver{} + optsA.Routes = RoutesFromStr(routeURLs) + optsA.NoSystemAccount = true + srvA := RunServer(optsA) + defer srvA.Shutdown() + + optsB.Host = "127.0.0.1" + optsB.Port = 9336 + optsB.Cluster.Name = "xyz" + optsB.Cluster.Host = optsB.Host + optsB.Cluster.Port = 9936 + optsB.Cluster.resolver = &localhostResolver{} + optsB.Routes = RoutesFromStr(routeURLs) + optsB.NoSystemAccount = true + srvB := RunServer(optsB) + defer srvB.Shutdown() + + if pass { + checkNumRoutes(t, srvA, 1) + checkNumRoutes(t, srvB, 1) + } else { + time.Sleep(1 * time.Second) // the fail case uses the IP, so a short wait is sufficient + checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + if srvA.NumRoutes() != 0 || srvB.NumRoutes() != 0 { + return fmt.Errorf("No route connection expected") + } + return nil + }) + } +} diff --git a/test/norace_test.go b/test/norace_test.go index 24330ed2..1e077bb4 100644 --- a/test/norace_test.go +++ b/test/norace_test.go @@ -393,7 +393,7 @@ func TestNoRaceQueueSubWeightOrderMultipleConnections(t *testing.T) { <-start // Now create 100 identical queue subscribers on each connection. for i := 0; i < 100; i++ { - if _, err := nc.QueueSubscribeSync("foo", "bar"); err != nil { + if _, err := nc.QueueSubscribe("foo", "bar", func(_ *nats.Msg) {}); err != nil { return } } diff --git a/test/ocsp_test.go b/test/ocsp_test.go index bce09bda..961bb6d5 100644 --- a/test/ocsp_test.go +++ b/test/ocsp_test.go @@ -274,6 +274,7 @@ func TestOCSPMustStapleAutoDoesNotShutdown(t *testing.T) { if err != nil { t.Fatal(err) } + defer nc.Close() sub, err := nc.SubscribeSync("foo") if err != nil { t.Fatal(err) @@ -296,7 +297,7 @@ func TestOCSPMustStapleAutoDoesNotShutdown(t *testing.T) { // Should not be connection refused, the client will continue running and // be served the stale OCSP staple instead. - _, err = nats.Connect(fmt.Sprintf("tls://localhost:%d", opts.Port), + nc, err = nats.Connect(fmt.Sprintf("tls://localhost:%d", opts.Port), nats.Secure(&tls.Config{ VerifyConnection: func(s tls.ConnectionState) error { resp, err := getOCSPStatus(s) @@ -315,6 +316,7 @@ func TestOCSPMustStapleAutoDoesNotShutdown(t *testing.T) { if err != nil { t.Fatal(err) } + nc.Close() } func TestOCSPAutoWithoutMustStapleDoesNotShutdownOnRevoke(t *testing.T) { @@ -378,6 +380,7 @@ func TestOCSPAutoWithoutMustStapleDoesNotShutdownOnRevoke(t *testing.T) { if err != nil { t.Fatal(err) } + defer nc.Close() sub, err := nc.SubscribeSync("foo") if err != nil { t.Fatal(err) @@ -398,13 +401,14 @@ func TestOCSPAutoWithoutMustStapleDoesNotShutdownOnRevoke(t *testing.T) { time.Sleep(2 * time.Second) // Should not be connection refused since server will continue running. - _, err = nats.Connect(fmt.Sprintf("tls://localhost:%d", opts.Port), + nc, err = nats.Connect(fmt.Sprintf("tls://localhost:%d", opts.Port), nats.RootCAs(caCert), nats.ErrorHandler(noOpErrHandler), ) if err != nil { t.Errorf("Unexpected error: %s", err) } + nc.Close() } func TestOCSPClient(t *testing.T) { @@ -1078,6 +1082,7 @@ func TestOCSPCluster(t *testing.T) { if err != nil { t.Fatal(err) } + defer cA.Close() checkClusterFormed(t, srvA, srvB) // Revoke the seed server cluster certificate, following servers will not be able to verify connection. @@ -1143,6 +1148,7 @@ func TestOCSPCluster(t *testing.T) { if err != nil { t.Fatal(err) } + defer cB.Close() cC, err := nats.Connect(fmt.Sprintf("tls://localhost:%d", optsC.Port), nats.Secure(&tls.Config{ VerifyConnection: func(s tls.ConnectionState) error { @@ -1158,6 +1164,7 @@ func TestOCSPCluster(t *testing.T) { if err != nil { t.Fatal(err) } + defer cC.Close() // There should be no connectivity between the clients due to the revoked staple. _, err = cA.Subscribe("foo", func(m *nats.Msg) { @@ -1343,6 +1350,7 @@ func TestOCSPLeaf(t *testing.T) { if err != nil { t.Fatal(err) } + defer cA.Close() // checkLeafNodeConnected(t, srvA) // Revoke the seed server cluster certificate, following servers will not be able to verify connection. @@ -1402,6 +1410,7 @@ func TestOCSPLeaf(t *testing.T) { if err != nil { t.Fatal(err) } + defer cB.Close() cC, err := nats.Connect(fmt.Sprintf("tls://127.0.0.1:%d", optsC.Port), nats.Secure(&tls.Config{ VerifyConnection: func(s tls.ConnectionState) error { @@ -1417,6 +1426,7 @@ func TestOCSPLeaf(t *testing.T) { if err != nil { t.Fatal(err) } + defer cC.Close() // There should be no connectivity between the clients due to the revoked staple. _, err = cA.Subscribe("foo", func(m *nats.Msg) { @@ -1623,6 +1633,7 @@ func TestOCSPGateway(t *testing.T) { if err != nil { t.Fatal(err) } + defer cA.Close() waitForOutboundGateways(t, srvB, 1, 5*time.Second) // Revoke the seed server cluster certificate, following servers will not be able to verify connection. @@ -1685,6 +1696,7 @@ func TestOCSPGateway(t *testing.T) { if err != nil { t.Fatal(err) } + defer cB.Close() cC, err := nats.Connect(fmt.Sprintf("tls://127.0.0.1:%d", optsC.Port), nats.Secure(&tls.Config{ VerifyConnection: func(s tls.ConnectionState) error { @@ -1700,6 +1712,7 @@ func TestOCSPGateway(t *testing.T) { if err != nil { t.Fatal(err) } + defer cC.Close() // There should be no connectivity between the clients due to the revoked staple. _, err = cA.Subscribe("foo", func(m *nats.Msg) { diff --git a/test/ping_test.go b/test/ping_test.go index 4d6db38d..38a9c4d5 100644 --- a/test/ping_test.go +++ b/test/ping_test.go @@ -194,6 +194,7 @@ func TestPingSuppresion(t *testing.T) { opts := DefaultTestOptions opts.Port = PING_TEST_PORT opts.PingInterval = pingInterval + opts.DisableShortFirstPing = true s := RunServer(&opts) defer s.Shutdown() diff --git a/test/tls_test.go b/test/tls_test.go index 0f662d20..f80f1eb9 100644 --- a/test/tls_test.go +++ b/test/tls_test.go @@ -787,168 +787,6 @@ func TestTLSGatewaysCertificateCNBasedAuth(t *testing.T) { } } -func TestTLSRoutesCertificateImplicitAllowPass(t *testing.T) { - testTLSRoutesCertificateImplicitAllow(t, true) -} - -func TestTLSRoutesCertificateImplicitAllowFail(t *testing.T) { - testTLSRoutesCertificateImplicitAllow(t, false) -} - -func testTLSRoutesCertificateImplicitAllow(t *testing.T, pass bool) { - // Base config for the servers - cfg := createFile(t, "cfg") - defer removeFile(t, cfg.Name()) - cfg.WriteString(fmt.Sprintf(` - cluster { - tls { - cert_file = "./configs/certs/tlsauth/server.pem" - key_file = "./configs/certs/tlsauth/server-key.pem" - ca_file = "./configs/certs/tlsauth/ca.pem" - verify_cert_and_check_known_urls = true - insecure = %t - timeout = 1 - } - } - `, !pass)) // set insecure to skip verification on the outgoing end - if err := cfg.Sync(); err != nil { - t.Fatal(err) - } - - optsA := LoadConfig(cfg.Name()) - optsB := LoadConfig(cfg.Name()) - - routeURLs := "nats://localhost:9935, nats://localhost:9936" - if !pass { - routeURLs = "nats://127.0.0.1:9935, nats://127.0.0.1:9936" - } - optsA.Host = "127.0.0.1" - optsA.Port = 9335 - optsA.Cluster.Name = "xyz" - optsA.Cluster.Host = optsA.Host - optsA.Cluster.Port = 9935 - optsA.Routes = server.RoutesFromStr(routeURLs) - optsA.NoSystemAccount = true - srvA := RunServer(optsA) - defer srvA.Shutdown() - - optsB.Host = "127.0.0.1" - optsB.Port = 9336 - optsB.Cluster.Name = "xyz" - optsB.Cluster.Host = optsB.Host - optsB.Cluster.Port = 9936 - optsB.Routes = server.RoutesFromStr(routeURLs) - optsB.NoSystemAccount = true - srvB := RunServer(optsB) - defer srvB.Shutdown() - - // srvC is not connected to srvA and srvB due to wrong cert - if pass { - checkNumRoutes(t, srvA, 1) - checkNumRoutes(t, srvB, 1) - } else { - time.Sleep(1 * time.Second) // the fail case uses the IP, so a short wait is sufficient - if srvA.NumRoutes() != 0 || srvB.NumRoutes() != 0 { - t.Fatal("No route connection expected") - } - } -} - -func TestTLSGatewaysCertificateImplicitAllowPass(t *testing.T) { - testTLSGatewaysCertificateImplicitAllow(t, true) -} - -func TestTLSGatewaysCertificateImplicitAllowFail(t *testing.T) { - testTLSGatewaysCertificateImplicitAllow(t, false) -} - -func testTLSGatewaysCertificateImplicitAllow(t *testing.T, pass bool) { - t.Helper() - // Base config for the servers - cfg := createFile(t, "cfg") - defer removeFile(t, cfg.Name()) - cfg.WriteString(fmt.Sprintf(` - gateway { - tls { - cert_file = "./configs/certs/tlsauth/server.pem" - key_file = "./configs/certs/tlsauth/server-key.pem" - ca_file = "./configs/certs/tlsauth/ca.pem" - verify_cert_and_check_known_urls = true - insecure = %t - timeout = 1 - } - } - `, !pass)) // set insecure to skip verification on the outgoing end - if err := cfg.Sync(); err != nil { - t.Fatal(err) - } - - optsA := LoadConfig(cfg.Name()) - optsB := LoadConfig(cfg.Name()) - - urlA := "nats://localhost:9995" - urlB := "nats://localhost:9996" - if !pass { - urlA = "nats://127.0.0.1:9995" - urlB = "nats://127.0.0.1:9996" - } - - gwA, err := url.Parse(urlA) - if err != nil { - t.Fatal(err) - } - gwB, err := url.Parse(urlB) - if err != nil { - t.Fatal(err) - } - - optsA.Host = "127.0.0.1" - optsA.Port = -1 - optsA.Gateway.Name = "A" - optsA.Gateway.Port = 9995 - - optsB.Host = "127.0.0.1" - optsB.Port = -1 - optsB.Gateway.Name = "B" - optsB.Gateway.Port = 9996 - - gateways := make([]*server.RemoteGatewayOpts, 2) - gateways[0] = &server.RemoteGatewayOpts{ - Name: optsA.Gateway.Name, - URLs: []*url.URL{gwA}, - } - gateways[1] = &server.RemoteGatewayOpts{ - Name: optsB.Gateway.Name, - URLs: []*url.URL{gwB}, - } - - optsA.Gateway.Gateways = gateways - optsB.Gateway.Gateways = gateways - - server.SetGatewaysSolicitDelay(100 * time.Millisecond) - defer server.ResetGatewaysSolicitDelay() - - srvA := RunServer(optsA) - defer srvA.Shutdown() - - srvB := RunServer(optsB) - defer srvB.Shutdown() - - // Because we need to use "localhost" in the gw URLs (to match - // hostname in the user/CN), the server may try to connect to - // a [::1], etc.. that may or may not work, so give a lot of - // time for that to complete ok. - if pass { - waitForOutboundGateways(t, srvA, 1, 5*time.Second) - waitForOutboundGateways(t, srvB, 1, 5*time.Second) - } else { - time.Sleep(1 * time.Second) // the fail case uses the IP, so a short wait is sufficient - if srvA.NumOutboundGateways() != 0 || srvB.NumOutboundGateways() != 0 { - t.Fatal("No outbound gateway connection expected") - } - } -} - func TestTLSVerifyClientCertificate(t *testing.T) { srv, opts := RunServerWithConfig("./configs/tlsverify_noca.conf") defer srv.Shutdown()