From 511cdaa5ac4c4e68a5df11ac0817d71f066f9a38 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 21 Oct 2020 20:16:39 -0700 Subject: [PATCH] Do not report bad latency on auto-unsubscribe triggers Signed-off-by: Derek Collison --- server/accounts.go | 11 +++-- server/client.go | 2 + server/config_check_test.go | 26 ----------- server/opts.go | 3 +- test/service_latency_test.go | 91 ++++++++++++++++++++++++++++++++++++ 5 files changed, 101 insertions(+), 32 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index 11c5d6a0..d828d904 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -130,6 +130,7 @@ type serviceImport struct { invalid bool share bool tracking bool + didDeliver bool trackingHdr http.Header // header from request } @@ -1535,9 +1536,8 @@ func (a *Account) checkForReverseEntry(reply string, si *serviceImport, checkInt // If there is we can not delete any entries yet. // Note that if we are here reply has to be a literal subject. if checkInterest { - rr := a.sl.Match(reply) // If interest still exists we can not clean these up yet. - if len(rr.psubs)+len(rr.qsubs) > 0 { + if rr := a.sl.Match(reply); len(rr.psubs)+len(rr.qsubs) > 0 { a.mu.RUnlock() return } @@ -1572,7 +1572,7 @@ func (a *Account) checkForReverseEntry(reply string, si *serviceImport, checkInt var trackingCleanup bool var rsi *serviceImport acc.mu.Lock() - if rsi = acc.exports.responses[sre.msub]; rsi != nil { + if rsi = acc.exports.responses[sre.msub]; rsi != nil && !rsi.didDeliver { delete(acc.exports.responses, rsi.from) trackingCleanup = rsi.tracking && rsi.rc != nil } @@ -1640,7 +1640,7 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im } } - si := &serviceImport{dest, claim, se, nil, from, to, "", tr, 0, rt, lat, nil, nil, usePub, false, false, false, false, nil} + si := &serviceImport{dest, claim, se, nil, from, to, "", tr, 0, rt, lat, nil, nil, usePub, false, false, false, false, false, nil} a.imports.services[from] = si a.mu.Unlock() @@ -1865,6 +1865,7 @@ func (a *Account) processServiceImportResponse(sub *subscription, c *client, sub return } si := a.exports.responses[subject] + if si == nil || si.invalid { a.mu.RUnlock() return @@ -2054,7 +2055,7 @@ func (a *Account) addRespServiceImport(dest *Account, to string, osi *serviceImp // dest is the requestor's account. a is the service responder with the export. // Marked as internal here, that is how we distinguish. - si := &serviceImport{dest, nil, osi.se, nil, nrr, to, osi.to, nil, 0, rt, nil, nil, nil, false, true, false, osi.share, false, nil} + si := &serviceImport{dest, nil, osi.se, nil, nrr, to, osi.to, nil, 0, rt, nil, nil, nil, false, true, false, osi.share, false, false, nil} if a.exports.responses == nil { a.exports.responses = make(map[string]*serviceImport) diff --git a/server/client.go b/server/client.go index 610472c3..fc3dcac8 100644 --- a/server/client.go +++ b/server/client.go @@ -3516,6 +3516,8 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt // This is always a response. var didSendTL bool if si.tracking { + // Stamp that we attempted delivery. + si.didDeliver = true didSendTL = acc.sendTrackingLatency(si, c) } diff --git a/server/config_check_test.go b/server/config_check_test.go index e6929146..c6e9ce30 100644 --- a/server/config_check_test.go +++ b/server/config_check_test.go @@ -1164,32 +1164,6 @@ func TestConfigCheck(t *testing.T) { errorLine: 4, errorPos: 18, }, - { - name: "when setting latency tracking without a system account", - config: ` - accounts { - sys { users = [ {user: sys, pass: "" } ] } - - nats.io: { - users = [ { user : bar, pass: "" } ] - - exports = [ - { service: "nats.add" - response: singleton - latency: { - sampling: 100% - subject: "latency.tracking.add" - } - } - - ] - } - } - `, - err: errors.New(`Error adding service latency sampling for "nats.add": system account not setup`), - errorLine: 2, - errorPos: 17, - }, { name: "when setting latency tracking with a system account", config: ` diff --git a/server/opts.go b/server/opts.go index 1dfbd054..bd67267b 100644 --- a/server/opts.go +++ b/server/opts.go @@ -2209,7 +2209,8 @@ func parseAccounts(v interface{}, opts *Options, errors *[]error, warnings *[]er } if service.lat != nil { - if opts.SystemAccount == "" { + // System accounts are on be default so just make sure we have not opted out.. + if opts.NoSystemAccount { msg := fmt.Sprintf("Error adding service latency sampling for %q: %v", service.sub, ErrNoSysAccount.Error()) *errors = append(*errors, &configErr{tk, msg}) continue diff --git a/test/service_latency_test.go b/test/service_latency_test.go index 4065732e..00287a20 100644 --- a/test/service_latency_test.go +++ b/test/service_latency_test.go @@ -20,6 +20,7 @@ import ( "math/rand" "net/http" "os" + "path" "strings" "sync" "sync/atomic" @@ -1781,3 +1782,93 @@ func TestServiceLatencyHeaderTriggered(t *testing.T) { }) } } + +// From a report by rip@nats.io on simple latency reporting missing in 2 server cluster setup. +func TestServiceLatencyMissingResults(t *testing.T) { + accConf := createConfFile(t, []byte(` + accounts { + one: { + users = [ {user: one, password: password} ] + imports = [ {service: {account: weather, subject: service.weather.requests.>}, to: service.weather.>, share: true} ] + } + weather: { + users = [ {user: weather, password: password} ] + exports = [ { + service: service.weather.requests.> + accounts: [one] + latency: { sampling: 100%, subject: service.weather.latency } + } ] + } + } + `)) + defer os.Remove(accConf) + + s1Conf := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:-1 + server_name: s1 + cluster { port: -1 } + include %q + `, path.Base(accConf)))) + defer os.Remove(s1Conf) + + s1, opts1 := RunServerWithConfig(s1Conf) + defer s1.Shutdown() + + s2Conf := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:-1 + server_name: s2 + cluster { + port: -1 + routes = [ nats-route://127.0.0.1:%d ] + } + include %q + `, opts1.Cluster.Port, path.Base(accConf)))) + defer os.Remove(s2Conf) + + s2, opts2 := RunServerWithConfig(s2Conf) + defer s2.Shutdown() + + checkClusterFormed(t, s1, s2) + + nc1, err := nats.Connect(fmt.Sprintf("nats://%s:%s@%s:%d", "weather", "password", opts1.Host, opts1.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc1.Close() + + // Create responder + sub, _ := nc1.Subscribe("service.weather.requests.>", func(msg *nats.Msg) { + time.Sleep(25 * time.Millisecond) + msg.Respond([]byte("sunny!")) + }) + defer sub.Unsubscribe() + + // Create sync listener for latency. + lsub, _ := nc1.SubscribeSync("service.weather.latency") + defer lsub.Unsubscribe() + nc1.Flush() + + // Create requestor on s2. + nc2, err := nats.Connect(fmt.Sprintf("nats://%s:%s@%s:%d", "one", "password", opts2.Host, opts2.Port), nats.UseOldRequestStyle()) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc2.Close() + + nc2.Request("service.weather.los_angeles", nil, time.Second) + + lr, err := lsub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Expected a latency result, got %v", err) + } + // Make sure we reported ok and have valid results for service, system and total. + var sl server.ServiceLatency + json.Unmarshal(lr.Data, &sl) + + if sl.Status != 200 { + t.Fatalf("Expected a 200 status, got %d\n", sl.Status) + } + if sl.ServiceLatency == 0 || sl.SystemLatency == 0 || sl.TotalLatency == 0 { + t.Fatalf("Received invalid tracking measurements, %d %d %d", sl.ServiceLatency, sl.SystemLatency, sl.TotalLatency) + } +}