mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Rework closeConnection()
This change allows the removal of the connection and update of the server state to be done "in place" but still delay the flushing of and close of tcp connection to the writeLoop. With ref counting we ensure that the reconnect happens after the flushing but not before the state has been updated. Had to fix some places where we may have called closeConnection() from under the server lock since it now would deadlock for sure. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -264,7 +264,7 @@ func (a *Account) nextEventID() string {
|
||||
|
||||
// Called to track a remote server and connections and leafnodes it
|
||||
// has for this account.
|
||||
func (a *Account) updateRemoteServer(m *AccountNumConns) {
|
||||
func (a *Account) updateRemoteServer(m *AccountNumConns) []*client {
|
||||
a.mu.Lock()
|
||||
if a.strack == nil {
|
||||
a.strack = make(map[string]sconns)
|
||||
@@ -308,9 +308,7 @@ func (a *Account) updateRemoteServer(m *AccountNumConns) {
|
||||
a.mu.Unlock()
|
||||
|
||||
// If we have exceeded our max clients this will be populated.
|
||||
for _, c := range clients {
|
||||
c.maxAccountConnExceeded()
|
||||
}
|
||||
return clients
|
||||
}
|
||||
|
||||
// Removes tracking for a remote server that has shutdown.
|
||||
|
||||
129
server/client.go
129
server/client.go
@@ -109,6 +109,7 @@ const (
|
||||
flushOutbound // Marks client as having a flushOutbound call in progress.
|
||||
noReconnect // Indicate that on close, this connection should not attempt a reconnect
|
||||
closeConnection // Marks that closeConnection has already been called.
|
||||
connMarkedClosed // Marks that markConnAsClosed has already been called.
|
||||
writeLoopStarted // Marks that the writeLoop has been started.
|
||||
skipFlushOnClose // Marks that flushOutbound() should not be called on connection close.
|
||||
expectConnect // Marks if this connection is expected to send a CONNECT
|
||||
@@ -239,6 +240,8 @@ type client struct {
|
||||
|
||||
flags clientFlag // Compact booleans into a single field. Size will be increased when needed.
|
||||
|
||||
rref byte
|
||||
|
||||
trace bool
|
||||
echo bool
|
||||
}
|
||||
@@ -799,9 +802,6 @@ func (c *client) writeLoop() {
|
||||
ch := c.out.sch
|
||||
c.mu.Unlock()
|
||||
|
||||
// This will clear connection state and remove it from the server.
|
||||
defer c.teardownConn()
|
||||
|
||||
// Used to check that we did flush from last wake up.
|
||||
waitOk := true
|
||||
|
||||
@@ -815,7 +815,7 @@ func (c *client) writeLoop() {
|
||||
// buffered outbound structure for efficient writev to the underlying socket.
|
||||
for {
|
||||
c.mu.Lock()
|
||||
if close = c.flags.isSet(closeConnection); !close {
|
||||
if close = c.isClosed(); !close {
|
||||
owtf := c.out.fsp > 0 && c.out.pb < maxBufSize && c.out.fsp < maxFlushPending
|
||||
if waitOk && (c.out.pb == 0 || owtf) {
|
||||
c.mu.Unlock()
|
||||
@@ -830,12 +830,21 @@ func (c *client) writeLoop() {
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
close = c.flags.isSet(closeConnection)
|
||||
close = c.isClosed()
|
||||
}
|
||||
}
|
||||
if close {
|
||||
c.flushAndClose(false)
|
||||
c.mu.Unlock()
|
||||
|
||||
// We should always call closeConnection() to ensure that state is
|
||||
// properly cleaned-up. It will be a no-op if already done.
|
||||
c.closeConnection(WriteError)
|
||||
|
||||
// Now explicitly call reconnect(). Thanks to ref counting, we know
|
||||
// that the reconnect will execute only after connection has been
|
||||
// removed from the server state.
|
||||
c.reconnect()
|
||||
return
|
||||
}
|
||||
// Flush data
|
||||
@@ -865,7 +874,7 @@ func (c *client) flushClients(budget time.Duration) time.Time {
|
||||
cp.out.fsp--
|
||||
|
||||
// Just ignore if this was closed.
|
||||
if cp.flags.isSet(closeConnection) {
|
||||
if cp.isClosed() {
|
||||
cp.mu.Unlock()
|
||||
continue
|
||||
}
|
||||
@@ -1008,7 +1017,6 @@ func (c *client) readLoop(pre []byte) {
|
||||
|
||||
// Update activity, check read buffer size.
|
||||
c.mu.Lock()
|
||||
closed := c.isClosed()
|
||||
|
||||
// Activity based on interest changes or data/msgs.
|
||||
if c.in.msgs > 0 || c.in.subs > 0 {
|
||||
@@ -1037,11 +1045,6 @@ func (c *client) readLoop(pre []byte) {
|
||||
c.Warnf("Readloop processing time: %v", dur)
|
||||
}
|
||||
|
||||
// Check to see if we got closed, e.g. slow consumer
|
||||
if closed {
|
||||
return
|
||||
}
|
||||
|
||||
// We could have had a read error from above but still read some data.
|
||||
// If so do the close here unconditionally.
|
||||
if err != nil {
|
||||
@@ -1269,13 +1272,13 @@ func (c *client) handleWriteTimeout(written, attempted int64, numChunks int) boo
|
||||
}
|
||||
|
||||
// Marks this connection has closed with the given reason.
|
||||
// Sets the closeConnection flag and skipFlushOnClose depending on the reason.
|
||||
// Sets the connMarkedClosed flag and skipFlushOnClose depending on the reason.
|
||||
// Depending on the kind of connection, the connection will be saved.
|
||||
// If a writeLoop has been started, the final flush/close/teardown will
|
||||
// be done there, otherwise flush and close of TCP connection is done here in place.
|
||||
// If a writeLoop has been started, the final flush will be done there, otherwise
|
||||
// flush and close of TCP connection is done here in place.
|
||||
// Returns true if closed in place, flase otherwise.
|
||||
// Lock is held on entry.
|
||||
func (c *client) markConnAsClosed(reason ClosedState) bool {
|
||||
func (c *client) markConnAsClosed(reason ClosedState) {
|
||||
// Possibly set skipFlushOnClose flag even if connection has already been
|
||||
// mark as closed. The rationale is that a connection may be closed with
|
||||
// a reason that justifies a flush (say after sending an -ERR), but then
|
||||
@@ -1288,10 +1291,10 @@ func (c *client) markConnAsClosed(reason ClosedState) bool {
|
||||
c.flags.set(skipFlushOnClose)
|
||||
skipFlush = true
|
||||
}
|
||||
if c.flags.isSet(closeConnection) {
|
||||
return false
|
||||
if c.flags.isSet(connMarkedClosed) {
|
||||
return
|
||||
}
|
||||
c.flags.set(closeConnection)
|
||||
c.flags.set(connMarkedClosed)
|
||||
// For a websocket client, unless we are told not to flush, enqueue
|
||||
// a websocket CloseMessage based on the reason.
|
||||
if !skipFlush && c.ws != nil && !c.ws.closeSent {
|
||||
@@ -1317,13 +1320,18 @@ func (c *client) markConnAsClosed(reason ClosedState) bool {
|
||||
}
|
||||
// If writeLoop exists, let it do the final flush, close and teardown.
|
||||
if c.flags.isSet(writeLoopStarted) {
|
||||
// Since we want the writeLoop to do the final flush and tcp close,
|
||||
// we want the reconnect to be done there too. However, it should'nt
|
||||
// happen before the connection has been removed from the server
|
||||
// state (end of closeConnection()). This ref count allows us to
|
||||
// guarantee that.
|
||||
c.rref++
|
||||
c.flushSignal()
|
||||
return false
|
||||
return
|
||||
}
|
||||
// Flush (if skipFlushOnClose is not set) and close in place. If flushing,
|
||||
// use a small WriteDeadline.
|
||||
c.flushAndClose(true)
|
||||
return true
|
||||
}
|
||||
|
||||
// flushSignal will use server to queue the flush IO operation to a pool of flushers.
|
||||
@@ -1691,7 +1699,7 @@ func (c *client) maxPayloadViolation(sz int, max int32) {
|
||||
// Lock should be held.
|
||||
func (c *client) queueOutbound(data []byte) bool {
|
||||
// Do not keep going if closed
|
||||
if c.flags.isSet(closeConnection) {
|
||||
if c.isClosed() {
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -1828,7 +1836,7 @@ func (c *client) sendRTTPingLocked() bool {
|
||||
// send the PING. But in case we have client libs that don't do that,
|
||||
// allow the send of the PING if more than 2 secs have elapsed since
|
||||
// the client TCP connection was accepted.
|
||||
if !c.flags.isSet(closeConnection) &&
|
||||
if !c.isClosed() &&
|
||||
(c.flags.isSet(firstPongSent) || time.Since(c.start) > maxNoRTTPingBeforeFirstPong) {
|
||||
c.sendPing()
|
||||
return true
|
||||
@@ -3873,6 +3881,7 @@ func (c *client) flushAndClose(minimalFlush bool) {
|
||||
if c.nc != nil {
|
||||
c.nc.SetWriteDeadline(time.Now().Add(100 * time.Millisecond))
|
||||
c.nc.Close()
|
||||
c.nc = nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3985,41 +3994,23 @@ func (c *client) closeConnection(reason ClosedState) {
|
||||
c.mu.Unlock()
|
||||
return
|
||||
}
|
||||
// This will set the closeConnection flag and save the connection, etc..
|
||||
// Will return true if no writeLoop was started and TCP connection was
|
||||
// closed in place, in which case we need to do the teardown.
|
||||
teardownNow := c.markConnAsClosed(reason)
|
||||
c.mu.Unlock()
|
||||
|
||||
if teardownNow {
|
||||
c.teardownConn()
|
||||
}
|
||||
}
|
||||
|
||||
// Clear the state of this connection and remove it from the server.
|
||||
// If the connection was initiated (such as ROUTE, GATEWAY, etc..) this may trigger
|
||||
// a reconnect. This function MUST be called only once per connection. It normally
|
||||
// happens when the writeLoop returns, or in closeConnection() if no writeLoop has
|
||||
// been started.
|
||||
func (c *client) teardownConn() {
|
||||
c.mu.Lock()
|
||||
|
||||
// Note that we may have markConnAsClosed() invoked before closeConnection(),
|
||||
// so don't set this to 1, instead bump the count.
|
||||
c.rref++
|
||||
c.flags.set(closeConnection)
|
||||
c.clearAuthTimer()
|
||||
c.clearPingTimer()
|
||||
c.markConnAsClosed(reason)
|
||||
|
||||
// Unblock anyone who is potentially stalled waiting on us.
|
||||
if c.out.stc != nil {
|
||||
close(c.out.stc)
|
||||
c.out.stc = nil
|
||||
}
|
||||
c.nc = nil
|
||||
|
||||
var (
|
||||
retryImplicit bool
|
||||
connectURLs []string
|
||||
wsConnectURLs []string
|
||||
gwName string
|
||||
gwIsOutbound bool
|
||||
gwCfg *gatewayCfg
|
||||
kind = c.kind
|
||||
srv = c.srv
|
||||
noReconnect = c.flags.isSet(noReconnect)
|
||||
@@ -4042,17 +4033,9 @@ func (c *client) teardownConn() {
|
||||
}
|
||||
|
||||
if c.route != nil {
|
||||
if !noReconnect {
|
||||
retryImplicit = c.route.retry
|
||||
}
|
||||
connectURLs = c.route.connectURLs
|
||||
wsConnectURLs = c.route.wsConnURLs
|
||||
}
|
||||
if kind == GATEWAY {
|
||||
gwName = c.gw.name
|
||||
gwIsOutbound = c.gw.outbound
|
||||
gwCfg = c.gw.cfg
|
||||
}
|
||||
|
||||
// If we have remote latency tracking running shut that down.
|
||||
if c.rrTracking != nil {
|
||||
@@ -4122,6 +4105,38 @@ func (c *client) teardownConn() {
|
||||
return
|
||||
}
|
||||
|
||||
c.reconnect()
|
||||
}
|
||||
|
||||
// Depending on the kind of connections, this may attempt to recreate a connection.
|
||||
// The actual reconnect attempt will be started in a go routine.
|
||||
func (c *client) reconnect() {
|
||||
var (
|
||||
retryImplicit bool
|
||||
gwName string
|
||||
gwIsOutbound bool
|
||||
gwCfg *gatewayCfg
|
||||
)
|
||||
|
||||
c.mu.Lock()
|
||||
// Decrease the ref count and perform the reconnect only if == 0.
|
||||
c.rref--
|
||||
if c.flags.isSet(noReconnect) || c.rref > 0 {
|
||||
c.mu.Unlock()
|
||||
return
|
||||
}
|
||||
if c.route != nil {
|
||||
retryImplicit = c.route.retry
|
||||
}
|
||||
kind := c.kind
|
||||
if kind == GATEWAY {
|
||||
gwName = c.gw.name
|
||||
gwIsOutbound = c.gw.outbound
|
||||
gwCfg = c.gw.cfg
|
||||
}
|
||||
srv := c.srv
|
||||
c.mu.Unlock()
|
||||
|
||||
// Check for a solicited route. If it was, start up a reconnect unless
|
||||
// we are already connected to the other end.
|
||||
if c.isSolicitedRoute() || retryImplicit {
|
||||
@@ -4333,10 +4348,10 @@ func (c *client) getAuthUser() string {
|
||||
}
|
||||
}
|
||||
|
||||
// isClosed returns true if either closeConnection or clearConnection
|
||||
// isClosed returns true if either closeConnection or connMarkedClosed
|
||||
// flag have been set, or if `nc` is nil, which may happen in tests.
|
||||
func (c *client) isClosed() bool {
|
||||
return c.flags.isSet(closeConnection) || c.nc == nil
|
||||
return c.flags.isSet(closeConnection) || c.flags.isSet(connMarkedClosed) || c.nc == nil
|
||||
}
|
||||
|
||||
// Logging functionality scoped to a client or route.
|
||||
|
||||
@@ -942,20 +942,26 @@ func (s *Server) remoteConnsUpdate(sub *subscription, _ *client, subject, reply
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// check again here if we have been shutdown.
|
||||
if !s.running || !s.eventsEnabled() {
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
// Double check that this is not us, should never happen, so error if it does.
|
||||
if m.Server.ID == s.info.ID {
|
||||
s.sys.client.Errorf("Processing our own account connection event message: ignored")
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
// If we are here we have interest in tracking this account. Update our accounting.
|
||||
acc.updateRemoteServer(&m)
|
||||
clients := acc.updateRemoteServer(&m)
|
||||
s.updateRemoteServer(&m.Server)
|
||||
s.mu.Unlock()
|
||||
// Need to close clients outside of server lock
|
||||
for _, c := range clients {
|
||||
c.maxAccountConnExceeded()
|
||||
}
|
||||
}
|
||||
|
||||
// Setup tracking for this account. This allows us to track global account activity.
|
||||
@@ -1140,9 +1146,10 @@ func (s *Server) accountDisconnectEvent(c *client, now time.Time, reason string)
|
||||
},
|
||||
Reason: reason,
|
||||
}
|
||||
accName := c.acc.Name
|
||||
c.mu.Unlock()
|
||||
|
||||
subj := fmt.Sprintf(disconnectEventSubj, c.acc.Name)
|
||||
subj := fmt.Sprintf(disconnectEventSubj, accName)
|
||||
s.sendInternalMsgLocked(subj, _EMPTY_, &m.Server, &m)
|
||||
}
|
||||
|
||||
|
||||
@@ -429,7 +429,7 @@ func (s *Server) startLeafNodeAcceptLoop() {
|
||||
var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}[^\n]*[-]{3,}\n)(.+)(?:\n\s*[-]{3,}[^\n]*[-]{3,}\n))`)
|
||||
|
||||
// Lock should be held entering here.
|
||||
func (c *client) sendLeafConnect(clusterName string, tlsRequired bool) {
|
||||
func (c *client) sendLeafConnect(clusterName string, tlsRequired bool) error {
|
||||
// We support basic user/pass and operator based user JWT with signatures.
|
||||
cinfo := leafConnectInfo{
|
||||
TLS: tlsRequired,
|
||||
@@ -444,13 +444,13 @@ func (c *client) sendLeafConnect(clusterName string, tlsRequired bool) {
|
||||
contents, err := ioutil.ReadFile(creds)
|
||||
if err != nil {
|
||||
c.Errorf("%v", err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
defer wipeSlice(contents)
|
||||
items := credsRe.FindAllSubmatch(contents, -1)
|
||||
if len(items) < 2 {
|
||||
c.Errorf("Credentials file malformed")
|
||||
return
|
||||
return err
|
||||
}
|
||||
// First result should be the user JWT.
|
||||
// We copy here so that the file containing the seed will be wiped appropriately.
|
||||
@@ -461,7 +461,7 @@ func (c *client) sendLeafConnect(clusterName string, tlsRequired bool) {
|
||||
kp, err := nkeys.FromSeed(items[1][1])
|
||||
if err != nil {
|
||||
c.Errorf("Credentials file has malformed seed")
|
||||
return
|
||||
return err
|
||||
}
|
||||
// Wipe our key on exit.
|
||||
defer kp.Wipe()
|
||||
@@ -480,13 +480,13 @@ func (c *client) sendLeafConnect(clusterName string, tlsRequired bool) {
|
||||
b, err := json.Marshal(cinfo)
|
||||
if err != nil {
|
||||
c.Errorf("Error marshaling CONNECT to route: %v\n", err)
|
||||
c.closeConnection(ProtocolViolation)
|
||||
return
|
||||
return err
|
||||
}
|
||||
// Although this call is made before the writeLoop is created,
|
||||
// we don't really need to send in place. The protocol will be
|
||||
// sent out by the writeLoop.
|
||||
c.enqueueProto([]byte(fmt.Sprintf(ConProto, b)))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Makes a deep copy of the LeafNode Info structure.
|
||||
@@ -714,7 +714,11 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client {
|
||||
c.mu.Lock()
|
||||
}
|
||||
|
||||
c.sendLeafConnect(clusterName, tlsRequired)
|
||||
if err := c.sendLeafConnect(clusterName, tlsRequired); err != nil {
|
||||
c.mu.Unlock()
|
||||
c.closeConnection(ProtocolViolation)
|
||||
return nil
|
||||
}
|
||||
c.Debugf("Remote leafnode connect msg sent")
|
||||
|
||||
} else {
|
||||
|
||||
@@ -447,7 +447,7 @@ func (c *client) processInboundRoutedMsg(msg []byte) {
|
||||
}
|
||||
|
||||
// Lock should be held entering here.
|
||||
func (c *client) sendRouteConnect(clusterName string, tlsRequired bool) {
|
||||
func (c *client) sendRouteConnect(clusterName string, tlsRequired bool) error {
|
||||
var user, pass string
|
||||
if userInfo := c.route.url.User; userInfo != nil {
|
||||
user = userInfo.Username()
|
||||
@@ -471,10 +471,10 @@ func (c *client) sendRouteConnect(clusterName string, tlsRequired bool) {
|
||||
b, err := json.Marshal(cinfo)
|
||||
if err != nil {
|
||||
c.Errorf("Error marshaling CONNECT to route: %v\n", err)
|
||||
c.closeConnection(ProtocolViolation)
|
||||
return
|
||||
return err
|
||||
}
|
||||
c.enqueueProto([]byte(fmt.Sprintf(ConProto, b)))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Process the info message if we are a route.
|
||||
@@ -1406,7 +1406,11 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
|
||||
// Queue Connect proto if we solicited the connection.
|
||||
if didSolicit {
|
||||
c.Debugf("Route connect msg sent")
|
||||
c.sendRouteConnect(clusterName, tlsRequired)
|
||||
if err := c.sendRouteConnect(clusterName, tlsRequired); err != nil {
|
||||
c.mu.Unlock()
|
||||
c.closeConnection(ProtocolViolation)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Send our info to the other side.
|
||||
|
||||
@@ -2129,13 +2129,13 @@ func (s *Server) createClient(conn net.Conn, ws *websocket) *client {
|
||||
// The connection may have been closed
|
||||
if c.isClosed() {
|
||||
c.mu.Unlock()
|
||||
// If it was due to TLS timeout, teardownConn() has already been called.
|
||||
// If it was due to TLS timeout, closeConnection() has already been called.
|
||||
// Otherwise, if connection was marked as closed while sending the INFO,
|
||||
// we need to call teardownConn() directly here.
|
||||
// we need to call closeConnection() directly here.
|
||||
if !info.TLSRequired {
|
||||
c.teardownConn()
|
||||
c.closeConnection(WriteError)
|
||||
}
|
||||
return c
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check for Auth. We schedule this timer after the TLS handshake to avoid
|
||||
|
||||
Reference in New Issue
Block a user