add type hints to service latency, use time.Time for timestamp

Signed-off-by: R.I.Pienaar <rip@devco.net>
This commit is contained in:
R.I.Pienaar
2020-05-01 15:22:29 +02:00
committed by Derek Collison
parent 5ffb500857
commit 63845b8577
6 changed files with 81 additions and 38 deletions

View File

@@ -27,6 +27,7 @@ import (
"time"
"github.com/nats-io/jwt"
"github.com/nats-io/nuid"
)
// For backwards compatibility with NATS < 2.0, users who are not explicitly defined into an
@@ -70,6 +71,8 @@ type Account struct {
lds string // loop detection subject for leaf nodes
siReply []byte // service reply prefix, will form wildcard subscription.
prand *rand.Rand
eventids *nuid.NUID
eventidsmu sync.Mutex
}
// Account based limits.
@@ -194,8 +197,9 @@ type importMap struct {
// NewAccount creates a new unlimited account with the given name.
func NewAccount(name string) *Account {
a := &Account{
Name: name,
limits: limits{-1, -1, -1, -1},
Name: name,
limits: limits{-1, -1, -1, -1},
eventids: nuid.New(),
}
return a
@@ -249,6 +253,16 @@ func (a *Account) shallowCopy() *Account {
return na
}
func (a *Account) nextEventID() string {
// TODO(dlc) we should add a nuid that holds a lock or massage this
// to work within the account lock but doing so now caused races
a.eventidsmu.Lock()
id := a.eventids.Next()
a.eventidsmu.Unlock()
return id
}
// Called to track a remote server and connections and leafnodes it
// has for this account.
func (a *Account) updateRemoteServer(m *AccountNumConns) {
@@ -729,6 +743,8 @@ func (a *Account) IsExportServiceTracking(service string) bool {
// is the RTT used to calculate the total latency. The requestor's account can
// designate to share the additional information in the service import.
type ServiceLatency struct {
TypedEvent
Status int `json:"status"`
Error string `json:"description,omitempty"`
Requestor LatencyClient `json:"requestor,omitempty"`
@@ -739,6 +755,9 @@ type ServiceLatency struct {
TotalLatency time.Duration `json:"total"`
}
// ServiceLatencyType is the NATS Event Type for ServiceLatency
const ServiceLatencyType = "io.nats.server.metric.v1.service_latency"
// LatencyClient is the JSON message structure assigned to requestors and responders.
// Note that for a requestor, the only information shared by default is the RTT used
// to calculate the total latency. The requestor's account can designate to share
@@ -799,6 +818,10 @@ type remoteLatency struct {
// sendLatencyResult will send a latency result and clear the si of the requestor(rc).
func (a *Account) sendLatencyResult(si *serviceImport, sl *ServiceLatency) {
sl.Type = ServiceLatencyType
sl.ID = a.nextEventID()
sl.Time = time.Now().UTC()
si.acc.mu.Lock()
a.srv.sendInternalAccountMsg(a, si.latency.subject, sl)
si.rc = nil
@@ -873,6 +896,10 @@ func (a *Account) sendTrackingLatency(si *serviceImport, responder *client) bool
}
sanitizeLatencyMetric(sl)
sl.Type = ServiceLatencyType
sl.ID = a.nextEventID()
sl.Time = time.Now().UTC()
// If we are expecting a remote measurement, store our sl here.
// We need to account for the race between this and us receiving the
// remote measurement.

View File

@@ -60,11 +60,9 @@ type CreateConsumerRequest struct {
}
// ConsumerAckMetric is a metric published when a user acknowledges a message, the
// number of these that will be published is dependant on SampleFrequency
// number of these that will be published is dependent on SampleFrequency
type ConsumerAckMetric struct {
Type string `json:"type"`
ID string `json:"id"`
Time string `json:"timestamp"`
TypedEvent
Stream string `json:"stream"`
Consumer string `json:"consumer"`
ConsumerSeq uint64 `json:"consumer_seq"`
@@ -79,9 +77,7 @@ const ConsumerAckMetricType = "io.nats.jetstream.metric.v1.consumer_ack"
// ConsumerDeliveryExceededAdvisory is an advisory informing that a message hit
// its MaxDeliver threshold and so might be a candidate for DLQ handling
type ConsumerDeliveryExceededAdvisory struct {
Type string `json:"type"`
ID string `json:"id"`
Time string `json:"timestamp"`
TypedEvent
Stream string `json:"stream"`
Consumer string `json:"consumer"`
StreamSeq uint64 `json:"stream_seq"`
@@ -772,10 +768,13 @@ func (o *Consumer) sampleAck(sseq, dseq, dcount uint64) {
now := time.Now().UTC()
unow := now.UnixNano()
e := &ConsumerAckMetric{
Type: ConsumerAckMetricType,
ID: nuid.Next(),
Time: now.Format(time.RFC3339Nano),
TypedEvent: TypedEvent{
Type: ConsumerAckMetricType,
ID: nuid.Next(),
Time: now,
},
Stream: o.stream,
Consumer: o.name,
ConsumerSeq: dseq,
@@ -922,9 +921,11 @@ func (o *Consumer) incDeliveryCount(sseq uint64) uint64 {
func (o *Consumer) notifyDeliveryExceeded(sseq, dcount uint64) {
e := &ConsumerDeliveryExceededAdvisory{
Type: ConsumerDeliveryExceededAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC().Format(time.RFC3339Nano),
TypedEvent: TypedEvent{
Type: ConsumerDeliveryExceededAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Stream: o.stream,
Consumer: o.name,
StreamSeq: sseq,

View File

@@ -88,11 +88,17 @@ type ServerStatsMsg struct {
Stats ServerStats `json:"statsz"`
}
// TypedEvent is a event or advisory sent by the server that has nats type hints
// typically used for events that might be consumed by 3rd party event systems
type TypedEvent struct {
Type string `json:"type"`
ID string `json:"id"`
Time time.Time `json:"timestamp"`
}
// ConnectEventMsg is sent when a new connection is made that is part of an account.
type ConnectEventMsg struct {
Type string `json:"type"`
ID string `json:"id"`
Time string `json:"timestamp"`
TypedEvent
Server ServerInfo `json:"server"`
Client ClientInfo `json:"client"`
}
@@ -103,9 +109,7 @@ const ConnectEventMsgType = "io.nats.server.advisory.v1.client_connect"
// DisconnectEventMsg is sent when a new connection previously defined from a
// ConnectEventMsg is closed.
type DisconnectEventMsg struct {
Type string `json:"type"`
ID string `json:"id"`
Time string `json:"timestamp"`
TypedEvent
Server ServerInfo `json:"server"`
Client ClientInfo `json:"client"`
Sent DataStats `json:"sent"`
@@ -1014,9 +1018,11 @@ func (s *Server) accountConnectEvent(c *client) {
}
m := ConnectEventMsg{
Type: ConnectEventMsgType,
ID: eid,
Time: time.Now().UTC().Format(time.RFC3339Nano),
TypedEvent: TypedEvent{
Type: ConnectEventMsgType,
ID: eid,
Time: time.Now().UTC(),
},
Client: ClientInfo{
Start: c.start,
Host: c.host,
@@ -1055,9 +1061,11 @@ func (s *Server) accountDisconnectEvent(c *client, now time.Time, reason string)
}
m := DisconnectEventMsg{
Type: DisconnectEventMsgType,
ID: eid,
Time: now.UTC().Format(time.RFC3339Nano),
TypedEvent: TypedEvent{
Type: DisconnectEventMsgType,
ID: eid,
Time: now.UTC(),
},
Client: ClientInfo{
Start: c.start,
Stop: &now,
@@ -1097,9 +1105,11 @@ func (s *Server) sendAuthErrorEvent(c *client) {
now := time.Now()
c.mu.Lock()
m := DisconnectEventMsg{
Type: DisconnectEventMsgType,
ID: eid,
Time: now.UTC().Format(time.RFC3339Nano),
TypedEvent: TypedEvent{
Type: DisconnectEventMsgType,
ID: eid,
Time: now.UTC(),
},
Client: ClientInfo{
Start: c.start,
Stop: &now,

View File

@@ -233,7 +233,7 @@ func TestSystemAccountNewConnection(t *testing.T) {
if cem.Type != ConnectEventMsgType {
t.Fatalf("Incorrect schema in connect event: %s", cem.Type)
}
if cem.Time == "" {
if cem.Time.IsZero() {
t.Fatalf("Event time is not set")
}
if len(cem.ID) != 22 {
@@ -289,7 +289,7 @@ func TestSystemAccountNewConnection(t *testing.T) {
if dem.Type != DisconnectEventMsgType {
t.Fatalf("Incorrect schema in connect event: %s", cem.Type)
}
if dem.Time == "" {
if dem.Time.IsZero() {
t.Fatalf("Event time is not set")
}
if len(dem.ID) != 22 {

View File

@@ -944,9 +944,7 @@ type ClientAPIAudit struct {
// JetStreamAPIAudit is an advisory about administrative actions taken on JetStream
type JetStreamAPIAudit struct {
Type string `json:"type"`
ID string `json:"id"`
Time string `json:"timestamp"`
TypedEvent
Server string `json:"server"`
Client ClientAPIAudit `json:"client"`
Subject string `json:"subject"`
@@ -968,9 +966,11 @@ func (s *Server) sendJetStreamAPIAuditAdvisory(c *client, subject, request, resp
c.mu.Unlock()
e := &JetStreamAPIAudit{
Type: auditType,
ID: nuid.Next(),
Time: time.Now().UTC().Format(time.RFC3339Nano),
TypedEvent: TypedEvent{
Type: auditType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Server: s.Name(),
Client: ClientAPIAudit{
Host: h,

View File

@@ -1033,6 +1033,8 @@ func TestLeafNodeLocalizedDQ(t *testing.T) {
func TestLeafNodeBasicAuth(t *testing.T) {
content := `
listen: "127.0.0.1:-1"
leafnodes {
listen: "127.0.0.1:-1"
authorization {
@@ -1097,6 +1099,8 @@ func runTLSSolicitLeafServer(lso *server.Options) (*server.Server, *server.Optio
func TestLeafNodeTLS(t *testing.T) {
content := `
listen: "127.0.0.1:-1"
leafnodes {
listen: "127.0.0.1:-1"
tls {
@@ -1207,6 +1211,7 @@ func runLeafNodeOperatorServer(t *testing.T) (*server.Server, *server.Options, s
port: -1
operator = "./configs/nkeys/op.jwt"
resolver = MEMORY
listen: "127.0.0.1:-1"
leafnodes {
listen: "127.0.0.1:-1"
}