Various bug fixes, fixes for flappers

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-05-03 18:37:19 -07:00
parent e4c15d8680
commit e9b9788fbe
4 changed files with 40 additions and 23 deletions

View File

@@ -820,10 +820,12 @@ func (a *Account) sendLatencyResult(si *serviceImport, sl *ServiceLatency) {
sl.ID = a.nextEventID()
sl.Time = time.Now().UTC()
si.acc.mu.Lock()
a.srv.sendInternalAccountMsg(a, si.latency.subject, sl)
a.mu.Lock()
lsubj := si.latency.subject
si.rc = nil
si.acc.mu.Unlock()
a.mu.Unlock()
a.srv.sendInternalAccountMsg(a, lsubj, sl)
}
// Used to send a bad request metric when we do not have a reply subject
@@ -844,19 +846,29 @@ func (a *Account) sendReplyInterestLostTrackLatency(si *serviceImport) {
Status: 408,
Error: "Request Timeout",
}
if si.rc != nil {
sl.Requestor = si.rc.getClientInfo(si.share)
a.mu.RLock()
rc := si.rc
share := si.share
ts := si.ts
a.mu.RUnlock()
if rc != nil {
sl.Requestor = rc.getClientInfo(share)
}
sl.RequestStart = time.Unix(0, si.ts-int64(sl.Requestor.RTT)).UTC()
sl.RequestStart = time.Unix(0, ts-int64(sl.Requestor.RTT)).UTC()
a.sendLatencyResult(si, sl)
}
func (a *Account) sendBackendErrorTrackingLatency(si *serviceImport, reason rsiReason) {
sl := &ServiceLatency{}
if si.rc != nil {
sl.Requestor = si.rc.getClientInfo(si.share)
a.mu.RLock()
rc := si.rc
share := si.share
ts := si.ts
a.mu.RUnlock()
if rc != nil {
sl.Requestor = rc.getClientInfo(share)
}
sl.RequestStart = time.Unix(0, si.ts-int64(sl.Requestor.RTT)).UTC()
sl.RequestStart = time.Unix(0, ts-int64(sl.Requestor.RTT)).UTC()
if reason == rsiNoDelivery {
sl.Status = 503
sl.Error = "Service Unavailable"
@@ -1061,14 +1073,19 @@ func (a *Account) removeRespServiceImport(si *serviceImport, reason rsiReason) {
if si == nil {
return
}
a.mu.Lock()
delete(a.exports.responses, si.from)
dest := si.acc
to := si.to
if si.tracking && si.rc != nil {
tracking := si.tracking
rc := si.rc
a.mu.Unlock()
if tracking && rc != nil {
a.sendBackendErrorTrackingLatency(si, reason)
}
a.mu.Unlock()
dest.checkForReverseEntry(to, si, false)
}

View File

@@ -1257,9 +1257,9 @@ func (s *Server) remoteLatencyUpdate(sub *subscription, _ *client, subject, _ st
m1.merge(&m2)
// Clear the requesting client since we send the result here.
si.acc.mu.Lock()
acc.mu.Lock()
si.rc = nil
si.acc.mu.Unlock()
acc.mu.Unlock()
// Make sure we remove the entry here.
acc.removeServiceImport(si.from)

View File

@@ -1959,7 +1959,7 @@ func TestJetStreamPullConsumerRemoveInterest(t *testing.T) {
defer os.RemoveAll(config.StoreDir)
}
mname := "MYS"
mname := "MYS-PULL"
mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{Name: mname, Storage: server.MemoryStorage})
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)

View File

@@ -728,8 +728,8 @@ func TestServiceLatencyWithQueueSubscribersAndNames(t *testing.T) {
})
nc.Flush()
// Send 100 requests from random locations.
for i := 0; i < 100; i++ {
// Send 200 requests from random locations.
for i := 0; i < 200; i++ {
doRequest()
}
@@ -1099,7 +1099,7 @@ func TestServiceLatencyFailureReportingSingleServer(t *testing.T) {
})
nc.Flush()
nc2.Request("ngs.usage", []byte("1h"), 20*time.Millisecond)
nc2.Request("ngs.usage", []byte("1h"), 10*time.Millisecond)
sl = getMetricResult()
if sl.Status != 504 {
t.Fatalf("Expected to get a service timeout status [504], got %d", sl.Status)
@@ -1183,23 +1183,23 @@ func TestServiceLatencyFailureReportingMultipleServers(t *testing.T) {
}
// Proper request, but no responders.
nc2.Request("ngs.usage", []byte("1h"), 20*time.Millisecond)
nc2.Request("ngs.usage", []byte("1h"), 10*time.Millisecond)
sl = getMetricResult()
if sl.Status != 503 {
t.Fatalf("Test %q, Expected to get a service unavailable status [503], got %d", cs.desc, sl.Status)
}
// The service listener. Make it slow. 10ms is respThreshold, so take 2X
// The service listener. Make it slow. 10ms is respThreshold, so take 3X
sub, _ := nc.Subscribe("ngs.usage.bar", func(msg *nats.Msg) {
time.Sleep(20 * time.Millisecond)
time.Sleep(30 * time.Millisecond)
msg.Respond([]byte("22 msgs"))
})
defer sub.Unsubscribe()
nc.Flush()
// Wait to propagate.
time.Sleep(100 * time.Millisecond)
time.Sleep(200 * time.Millisecond)
nc2.Request("ngs.usage", []byte("1h"), 20*time.Millisecond)
nc2.Request("ngs.usage", []byte("1h"), 10*time.Millisecond)
sl = getMetricResult()
if sl.Status != 504 {
t.Fatalf("Test %q, Expected to get a service timeout status [504], got %d", cs.desc, sl.Status)
@@ -1210,7 +1210,7 @@ func TestServiceLatencyFailureReportingMultipleServers(t *testing.T) {
sub.Unsubscribe()
nc.Flush()
// Wait to propagate.
time.Sleep(100 * time.Millisecond)
time.Sleep(200 * time.Millisecond)
}
}