mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
@@ -310,23 +310,28 @@ func (c *client) processConnect(arg []byte) error {
|
||||
c.mu.Lock()
|
||||
c.clearAuthTimer()
|
||||
c.last = time.Now()
|
||||
typ := c.typ
|
||||
r := c.route
|
||||
srv := c.srv
|
||||
c.mu.Unlock()
|
||||
|
||||
if err := json.Unmarshal(arg, &c.opts); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if c.srv != nil {
|
||||
if srv != nil {
|
||||
// Check for Auth
|
||||
if ok := c.srv.checkAuth(c); !ok {
|
||||
if ok := srv.checkAuth(c); !ok {
|
||||
c.authViolation()
|
||||
return ErrAuthorization
|
||||
}
|
||||
}
|
||||
|
||||
// Grab connection name of remote route.
|
||||
if c.typ == ROUTER && c.route != nil {
|
||||
if typ == ROUTER && r != nil {
|
||||
c.mu.Lock()
|
||||
c.route.remoteID = c.opts.Name
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
if c.opts.Verbose {
|
||||
|
||||
@@ -690,10 +690,14 @@ func (s *Server) checkAuth(c *client) bool {
|
||||
|
||||
// Remove a client or route from our internal accounting.
|
||||
func (s *Server) removeClient(c *client) {
|
||||
var rID string
|
||||
c.mu.Lock()
|
||||
cid := c.cid
|
||||
typ := c.typ
|
||||
r := c.route
|
||||
if r != nil {
|
||||
rID = r.remoteID
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
s.mu.Lock()
|
||||
@@ -703,10 +707,10 @@ func (s *Server) removeClient(c *client) {
|
||||
case ROUTER:
|
||||
delete(s.routes, cid)
|
||||
if r != nil {
|
||||
rc, ok := s.remotes[r.remoteID]
|
||||
rc, ok := s.remotes[rID]
|
||||
// Only delete it if it is us..
|
||||
if ok && c == rc {
|
||||
delete(s.remotes, r.remoteID)
|
||||
delete(s.remotes, rID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"net"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -611,3 +612,47 @@ func TestAutoUnsubPropagation(t *testing.T) {
|
||||
|
||||
routeExpect(unsubnomaxRe)
|
||||
}
|
||||
|
||||
type ignoreLogger struct {
|
||||
}
|
||||
|
||||
func (l *ignoreLogger) Fatalf(f string, args ...interface{}) {
|
||||
}
|
||||
func (l *ignoreLogger) Errorf(f string, args ...interface{}) {
|
||||
}
|
||||
|
||||
func TestRouteConnectOnShutdownRace(t *testing.T) {
|
||||
s, opts := runRouteServer(t)
|
||||
defer s.Shutdown()
|
||||
|
||||
l := &ignoreLogger{}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
cQuit := make(chan bool, 1)
|
||||
|
||||
wg.Add(1)
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for {
|
||||
route := createRouteConn(l, opts.ClusterHost, opts.ClusterPort)
|
||||
if route != nil {
|
||||
setupRouteEx(l, route, opts, "ROUTE:4222")
|
||||
route.Close()
|
||||
}
|
||||
select {
|
||||
case <-cQuit:
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
s.Shutdown()
|
||||
|
||||
cQuit <- true
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user