mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
Make sure to turn latency on with a claim update
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -91,6 +91,19 @@ func (sc *supercluster) setupLatencyTracking(t *testing.T, p int) {
|
||||
}
|
||||
}
|
||||
|
||||
func (sc *supercluster) removeLatencyTracking(t *testing.T) {
|
||||
t.Helper()
|
||||
for _, c := range sc.clusters {
|
||||
for _, s := range c.servers {
|
||||
foo, err := s.LookupAccount("FOO")
|
||||
if err != nil {
|
||||
t.Fatalf("Error looking up account 'FOO': %v", err)
|
||||
}
|
||||
foo.UnTrackServiceExport("ngs.usage.*")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func clientConnectWithName(t *testing.T, opts *server.Options, user, appname string) *nats.Conn {
|
||||
t.Helper()
|
||||
url := fmt.Sprintf("nats://%s:pass@%s:%d", user, opts.Host, opts.Port)
|
||||
@@ -256,6 +269,18 @@ func TestServiceLatencyRemoteConnect(t *testing.T) {
|
||||
if crtt := connRTT(nc) + connRTT(nc2); sl.NATSLatency.TotalTime() < crtt {
|
||||
t.Fatalf("Not tracking second measurement for NATS latency across servers: %v vs %v", sl.NATSLatency.TotalTime(), crtt)
|
||||
}
|
||||
|
||||
// Now turn off and make sure we no longer receive updates.
|
||||
sc.removeLatencyTracking(t)
|
||||
_, err = nc2.Request("ngs.usage", []byte("1h"), time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected a response")
|
||||
}
|
||||
|
||||
_, err = rsub.NextMsg(100 * time.Millisecond)
|
||||
if err == nil {
|
||||
t.Fatalf("Did not expect to receive a latency metric")
|
||||
}
|
||||
}
|
||||
|
||||
func TestServiceLatencySampling(t *testing.T) {
|
||||
@@ -470,8 +495,7 @@ func TestServiceLatencyWithJWT(t *testing.T) {
|
||||
svcPub, _ := svcKP.PublicKey()
|
||||
|
||||
// Add in the service export with latency tracking here.
|
||||
serviceExport := &jwt.Export{Subject: "req.echo", Type: jwt.Service}
|
||||
serviceExport.Latency = &jwt.ServiceLatency{Sampling: 100, Results: "results"}
|
||||
serviceExport := &jwt.Export{Subject: "req.*", Type: jwt.Service}
|
||||
svcAcc.Exports.Add(serviceExport)
|
||||
svcJWT, err := svcAcc.Encode(okp)
|
||||
if err != nil {
|
||||
@@ -553,6 +577,32 @@ func TestServiceLatencyWithJWT(t *testing.T) {
|
||||
}
|
||||
defer nc2.Close()
|
||||
|
||||
// Send the request.
|
||||
_, err = nc2.Request("request", []byte("hello"), time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected a response")
|
||||
}
|
||||
|
||||
// We should not receive latency at this time.
|
||||
_, err = rsub.NextMsg(100 * time.Millisecond)
|
||||
if err == nil {
|
||||
t.Fatalf("Did not expect to receive a latency metric")
|
||||
}
|
||||
|
||||
// Now turn it on..
|
||||
updateAccount := func() {
|
||||
t.Helper()
|
||||
for _, s := range []*server.Server{s, s2} {
|
||||
svcAccount, err := s.LookupAccount(svcPub)
|
||||
if err != nil {
|
||||
t.Fatalf("Could not lookup service account from server %+v", s)
|
||||
}
|
||||
s.UpdateAccountClaims(svcAccount, svcAcc)
|
||||
}
|
||||
}
|
||||
serviceExport.Latency = &jwt.ServiceLatency{Sampling: 100, Results: "results"}
|
||||
updateAccount()
|
||||
|
||||
// Send the request.
|
||||
start := time.Now()
|
||||
_, err = nc2.Request("request", []byte("hello"), time.Second)
|
||||
@@ -570,12 +620,7 @@ func TestServiceLatencyWithJWT(t *testing.T) {
|
||||
|
||||
// Now we will remove tracking. Do this by simulating a JWT update.
|
||||
serviceExport.Latency = nil
|
||||
|
||||
svcAccount, err := s.LookupAccount(svcPub)
|
||||
if err != nil {
|
||||
t.Fatalf("Could not lookup service account")
|
||||
}
|
||||
s.UpdateAccountClaims(svcAccount, svcAcc)
|
||||
updateAccount()
|
||||
|
||||
// Now we should not get any tracking data.
|
||||
_, err = nc2.Request("request", []byte("hello"), time.Second)
|
||||
|
||||
Reference in New Issue
Block a user