mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
WIP: socket stats
At this point, we're collecting for gateways, we have the general framework in place, and we're populating unpublished expvars.
This commit is contained in:
@@ -17,6 +17,7 @@ import (
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
@@ -67,6 +68,10 @@ const (
|
||||
okProto = "+OK" + _CRLF_
|
||||
)
|
||||
|
||||
const (
|
||||
diagnosticsInterval = 5 * time.Second
|
||||
)
|
||||
|
||||
func init() {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
}
|
||||
@@ -212,6 +217,7 @@ type client struct {
|
||||
nonce []byte
|
||||
pubKey string
|
||||
nc net.Conn
|
||||
diagConn *net.TCPConn
|
||||
ncs atomic.Value
|
||||
out outbound
|
||||
user *NkeyUser
|
||||
@@ -245,6 +251,13 @@ type client struct {
|
||||
|
||||
trace bool
|
||||
echo bool
|
||||
|
||||
// This is not supported on all platforms; on those, the bool will never be
|
||||
// true and the TCPInfo will deteriorate to struct{}.
|
||||
// Where it is supported, TCPInfo is roughly 1/4 kB.
|
||||
diagConnUsable bool
|
||||
diagTCPData TCPDiagnostics
|
||||
diagMetrics TCPInfoExpMetrics
|
||||
}
|
||||
|
||||
type rrTracking struct {
|
||||
@@ -1094,7 +1107,7 @@ func (c *client) readLoop(pre []byte) {
|
||||
|
||||
// Returns the appropriate closed state for a given read error.
|
||||
func closedStateForErr(err error) ClosedState {
|
||||
if err == io.EOF {
|
||||
if errors.Is(err, io.EOF) {
|
||||
return ClientClosed
|
||||
}
|
||||
return ReadError
|
||||
@@ -4479,6 +4492,38 @@ func (c *client) isClosed() bool {
|
||||
return c.flags.isSet(closeConnection) || c.flags.isSet(connMarkedClosed) || c.nc == nil
|
||||
}
|
||||
|
||||
// diagnosticsLoop reports statistics on the client connection.
|
||||
// Runs in its own Go routine.
|
||||
func (c *client) diagnosticsLoop() {
|
||||
// House-keeping for the server's tracking of extant go-routines.
|
||||
c.mu.Lock()
|
||||
s := c.srv
|
||||
defer s.grWG.Done()
|
||||
if c.isClosed() {
|
||||
c.mu.Unlock()
|
||||
return
|
||||
}
|
||||
c.mu.Unlock()
|
||||
var err error
|
||||
for {
|
||||
time.Sleep(diagnosticsInterval)
|
||||
c.mu.Lock()
|
||||
err = GetSocketTCPDiagnostics(c.diagConn, &c.diagTCPData)
|
||||
if err != nil {
|
||||
c.mu.Unlock()
|
||||
if closedStateForErr(err) == ClientClosed {
|
||||
c.Debugf("diagnostics loop shutting down on client closed")
|
||||
} else {
|
||||
// FIXME: do we really want to exit for non-closed errors? TBD.
|
||||
c.Errorf("diagnostics loop exiting: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
(&c.diagMetrics).PopulateFromTCPDiagnostics(&c.diagTCPData)
|
||||
c.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// Logging functionality scoped to a client or route.
|
||||
func (c *client) Error(err error) {
|
||||
c.srv.Errors(c, err)
|
||||
|
||||
@@ -679,6 +679,11 @@ func (s *Server) createGateway(cfg *gatewayCfg, url *url.URL, conn net.Conn) {
|
||||
now := time.Now()
|
||||
c := &client{srv: s, nc: conn, start: now, last: now, kind: GATEWAY}
|
||||
|
||||
// Stash raw connection required for getting socket diagnostics
|
||||
if PlatformCanGetSocketTCPInfo {
|
||||
c.diagConn, c.diagConnUsable = conn.(*net.TCPConn)
|
||||
}
|
||||
|
||||
// Are we creating the gateway based on the configuration
|
||||
solicit := cfg != nil
|
||||
var tlsRequired bool
|
||||
@@ -813,6 +818,11 @@ func (s *Server) createGateway(cfg *gatewayCfg, url *url.URL, conn net.Conn) {
|
||||
// Spin up the write loop.
|
||||
s.startGoRoutine(func() { c.writeLoop() })
|
||||
|
||||
// Start up diagnostics.
|
||||
if c.diagConnUsable {
|
||||
s.startGoRoutine(func() { c.diagnosticsLoop() })
|
||||
}
|
||||
|
||||
if tlsRequired {
|
||||
c.Debugf("TLS handshake complete")
|
||||
cs := c.nc.(*tls.Conn).ConnectionState()
|
||||
|
||||
121
server/tcpinfo_linux.go
Normal file
121
server/tcpinfo_linux.go
Normal file
@@ -0,0 +1,121 @@
|
||||
// Copyright 2020 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package server
|
||||
|
||||
import (
|
||||
"expvar"
|
||||
"fmt"
|
||||
"net"
|
||||
"syscall"
|
||||
"unsafe"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
type TCPInfo = syscall.TCPInfo
|
||||
|
||||
type TCPDiagnostics struct {
|
||||
Info TCPInfo
|
||||
UnreadData uint32
|
||||
UnsentData uint32
|
||||
}
|
||||
|
||||
const PlatformCanGetSocketTCPInfo = true
|
||||
|
||||
// GetSocketTCPDiagnostics relies upon a non-portable Linux-ism for returning a
|
||||
// lot of data about a connected socket.
|
||||
// We expect this to be called routinely, so we expect the caller to provide
|
||||
// the existing memory object to be written to.
|
||||
func GetSocketTCPDiagnostics(conn *net.TCPConn, diag *TCPDiagnostics) error {
|
||||
if conn == nil {
|
||||
return fmt.Errorf("GetSocketTCPDiagnostics: %w", ErrConnectionClosed)
|
||||
}
|
||||
rawConn, err := conn.SyscallConn()
|
||||
if err != nil {
|
||||
return fmt.Errorf("GetSocketTCPDiagnostics: %w", err)
|
||||
}
|
||||
|
||||
*diag = TCPDiagnostics{}
|
||||
infoSize := syscall.SizeofTCPInfo
|
||||
|
||||
err = nil
|
||||
rawConn.Control(func(fd uintptr) {
|
||||
sysErr := syscall.Errno(0)
|
||||
ret := uintptr(0)
|
||||
for sysErr == 0 || sysErr == syscall.EINTR {
|
||||
_, _, sysErr = syscall.Syscall6(
|
||||
syscall.SYS_GETSOCKOPT, fd,
|
||||
syscall.SOL_TCP, syscall.TCP_INFO,
|
||||
uintptr(unsafe.Pointer(&diag.Info)), uintptr(unsafe.Pointer(&infoSize)),
|
||||
0)
|
||||
}
|
||||
if sysErr != 0 {
|
||||
err = fmt.Errorf("GetSocketTCPDiagnostics: getsockopt(TCP_INFO) failed: %w", sysErr)
|
||||
return
|
||||
}
|
||||
for sysErr == 0 || sysErr == syscall.EINTR {
|
||||
ret, _, sysErr = syscall.Syscall(syscall.SYS_IOCTL, fd, unix.SIOCINQ, 0)
|
||||
}
|
||||
if sysErr != 0 {
|
||||
err = fmt.Errorf("GetSocketTCPDiagnostics: getsockopt(SIOCINQ) failed: %w", sysErr)
|
||||
return
|
||||
}
|
||||
diag.UnreadData = uint32(ret)
|
||||
|
||||
for sysErr == 0 || sysErr == syscall.EINTR {
|
||||
ret, _, sysErr = syscall.Syscall(syscall.SYS_IOCTL, fd, unix.SIOCOUTQ, 0)
|
||||
}
|
||||
if sysErr != 0 {
|
||||
err = fmt.Errorf("GetSocketTCPDiagnostics: getsockopt(SIOCOUTQ) failed: %w", sysErr)
|
||||
return
|
||||
}
|
||||
diag.UnsentData = uint32(ret)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type TCPInfoExpMetrics struct {
|
||||
// nb: the Golang syscall/ztypes_linux_amd64.go stops at Total_retrans
|
||||
// So we miss out on things like tcpi_notsent_bytes
|
||||
UnreadData expvar.Int
|
||||
UnsentData expvar.Int
|
||||
UnAckedPackets expvar.Int
|
||||
LostPackets expvar.Int
|
||||
RetransOutPackets expvar.Int
|
||||
TotalRetransPackets expvar.Int
|
||||
PathMTU expvar.Int
|
||||
LastDataSentMSec expvar.Int // result of jiffies_to_msecs(), now-timestamp
|
||||
LastDataRecvMSec expvar.Int // result of jiffies_to_msecs(), now-timestamp
|
||||
RTT expvar.Int
|
||||
RTTVariance expvar.Int
|
||||
}
|
||||
|
||||
func (m *TCPInfoExpMetrics) PopulateFromTCPDiagnostics(d *TCPDiagnostics) {
|
||||
m.UnreadData.Set(int64(d.UnreadData))
|
||||
m.UnsentData.Set(int64(d.UnsentData))
|
||||
m.UnAckedPackets.Set(int64(d.Info.Unacked))
|
||||
m.LostPackets.Set(int64(d.Info.Lost))
|
||||
m.RetransOutPackets.Set(int64(d.Info.Retrans))
|
||||
m.TotalRetransPackets.Set(int64(d.Info.Total_retrans))
|
||||
m.PathMTU.Set(int64(d.Info.Pmtu))
|
||||
m.LastDataSentMSec.Set(int64(d.Info.Last_data_sent))
|
||||
m.LastDataRecvMSec.Set(int64(d.Info.Last_data_recv))
|
||||
m.RTT.Set(int64(d.Info.Rtt))
|
||||
m.RTTVariance.Set(int64(d.Info.Rttvar))
|
||||
}
|
||||
42
server/tcpinfo_other.go
Normal file
42
server/tcpinfo_other.go
Normal file
@@ -0,0 +1,42 @@
|
||||
// Copyright 2020 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// +build !linux
|
||||
|
||||
package server
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net"
|
||||
)
|
||||
|
||||
// These are empty but should exist so that they can be embedded in state
|
||||
// structs so that we don't have heavy(ier) GC churn.
|
||||
type TCPInfo struct{}
|
||||
type TCPDiagnostics struct{}
|
||||
type TCPInfoExpMetrics struct{}
|
||||
|
||||
var ErrNotSupported = errors.New("error: operation not supported on this platform")
|
||||
|
||||
const PlatformCanGetSocketTCPInfo = false
|
||||
|
||||
// GetSocketTCPDiagnostics populates a TCPDiagnostics structure.
|
||||
// The core of this relies upon a non-portable Linux-ism for returning a lot of
|
||||
// data about a connected socket.
|
||||
func GetSocketTCPDiagnostics(conn *net.TCPConn, diag *TCPDiagnostics) error {
|
||||
return ErrNotImplemented
|
||||
}
|
||||
|
||||
func (m *TCPInfoExpMetrics) PopulateFromTCPDiagnostics(d *TCPDiagnostics) {}
|
||||
|
||||
// There will be other functions here, as we populate maps.
|
||||
Reference in New Issue
Block a user