diff --git a/server/const.go b/server/const.go index 6b495360..d07b445d 100644 --- a/server/const.go +++ b/server/const.go @@ -133,4 +133,7 @@ const ( // DEFAULT_LAME_DUCK_DURATION is the time in which the server spreads // the closing of clients when signaled to go in lame duck mode. DEFAULT_LAME_DUCK_DURATION = 2 * time.Minute + + // DEFAULT_LEAFNODE_INFO_WAIT Route dial timeout. + DEFAULT_LEAFNODE_INFO_WAIT = 1 * time.Second ) diff --git a/server/leafnode.go b/server/leafnode.go index 6463b75a..53e3ec08 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -14,6 +14,7 @@ package server import ( + "bufio" "bytes" "crypto/tls" "encoding/base64" @@ -273,7 +274,7 @@ func (c *client) sendLeafConnect(tlsRequired bool) { c.sendProto([]byte(fmt.Sprintf(ConProto, b)), true) } -// Called when an inbound leafnode connection is either accepted. +// Called when an inbound leafnode connection is accepted or we create one for a solicited leafnode. func (s *Server) createLeafNode(conn net.Conn, remote *RemoteLeafOpts) *client { // Snapshot server options. opts := s.getOpts() @@ -316,13 +317,13 @@ func (s *Server) createLeafNode(conn net.Conn, remote *RemoteLeafOpts) *client { c.initClient() - c.Debugf("LeafNode connection created") + c.Debugf("Leafnode connection created") if solicited { // We need to wait here for the info, but not for too long. - b := make([]byte, MAX_CONTROL_LINE_SIZE) - c.nc.SetReadDeadline(time.Now().Add(time.Second)) - n, err := c.nc.Read(b) + c.nc.SetReadDeadline(time.Now().Add(DEFAULT_LEAFNODE_INFO_WAIT)) + br := bufio.NewReaderSize(c.nc, MAX_CONTROL_LINE_SIZE) + info, err := br.ReadString('\n') if err != nil { c.mu.Unlock() if err == io.EOF { @@ -336,7 +337,7 @@ func (s *Server) createLeafNode(conn net.Conn, remote *RemoteLeafOpts) *client { c.mu.Unlock() // Error will be handled below, so ignore here. - c.parse(b[:n]) + c.parse([]byte(info)) c.mu.Lock() if !c.flags.isSet(infoReceived) { @@ -438,15 +439,15 @@ func (s *Server) createLeafNode(conn net.Conn, remote *RemoteLeafOpts) *client { } } - // Set the Ping timer - c.setPingTimer() - // Spin up the read loop. s.startGoRoutine(c.readLoop) // Spin up the write loop. s.startGoRoutine(c.writeLoop) + // Set the Ping timer + c.setPingTimer() + c.mu.Unlock() // Update server's accounting here if we solicited. @@ -543,9 +544,9 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro } // Reject if this has Gateway which means that it would be from a gateway - // connection that incorrectly connects to the Route port. + // connection that incorrectly connects to the leafnode port. if proto.Gateway != "" { - errTxt := fmt.Sprintf("Rejecting connection from gateway %q on the Route port", proto.Gateway) + errTxt := fmt.Sprintf("Rejecting connection from gateway %q on the leafnode port", proto.Gateway) c.Errorf(errTxt) c.sendErr(errTxt) c.closeConnection(WrongGateway) @@ -602,12 +603,12 @@ func (c *client) initLeafNodeSmap() { for _, sub := range subs { // We ignore ourselves here. if c != sub.client { - c.leaf.smap[keyFromSub(sub)] += 1 + c.leaf.smap[keyFromSub(sub)]++ } } // FIXME(dlc) - We need to update appropriately on an account claims update. for _, isubj := range ims { - c.leaf.smap[isubj] += 1 + c.leaf.smap[isubj]++ } c.mu.Unlock() } @@ -869,12 +870,11 @@ func (c *client) processLeafUnsub(arg []byte) error { c.Errorf(err.Error()) } // If we are routing add to the route map for the associated account. - srv.updateRouteSubscriptionMap(acc, sub, -11) if updateGWs { - srv.gatewayUpdateSubInterest(acc.Name, sub, -11) + srv.gatewayUpdateSubInterest(acc.Name, sub, -1) } // Now check on leafnode updates for other leaf nodes. - srv.updateLeafNodes(acc, sub, -11) + srv.updateLeafNodes(acc, sub, -1) return nil }