[FIXEd] subsz monitoring endpoint did not account for accounts.

Fixes  #1371 and #1357 by adding up stats and collecting subscriptions
from all accounts.

Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
Matthias Hanel
2020-05-06 15:20:28 -04:00
parent 27882a93cd
commit 136feb9bc6
3 changed files with 150 additions and 8 deletions

View File

@@ -752,6 +752,8 @@ func (s *Server) HandleRoutez(w http.ResponseWriter, r *http.Request) {
// Subsz represents detail information on current connections.
type Subsz struct {
ID string `json:"server_id"`
Now time.Time `json:"now"`
*SublistStats
Total int `json:"total"`
Offset int `json:"offset"`
@@ -827,19 +829,21 @@ func (s *Server) Subsz(opts *SubszOptions) (*Subsz, error) {
}
}
s.mu.Lock()
gaccSl := s.gacc.sl
s.mu.Unlock()
slStats := &SublistStats{}
// FIXME(dlc) - Make account aware.
sz := &Subsz{gaccSl.Stats(), 0, offset, limit, nil}
sz := &Subsz{s.info.ID, time.Now(), slStats, 0, offset, limit, nil}
if subdetail {
// Now add in subscription's details
var raw [4096]*subscription
subs := raw[:0]
s.accounts.Range(func(k, v interface{}) bool {
acc := v.(*Account)
slStats.add(acc.sl.Stats())
acc.sl.localSubs(&subs)
return true
})
gaccSl.localSubs(&subs)
details := make([]SubDetail, len(subs))
i := 0
// TODO(dlc) - may be inefficient and could just do normal match when total subs is large and filtering.
@@ -870,6 +874,12 @@ func (s *Server) Subsz(opts *SubszOptions) (*Subsz, error) {
}
sz.Subs = details[minoff:maxoff]
sz.Total = len(sz.Subs)
} else {
s.accounts.Range(func(k, v interface{}) bool {
acc := v.(*Account)
slStats.add(acc.sl.Stats())
return true
})
}
return sz, nil

View File

@@ -59,6 +59,18 @@ func runMonitorServer() *Server {
return RunServer(opts)
}
func runMonitorServerWithAccounts() *Server {
resetPreviousHTTPConnections()
opts := DefaultMonitorOptions()
aA := NewAccount("A")
aB := NewAccount("B")
opts.Accounts = append(opts.Accounts, aA, aB)
opts.Users = append(opts.Users,
&User{Username: "a", Password: "a", Account: aA},
&User{Username: "b", Password: "b", Account: aB})
return RunServer(opts)
}
func runMonitorServerNoHTTPPort() *Server {
resetPreviousHTTPConnections()
opts := DefaultMonitorOptions()
@@ -1471,6 +1483,93 @@ func TestSubszTestPubSubject(t *testing.T) {
readBodyEx(t, testUrl+"test=foo..bar", http.StatusBadRequest, textPlain)
}
func TestSubszMultiAccount(t *testing.T) {
s := runMonitorServerWithAccounts()
defer s.Shutdown()
ncA := createClientConnWithUserSubscribeAndPublish(t, s, "a", "a")
defer ncA.Close()
ncA.Subscribe("foo.*", func(m *nats.Msg) {})
ncA.Subscribe("foo.bar", func(m *nats.Msg) {})
ncA.Subscribe("foo.foo", func(m *nats.Msg) {})
ncA.Publish("foo.bar", []byte("Hello"))
ncA.Publish("foo.baz", []byte("Hello"))
ncA.Publish("foo.foo", []byte("Hello"))
ncA.Flush()
ncB := createClientConnWithUserSubscribeAndPublish(t, s, "b", "b")
defer ncB.Close()
ncB.Subscribe("foo.*", func(m *nats.Msg) {})
ncB.Subscribe("foo.bar", func(m *nats.Msg) {})
ncB.Subscribe("foo.foo", func(m *nats.Msg) {})
ncB.Publish("foo.bar", []byte("Hello"))
ncB.Publish("foo.baz", []byte("Hello"))
ncB.Publish("foo.foo", []byte("Hello"))
ncB.Flush()
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
for mode := 0; mode < 2; mode++ {
sl := pollSubsz(t, s, mode, url+"subsz?subs=1", &SubszOptions{Subscriptions: true})
if sl.NumSubs != 6 {
t.Fatalf("Expected NumSubs of 6, got %d\n", sl.NumSubs)
}
if sl.Total != 6 {
t.Fatalf("Expected Total of 6, got %d\n", sl.Total)
}
if len(sl.Subs) != 6 {
t.Fatalf("Expected subscription details for 6 subs, got %d\n", len(sl.Subs))
}
}
}
func TestSubszMultiAccountWithOffsetAndLimit(t *testing.T) {
s := runMonitorServer()
defer s.Shutdown()
ncA := createClientConnWithUserSubscribeAndPublish(t, s, "a", "a")
defer ncA.Close()
for i := 0; i < 200; i++ {
ncA.Subscribe(fmt.Sprintf("foo.%d", i), func(m *nats.Msg) {})
}
ncA.Flush()
ncB := createClientConnWithUserSubscribeAndPublish(t, s, "b", "b")
defer ncB.Close()
for i := 0; i < 200; i++ {
ncB.Subscribe(fmt.Sprintf("foo.%d", i), func(m *nats.Msg) {})
}
ncB.Flush()
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
for mode := 0; mode < 2; mode++ {
sl := pollSubsz(t, s, mode, url+"subsz?subs=1&offset=10&limit=100", &SubszOptions{Subscriptions: true, Offset: 10, Limit: 100})
if sl.NumSubs != 400 {
t.Fatalf("Expected NumSubs of 200, got %d\n", sl.NumSubs)
}
if sl.Total != 100 {
t.Fatalf("Expected Total of 100, got %d\n", sl.Total)
}
if sl.Offset != 10 {
t.Fatalf("Expected Offset of 10, got %d\n", sl.Offset)
}
if sl.Limit != 100 {
t.Fatalf("Expected Total of 100, got %d\n", sl.Limit)
}
if len(sl.Subs) != 100 {
t.Fatalf("Expected subscription details for 100 subs, got %d\n", len(sl.Subs))
}
}
}
// Tests handle root
func TestHandleRoot(t *testing.T) {
s := runMonitorServer()
@@ -1735,8 +1834,13 @@ func TestConnzClosedConnsBadTLSClient(t *testing.T) {
}
// Create a connection to test ConnInfo
func createClientConnSubscribeAndPublish(t *testing.T, s *Server) *nats.Conn {
natsURL := fmt.Sprintf("nats://127.0.0.1:%d", s.Addr().(*net.TCPAddr).Port)
func createClientConnWithUserSubscribeAndPublish(t *testing.T, s *Server, user, pwd string) *nats.Conn {
natsURL := ""
if user == "" {
natsURL = fmt.Sprintf("nats://127.0.0.1:%d", s.Addr().(*net.TCPAddr).Port)
} else {
natsURL = fmt.Sprintf("nats://%s:%s@127.0.0.1:%d", user, pwd, s.Addr().(*net.TCPAddr).Port)
}
client := nats.DefaultOptions
client.Servers = []string{natsURL}
nc, err := client.Connect()
@@ -1759,6 +1863,10 @@ func createClientConnSubscribeAndPublish(t *testing.T, s *Server) *nats.Conn {
return nc
}
func createClientConnSubscribeAndPublish(t *testing.T, s *Server) *nats.Conn {
return createClientConnWithUserSubscribeAndPublish(t, s, "", "")
}
func createClientConnWithName(t *testing.T, name string, s *Server) *nats.Conn {
natsURI := fmt.Sprintf("nats://127.0.0.1:%d", s.Addr().(*net.TCPAddr).Port)

View File

@@ -700,6 +700,28 @@ type SublistStats struct {
CacheHitRate float64 `json:"cache_hit_rate"`
MaxFanout uint32 `json:"max_fanout"`
AvgFanout float64 `json:"avg_fanout"`
totFanout int
cacheCnt int
}
func (s *SublistStats) add(stat *SublistStats) {
s.NumSubs += stat.NumSubs
s.NumCache += stat.NumCache
s.NumInserts += stat.NumInserts
s.NumRemoves += stat.NumRemoves
s.NumMatches += stat.NumMatches
s.CacheHitRate += stat.CacheHitRate
if s.MaxFanout < stat.MaxFanout {
s.MaxFanout = stat.MaxFanout
}
// ignore slStats.AvgFanout, collect the values
// it's based on instead
s.totFanout += stat.totFanout
s.cacheCnt += stat.cacheCnt
if s.totFanout > 0 {
s.AvgFanout = float64(s.totFanout) / float64(s.cacheCnt)
}
}
// Stats will return a stats structure for the current state.
@@ -735,6 +757,8 @@ func (s *Sublist) Stats() *SublistStats {
}
return true
})
st.totFanout = tot
st.cacheCnt = clen
st.MaxFanout = uint32(max)
if tot > 0 {
st.AvgFanout = float64(tot) / float64(clen)