From 65be9706b3603b4714bb330ca000897ced2eb11b Mon Sep 17 00:00:00 2001 From: Phil Pennock Date: Tue, 13 Oct 2020 18:26:28 -0400 Subject: [PATCH] WIP: socket stats At this point, we're collecting for gateways, we have the general framework in place, and we're populating unpublished expvars. --- server/client.go | 47 +++++++++++++++- server/gateway.go | 10 ++++ server/tcpinfo_linux.go | 121 ++++++++++++++++++++++++++++++++++++++++ server/tcpinfo_other.go | 42 ++++++++++++++ 4 files changed, 219 insertions(+), 1 deletion(-) create mode 100644 server/tcpinfo_linux.go create mode 100644 server/tcpinfo_other.go diff --git a/server/client.go b/server/client.go index 112afdd4..8f592be1 100644 --- a/server/client.go +++ b/server/client.go @@ -17,6 +17,7 @@ import ( "bytes" "crypto/tls" "encoding/json" + "errors" "fmt" "io" "math/rand" @@ -67,6 +68,10 @@ const ( okProto = "+OK" + _CRLF_ ) +const ( + diagnosticsInterval = 5 * time.Second +) + func init() { rand.Seed(time.Now().UnixNano()) } @@ -212,6 +217,7 @@ type client struct { nonce []byte pubKey string nc net.Conn + diagConn *net.TCPConn ncs atomic.Value out outbound user *NkeyUser @@ -245,6 +251,13 @@ type client struct { trace bool echo bool + + // This is not supported on all platforms; on those, the bool will never be + // true and the TCPInfo will deteriorate to struct{}. + // Where it is supported, TCPInfo is roughly 1/4 kB. + diagConnUsable bool + diagTCPData TCPDiagnostics + diagMetrics TCPInfoExpMetrics } type rrTracking struct { @@ -1094,7 +1107,7 @@ func (c *client) readLoop(pre []byte) { // Returns the appropriate closed state for a given read error. func closedStateForErr(err error) ClosedState { - if err == io.EOF { + if errors.Is(err, io.EOF) { return ClientClosed } return ReadError @@ -4479,6 +4492,38 @@ func (c *client) isClosed() bool { return c.flags.isSet(closeConnection) || c.flags.isSet(connMarkedClosed) || c.nc == nil } +// diagnosticsLoop reports statistics on the client connection. +// Runs in its own Go routine. +func (c *client) diagnosticsLoop() { + // House-keeping for the server's tracking of extant go-routines. + c.mu.Lock() + s := c.srv + defer s.grWG.Done() + if c.isClosed() { + c.mu.Unlock() + return + } + c.mu.Unlock() + var err error + for { + time.Sleep(diagnosticsInterval) + c.mu.Lock() + err = GetSocketTCPDiagnostics(c.diagConn, &c.diagTCPData) + if err != nil { + c.mu.Unlock() + if closedStateForErr(err) == ClientClosed { + c.Debugf("diagnostics loop shutting down on client closed") + } else { + // FIXME: do we really want to exit for non-closed errors? TBD. + c.Errorf("diagnostics loop exiting: %v", err) + } + return + } + (&c.diagMetrics).PopulateFromTCPDiagnostics(&c.diagTCPData) + c.mu.Unlock() + } +} + // Logging functionality scoped to a client or route. func (c *client) Error(err error) { c.srv.Errors(c, err) diff --git a/server/gateway.go b/server/gateway.go index 6b0aff05..62fc3ea6 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -679,6 +679,11 @@ func (s *Server) createGateway(cfg *gatewayCfg, url *url.URL, conn net.Conn) { now := time.Now() c := &client{srv: s, nc: conn, start: now, last: now, kind: GATEWAY} + // Stash raw connection required for getting socket diagnostics + if PlatformCanGetSocketTCPInfo { + c.diagConn, c.diagConnUsable = conn.(*net.TCPConn) + } + // Are we creating the gateway based on the configuration solicit := cfg != nil var tlsRequired bool @@ -813,6 +818,11 @@ func (s *Server) createGateway(cfg *gatewayCfg, url *url.URL, conn net.Conn) { // Spin up the write loop. s.startGoRoutine(func() { c.writeLoop() }) + // Start up diagnostics. + if c.diagConnUsable { + s.startGoRoutine(func() { c.diagnosticsLoop() }) + } + if tlsRequired { c.Debugf("TLS handshake complete") cs := c.nc.(*tls.Conn).ConnectionState() diff --git a/server/tcpinfo_linux.go b/server/tcpinfo_linux.go new file mode 100644 index 00000000..f3335e21 --- /dev/null +++ b/server/tcpinfo_linux.go @@ -0,0 +1,121 @@ +// Copyright 2020 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "expvar" + "fmt" + "net" + "syscall" + "unsafe" + + "golang.org/x/sys/unix" +) + +type TCPInfo = syscall.TCPInfo + +type TCPDiagnostics struct { + Info TCPInfo + UnreadData uint32 + UnsentData uint32 +} + +const PlatformCanGetSocketTCPInfo = true + +// GetSocketTCPDiagnostics relies upon a non-portable Linux-ism for returning a +// lot of data about a connected socket. +// We expect this to be called routinely, so we expect the caller to provide +// the existing memory object to be written to. +func GetSocketTCPDiagnostics(conn *net.TCPConn, diag *TCPDiagnostics) error { + if conn == nil { + return fmt.Errorf("GetSocketTCPDiagnostics: %w", ErrConnectionClosed) + } + rawConn, err := conn.SyscallConn() + if err != nil { + return fmt.Errorf("GetSocketTCPDiagnostics: %w", err) + } + + *diag = TCPDiagnostics{} + infoSize := syscall.SizeofTCPInfo + + err = nil + rawConn.Control(func(fd uintptr) { + sysErr := syscall.Errno(0) + ret := uintptr(0) + for sysErr == 0 || sysErr == syscall.EINTR { + _, _, sysErr = syscall.Syscall6( + syscall.SYS_GETSOCKOPT, fd, + syscall.SOL_TCP, syscall.TCP_INFO, + uintptr(unsafe.Pointer(&diag.Info)), uintptr(unsafe.Pointer(&infoSize)), + 0) + } + if sysErr != 0 { + err = fmt.Errorf("GetSocketTCPDiagnostics: getsockopt(TCP_INFO) failed: %w", sysErr) + return + } + for sysErr == 0 || sysErr == syscall.EINTR { + ret, _, sysErr = syscall.Syscall(syscall.SYS_IOCTL, fd, unix.SIOCINQ, 0) + } + if sysErr != 0 { + err = fmt.Errorf("GetSocketTCPDiagnostics: getsockopt(SIOCINQ) failed: %w", sysErr) + return + } + diag.UnreadData = uint32(ret) + + for sysErr == 0 || sysErr == syscall.EINTR { + ret, _, sysErr = syscall.Syscall(syscall.SYS_IOCTL, fd, unix.SIOCOUTQ, 0) + } + if sysErr != 0 { + err = fmt.Errorf("GetSocketTCPDiagnostics: getsockopt(SIOCOUTQ) failed: %w", sysErr) + return + } + diag.UnsentData = uint32(ret) + }) + + if err != nil { + return err + } + + return nil +} + +type TCPInfoExpMetrics struct { + // nb: the Golang syscall/ztypes_linux_amd64.go stops at Total_retrans + // So we miss out on things like tcpi_notsent_bytes + UnreadData expvar.Int + UnsentData expvar.Int + UnAckedPackets expvar.Int + LostPackets expvar.Int + RetransOutPackets expvar.Int + TotalRetransPackets expvar.Int + PathMTU expvar.Int + LastDataSentMSec expvar.Int // result of jiffies_to_msecs(), now-timestamp + LastDataRecvMSec expvar.Int // result of jiffies_to_msecs(), now-timestamp + RTT expvar.Int + RTTVariance expvar.Int +} + +func (m *TCPInfoExpMetrics) PopulateFromTCPDiagnostics(d *TCPDiagnostics) { + m.UnreadData.Set(int64(d.UnreadData)) + m.UnsentData.Set(int64(d.UnsentData)) + m.UnAckedPackets.Set(int64(d.Info.Unacked)) + m.LostPackets.Set(int64(d.Info.Lost)) + m.RetransOutPackets.Set(int64(d.Info.Retrans)) + m.TotalRetransPackets.Set(int64(d.Info.Total_retrans)) + m.PathMTU.Set(int64(d.Info.Pmtu)) + m.LastDataSentMSec.Set(int64(d.Info.Last_data_sent)) + m.LastDataRecvMSec.Set(int64(d.Info.Last_data_recv)) + m.RTT.Set(int64(d.Info.Rtt)) + m.RTTVariance.Set(int64(d.Info.Rttvar)) +} diff --git a/server/tcpinfo_other.go b/server/tcpinfo_other.go new file mode 100644 index 00000000..a8132b33 --- /dev/null +++ b/server/tcpinfo_other.go @@ -0,0 +1,42 @@ +// Copyright 2020 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !linux + +package server + +import ( + "errors" + "net" +) + +// These are empty but should exist so that they can be embedded in state +// structs so that we don't have heavy(ier) GC churn. +type TCPInfo struct{} +type TCPDiagnostics struct{} +type TCPInfoExpMetrics struct{} + +var ErrNotSupported = errors.New("error: operation not supported on this platform") + +const PlatformCanGetSocketTCPInfo = false + +// GetSocketTCPDiagnostics populates a TCPDiagnostics structure. +// The core of this relies upon a non-portable Linux-ism for returning a lot of +// data about a connected socket. +func GetSocketTCPDiagnostics(conn *net.TCPConn, diag *TCPDiagnostics) error { + return ErrNotImplemented +} + +func (m *TCPInfoExpMetrics) PopulateFromTCPDiagnostics(d *TCPDiagnostics) {} + +// There will be other functions here, as we populate maps.