mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Do not report bad latency on auto-unsubscribe triggers
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -130,6 +130,7 @@ type serviceImport struct {
|
||||
invalid bool
|
||||
share bool
|
||||
tracking bool
|
||||
didDeliver bool
|
||||
trackingHdr http.Header // header from request
|
||||
}
|
||||
|
||||
@@ -1535,9 +1536,8 @@ func (a *Account) checkForReverseEntry(reply string, si *serviceImport, checkInt
|
||||
// If there is we can not delete any entries yet.
|
||||
// Note that if we are here reply has to be a literal subject.
|
||||
if checkInterest {
|
||||
rr := a.sl.Match(reply)
|
||||
// If interest still exists we can not clean these up yet.
|
||||
if len(rr.psubs)+len(rr.qsubs) > 0 {
|
||||
if rr := a.sl.Match(reply); len(rr.psubs)+len(rr.qsubs) > 0 {
|
||||
a.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
@@ -1572,7 +1572,7 @@ func (a *Account) checkForReverseEntry(reply string, si *serviceImport, checkInt
|
||||
var trackingCleanup bool
|
||||
var rsi *serviceImport
|
||||
acc.mu.Lock()
|
||||
if rsi = acc.exports.responses[sre.msub]; rsi != nil {
|
||||
if rsi = acc.exports.responses[sre.msub]; rsi != nil && !rsi.didDeliver {
|
||||
delete(acc.exports.responses, rsi.from)
|
||||
trackingCleanup = rsi.tracking && rsi.rc != nil
|
||||
}
|
||||
@@ -1640,7 +1640,7 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im
|
||||
}
|
||||
}
|
||||
|
||||
si := &serviceImport{dest, claim, se, nil, from, to, "", tr, 0, rt, lat, nil, nil, usePub, false, false, false, false, nil}
|
||||
si := &serviceImport{dest, claim, se, nil, from, to, "", tr, 0, rt, lat, nil, nil, usePub, false, false, false, false, false, nil}
|
||||
a.imports.services[from] = si
|
||||
a.mu.Unlock()
|
||||
|
||||
@@ -1865,6 +1865,7 @@ func (a *Account) processServiceImportResponse(sub *subscription, c *client, sub
|
||||
return
|
||||
}
|
||||
si := a.exports.responses[subject]
|
||||
|
||||
if si == nil || si.invalid {
|
||||
a.mu.RUnlock()
|
||||
return
|
||||
@@ -2054,7 +2055,7 @@ func (a *Account) addRespServiceImport(dest *Account, to string, osi *serviceImp
|
||||
|
||||
// dest is the requestor's account. a is the service responder with the export.
|
||||
// Marked as internal here, that is how we distinguish.
|
||||
si := &serviceImport{dest, nil, osi.se, nil, nrr, to, osi.to, nil, 0, rt, nil, nil, nil, false, true, false, osi.share, false, nil}
|
||||
si := &serviceImport{dest, nil, osi.se, nil, nrr, to, osi.to, nil, 0, rt, nil, nil, nil, false, true, false, osi.share, false, false, nil}
|
||||
|
||||
if a.exports.responses == nil {
|
||||
a.exports.responses = make(map[string]*serviceImport)
|
||||
|
||||
@@ -3516,6 +3516,8 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
|
||||
// This is always a response.
|
||||
var didSendTL bool
|
||||
if si.tracking {
|
||||
// Stamp that we attempted delivery.
|
||||
si.didDeliver = true
|
||||
didSendTL = acc.sendTrackingLatency(si, c)
|
||||
}
|
||||
|
||||
|
||||
@@ -1164,32 +1164,6 @@ func TestConfigCheck(t *testing.T) {
|
||||
errorLine: 4,
|
||||
errorPos: 18,
|
||||
},
|
||||
{
|
||||
name: "when setting latency tracking without a system account",
|
||||
config: `
|
||||
accounts {
|
||||
sys { users = [ {user: sys, pass: "" } ] }
|
||||
|
||||
nats.io: {
|
||||
users = [ { user : bar, pass: "" } ]
|
||||
|
||||
exports = [
|
||||
{ service: "nats.add"
|
||||
response: singleton
|
||||
latency: {
|
||||
sampling: 100%
|
||||
subject: "latency.tracking.add"
|
||||
}
|
||||
}
|
||||
|
||||
]
|
||||
}
|
||||
}
|
||||
`,
|
||||
err: errors.New(`Error adding service latency sampling for "nats.add": system account not setup`),
|
||||
errorLine: 2,
|
||||
errorPos: 17,
|
||||
},
|
||||
{
|
||||
name: "when setting latency tracking with a system account",
|
||||
config: `
|
||||
|
||||
@@ -2209,7 +2209,8 @@ func parseAccounts(v interface{}, opts *Options, errors *[]error, warnings *[]er
|
||||
}
|
||||
|
||||
if service.lat != nil {
|
||||
if opts.SystemAccount == "" {
|
||||
// System accounts are on be default so just make sure we have not opted out..
|
||||
if opts.NoSystemAccount {
|
||||
msg := fmt.Sprintf("Error adding service latency sampling for %q: %v", service.sub, ErrNoSysAccount.Error())
|
||||
*errors = append(*errors, &configErr{tk, msg})
|
||||
continue
|
||||
|
||||
@@ -20,6 +20,7 @@ import (
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@@ -1781,3 +1782,93 @@ func TestServiceLatencyHeaderTriggered(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// From a report by rip@nats.io on simple latency reporting missing in 2 server cluster setup.
|
||||
func TestServiceLatencyMissingResults(t *testing.T) {
|
||||
accConf := createConfFile(t, []byte(`
|
||||
accounts {
|
||||
one: {
|
||||
users = [ {user: one, password: password} ]
|
||||
imports = [ {service: {account: weather, subject: service.weather.requests.>}, to: service.weather.>, share: true} ]
|
||||
}
|
||||
weather: {
|
||||
users = [ {user: weather, password: password} ]
|
||||
exports = [ {
|
||||
service: service.weather.requests.>
|
||||
accounts: [one]
|
||||
latency: { sampling: 100%, subject: service.weather.latency }
|
||||
} ]
|
||||
}
|
||||
}
|
||||
`))
|
||||
defer os.Remove(accConf)
|
||||
|
||||
s1Conf := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: 127.0.0.1:-1
|
||||
server_name: s1
|
||||
cluster { port: -1 }
|
||||
include %q
|
||||
`, path.Base(accConf))))
|
||||
defer os.Remove(s1Conf)
|
||||
|
||||
s1, opts1 := RunServerWithConfig(s1Conf)
|
||||
defer s1.Shutdown()
|
||||
|
||||
s2Conf := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: 127.0.0.1:-1
|
||||
server_name: s2
|
||||
cluster {
|
||||
port: -1
|
||||
routes = [ nats-route://127.0.0.1:%d ]
|
||||
}
|
||||
include %q
|
||||
`, opts1.Cluster.Port, path.Base(accConf))))
|
||||
defer os.Remove(s2Conf)
|
||||
|
||||
s2, opts2 := RunServerWithConfig(s2Conf)
|
||||
defer s2.Shutdown()
|
||||
|
||||
checkClusterFormed(t, s1, s2)
|
||||
|
||||
nc1, err := nats.Connect(fmt.Sprintf("nats://%s:%s@%s:%d", "weather", "password", opts1.Host, opts1.Port))
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer nc1.Close()
|
||||
|
||||
// Create responder
|
||||
sub, _ := nc1.Subscribe("service.weather.requests.>", func(msg *nats.Msg) {
|
||||
time.Sleep(25 * time.Millisecond)
|
||||
msg.Respond([]byte("sunny!"))
|
||||
})
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
// Create sync listener for latency.
|
||||
lsub, _ := nc1.SubscribeSync("service.weather.latency")
|
||||
defer lsub.Unsubscribe()
|
||||
nc1.Flush()
|
||||
|
||||
// Create requestor on s2.
|
||||
nc2, err := nats.Connect(fmt.Sprintf("nats://%s:%s@%s:%d", "one", "password", opts2.Host, opts2.Port), nats.UseOldRequestStyle())
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer nc2.Close()
|
||||
|
||||
nc2.Request("service.weather.los_angeles", nil, time.Second)
|
||||
|
||||
lr, err := lsub.NextMsg(time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected a latency result, got %v", err)
|
||||
}
|
||||
// Make sure we reported ok and have valid results for service, system and total.
|
||||
var sl server.ServiceLatency
|
||||
json.Unmarshal(lr.Data, &sl)
|
||||
|
||||
if sl.Status != 200 {
|
||||
t.Fatalf("Expected a 200 status, got %d\n", sl.Status)
|
||||
}
|
||||
if sl.ServiceLatency == 0 || sl.SystemLatency == 0 || sl.TotalLatency == 0 {
|
||||
t.Fatalf("Received invalid tracking measurements, %d %d %d", sl.ServiceLatency, sl.SystemLatency, sl.TotalLatency)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user