mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
12
TODO.md
12
TODO.md
@@ -1,10 +1,9 @@
|
||||
|
||||
# General
|
||||
|
||||
- [ ] Pedantic state
|
||||
- [ ] brew, apt-get, rpm, chocately (windows)
|
||||
- [ ] Dynamic socket buffer sizes
|
||||
- [ ] Switch to 1.4/1.5 and use maps vs hashmaps in sublist
|
||||
- [ ] brew, apt-get, rpm, chocately (windows)
|
||||
- [ ] Sublist better at high concurrency, cache uses writelock currently
|
||||
- [ ] Buffer pools/sync pools?
|
||||
- [ ] IOVec pools and writev for high fanout?
|
||||
@@ -14,10 +13,15 @@
|
||||
- [ ] Modify cluster support for single message across routes between pub/sub and d-queue
|
||||
- [ ] Memory limits/warnings?
|
||||
- [ ] Limit number of subscriptions a client can have, total memory usage etc.
|
||||
- [ ] Gossip Protocol for discovery for clustering
|
||||
- [ ] Info updates contain other implicit route servers
|
||||
- [ ] Pedantic state
|
||||
- [ ] Multi-tenant accounts with isolation of subject space
|
||||
- [ ] Add to varz, time for slow consumers, peek or total connections, memory, etc.
|
||||
- [ ] Track last activity time per connection?
|
||||
- [X] Add total connections to varz so we won't miss spikes, etc.
|
||||
- [X] Add starttime and uptime to connz list.
|
||||
- [X] Gossip Protocol for discovery for clustering
|
||||
- [X] Add in HTTP requests to varz?
|
||||
- [X] Add favico and help link for monitoring?
|
||||
- [X] Better user/pass support using bcrypt etc.
|
||||
- [X] SSL/TLS support
|
||||
- [X] Add support for / to point to varz, connz, etc..
|
||||
|
||||
@@ -1,38 +1,52 @@
|
||||
2015 iMac 5k 4Ghz i7
|
||||
MacOSX 10.11.2
|
||||
|
||||
====================
|
||||
Go version go1.5.2
|
||||
====================
|
||||
|
||||
PASS
|
||||
Benchmark_GoMap___GetSmallKey-8 300000000 5.06 ns/op 197.63 mops/s
|
||||
Benchmark_HashMap_GetSmallKey-8 100000000 10.6 ns/op 94.34 mops/s
|
||||
Benchmark_GoMap____GetMedKey-8 300000000 5.09 ns/op 196.46 mops/s
|
||||
Benchmark_HashMap__GetMedKey-8 200000000 6.75ns/op 148.15 mops/s
|
||||
Benchmark_GoMap____GetLrgKey-8 300000000 4.88ns/op 204.91 mops/s
|
||||
Benchmark_HashMap__GetLrgKey-8 100000000 17.8 ns/op 56.18 mops/s
|
||||
Benchmark_GoMap_________Set-8 50000000 26.3 ns/op 38.02 mops/s
|
||||
Benchmark_HashMap_______Set-8 20000000 82.4 ns/op 12.13 mops/s
|
||||
|
||||
2013 MacbookAir 11" i7 1.7Ghz Haswell
|
||||
Linux mint15 3.8.0-19
|
||||
|
||||
================
|
||||
====================
|
||||
Go version go1.2.1
|
||||
================
|
||||
====================
|
||||
|
||||
Benchmark_GoMap___GetSmallKey 500000000 7.57 ns/op 132.05 mops/s
|
||||
Benchmark_HashMap_GetSmallKey 100000000 14.30 ns/op 70.08 mops/s
|
||||
Benchmark_GoMap____GetMedKey 500000000 4.83 ns/op 207.01 mops/s
|
||||
Benchmark_HashMap__GetMedKey 200000000 9.54 ns/op 104.82 mops/s
|
||||
Benchmark_GoMap____GetLrgKey 500000000 4.39 ns/op 227.79 mops/s
|
||||
Benchmark_HashMap__GetLrgKey 100000000 24.50 ns/op 40.77 mops/s
|
||||
Benchmark_GoMap___GetSmallKey 500000000 7.57 ns/op 132.05 mops/s
|
||||
Benchmark_HashMap_GetSmallKey 100000000 14.30 ns/op 70.08 mops/s
|
||||
Benchmark_GoMap____GetMedKey 500000000 4.83 ns/op 207.01 mops/s
|
||||
Benchmark_HashMap__GetMedKey 200000000 9.54 ns/op 104.82 mops/s
|
||||
Benchmark_GoMap____GetLrgKey 500000000 4.39 ns/op 227.79 mops/s
|
||||
Benchmark_HashMap__GetLrgKey 100000000 24.50 ns/op 40.77 mops/s
|
||||
|
||||
================
|
||||
====================
|
||||
Go version go1.2.1
|
||||
================
|
||||
====================
|
||||
|
||||
Benchmark_GoMap___GetSmallKey 200000000 8.77 ns/op 114.02 mops/s
|
||||
Benchmark_HashMap_GetSmallKey 100000000 14.80 ns/op 67.53 mops/s
|
||||
Benchmark_GoMap____GetMedKey 500000000 6.21 ns/op 161.05 mops/s
|
||||
Benchmark_HashMap__GetMedKey 200000000 9.51 ns/op 105.15 mops/s
|
||||
Benchmark_GoMap____GetLrgKey 100000000 18.30 ns/op 54.68 mops/s
|
||||
Benchmark_HashMap__GetLrgKey 100000000 24.80 ns/op 40.36 mops/s
|
||||
Benchmark_GoMap___GetSmallKey 200000000 8.77 ns/op 114.02 mops/s
|
||||
Benchmark_HashMap_GetSmallKey 100000000 14.80 ns/op 67.53 mops/s
|
||||
Benchmark_GoMap____GetMedKey 500000000 6.21 ns/op 161.05 mops/s
|
||||
Benchmark_HashMap__GetMedKey 200000000 9.51 ns/op 105.15 mops/s
|
||||
Benchmark_GoMap____GetLrgKey 100000000 18.30 ns/op 54.68 mops/s
|
||||
Benchmark_HashMap__GetLrgKey 100000000 24.80 ns/op 40.36 mops/s
|
||||
|
||||
================
|
||||
====================
|
||||
Go version go1.0.3
|
||||
================
|
||||
|
||||
Benchmark_GoMap___GetSmallKey 50000000 52.20 ns/op 19.17 mops/s
|
||||
Benchmark_HashMap_GetSmallKey 100000000 15.50 ns/op 64.34 mops/s
|
||||
Benchmark_GoMap____GetMedKey 50000000 61.60 ns/op 16.24 mops/s
|
||||
Benchmark_HashMap__GetMedKey 200000000 8.91 ns/op 112.20 mops/s
|
||||
Benchmark_GoMap____GetLrgKey 20000000 86.90 ns/op 11.51 mops/s
|
||||
Benchmark_HashMap__GetLrgKey 100000000 25.40 ns/op 39.44 mops/s
|
||||
|
||||
|
||||
====================
|
||||
|
||||
Benchmark_GoMap___GetSmallKey 50000000 52.20 ns/op 19.17 mops/s
|
||||
Benchmark_HashMap_GetSmallKey 100000000 15.50 ns/op 64.34 mops/s
|
||||
Benchmark_GoMap____GetMedKey 50000000 61.60 ns/op 16.24 mops/s
|
||||
Benchmark_HashMap__GetMedKey 200000000 8.91 ns/op 112.20 mops/s
|
||||
Benchmark_GoMap____GetLrgKey 20000000 86.90 ns/op 11.51 mops/s
|
||||
Benchmark_HashMap__GetLrgKey 100000000 25.40 ns/op 39.44 mops/s
|
||||
|
||||
@@ -31,22 +31,23 @@ const (
|
||||
)
|
||||
|
||||
type client struct {
|
||||
mu sync.Mutex
|
||||
typ int
|
||||
cid uint64
|
||||
lang string
|
||||
opts clientOpts
|
||||
nc net.Conn
|
||||
mpay int
|
||||
ncs string
|
||||
bw *bufio.Writer
|
||||
srv *Server
|
||||
subs *hashmap.HashMap
|
||||
pcd map[*client]struct{}
|
||||
atmr *time.Timer
|
||||
ptmr *time.Timer
|
||||
pout int
|
||||
msgb [msgScratchSize]byte
|
||||
mu sync.Mutex
|
||||
typ int
|
||||
cid uint64
|
||||
lang string
|
||||
opts clientOpts
|
||||
start time.Time
|
||||
nc net.Conn
|
||||
mpay int
|
||||
ncs string
|
||||
bw *bufio.Writer
|
||||
srv *Server
|
||||
subs *hashmap.HashMap
|
||||
pcd map[*client]struct{}
|
||||
atmr *time.Timer
|
||||
ptmr *time.Timer
|
||||
pout int
|
||||
msgb [msgScratchSize]byte
|
||||
|
||||
parseState
|
||||
stats
|
||||
|
||||
@@ -34,21 +34,23 @@ type Connz struct {
|
||||
|
||||
// ConnInfo has detailed information on a per connection basis.
|
||||
type ConnInfo struct {
|
||||
Cid uint64 `json:"cid"`
|
||||
IP string `json:"ip"`
|
||||
Port int `json:"port"`
|
||||
Pending int `json:"pending_bytes"`
|
||||
InMsgs int64 `json:"in_msgs"`
|
||||
OutMsgs int64 `json:"out_msgs"`
|
||||
InBytes int64 `json:"in_bytes"`
|
||||
OutBytes int64 `json:"out_bytes"`
|
||||
NumSubs uint32 `json:"subscriptions"`
|
||||
Name string `json:"name,omitempty"`
|
||||
Lang string `json:"lang,omitempty"`
|
||||
Version string `json:"version,omitempty"`
|
||||
TLSVersion string `json:"tls_version,omitempty"`
|
||||
TLSCipher string `json:"tls_cipher_suite,omitempty"`
|
||||
Subs []string `json:"subscriptions_list,omitempty"`
|
||||
Cid uint64 `json:"cid"`
|
||||
IP string `json:"ip"`
|
||||
Port int `json:"port"`
|
||||
Start time.Time `json:"start"`
|
||||
Uptime string `json:"uptime"`
|
||||
Pending int `json:"pending_bytes"`
|
||||
InMsgs int64 `json:"in_msgs"`
|
||||
OutMsgs int64 `json:"out_msgs"`
|
||||
InBytes int64 `json:"in_bytes"`
|
||||
OutBytes int64 `json:"out_bytes"`
|
||||
NumSubs uint32 `json:"subscriptions"`
|
||||
Name string `json:"name,omitempty"`
|
||||
Lang string `json:"lang,omitempty"`
|
||||
Version string `json:"version,omitempty"`
|
||||
TLSVersion string `json:"tls_version,omitempty"`
|
||||
TLSCipher string `json:"tls_cipher_suite,omitempty"`
|
||||
Subs []string `json:"subscriptions_list,omitempty"`
|
||||
}
|
||||
|
||||
const DefaultConnListSize = 1024
|
||||
@@ -69,6 +71,7 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// Walk the list
|
||||
s.mu.Lock()
|
||||
s.httpReqStats[ConnzPath]++
|
||||
tlsRequired := s.info.TLSRequired
|
||||
c.NumConns = len(s.clients)
|
||||
|
||||
@@ -114,12 +117,17 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
pairs = pairs[minoff:maxoff]
|
||||
|
||||
// Use same now for all uptime calculations.
|
||||
now := time.Now()
|
||||
|
||||
for _, pair := range pairs {
|
||||
client := pair.Val
|
||||
client.mu.Lock()
|
||||
|
||||
ci := &ConnInfo{
|
||||
Cid: client.cid,
|
||||
Start: client.start,
|
||||
Uptime: myUptime(now.Sub(client.start)),
|
||||
InMsgs: client.inMsgs,
|
||||
OutMsgs: client.outMsgs,
|
||||
InBytes: client.inBytes,
|
||||
@@ -206,6 +214,8 @@ func (s *Server) HandleRoutez(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// Walk the list
|
||||
s.mu.Lock()
|
||||
|
||||
s.httpReqStats[RoutezPath]++
|
||||
rs.NumRoutes = len(s.routes)
|
||||
|
||||
for _, r := range s.routes {
|
||||
@@ -245,6 +255,10 @@ func (s *Server) HandleRoutez(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// HandleStats process HTTP requests for subjects stats.
|
||||
func (s *Server) HandleSubsz(w http.ResponseWriter, r *http.Request) {
|
||||
s.mu.Lock()
|
||||
s.httpReqStats[SubszPath]++
|
||||
s.mu.Unlock()
|
||||
|
||||
st := &Subsz{s.sl.Stats()}
|
||||
|
||||
b, err := json.MarshalIndent(st, "", " ")
|
||||
@@ -260,22 +274,25 @@ func (s *Server) HandleSubsz(w http.ResponseWriter, r *http.Request) {
|
||||
type Varz struct {
|
||||
*Info
|
||||
*Options
|
||||
Port int `json:"port"`
|
||||
MaxPayload int `json:"max_payload"`
|
||||
Start time.Time `json:"start"`
|
||||
Now time.Time `json:"now"`
|
||||
Uptime string `json:"uptime"`
|
||||
Mem int64 `json:"mem"`
|
||||
Cores int `json:"cores"`
|
||||
CPU float64 `json:"cpu"`
|
||||
Connections int `json:"connections"`
|
||||
Routes int `json:"routes"`
|
||||
Remotes int `json:"remotes"`
|
||||
InMsgs int64 `json:"in_msgs"`
|
||||
OutMsgs int64 `json:"out_msgs"`
|
||||
InBytes int64 `json:"in_bytes"`
|
||||
OutBytes int64 `json:"out_bytes"`
|
||||
SlowConsumers int64 `json:"slow_consumers"`
|
||||
Port int `json:"port"`
|
||||
MaxPayload int `json:"max_payload"`
|
||||
Start time.Time `json:"start"`
|
||||
Now time.Time `json:"now"`
|
||||
Uptime string `json:"uptime"`
|
||||
Mem int64 `json:"mem"`
|
||||
Cores int `json:"cores"`
|
||||
CPU float64 `json:"cpu"`
|
||||
Connections int `json:"connections"`
|
||||
TotalConnections uint64 `json:"total_connections"`
|
||||
Routes int `json:"routes"`
|
||||
Remotes int `json:"remotes"`
|
||||
InMsgs int64 `json:"in_msgs"`
|
||||
OutMsgs int64 `json:"out_msgs"`
|
||||
InBytes int64 `json:"in_bytes"`
|
||||
OutBytes int64 `json:"out_bytes"`
|
||||
SlowConsumers int64 `json:"slow_consumers"`
|
||||
|
||||
HTTPReqStats map[string]uint64 `json:"http_req_stats"`
|
||||
}
|
||||
|
||||
type usage struct {
|
||||
@@ -309,23 +326,33 @@ func myUptime(d time.Duration) string {
|
||||
|
||||
// HandleRoot will show basic info and links to others handlers.
|
||||
func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) {
|
||||
// This feels dumb to me, but is required: https://code.google.com/p/go/issues/detail?id=4799
|
||||
if r.URL.Path != "/" {
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
s.mu.Lock()
|
||||
s.httpReqStats[RootPath]++
|
||||
s.mu.Unlock()
|
||||
fmt.Fprintf(w, `<html lang="en">
|
||||
<head>
|
||||
<style type="text/css">
|
||||
body { font-family: “Century Gothic”, CenturyGothic, AppleGothic, sans-serif; font-size: 18; }
|
||||
a { margin-left: 32px; }
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<img src="http://nats.io/img/logo.png" alt="NATS">
|
||||
<br/>
|
||||
<a href=/varz>varz</a><br/>
|
||||
<a href=/connz>connz</a><br/>
|
||||
<a href=/routez>routez</a><br/>
|
||||
<a href=/subsz>subsz</a><br/>
|
||||
</body>
|
||||
</html>
|
||||
`)
|
||||
<head>
|
||||
<link rel="shortcut icon" href="http://nats.io/img/favicon.ico">
|
||||
<style type="text/css">
|
||||
body { font-family: “Century Gothic”, CenturyGothic, AppleGothic, sans-serif; font-size: 22; }
|
||||
a { margin-left: 32px; }
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<img src="http://nats.io/img/logo.png" alt="NATS">
|
||||
<br/>
|
||||
<a href=/varz>varz</a><br/>
|
||||
<a href=/connz>connz</a><br/>
|
||||
<a href=/routez>routez</a><br/>
|
||||
<a href=/subsz>subsz</a><br/>
|
||||
<br/>
|
||||
<a href=http://nats.io/documentation/server/gnatsd-monitoring/>help</a>
|
||||
</body>
|
||||
</html>`)
|
||||
}
|
||||
|
||||
// HandleVarz will process HTTP requests for server information.
|
||||
@@ -339,6 +366,7 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
s.mu.Lock()
|
||||
v.Connections = len(s.clients)
|
||||
v.TotalConnections = s.totalClients
|
||||
v.Routes = len(s.routes)
|
||||
v.Remotes = len(s.remotes)
|
||||
v.InMsgs = s.inMsgs
|
||||
@@ -346,6 +374,8 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) {
|
||||
v.OutMsgs = s.outMsgs
|
||||
v.OutBytes = s.outBytes
|
||||
v.SlowConsumers = s.slowConsumers
|
||||
s.httpReqStats[VarzPath]++
|
||||
v.HTTPReqStats = s.httpReqStats
|
||||
s.mu.Unlock()
|
||||
|
||||
b, err := json.MarshalIndent(v, "", " ")
|
||||
|
||||
@@ -145,6 +145,9 @@ func TestVarz(t *testing.T) {
|
||||
if v.Connections != 1 {
|
||||
t.Fatalf("Expected Connections of 1, got %v\n", v.Connections)
|
||||
}
|
||||
if v.TotalConnections < 1 {
|
||||
t.Fatalf("Expected Total Connections of at least 1, got %v\n", v.TotalConnections)
|
||||
}
|
||||
if v.InMsgs != 1 {
|
||||
t.Fatalf("Expected InMsgs of 1, got %v\n", v.InMsgs)
|
||||
}
|
||||
@@ -218,6 +221,7 @@ func TestConnz(t *testing.T) {
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, err = ioutil.ReadAll(resp.Body)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("Got an error reading the body: %v\n", err)
|
||||
}
|
||||
@@ -270,6 +274,12 @@ func TestConnz(t *testing.T) {
|
||||
if ci.OutBytes != 5 {
|
||||
t.Fatalf("Expected OutBytes of 1, got %v\n", ci.OutBytes)
|
||||
}
|
||||
if ci.Start.IsZero() {
|
||||
t.Fatalf("Expected Start to be valid\n")
|
||||
}
|
||||
if ci.Uptime == "" {
|
||||
t.Fatalf("Expected Uptime to be valid\n")
|
||||
}
|
||||
|
||||
// Test JSONP
|
||||
respj, errj := http.Get(fmt.Sprintf("http://localhost:%d/", DEFAULT_HTTP_PORT) + "connz?callback=callback")
|
||||
|
||||
@@ -44,23 +44,24 @@ type Server struct {
|
||||
gcid uint64
|
||||
grid uint64
|
||||
stats
|
||||
mu sync.Mutex
|
||||
info Info
|
||||
infoJSON []byte
|
||||
sl *sublist.Sublist
|
||||
opts *Options
|
||||
auth Auth
|
||||
trace bool
|
||||
debug bool
|
||||
running bool
|
||||
listener net.Listener
|
||||
clients map[uint64]*client
|
||||
routes map[uint64]*client
|
||||
remotes map[string]*client
|
||||
done chan bool
|
||||
start time.Time
|
||||
http net.Listener
|
||||
|
||||
mu sync.Mutex
|
||||
info Info
|
||||
infoJSON []byte
|
||||
sl *sublist.Sublist
|
||||
opts *Options
|
||||
auth Auth
|
||||
trace bool
|
||||
debug bool
|
||||
running bool
|
||||
listener net.Listener
|
||||
clients map[uint64]*client
|
||||
routes map[uint64]*client
|
||||
remotes map[string]*client
|
||||
totalClients uint64
|
||||
done chan bool
|
||||
start time.Time
|
||||
http net.Listener
|
||||
httpReqStats map[string]uint64
|
||||
routeListener net.Listener
|
||||
routeInfo Info
|
||||
rcQuit chan bool
|
||||
@@ -379,8 +380,27 @@ func (s *Server) StartHTTPSMonitoring() {
|
||||
s.startMonitoring(true)
|
||||
}
|
||||
|
||||
// HTTP endpoints
|
||||
const (
|
||||
RootPath = "/"
|
||||
VarzPath = "/varz"
|
||||
ConnzPath = "/connz"
|
||||
RoutezPath = "/routez"
|
||||
SubszPath = "/subsz"
|
||||
)
|
||||
|
||||
// Start the monitoring server
|
||||
func (s *Server) startMonitoring(secure bool) {
|
||||
|
||||
// Used to track HTTP requests
|
||||
s.httpReqStats = map[string]uint64{
|
||||
RootPath: 0,
|
||||
VarzPath: 0,
|
||||
ConnzPath: 0,
|
||||
RoutezPath: 0,
|
||||
SubszPath: 0,
|
||||
}
|
||||
|
||||
var hp string
|
||||
var err error
|
||||
|
||||
@@ -404,17 +424,17 @@ func (s *Server) startMonitoring(secure bool) {
|
||||
mux := http.NewServeMux()
|
||||
|
||||
// Root
|
||||
mux.HandleFunc("/", s.HandleRoot)
|
||||
mux.HandleFunc(RootPath, s.HandleRoot)
|
||||
// Varz
|
||||
mux.HandleFunc("/varz", s.HandleVarz)
|
||||
mux.HandleFunc(VarzPath, s.HandleVarz)
|
||||
// Connz
|
||||
mux.HandleFunc("/connz", s.HandleConnz)
|
||||
mux.HandleFunc(ConnzPath, s.HandleConnz)
|
||||
// Routez
|
||||
mux.HandleFunc("/routez", s.HandleRoutez)
|
||||
mux.HandleFunc(RoutezPath, s.HandleRoutez)
|
||||
// Subz
|
||||
mux.HandleFunc(SubszPath, s.HandleSubsz)
|
||||
// Subz alias for backwards compatibility
|
||||
mux.HandleFunc("/subscriptionsz", s.HandleSubsz)
|
||||
// Subz
|
||||
mux.HandleFunc("/subsz", s.HandleSubsz)
|
||||
|
||||
srv := &http.Server{
|
||||
Addr: hp,
|
||||
@@ -432,13 +452,14 @@ func (s *Server) startMonitoring(secure bool) {
|
||||
}
|
||||
|
||||
func (s *Server) createClient(conn net.Conn) *client {
|
||||
c := &client{srv: s, nc: conn, opts: defaultOpts, mpay: s.info.MaxPayload}
|
||||
c := &client{srv: s, nc: conn, opts: defaultOpts, mpay: s.info.MaxPayload, start: time.Now()}
|
||||
|
||||
// Grab JSON info string
|
||||
s.mu.Lock()
|
||||
info := s.infoJSON
|
||||
authRequired := s.info.AuthRequired
|
||||
tlsRequired := s.info.TLSRequired
|
||||
s.totalClients++
|
||||
s.mu.Unlock()
|
||||
|
||||
// Grab lock
|
||||
|
||||
Reference in New Issue
Block a user