mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
Changes based on code review
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -415,9 +415,7 @@ const (
|
||||
switchToCompression readCacheFlag = 1 << 1
|
||||
)
|
||||
|
||||
const (
|
||||
sysGroup = "_sys_"
|
||||
)
|
||||
const sysGroup = "_sys_"
|
||||
|
||||
// Used in readloop to cache hot subject lookups and group statistics.
|
||||
type readCache struct {
|
||||
@@ -1603,9 +1601,6 @@ func (c *client) flushOutbound() bool {
|
||||
// Re-acquire client lock.
|
||||
c.mu.Lock()
|
||||
|
||||
// Keep track of what was actually sent
|
||||
c.sentBytes += n
|
||||
|
||||
// Adjust if we were compressing.
|
||||
if cw != nil {
|
||||
c.out.pb += attempted
|
||||
|
||||
@@ -123,7 +123,6 @@ type ConnInfo struct {
|
||||
OutMsgs int64 `json:"out_msgs"`
|
||||
InBytes int64 `json:"in_bytes"`
|
||||
OutBytes int64 `json:"out_bytes"`
|
||||
SentBytes int64 `json:"sent_bytes"`
|
||||
NumSubs uint32 `json:"subscriptions"`
|
||||
Name string `json:"name,omitempty"`
|
||||
Lang string `json:"lang,omitempty"`
|
||||
@@ -543,7 +542,6 @@ func (ci *ConnInfo) fill(client *client, nc net.Conn, now time.Time, auth bool)
|
||||
ci.RTT = client.getRTT().String()
|
||||
ci.OutMsgs = client.outMsgs
|
||||
ci.OutBytes = client.outBytes
|
||||
ci.SentBytes = client.sentBytes
|
||||
ci.NumSubs = uint32(len(client.subs))
|
||||
ci.Pending = int(client.out.pb)
|
||||
ci.Name = client.opts.Name
|
||||
@@ -779,7 +777,6 @@ type RouteInfo struct {
|
||||
OutMsgs int64 `json:"out_msgs"`
|
||||
InBytes int64 `json:"in_bytes"`
|
||||
OutBytes int64 `json:"out_bytes"`
|
||||
SentBytes int64 `json:"sent_bytes"`
|
||||
NumSubs uint32 `json:"subscriptions"`
|
||||
Subs []string `json:"subscriptions_list,omitempty"`
|
||||
SubsDetail []SubDetail `json:"subscriptions_list_detail,omitempty"`
|
||||
@@ -821,7 +818,6 @@ func (s *Server) Routez(routezOpts *RoutezOptions) (*Routez, error) {
|
||||
OutMsgs: r.outMsgs,
|
||||
InBytes: atomic.LoadInt64(&r.inBytes),
|
||||
OutBytes: r.outBytes,
|
||||
SentBytes: r.sentBytes,
|
||||
NumSubs: uint32(len(r.subs)),
|
||||
Import: r.opts.Import,
|
||||
Export: r.opts.Export,
|
||||
@@ -2101,19 +2097,18 @@ type LeafzOptions struct {
|
||||
|
||||
// LeafInfo has detailed information on each remote leafnode connection.
|
||||
type LeafInfo struct {
|
||||
Name string `json:"name"`
|
||||
IsSpoke bool `json:"is_spoke"`
|
||||
Account string `json:"account"`
|
||||
IP string `json:"ip"`
|
||||
Port int `json:"port"`
|
||||
RTT string `json:"rtt,omitempty"`
|
||||
InMsgs int64 `json:"in_msgs"`
|
||||
OutMsgs int64 `json:"out_msgs"`
|
||||
InBytes int64 `json:"in_bytes"`
|
||||
OutBytes int64 `json:"out_bytes"`
|
||||
SentBytes int64 `json:"sent_bytes"`
|
||||
NumSubs uint32 `json:"subscriptions"`
|
||||
Subs []string `json:"subscriptions_list,omitempty"`
|
||||
Name string `json:"name"`
|
||||
IsSpoke bool `json:"is_spoke"`
|
||||
Account string `json:"account"`
|
||||
IP string `json:"ip"`
|
||||
Port int `json:"port"`
|
||||
RTT string `json:"rtt,omitempty"`
|
||||
InMsgs int64 `json:"in_msgs"`
|
||||
OutMsgs int64 `json:"out_msgs"`
|
||||
InBytes int64 `json:"in_bytes"`
|
||||
OutBytes int64 `json:"out_bytes"`
|
||||
NumSubs uint32 `json:"subscriptions"`
|
||||
Subs []string `json:"subscriptions_list,omitempty"`
|
||||
}
|
||||
|
||||
// Leafz returns a Leafz structure containing information about leafnodes.
|
||||
@@ -2143,18 +2138,17 @@ func (s *Server) Leafz(opts *LeafzOptions) (*Leafz, error) {
|
||||
for _, ln := range lconns {
|
||||
ln.mu.Lock()
|
||||
lni := &LeafInfo{
|
||||
Name: ln.leaf.remoteServer,
|
||||
IsSpoke: ln.isSpokeLeafNode(),
|
||||
Account: ln.acc.Name,
|
||||
IP: ln.host,
|
||||
Port: int(ln.port),
|
||||
RTT: ln.getRTT().String(),
|
||||
InMsgs: atomic.LoadInt64(&ln.inMsgs),
|
||||
OutMsgs: ln.outMsgs,
|
||||
InBytes: atomic.LoadInt64(&ln.inBytes),
|
||||
OutBytes: ln.outBytes,
|
||||
SentBytes: ln.sentBytes,
|
||||
NumSubs: uint32(len(ln.subs)),
|
||||
Name: ln.leaf.remoteServer,
|
||||
IsSpoke: ln.isSpokeLeafNode(),
|
||||
Account: ln.acc.Name,
|
||||
IP: ln.host,
|
||||
Port: int(ln.port),
|
||||
RTT: ln.getRTT().String(),
|
||||
InMsgs: atomic.LoadInt64(&ln.inMsgs),
|
||||
OutMsgs: ln.outMsgs,
|
||||
InBytes: atomic.LoadInt64(&ln.inBytes),
|
||||
OutBytes: ln.outBytes,
|
||||
NumSubs: uint32(len(ln.subs)),
|
||||
}
|
||||
if opts != nil && opts.Subscriptions {
|
||||
lni.Subs = make([]string, 0, len(ln.subs))
|
||||
|
||||
@@ -362,7 +362,7 @@ type clusterOption struct {
|
||||
accsAdded []string
|
||||
accsRemoved []string
|
||||
poolSizeChanged bool
|
||||
compChanged bool
|
||||
compressChanged bool
|
||||
}
|
||||
|
||||
// Apply the cluster change.
|
||||
@@ -382,7 +382,7 @@ func (c *clusterOption) Apply(s *Server) {
|
||||
}
|
||||
s.setRouteInfoHostPortAndIP()
|
||||
var routes []*client
|
||||
if c.compChanged {
|
||||
if c.compressChanged {
|
||||
newMode := s.getOpts().Cluster.Compression.Mode
|
||||
s.forEachRoute(func(r *client) {
|
||||
r.mu.Lock()
|
||||
@@ -1139,9 +1139,9 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
|
||||
return nil, err
|
||||
}
|
||||
co := &clusterOption{
|
||||
newValue: newClusterOpts,
|
||||
permsChanged: !reflect.DeepEqual(newClusterOpts.Permissions, oldClusterOpts.Permissions),
|
||||
compChanged: !reflect.DeepEqual(oldClusterOpts.Compression, newClusterOpts.Compression),
|
||||
newValue: newClusterOpts,
|
||||
permsChanged: !reflect.DeepEqual(newClusterOpts.Permissions, oldClusterOpts.Permissions),
|
||||
compressChanged: !reflect.DeepEqual(oldClusterOpts.Compression, newClusterOpts.Compression),
|
||||
}
|
||||
co.diffPoolAndAccounts(&oldClusterOpts)
|
||||
// If there are added accounts, first make sure that we can look them up.
|
||||
|
||||
@@ -3283,6 +3283,20 @@ func TestRouteCompressionOptions(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
type testConnSentBytes struct {
|
||||
net.Conn
|
||||
sync.RWMutex
|
||||
sent int
|
||||
}
|
||||
|
||||
func (c *testConnSentBytes) Write(p []byte) (int, error) {
|
||||
n, err := c.Conn.Write(p)
|
||||
c.Lock()
|
||||
c.sent += n
|
||||
c.Unlock()
|
||||
return n, err
|
||||
}
|
||||
|
||||
func TestRouteCompression(t *testing.T) {
|
||||
tmpl := `
|
||||
port: -1
|
||||
@@ -3320,6 +3334,14 @@ func TestRouteCompression(t *testing.T) {
|
||||
|
||||
checkClusterFormed(t, s1, s2)
|
||||
|
||||
s1.mu.RLock()
|
||||
s1.forEachRoute(func(r *client) {
|
||||
r.mu.Lock()
|
||||
r.nc = &testConnSentBytes{Conn: r.nc}
|
||||
r.mu.Unlock()
|
||||
})
|
||||
s1.mu.RUnlock()
|
||||
|
||||
nc2 := natsConnect(t, s2.ClientURL(), nats.UserInfo("a", "pwd"))
|
||||
defer nc2.Close()
|
||||
sub := natsSubSync(t, nc2, "foo")
|
||||
@@ -3351,7 +3373,7 @@ func TestRouteCompression(t *testing.T) {
|
||||
}
|
||||
|
||||
// Also check that the route stats shows that compression likely occurred
|
||||
var out int64
|
||||
var out int
|
||||
s1.mu.RLock()
|
||||
if len(test.accounts) > 0 {
|
||||
rems := s1.accRoutes["A"]
|
||||
@@ -3360,7 +3382,12 @@ func TestRouteCompression(t *testing.T) {
|
||||
}
|
||||
for _, r := range rems {
|
||||
r.mu.Lock()
|
||||
out = r.sentBytes
|
||||
if r.nc != nil {
|
||||
nc := r.nc.(*testConnSentBytes)
|
||||
nc.RLock()
|
||||
out = nc.sent
|
||||
nc.RUnlock()
|
||||
}
|
||||
r.mu.Unlock()
|
||||
break
|
||||
}
|
||||
@@ -3372,7 +3399,12 @@ func TestRouteCompression(t *testing.T) {
|
||||
acc.mu.RUnlock()
|
||||
s1.forEachRouteIdx(pi, func(r *client) bool {
|
||||
r.mu.Lock()
|
||||
out = r.sentBytes
|
||||
if r.nc != nil {
|
||||
nc := r.nc.(*testConnSentBytes)
|
||||
nc.RLock()
|
||||
out = nc.sent
|
||||
nc.RUnlock()
|
||||
}
|
||||
r.mu.Unlock()
|
||||
return false
|
||||
})
|
||||
|
||||
@@ -327,7 +327,6 @@ type stats struct {
|
||||
outMsgs int64
|
||||
inBytes int64
|
||||
outBytes int64
|
||||
sentBytes int64 // Total of what is sent out, include all protocols
|
||||
slowConsumers int64
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user