Emit latency traces when sampling is set to headers

Latency reports will include the header(s) responsible for the trace
Updated ADR to have it reflect implementation

Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
Matthias Hanel
2020-08-11 15:34:02 -04:00
parent 9bad6725aa
commit e1350a05f3
7 changed files with 532 additions and 58 deletions

View File

@@ -5,7 +5,7 @@ Author: @ripienaar
## Status
Proposed
Approved
## Context
@@ -52,15 +52,14 @@ exports: [
This enables sampling based `50%` of the service requests on this service.
I propose we support additional sampling values `zipkin`, `jaeger`, `trace_context` which will configure the server to
I propose we support the additional sampling value `headers` which will configure the server to
interpret the headers as below to determine if a request should be sampled.
## Propagating headers
The `io.nats.server.metric.v1.service_latency` advisory gets updated with additional `trace_format` and `headers` fields.
The `io.nats.server.metric.v1.service_latency` advisory gets updated with an additional `headers` field.
`headers` would just propagate all the headers unmodified. We might later add support for a whitelist here to avoid
leaking sensitive information.
`headers` contains only the headers used for the sampling decision.
```json
{
@@ -68,9 +67,8 @@ leaking sensitive information.
"id": "YBxAhpUFfs1rPGo323WcmQ",
"timestamp": "2020-05-21T08:06:29.4981587Z",
"status": 200,
"trace_format": "jaeger",
"headers": {
"Uber-Trace-Id": "09931e3444de7c99:50ed16db42b98999:0:1"
"Uber-Trace-Id": ["09931e3444de7c99:50ed16db42b98999:0:1"]
},
"requestor": {
"acc": "WEB",

View File

@@ -15,12 +15,14 @@ package server
import (
"bytes"
"encoding/hex"
"errors"
"fmt"
"io/ioutil"
"math"
"math/rand"
"net/http"
"net/textproto"
"net/url"
"reflect"
"sort"
@@ -104,23 +106,24 @@ type streamImport struct {
// Import service mapping struct
type serviceImport struct {
acc *Account
claim *jwt.Import
se *serviceExport
sid []byte
from string
to string
exsub string
ts int64
rt ServiceRespType
latency *serviceLatency
m1 *ServiceLatency
rc *client
hasWC bool
response bool
invalid bool
tracking bool
share bool
acc *Account
claim *jwt.Import
se *serviceExport
sid []byte
from string
to string
exsub string
ts int64
rt ServiceRespType
latency *serviceLatency
m1 *ServiceLatency
rc *client
hasWC bool
response bool
invalid bool
share bool
tracking bool
trackingHdr http.Header // header from request
}
// This is used to record when we create a mapping for implicit service
@@ -179,7 +182,7 @@ type serviceExport struct {
// Used to track service latency.
type serviceLatency struct {
sampling int8
sampling int8 // percentage from 1-100 or 0 to indicate triggered by header
subject string
}
@@ -651,8 +654,10 @@ func (a *Account) TrackServiceExportWithSampling(service, results string, sampli
return ErrMissingAccount
}
if sampling < 1 || sampling > 100 {
return ErrBadSampling
if sampling != 0 { // 0 means triggered by header
if sampling < 1 || sampling > 100 {
return ErrBadSampling
}
}
if !IsValidPublishSubject(results) {
return ErrBadPublishSubject
@@ -804,6 +809,7 @@ type ServiceLatency struct {
Error string `json:"description,omitempty"`
Requestor LatencyClient `json:"requestor,omitempty"`
Responder LatencyClient `json:"responder,omitempty"`
RequestHeader http.Header `json:"header,omitempty"` // only contains header(s) triggering the measurement
RequestStart time.Time `json:"start"`
ServiceLatency time.Duration `json:"service"`
SystemLatency time.Duration `json:"system"`
@@ -876,7 +882,6 @@ func (a *Account) sendLatencyResult(si *serviceImport, sl *ServiceLatency) {
sl.Type = ServiceLatencyType
sl.ID = a.nextEventID()
sl.Time = time.Now().UTC()
a.mu.Lock()
lsubj := si.latency.subject
si.rc = nil
@@ -886,12 +891,13 @@ func (a *Account) sendLatencyResult(si *serviceImport, sl *ServiceLatency) {
}
// Used to send a bad request metric when we do not have a reply subject
func (a *Account) sendBadRequestTrackingLatency(si *serviceImport, requestor *client) {
func (a *Account) sendBadRequestTrackingLatency(si *serviceImport, requestor *client, header http.Header) {
sl := &ServiceLatency{
Status: 400,
Error: "Bad Request",
Requestor: requestor.getClientInfo(si.share),
}
sl.RequestHeader = header
sl.RequestStart = time.Now().Add(-sl.Requestor.RTT).UTC()
a.sendLatencyResult(si, sl)
}
@@ -907,6 +913,7 @@ func (a *Account) sendReplyInterestLostTrackLatency(si *serviceImport) {
rc := si.rc
share := si.share
ts := si.ts
sl.RequestHeader = si.trackingHdr
a.mu.RUnlock()
if rc != nil {
sl.Requestor = rc.getClientInfo(share)
@@ -921,6 +928,7 @@ func (a *Account) sendBackendErrorTrackingLatency(si *serviceImport, reason rsiR
rc := si.rc
share := si.share
ts := si.ts
sl.RequestHeader = si.trackingHdr
a.mu.RUnlock()
if rc != nil {
sl.Requestor = rc.getClientInfo(share)
@@ -961,6 +969,7 @@ func (a *Account) sendTrackingLatency(si *serviceImport, responder *client) bool
sl.SystemLatency = time.Since(ts)
sl.TotalLatency += sl.SystemLatency
}
sl.RequestHeader = si.trackingHdr
sanitizeLatencyMetric(sl)
sl.Type = ServiceLatencyType
@@ -1324,7 +1333,7 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im
}
hasWC := subjectHasWildcard(from)
si := &serviceImport{dest, claim, se, nil, from, to, "", 0, rt, lat, nil, nil, hasWC, false, false, false, false}
si := &serviceImport{dest, claim, se, nil, from, to, "", 0, rt, lat, nil, nil, hasWC, false, false, false, false, nil}
a.imports.services[from] = si
a.mu.Unlock()
@@ -1417,15 +1426,116 @@ func (a *Account) addAllServiceImportSubs() {
}
}
// Helper to determine when to sample.
func shouldSample(l *serviceLatency) bool {
if l == nil || l.sampling <= 0 {
return false
var (
// header where all information is encoded in one value.
trcUber = textproto.CanonicalMIMEHeaderKey("Uber-Trace-Id")
trcCtx = textproto.CanonicalMIMEHeaderKey("Traceparent")
trcB3 = textproto.CanonicalMIMEHeaderKey("B3")
// openzipkin header to check
trcB3Sm = textproto.CanonicalMIMEHeaderKey("X-B3-Sampled")
trcB3Id = textproto.CanonicalMIMEHeaderKey("X-B3-TraceId")
// additional header needed to include when present
trcB3PSId = textproto.CanonicalMIMEHeaderKey("X-B3-ParentSpanId")
trcB3SId = textproto.CanonicalMIMEHeaderKey("X-B3-SpanId")
trcCtxSt = textproto.CanonicalMIMEHeaderKey("Tracestate")
trcUberCtxPrefix = textproto.CanonicalMIMEHeaderKey("Uberctx-")
)
func newB3Header(h http.Header) http.Header {
retHdr := http.Header{}
if v, ok := h[trcB3Sm]; ok {
retHdr[trcB3Sm] = v
}
if v, ok := h[trcB3Id]; ok {
retHdr[trcB3Id] = v
}
if v, ok := h[trcB3PSId]; ok {
retHdr[trcB3PSId] = v
}
if v, ok := h[trcB3SId]; ok {
retHdr[trcB3SId] = v
}
return retHdr
}
func newUberHeader(h http.Header, tId []string) http.Header {
retHdr := http.Header{trcUber: tId}
for k, v := range h {
if strings.HasPrefix(k, trcUberCtxPrefix) {
retHdr[k] = v
}
}
return retHdr
}
func newTraceCtxHeader(h http.Header, tId []string) http.Header {
retHdr := http.Header{trcCtx: tId}
if v, ok := h[trcCtxSt]; ok {
retHdr[trcCtxSt] = v
}
return retHdr
}
// Helper to determine when to sample. When header has a value, sampling is driven by header
func shouldSample(l *serviceLatency, c *client) (bool, http.Header) {
if l == nil {
return false, nil
}
if l.sampling < 0 {
return false, nil
}
if l.sampling >= 100 {
return true
return true, nil
}
return rand.Int31n(100) <= int32(l.sampling)
if l.sampling > 0 && rand.Int31n(100) <= int32(l.sampling) {
return true, nil
}
h := c.parseState.getHeader()
if len(h) == 0 {
return false, nil
}
if tId := h[trcUber]; len(tId) != 0 {
// sample 479fefe9525eddb:5adb976bfc1f95c1:479fefe9525eddb:1
tk := strings.Split(tId[0], ":")
if len(tk) == 4 && len(tk[3]) > 0 && len(tk[3]) <= 2 {
dst := [2]byte{}
src := [2]byte{'0', tk[3][0]}
if len(tk[3]) == 2 {
src[1] = tk[3][1]
}
if _, err := hex.Decode(dst[:], src[:]); err == nil && dst[0]&1 == 1 {
return true, newUberHeader(h, tId)
}
}
return false, nil
} else if sampled := h[trcB3Sm]; len(sampled) != 0 && sampled[0] == "1" {
return true, newB3Header(h) // allowed
} else if len(sampled) != 0 && sampled[0] == "0" {
return false, nil // denied
} else if _, ok := h[trcB3Id]; ok {
// sample 80f198ee56343ba864fe8b2a57d3eff7
// presence (with X-B3-Sampled not being 0) means sampling left to recipient
return true, newB3Header(h)
} else if b3 := h[trcB3]; len(b3) != 0 {
// sample 80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-1-05e3ac9a4f6e3b90
// sample 0
tk := strings.Split(b3[0], "-")
if len(tk) > 2 && tk[2] == "0" {
return false, nil // denied
} else if len(tk) == 1 && tk[0] == "0" {
return false, nil // denied
}
return true, http.Header{trcB3: b3} // sampling allowed or left to recipient of header
} else if tId := h[trcCtx]; len(tId) != 0 {
// sample 00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01
tk := strings.Split(tId[0], "-")
if len(tk) == 4 && len([]byte(tk[3])) == 2 && tk[3] == "01" {
return true, newTraceCtxHeader(h, tId)
} else {
return false, nil
}
}
return false, nil
}
// Used to mimic client like replies.
@@ -1629,8 +1739,7 @@ func (a *Account) SetServiceExportResponseThreshold(export string, maxTime time.
}
// This is for internal service import responses.
func (a *Account) addRespServiceImport(dest *Account, to string, osi *serviceImport) *serviceImport {
tracking := shouldSample(osi.latency)
func (a *Account) addRespServiceImport(dest *Account, to string, osi *serviceImport, tracking bool, header http.Header) *serviceImport {
nrr := string(osi.acc.newServiceReply(tracking))
a.mu.Lock()
@@ -1638,7 +1747,7 @@ func (a *Account) addRespServiceImport(dest *Account, to string, osi *serviceImp
// dest is the requestor's account. a is the service responder with the export.
// Marked as internal here, that is how we distinguish.
si := &serviceImport{dest, nil, osi.se, nil, nrr, to, osi.to, 0, rt, nil, nil, nil, false, true, false, false, osi.share}
si := &serviceImport{dest, nil, osi.se, nil, nrr, to, osi.to, 0, rt, nil, nil, nil, false, true, false, osi.share, false, nil}
if a.exports.responses == nil {
a.exports.responses = make(map[string]*serviceImport)
@@ -1652,6 +1761,7 @@ func (a *Account) addRespServiceImport(dest *Account, to string, osi *serviceImp
if rt == Singleton && tracking {
si.latency = osi.latency
si.tracking = true
si.trackingHdr = header
}
a.mu.Unlock()

View File

@@ -17,6 +17,7 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
"os"
"strconv"
"strings"
@@ -2342,3 +2343,63 @@ func BenchmarkNewRouteReply(b *testing.B) {
g.newServiceReply(false)
}
}
func TestSamplingHeader(t *testing.T) {
test := func(expectSampling bool, h http.Header) {
t.Helper()
b := strings.Builder{}
b.WriteString("\r\n") // simulate status line
h.Write(&b)
b.WriteString("\r\n")
hdrString := b.String()
c := &client{parseState: parseState{msgBuf: []byte(hdrString), pa: pubArg{hdr: len(hdrString)}}}
sample, hdr := shouldSample(&serviceLatency{0, "foo"}, c)
if expectSampling {
if !sample {
t.Fatal("Expected to sample")
} else if hdr == nil {
t.Fatal("Expected a header")
}
for k, v := range h {
if hdr.Get(k) != v[0] {
t.Fatal("Expect header to match")
}
}
} else {
if sample {
t.Fatal("Expected not to sample")
} else if hdr != nil {
t.Fatal("Expected no header")
}
}
}
test(false, http.Header{"Uber-Trace-Id": []string{"0:0:0:0"}})
test(false, http.Header{"Uber-Trace-Id": []string{"0:0:0:00"}}) // one byte encoded as two hex digits
test(true, http.Header{"Uber-Trace-Id": []string{"0:0:0:1"}})
test(true, http.Header{"Uber-Trace-Id": []string{"0:0:0:01"}})
test(true, http.Header{"Uber-Trace-Id": []string{"0:0:0:5"}}) // debug and sample
test(true, http.Header{"Uber-Trace-Id": []string{"479fefe9525eddb:5adb976bfc1f95c1:479fefe9525eddb:1"}})
test(true, http.Header{"Uber-Trace-Id": []string{"479fefe9525eddb:479fefe9525eddb:0:1"}})
test(false, http.Header{"Uber-Trace-Id": []string{"479fefe9525eddb:5adb976bfc1f95c1:479fefe9525eddb:0"}})
test(false, http.Header{"Uber-Trace-Id": []string{"479fefe9525eddb:479fefe9525eddb:0:0"}})
test(true, http.Header{"X-B3-Sampled": []string{"1"}})
test(false, http.Header{"X-B3-Sampled": []string{"0"}})
test(true, http.Header{"X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"}}) // decision left to recipient
test(false, http.Header{"X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"}, "X-B3-Sampled": []string{"0"}})
test(true, http.Header{"X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"}, "X-B3-Sampled": []string{"1"}})
test(false, http.Header{"B3": []string{"0"}}) // deny only
test(false, http.Header{"B3": []string{"0-0-0-0"}})
test(false, http.Header{"B3": []string{"0-0-0"}})
test(true, http.Header{"B3": []string{"0-0-1-0"}})
test(true, http.Header{"B3": []string{"0-0-1"}})
test(true, http.Header{"B3": []string{"0-0-d"}}) // debug is not a deny
test(true, http.Header{"B3": []string{"80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-1"}})
test(true, http.Header{"B3": []string{"80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-1-05e3ac9a4f6e3b90"}})
test(false, http.Header{"B3": []string{"80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-0-05e3ac9a4f6e3b90"}})
test(true, http.Header{"traceparent": []string{"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"}})
test(false, http.Header{"traceparent": []string{"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-00"}})
}

View File

@@ -21,6 +21,7 @@ import (
"io"
"math/rand"
"net"
"net/http"
"regexp"
"runtime"
"strconv"
@@ -3346,8 +3347,8 @@ func (c *client) handleGWReplyMap(msg []byte) bool {
}
// Used to setup the response map for a service import request that has a reply subject.
func (c *client) setupResponseServiceImport(acc *Account, si *serviceImport) *serviceImport {
rsi := si.acc.addRespServiceImport(acc, string(c.pa.reply), si)
func (c *client) setupResponseServiceImport(acc *Account, si *serviceImport, tracking bool, header http.Header) *serviceImport {
rsi := si.acc.addRespServiceImport(acc, string(c.pa.reply), si, tracking, header)
if si.latency != nil {
if c.rtt == 0 {
// We have a service import that we are tracking but have not established RTT.
@@ -3380,22 +3381,17 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
// Check if there is a reply present and set up a response.
// TODO(dlc) - restrict to configured service imports and not responses?
tracking, headers := shouldSample(si.latency, c)
if len(c.pa.reply) > 0 {
rsi = c.setupResponseServiceImport(acc, si)
rsi = c.setupResponseServiceImport(acc, si, tracking, headers)
if rsi != nil {
nrr = []byte(rsi.from)
}
}
// Pick correct to subject. If we matched on a wildcard use the literal publish subject.
to := si.to
if si.hasWC {
to = string(c.pa.subject)
}
// Check to see if this was a bad request with no reply and we were supposed to be tracking.
if !si.response && si.latency != nil && len(c.pa.reply) == 0 && shouldSample(si.latency) {
si.acc.sendBadRequestTrackingLatency(si, c)
} else {
// Check to see if this was a bad request with no reply and we were supposed to be tracking.
if !si.response && si.latency != nil && tracking {
si.acc.sendBadRequestTrackingLatency(si, c, headers)
}
}
// Send tracking info here if we are tracking this response.
@@ -3405,6 +3401,12 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
didSendTL = acc.sendTrackingLatency(si, c)
}
// Pick correct to subject. If we matched on a wildcard use the literal publish subject.
to := si.to
if si.hasWC {
to = string(c.pa.subject)
}
// FIXME(dlc) - Do L1 cache trick like normal client?
rr := si.acc.sl.Match(to)

View File

@@ -2415,7 +2415,7 @@ func parseServiceLatency(root token, v interface{}) (l *serviceLatency, retErr e
// Read sampling value.
if v, ok := latency["sampling"]; ok {
tk, v := unwrapValue(v, &lt)
header := false
var sample int64
switch vv := v.(type) {
case int64:
@@ -2423,6 +2423,11 @@ func parseServiceLatency(root token, v interface{}) (l *serviceLatency, retErr e
sample = vv
case string:
// Sample is a string, like "50%".
if strings.ToLower(strings.TrimSpace(vv)) == "headers" {
header = true
sample = 0
break
}
s := strings.TrimSuffix(vv, "%")
n, err := strconv.Atoi(s)
if err != nil {
@@ -2434,9 +2439,11 @@ func parseServiceLatency(root token, v interface{}) (l *serviceLatency, retErr e
return nil, &configErr{token: tk,
reason: fmt.Sprintf("Expected latency sample to be a string or map/struct, got %T", v)}
}
if sample < 1 || sample > 100 {
return nil, &configErr{token: tk,
reason: ErrBadSampling.Error()}
if !header {
if sample < 1 || sample > 100 {
return nil, &configErr{token: tk,
reason: ErrBadSampling.Error()}
}
}
sl.sampling = int8(sample)

View File

@@ -14,7 +14,11 @@
package server
import (
"bufio"
"bytes"
"fmt"
"net/http"
"net/textproto"
)
type parserState int
@@ -26,6 +30,7 @@ type parseState struct {
pa pubArg
argBuf []byte
msgBuf []byte
header http.Header // access via getHeader
scratch [MAX_CONTROL_LINE_SIZE]byte
}
@@ -442,7 +447,7 @@ func (c *client) parse(buf []byte) error {
c.traceMsg(c.msgBuf)
}
c.processInboundMsg(c.msgBuf)
c.argBuf, c.msgBuf = nil, nil
c.argBuf, c.msgBuf, c.header = nil, nil, nil
c.drop, c.as, c.state = 0, i+1, OP_START
// Drop all pub args
c.pa.arg, c.pa.pacache, c.pa.origin, c.pa.account, c.pa.subject = nil, nil, nil, nil, nil
@@ -1175,3 +1180,17 @@ func (c *client) clonePubArg() error {
}
}
}
func (ps *parseState) getHeader() http.Header {
if ps.header == nil {
if hdr := ps.pa.hdr; hdr > 0 {
reader := bufio.NewReader(bytes.NewReader(ps.msgBuf[0:hdr]))
tp := textproto.NewReader(reader)
tp.ReadLine() // skip over first line, contains version
if mimeHeader, err := tp.ReadMIMEHeader(); err == nil {
ps.header = http.Header(mimeHeader)
}
}
}
return ps.header
}

View File

@@ -18,8 +18,10 @@ import (
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"os"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
@@ -1423,3 +1425,278 @@ func TestServiceLatencyRequestorSharesConfig(t *testing.T) {
json.Unmarshal(rmsg.Data, &sl2)
noShareCheck(t, &sl2.Requestor)
}
func TestServiceLatencyLossTest(t *testing.T) {
// assure that behavior with respect to requests timing out (and samples being reordered) is as expected.
conf := createConfFile(t, []byte(`
listen: 127.0.0.1:-1
accounts: {
SVC: {
users: [ {user: svc, password: pass} ]
exports: [ {
service: "svc.echo"
threshold: "500ms"
accounts: [CLIENT]
latency: {
sampling: headers
subject: latency.svc
}
} ]
},
CLIENT: {
users: [{user: client, password: pass} ]
imports: [ {service: {account: SVC, subject: svc.echo}, to: SVC, share:true} ]
},
SYS: { users: [{user: admin, password: pass}] }
}
system_account: SYS
`))
defer os.Remove(conf)
srv, opts := RunServerWithConfig(conf)
defer srv.Shutdown()
// Responder connection
ncr, err := nats.Connect(fmt.Sprintf("nats://svc:pass@%s:%d", opts.Host, opts.Port), nats.Name("responder"))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer ncr.Close()
ncl, err := nats.Connect(fmt.Sprintf("nats://svc:pass@%s:%d", opts.Host, opts.Port), nats.Name("latency"))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer ncl.Close()
// Table of expected state for which message.
// This also codifies that the first message, in respsonse to second request is ok.
// Second message, in response to first request times out.
expectedState := map[int]int{1: http.StatusOK, 2: http.StatusGatewayTimeout}
msgCnt := 0
start := time.Now().Add(250 * time.Millisecond)
var latErr []error
// Listen for metrics
wg := sync.WaitGroup{}
wg.Add(2)
rsub, _ := ncl.Subscribe("latency.svc", func(rmsg *nats.Msg) {
defer wg.Done()
var sl server.ServiceLatency
json.Unmarshal(rmsg.Data, &sl)
msgCnt++
if want := expectedState[msgCnt]; want != sl.Status {
latErr = append(latErr, fmt.Errorf("Expected different status for msg #%d: %d != %d", msgCnt, want, sl.Status))
}
if msgCnt > 1 {
if start.Before(sl.RequestStart) {
latErr = append(latErr, fmt.Errorf("start times should indicate reordering %v : %v", start, sl.RequestStart))
}
}
start = sl.RequestStart
if strings.EqualFold(sl.RequestHeader.Get("Uber-Trace-Id"), fmt.Sprintf("msg-%d", msgCnt)) {
latErr = append(latErr, fmt.Errorf("no header present"))
}
})
defer rsub.Unsubscribe()
// Setup requestor
nc2, err := nats.Connect(fmt.Sprintf("nats://client:pass@%s:%d", opts.Host, opts.Port),
nats.UseOldRequestStyle(), nats.Name("requestor"))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc2.Close()
respCnt := int64(0)
reply := nc2.NewRespInbox()
repSub, _ := nc2.Subscribe(reply, func(msg *nats.Msg) {
atomic.AddInt64(&respCnt, 1)
})
defer repSub.Unsubscribe()
nc2.Flush()
// use dedicated send that publishes requests using same reply subject
send := func(msg string) {
if err := nc2.PublishMsg(&nats.Msg{Subject: "SVC", Data: []byte(msg), Reply: reply,
Header: http.Header{"X-B3-Sampled": []string{"1"}}}); err != nil {
t.Fatalf("Expected a response got: %v", err)
}
}
// Setup responder that skips responding and triggers next request OR responds
sub, _ := ncr.Subscribe("svc.echo", func(msg *nats.Msg) {
if string(msg.Data) != "msg2" {
msg.Respond([]byte("response"))
} else {
wg.Add(1)
go func() { // second request (use go routine to not block in responders callback)
defer wg.Done()
time.Sleep(250 * time.Millisecond)
send("msg1") // will cause the first latency measurement
}()
}
})
ncr.Flush()
ncl.Flush()
nc2.Flush()
defer sub.Unsubscribe()
// Send first request, which is expected to timeout
send("msg2")
// wait till we got enough responses
wg.Wait()
if len(latErr) > 0 {
t.Fatalf("Got errors %v", latErr)
}
if atomic.LoadInt64(&respCnt) != 1 {
t.Fatalf("Expected only one message")
}
}
func TestServiceLatencyHeaderTriggered(t *testing.T) {
receiveAndTest := func(t *testing.T, rsub *nats.Subscription, shared bool, header http.Header, status int, srvName string) server.ServiceLatency {
t.Helper()
var sl server.ServiceLatency
rmsg, _ := rsub.NextMsg(time.Second)
if rmsg == nil {
t.Fatal("Expected message")
return sl
}
json.Unmarshal(rmsg.Data, &sl)
if sl.Status != status {
t.Fatalf("Expected different status %d != %d", status, sl.Status)
}
if status == http.StatusOK {
extendedCheck(t, &sl.Responder, "svc", "", srvName)
}
if shared {
extendedCheck(t, &sl.Requestor, "client", "", srvName)
} else {
noShareCheck(t, &sl.Requestor)
}
// header are always included
if v := sl.RequestHeader.Get("Some-Other"); v != "" {
t.Fatalf("Expected header to be gone")
}
for k, value := range header {
if v := sl.RequestHeader.Get(k); v != value[0] {
t.Fatalf("Expected header set")
}
}
return sl
}
for _, v := range []struct {
shared bool
header http.Header
}{
{true, http.Header{"Uber-Trace-Id": []string{"479fefe9525eddb:479fefe9525eddb:0:1"}}},
{true, http.Header{"X-B3-Sampled": []string{"1"}}},
{true, http.Header{"X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"}}},
{true, http.Header{"B3": []string{"80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-1-05e3ac9a4f6e3b90"}}},
{true, http.Header{"Traceparent": []string{"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"}}},
{false, http.Header{"Uber-Trace-Id": []string{"479fefe9525eddb:479fefe9525eddb:0:1"}}},
{false, http.Header{"X-B3-Sampled": []string{"1"}}},
{false, http.Header{"X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"}}},
{false, http.Header{"B3": []string{"80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-1-05e3ac9a4f6e3b90"}}},
{false, http.Header{"Traceparent": []string{"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"}}},
{false, http.Header{
"X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"},
"X-B3-ParentSpanId": []string{"05e3ac9a4f6e3b90"},
"X-B3-SpanId": []string{"e457b5a2e4d86bd1"},
"X-B3-Sampled": []string{"1"},
}},
{false, http.Header{
"X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"},
"X-B3-ParentSpanId": []string{"05e3ac9a4f6e3b90"},
"X-B3-SpanId": []string{"e457b5a2e4d86bd1"},
}},
{false, http.Header{
"Uber-Trace-Id": []string{"479fefe9525eddb:479fefe9525eddb:0:1"},
"Uberctx-X": []string{"foo"},
}},
{false, http.Header{
"Traceparent": []string{"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"},
"Tracestate": []string{"rojo=00f067aa0ba902b7,congo=t61rcWkgMzE"},
}},
} {
t.Run(fmt.Sprintf("%s_%t_%s", t.Name(), v.shared, v.header), func(t *testing.T) {
conf := createConfFile(t, []byte(fmt.Sprintf(`
listen: 127.0.0.1:-1
accounts: {
SVC: {
users: [ {user: svc, password: pass} ]
exports: [ {
service: "svc.echo"
accounts: [CLIENT]
latency: {
sampling: headers
subject: latency.svc
}
}]
},
CLIENT: {
users: [{user: client, password: pass} ]
imports: [ {service: {account: SVC, subject: svc.echo}, to: SVC, share:%t} ]
},
SYS: { users: [{user: admin, password: pass}] }
}
system_account: SYS
`, v.shared)))
defer os.Remove(conf)
srv, opts := RunServerWithConfig(conf)
defer srv.Shutdown()
// Responder
nc, err := nats.Connect(fmt.Sprintf("nats://svc:pass@%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
// Listen for metrics
rsub, _ := nc.SubscribeSync("latency.svc")
defer rsub.Unsubscribe()
// Setup responder
serviceTime := 25 * time.Millisecond
sub, _ := nc.Subscribe("svc.echo", func(msg *nats.Msg) {
time.Sleep(serviceTime)
msg.Respond([]byte("world"))
})
nc.Flush()
defer sub.Unsubscribe()
// Setup requestor
nc2, err := nats.Connect(fmt.Sprintf("nats://client:pass@%s:%d", opts.Host, opts.Port), nats.UseOldRequestStyle())
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc2.Close()
// Send a request
start := time.Now()
msg := &nats.Msg{
Subject: "SVC",
Data: []byte("1h"),
Header: v.header.Clone(),
}
msg.Header.Add("Some-Other", "value")
if _, err := nc2.RequestMsg(msg, 50*time.Millisecond); err != nil {
t.Fatalf("Expected a response")
}
sl := receiveAndTest(t, rsub, v.shared, v.header, http.StatusOK, srv.Name())
checkServiceLatency(t, sl, start, serviceTime)
// shut down responder to test various error scenarios
sub.Unsubscribe()
nc.Flush()
// Send a request without responder
if _, err := nc2.RequestMsg(msg, 50*time.Millisecond); err == nil {
t.Fatalf("Expected no response")
}
receiveAndTest(t, rsub, v.shared, v.header, http.StatusServiceUnavailable, srv.Name())
// send a message without a response
msg.Reply = ""
if err := nc2.PublishMsg(msg); err != nil {
t.Fatalf("Expected no error got %v", err)
}
receiveAndTest(t, rsub, v.shared, v.header, http.StatusBadRequest, srv.Name())
})
}
}