mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-15 10:40:41 -07:00
Merge branch 'main' into dev
This commit is contained in:
@@ -554,6 +554,9 @@ func (a *Account) RoutedSubs() int {
|
||||
func (a *Account) TotalSubs() int {
|
||||
a.mu.RLock()
|
||||
defer a.mu.RUnlock()
|
||||
if a.sl == nil {
|
||||
return 0
|
||||
}
|
||||
return int(a.sl.Count())
|
||||
}
|
||||
|
||||
|
||||
@@ -1906,21 +1906,19 @@ func (c *client) isSolicitedRoute() bool {
|
||||
// Save the first hostname found in route URLs. This will be used in gossip mode
|
||||
// when trying to create a TLS connection by setting the tlsConfig.ServerName.
|
||||
// Lock is held on entry
|
||||
func (s *Server) saveRouteTLSName() {
|
||||
var tlsName string
|
||||
for _, u := range s.getOpts().Routes {
|
||||
if tlsName == _EMPTY_ && net.ParseIP(u.Hostname()) == nil {
|
||||
tlsName = u.Hostname()
|
||||
func (s *Server) saveRouteTLSName(routes []*url.URL) {
|
||||
for _, u := range routes {
|
||||
if s.routeTLSName == _EMPTY_ && net.ParseIP(u.Hostname()) == nil {
|
||||
s.routeTLSName = u.Hostname()
|
||||
}
|
||||
}
|
||||
s.routeTLSName = tlsName
|
||||
}
|
||||
|
||||
// Start connection process to provided routes. Each route connection will
|
||||
// be started in a dedicated go routine.
|
||||
// Lock is held on entry
|
||||
func (s *Server) solicitRoutes(routes []*url.URL) {
|
||||
s.saveRouteTLSName()
|
||||
s.saveRouteTLSName(routes)
|
||||
for _, r := range routes {
|
||||
route := r
|
||||
s.startGoRoutine(func() { s.connectToRoute(route, true, true) })
|
||||
|
||||
@@ -1688,19 +1688,20 @@ func TestRouteSaveTLSName(t *testing.T) {
|
||||
s1, o1 := RunServerWithConfig(c1Conf)
|
||||
defer s1.Shutdown()
|
||||
|
||||
c2And3Conf := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
tmpl := `
|
||||
port: -1
|
||||
cluster {
|
||||
name: "abc"
|
||||
port: -1
|
||||
cluster {
|
||||
name: "abc"
|
||||
port: -1
|
||||
routes: ["nats://localhost:%d"]
|
||||
tls {
|
||||
cert_file: '../test/configs/certs/server-noip.pem'
|
||||
key_file: '../test/configs/certs/server-key-noip.pem'
|
||||
ca_file: '../test/configs/certs/ca.pem'
|
||||
}
|
||||
routes: ["nats://%s:%d"]
|
||||
tls {
|
||||
cert_file: '../test/configs/certs/server-noip.pem'
|
||||
key_file: '../test/configs/certs/server-key-noip.pem'
|
||||
ca_file: '../test/configs/certs/ca.pem'
|
||||
}
|
||||
`, o1.Cluster.Port)))
|
||||
}
|
||||
`
|
||||
c2And3Conf := createConfFile(t, []byte(fmt.Sprintf(tmpl, "localhost", o1.Cluster.Port)))
|
||||
defer removeFile(t, c2And3Conf)
|
||||
s2, _ := RunServerWithConfig(c2And3Conf)
|
||||
defer s2.Shutdown()
|
||||
@@ -1712,18 +1713,50 @@ func TestRouteSaveTLSName(t *testing.T) {
|
||||
|
||||
checkClusterFormed(t, s1, s2, s3)
|
||||
|
||||
// Do a reload of s2 and close the route connections and make sure it
|
||||
// reconnects properly.
|
||||
err := s2.Reload()
|
||||
require_NoError(t, err)
|
||||
reloadUpdateConfig(t, s2, c2And3Conf, fmt.Sprintf(tmpl, "127.0.0.1", o1.Cluster.Port))
|
||||
|
||||
s2.mu.RLock()
|
||||
for _, r := range s2.routes {
|
||||
r.mu.Lock()
|
||||
r.nc.Close()
|
||||
if r.route.routeType == Implicit {
|
||||
r.nc.Close()
|
||||
}
|
||||
r.mu.Unlock()
|
||||
}
|
||||
s2.mu.RUnlock()
|
||||
|
||||
checkClusterFormed(t, s1, s2, s3)
|
||||
|
||||
// Set a logger to capture errors trying to connect after clearing
|
||||
// the routeTLSName and causing a disconnect
|
||||
l := &captureErrorLogger{errCh: make(chan string, 1)}
|
||||
s2.SetLogger(l, false, false)
|
||||
|
||||
var gotIt bool
|
||||
for i := 0; !gotIt && i < 5; i++ {
|
||||
s2.mu.Lock()
|
||||
s2.routeTLSName = _EMPTY_
|
||||
for _, r := range s2.routes {
|
||||
r.mu.Lock()
|
||||
if r.route.routeType == Implicit {
|
||||
r.nc.Close()
|
||||
}
|
||||
r.mu.Unlock()
|
||||
}
|
||||
s2.mu.Unlock()
|
||||
select {
|
||||
case <-l.errCh:
|
||||
gotIt = true
|
||||
case <-time.After(time.Second):
|
||||
// Try again
|
||||
}
|
||||
}
|
||||
if !gotIt {
|
||||
t.Fatal("Did not get the handshake error")
|
||||
}
|
||||
|
||||
// Now get back to localhost in config and reload config and
|
||||
// it should start to work again.
|
||||
reloadUpdateConfig(t, s2, c2And3Conf, fmt.Sprintf(tmpl, "localhost", o1.Cluster.Port))
|
||||
checkClusterFormed(t, s1, s2, s3)
|
||||
}
|
||||
|
||||
@@ -2912,9 +2912,7 @@ func (s *Server) numSubscriptions() uint32 {
|
||||
var subs int
|
||||
s.accounts.Range(func(k, v interface{}) bool {
|
||||
acc := v.(*Account)
|
||||
if acc.sl != nil {
|
||||
subs += acc.TotalSubs()
|
||||
}
|
||||
subs += acc.TotalSubs()
|
||||
return true
|
||||
})
|
||||
return uint32(subs)
|
||||
|
||||
Reference in New Issue
Block a user