Add InProcessConn, DontListen

This commit is contained in:
Neil Alexander
2021-07-13 12:08:24 +01:00
parent b0c4ec69ab
commit e9abc5801e
2 changed files with 39 additions and 1 deletions

View File

@@ -199,6 +199,7 @@ type Options struct {
ServerName string `json:"server_name"`
Host string `json:"addr"`
Port int `json:"port"`
DontListen bool `json:"dont_listen"`
ClientAdvertise string `json:"-"`
Trace bool `json:"-"`
Debug bool `json:"-"`
@@ -4284,6 +4285,9 @@ func MergeOptions(fileOpts, flagOpts *Options) *Options {
if flagOpts.Host != "" {
opts.Host = flagOpts.Host
}
if flagOpts.DontListen {
opts.DontListen = flagOpts.DontListen
}
if flagOpts.ClientAdvertise != "" {
opts.ClientAdvertise = flagOpts.ClientAdvertise
}

View File

@@ -167,6 +167,7 @@ type Server struct {
leafNodeEnabled bool
quitCh chan struct{}
startupComplete chan struct{}
shutdownComplete chan struct{}
// Tracking Go routines
@@ -496,6 +497,10 @@ func NewServer(opts *Options) (*Server, error) {
// For tracking leaf nodes.
s.leafs = make(map[uint64]*client)
// Closed when startup is complete. Allows WaitForStartup() to block
// waiting until NATS has started.
s.startupComplete = make(chan struct{})
// Used to kick out all go routines possibly waiting on server
// to shutdown.
s.quitCh = make(chan struct{})
@@ -1613,6 +1618,7 @@ func (s *Server) Start() {
}
s.Noticef(" ID: %s", s.info.ID)
defer close(s.startupComplete)
defer s.Noticef("Server is ready")
// Check for insecure configurations.
@@ -1849,7 +1855,9 @@ func (s *Server) Start() {
}
// Wait for clients.
s.AcceptLoop(clientListenReady)
if !opts.DontListen {
s.AcceptLoop(clientListenReady)
}
}
// Shutdown will shutdown the server instance by kicking out the AcceptLoop
@@ -2021,6 +2029,11 @@ func (s *Server) Shutdown() {
close(s.shutdownComplete)
}
// WaitForStartup will block until the server has been fully started.
func (s *Server) WaitForStartup() {
<-s.startupComplete
}
// WaitForShutdown will block until the server has been fully shutdown.
func (s *Server) WaitForShutdown() {
<-s.shutdownComplete
@@ -2100,6 +2113,27 @@ func (s *Server) AcceptLoop(clr chan struct{}) {
clr = nil
}
// InProcessConn returns a connection to the server which can
// be used in-process when DontListen is set.
func (s *Server) InProcessConn() (net.Conn, error) {
pl, pr := net.Pipe()
created := true
if !s.startGoRoutine(func() {
if c := s.createClient(pl); c == nil {
created = false
}
}) {
created = false
}
if !created {
s.grWG.Done()
_ = pl.Close()
_ = pr.Close()
return nil, fmt.Errorf("failed to make connection")
}
return pr, nil
}
func (s *Server) acceptConnections(l net.Listener, acceptName string, createFunc func(conn net.Conn), errFunc func(err error) bool) {
tmpDelay := ACCEPT_MIN_SLEEP