[ADDED] LeafNode: Support for s2 compression

This is similar to PR #4115 but for LeafNodes.
Compression mode can be set on both side (the accept and in remotes).
```
leafnodes {
   port: 7422
   compression: s2_best
   remotes [
       {
         url: "nats://host2:74222"
         compression: s2_better
       }
   ]
}
```
Possible modes are similar than for routes (described in PR #4115),
except that when not defined we default to `s2_auto`.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2023-05-15 17:42:39 -06:00
parent e07ccf9cc1
commit 67498af2dc
15 changed files with 1904 additions and 208 deletions

View File

@@ -34,6 +34,7 @@ import (
"sync/atomic"
"time"
"github.com/klauspost/compress/s2"
"github.com/nats-io/jwt/v2"
"github.com/nats-io/nkeys"
"github.com/nats-io/nuid"
@@ -90,6 +91,8 @@ type leaf struct {
// we would add it a second time in the smap causing later unsub to suppress the LS-.
tsub map[*subscription]struct{}
tsubt *time.Timer
// Selected compression mode, which may be different from the server configured mode.
compression string
}
// Used for remote (solicited) leafnodes.
@@ -241,6 +244,13 @@ func validateLeafNode(o *Options) error {
}
}
// Validate compression settings
if o.LeafNode.Compression.Mode != _EMPTY_ {
if err := validateAndNormalizeCompressionOption(&o.LeafNode.Compression, CompressionS2Auto); err != nil {
return err
}
}
// If a remote has a websocket scheme, all need to have it.
for _, rcfg := range o.LeafNode.Remotes {
if len(rcfg.URLs) >= 2 {
@@ -256,6 +266,12 @@ func validateLeafNode(o *Options) error {
return fmt.Errorf("remote leaf node configuration cannot have a mix of websocket and non-websocket urls: %q", redactURLList(rcfg.URLs))
}
}
// Validate compression settings
if rcfg.Compression.Mode != _EMPTY_ {
if err := validateAndNormalizeCompressionOption(&rcfg.Compression, CompressionS2Auto); err != nil {
return err
}
}
}
if o.LeafNode.Port == 0 {
@@ -689,6 +705,11 @@ func (s *Server) startLeafNodeAcceptLoop() {
Proto: 1, // Fixed for now.
InfoOnConnect: true,
}
// For tests that want to simulate old servers, do not set the compression
// on the INFO protocol if configured with CompressionNotSupported.
if cm := opts.LeafNode.Compression.Mode; cm != CompressionNotSupported {
info.Compression = cm
}
// If we have selected a random port...
if port == 0 {
// Write resolved port back to options.
@@ -736,19 +757,19 @@ var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}[^\n]*[-]{3,}\n)(.+)(?:\n\s*[-
// clusterName is provided as argument to avoid lock ordering issues with the locked client c
// Lock should be held entering here.
func (c *client) sendLeafConnect(clusterName string, tlsRequired, headers bool) error {
func (c *client) sendLeafConnect(clusterName string, headers bool) error {
// We support basic user/pass and operator based user JWT with signatures.
cinfo := leafConnectInfo{
Version: VERSION,
TLS: tlsRequired,
ID: c.srv.info.ID,
Domain: c.srv.info.Domain,
Name: c.srv.info.Name,
Hub: c.leaf.remote.Hub,
Cluster: clusterName,
Headers: headers,
JetStream: c.acc.jetStreamConfigured(),
DenyPub: c.leaf.remote.DenyImports,
Version: VERSION,
ID: c.srv.info.ID,
Domain: c.srv.info.Domain,
Name: c.srv.info.Name,
Hub: c.leaf.remote.Hub,
Cluster: clusterName,
Headers: headers,
JetStream: c.acc.jetStreamConfigured(),
DenyPub: c.leaf.remote.DenyImports,
Compression: c.leaf.compression,
}
// If a signature callback is specified, this takes precedence over anything else.
@@ -967,6 +988,7 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
// Grab server variables
s.mu.Lock()
info = s.copyLeafNodeInfo()
info.Compression = opts.LeafNode.Compression.Mode
s.generateNonce(nonce[:])
s.mu.Unlock()
}
@@ -995,26 +1017,9 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
} else {
// If configured to do TLS handshake first
if tlsFirst {
// Still check if there is really need for TLS in case user set
// this boolean but nothing else...
tlsRequired, tlsConfig, tlsName, tlsTimeout := c.leafNodeGetTLSConfigForSolicit(remote, true)
// If TLS required, peform handshake.
if tlsRequired {
// Get the URL that was used to connect to the remote server.
rURL := remote.getCurrentURL()
// Perform the client-side TLS handshake.
if resetTLSName, err := c.doTLSClientHandshake("leafnode", rURL, tlsConfig, tlsName, tlsTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
// Check if we need to reset the remote's TLS name.
if resetTLSName {
remote.Lock()
remote.tlsName = _EMPTY_
remote.Unlock()
}
c.mu.Unlock()
return nil
}
if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
c.mu.Unlock()
return nil
}
}
// We need to wait for the info, but not for too long.
@@ -1068,7 +1073,20 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
// Leaf nodes will always require a CONNECT to let us know
// when we are properly bound to an account.
c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout))
//
// If compression is configured, we can't set the authTimer here because
// it would cause the parser to fail any incoming protocol that is not a
// CONNECT (and we need to exchange INFO protocols for compression
// negotiation). So instead, use the ping timer until we are done with
// negotiation and can set the auth timer.
timeout := secondsToDuration(opts.LeafNode.AuthTimeout)
if needsCompression(opts.LeafNode.Compression.Mode) {
c.ping.tmr = time.AfterFunc(timeout, func() {
c.authTimeout()
})
} else {
c.setAuthTimer(timeout)
}
}
// Keep track in case server is shutdown before we can successfully register.
@@ -1093,22 +1111,112 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
return c
}
func (c *client) processLeafnodeInfo(info *Info) {
s := c.srv
// Will perform the client-side TLS handshake if needed. Assumes that this
// is called by the solicit side (remote will be non nil). Returns `true`
// if TLS is required, `false` otherwise.
// Lock held on entry.
func (c *client) leafClientHandshakeIfNeeded(remote *leafNodeCfg, opts *Options) (bool, error) {
// Check if TLS is required and gather TLS config variables.
tlsRequired, tlsConfig, tlsName, tlsTimeout := c.leafNodeGetTLSConfigForSolicit(remote)
if !tlsRequired {
return false, nil
}
// If TLS required, peform handshake.
// Get the URL that was used to connect to the remote server.
rURL := remote.getCurrentURL()
// Perform the client-side TLS handshake.
if resetTLSName, err := c.doTLSClientHandshake("leafnode", rURL, tlsConfig, tlsName, tlsTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
// Check if we need to reset the remote's TLS name.
if resetTLSName {
remote.Lock()
remote.tlsName = _EMPTY_
remote.Unlock()
}
return false, err
}
return true, nil
}
func (c *client) processLeafnodeInfo(info *Info) {
c.mu.Lock()
if c.leaf == nil || c.isClosed() {
c.mu.Unlock()
return
}
s := c.srv
opts := s.getOpts()
remote := c.leaf.remote
didSolicit := remote != nil
firstINFO := !c.flags.isSet(infoReceived)
var firstINFO bool
// In case of websocket, the TLS handshake has been already done.
// So check only for non websocket connections and for configurations
// where the TLS Handshake was not done first.
if didSolicit && !c.flags.isSet(handshakeComplete) && !c.isWebsocket() && !remote.TLSHandshakeFirst {
if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
c.mu.Unlock()
return
}
}
// Check for compression, unless already done.
if firstINFO && !c.flags.isSet(compressionNegotiated) {
// Prevent from getting back here.
c.flags.set(compressionNegotiated)
var co *CompressionOpts
if !didSolicit {
co = &opts.LeafNode.Compression
} else {
co = &remote.Compression
}
if needsCompression(co.Mode) {
// Release client lock since following function will need server lock.
c.mu.Unlock()
compress, err := s.negotiateLeafCompression(c, didSolicit, info.Compression, co)
if err != nil {
c.sendErrAndErr(err.Error())
c.closeConnection(ProtocolViolation)
return
}
if compress {
// Done for now, will get back another INFO protocol...
return
}
// No compression because one side does not want/can't, so proceed.
c.mu.Lock()
// Check that the connection did not close if the lock was released.
if c.isClosed() {
c.mu.Unlock()
return
}
} else {
// Coming from an old server, the Compression field would be the empty
// string. For servers that are configured with CompressionNotSupported,
// this makes them behave as old servers.
if info.Compression == _EMPTY_ || co.Mode == CompressionNotSupported {
c.leaf.compression = CompressionNotSupported
} else {
c.leaf.compression = CompressionOff
}
}
// Fall through and process the INFO protocol as usual.
} else if firstINFO && !didSolicit && needsCompression(opts.LeafNode.Compression.Mode) {
// We used the ping timer instead of auth timer when accepting a remote
// connection so that we can exchange INFO protocols and not have the
// parser return a protocol violation. Now that the negotiation is over
// stop the ping timer and set the auth timer.
clearTimer(&c.ping.tmr)
c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout))
}
// Mark that the INFO protocol has been received.
// Note: For now, only the initial INFO has a nonce. We
// will probably do auto key rotation at some point.
if c.flags.setIfNotSet(infoReceived) {
firstINFO = true
if firstINFO {
// Mark that the INFO protocol has been received.
c.flags.set(infoReceived)
// Prevent connecting to non leafnode port. Need to do this only for
// the first INFO, not for async INFO updates...
//
@@ -1128,7 +1236,7 @@ func (c *client) processLeafnodeInfo(info *Info) {
// As seen from above, a solicited LeafNode connection should receive
// from the remote server an INFO with CID and LeafNodeURLs. Anything
// else should be considered an attempt to connect to a wrong port.
if c.leaf.remote != nil && (info.CID == 0 || info.LeafNodeURLs == nil) {
if didSolicit && (info.CID == 0 || info.LeafNodeURLs == nil) {
c.mu.Unlock()
c.Errorf(ErrConnectedToWrongPort.Error())
c.closeConnection(WrongPort)
@@ -1136,8 +1244,8 @@ func (c *client) processLeafnodeInfo(info *Info) {
}
// Capture a nonce here.
c.nonce = []byte(info.Nonce)
if info.TLSRequired && c.leaf.remote != nil {
c.leaf.remote.TLS = true
if info.TLSRequired && didSolicit {
remote.TLS = true
}
supportsHeaders := c.srv.supportsHeaders()
c.headers = supportsHeaders && info.Headers
@@ -1157,7 +1265,7 @@ func (c *client) processLeafnodeInfo(info *Info) {
// For both initial INFO and async INFO protocols, Possibly
// update our list of remote leafnode URLs we can connect to.
if c.leaf.remote != nil && (len(info.LeafNodeURLs) > 0 || len(info.WSConnectURLs) > 0) {
if didSolicit && (len(info.LeafNodeURLs) > 0 || len(info.WSConnectURLs) > 0) {
// Consider the incoming array as the most up-to-date
// representation of the remote cluster's list of URLs.
c.updateLeafNodeURLs(info)
@@ -1191,7 +1299,7 @@ func (c *client) processLeafnodeInfo(info *Info) {
// If this is a remote connection and this is the first INFO protocol,
// then we need to finish the connect process by sending CONNECT, etc..
if firstINFO && c.leaf.remote != nil {
if firstINFO && didSolicit {
// Clear deadline that was set in createLeafNode while waiting for the INFO.
c.nc.SetDeadline(time.Time{})
resumeConnect = true
@@ -1215,6 +1323,67 @@ func (c *client) processLeafnodeInfo(info *Info) {
}
}
func (s *Server) negotiateLeafCompression(c *client, didSolicit bool, infoCompression string, co *CompressionOpts) (bool, error) {
// Negotiate the appropriate compression mode (or no compression)
cm, err := selectCompressionMode(co.Mode, infoCompression)
if err != nil {
return false, err
}
c.mu.Lock()
// For "auto" mode, set the initial compression mode based on RTT
if cm == CompressionS2Auto {
if c.rttStart.IsZero() {
c.rtt = computeRTT(c.start)
}
cm = selectS2AutoModeBasedOnRTT(c.rtt, co.RTTThresholds)
}
// Keep track of the negotiated compression mode.
c.leaf.compression = cm
cid := c.cid
var nonce string
if !didSolicit {
nonce = string(c.nonce)
}
c.mu.Unlock()
if !needsCompression(cm) {
return false, nil
}
// If we end-up doing compression...
// Generate an INFO with the chosen compression mode.
s.mu.Lock()
info := s.copyLeafNodeInfo()
info.Compression, info.CID, info.Nonce = compressionModeForInfoProtocol(co, cm), cid, nonce
infoProto := generateInfoJSON(info)
s.mu.Unlock()
// If we solicited, then send this INFO protocol BEFORE switching
// to compression writer. However, if we did not, we send it after.
c.mu.Lock()
if didSolicit {
c.enqueueProto(infoProto)
// Make sure it is completely flushed (the pending bytes goes to
// 0) before proceeding.
for c.out.pb > 0 && !c.isClosed() {
c.flushOutbound()
}
}
// This is to notify the readLoop that it should switch to a
// (de)compression reader.
c.in.flags.set(switchToCompression)
// Create the compress writer before queueing the INFO protocol for
// a route that did not solicit. It will make sure that that proto
// is sent with compression on.
c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...)
if !didSolicit {
c.enqueueProto(infoProto)
}
c.mu.Unlock()
return true, nil
}
// When getting a leaf node INFO protocol, use the provided
// array of urls to update the list of possible endpoints.
func (c *client) updateLeafNodeURLs(info *Info) {
@@ -1500,8 +1669,6 @@ type leafConnectInfo struct {
Sig string `json:"sig,omitempty"`
User string `json:"user,omitempty"`
Pass string `json:"pass,omitempty"`
TLS bool `json:"tls_required"`
Comp bool `json:"compression,omitempty"`
ID string `json:"server_id,omitempty"`
Domain string `json:"domain,omitempty"`
Name string `json:"name,omitempty"`
@@ -1511,6 +1678,12 @@ type leafConnectInfo struct {
JetStream bool `json:"jetstream,omitempty"`
DenyPub []string `json:"deny_pub,omitempty"`
// There was an existing field called:
// >> Comp bool `json:"compression,omitempty"`
// that has never been used. With support for compression, we now need
// a field that is a string. So we use a different json tag:
Compression string `json:"compress_mode,omitempty"`
// Just used to detect wrong connection attempts.
Gateway string `json:"gateway,omitempty"`
}
@@ -1581,6 +1754,16 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
// support headers and the remote has sent in the CONNECT protocol that it does
// support headers too.
c.headers = supportHeaders && proto.Headers
// If the compression level is still not set, set it based on what has been
// given to us in the CONNECT protocol.
if c.leaf.compression == _EMPTY_ {
// But if proto.Compression is _EMPTY_, set it to CompressionNotSupported
if proto.Compression == _EMPTY_ {
c.leaf.compression = CompressionNotSupported
} else {
c.leaf.compression = proto.Compression
}
}
// Remember the remote server.
c.leaf.remoteServer = proto.Name
@@ -2509,16 +2692,16 @@ func (c *client) setLeafConnectDelayIfSoliciting(delay time.Duration) (string, t
// if TLS is required, and if so, will return a clone of the TLS Config
// (since some fields will be changed during handshake), the TLS server
// name that is remembered, and the TLS timeout.
func (c *client) leafNodeGetTLSConfigForSolicit(remote *leafNodeCfg, needsLock bool) (bool, *tls.Config, string, float64) {
func (c *client) leafNodeGetTLSConfigForSolicit(remote *leafNodeCfg) (bool, *tls.Config, string, float64) {
var (
tlsConfig *tls.Config
tlsName string
tlsTimeout float64
)
if needsLock {
remote.RLock()
}
remote.RLock()
defer remote.RUnlock()
tlsRequired := remote.TLS || remote.TLSConfig != nil
if tlsRequired {
if remote.TLSConfig != nil {
@@ -2532,9 +2715,6 @@ func (c *client) leafNodeGetTLSConfigForSolicit(remote *leafNodeCfg, needsLock b
tlsTimeout = float64(TLS_TIMEOUT / time.Second)
}
}
if needsLock {
remote.RUnlock()
}
return tlsRequired, tlsConfig, tlsName, tlsTimeout
}
@@ -2555,21 +2735,12 @@ func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remot
compress := remote.Websocket.Compression
// By default the server will mask outbound frames, but it can be disabled with this option.
noMasking := remote.Websocket.NoMasking
tlsRequired, tlsConfig, tlsName, tlsTimeout := c.leafNodeGetTLSConfigForSolicit(remote, false)
remote.RUnlock()
// Do TLS here as needed.
if tlsRequired {
// Perform the client-side TLS handshake.
if resetTLSName, err := c.doTLSClientHandshake("leafnode", rURL, tlsConfig, tlsName, tlsTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
// Check if we need to reset the remote's TLS name.
if resetTLSName {
remote.Lock()
remote.tlsName = _EMPTY_
remote.Unlock()
}
// 0 will indicate that the connection was already closed
return nil, 0, err
}
// Will do the client-side TLS handshake if needed.
tlsRequired, err := c.leafClientHandshakeIfNeeded(remote, opts)
if err != nil {
// 0 will indicate that the connection was already closed
return nil, 0, err
}
// For http request, we need the passed URL to contain either http or https scheme.
@@ -2671,7 +2842,7 @@ func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remot
const connectProcessTimeout = 2 * time.Second
// This is invoked for remote LEAF remote connections after processing the INFO
// protocol. This will do the TLS handshake (if need be)
// protocol.
func (s *Server) leafNodeResumeConnectProcess(c *client) {
clusterName := s.ClusterName()
@@ -2680,40 +2851,7 @@ func (s *Server) leafNodeResumeConnectProcess(c *client) {
c.mu.Unlock()
return
}
remote := c.leaf.remote
var tlsRequired bool
// In case of websocket, the TLS handshake has been already done.
// So check only for non websocket connections and for configurations
// where the TLS Handshake was not done first.
if !c.isWebsocket() && !remote.TLSHandshakeFirst {
var tlsConfig *tls.Config
var tlsName string
var tlsTimeout float64
// Check if TLS is required and gather TLS config variables.
tlsRequired, tlsConfig, tlsName, tlsTimeout = c.leafNodeGetTLSConfigForSolicit(remote, true)
// If TLS required, peform handshake.
if tlsRequired {
// Get the URL that was used to connect to the remote server.
rURL := remote.getCurrentURL()
// Perform the client-side TLS handshake.
if resetTLSName, err := c.doTLSClientHandshake("leafnode", rURL, tlsConfig, tlsName, tlsTimeout, c.srv.getOpts().LeafNode.TLSPinnedCerts); err != nil {
// Check if we need to reset the remote's TLS name.
if resetTLSName {
remote.Lock()
remote.tlsName = _EMPTY_
remote.Unlock()
}
c.mu.Unlock()
return
}
}
}
if err := c.sendLeafConnect(clusterName, tlsRequired, c.headers); err != nil {
if err := c.sendLeafConnect(clusterName, c.headers); err != nil {
c.mu.Unlock()
c.closeConnection(WriteError)
return
@@ -2722,6 +2860,9 @@ func (s *Server) leafNodeResumeConnectProcess(c *client) {
// Spin up the write loop.
s.startGoRoutine(func() { c.writeLoop() })
// In case there was compression negotiation, the timer could have been
// already created. Destroy and recreate with different callback.
clearTimer(&c.ping.tmr)
// timeout leafNodeFinishConnectProcess
c.ping.tmr = time.AfterFunc(connectProcessTimeout, func() {
c.mu.Lock()