diff --git a/doc/adr/0003-distributed-tracing.md b/doc/adr/0003-distributed-tracing.md index 9d7cb2aa..71414142 100644 --- a/doc/adr/0003-distributed-tracing.md +++ b/doc/adr/0003-distributed-tracing.md @@ -5,7 +5,7 @@ Author: @ripienaar ## Status -Proposed +Approved ## Context @@ -52,15 +52,14 @@ exports: [ This enables sampling based `50%` of the service requests on this service. -I propose we support additional sampling values `zipkin`, `jaeger`, `trace_context` which will configure the server to +I propose we support the additional sampling value `headers` which will configure the server to interpret the headers as below to determine if a request should be sampled. ## Propagating headers -The `io.nats.server.metric.v1.service_latency` advisory gets updated with additional `trace_format` and `headers` fields. +The `io.nats.server.metric.v1.service_latency` advisory gets updated with an additional `headers` field. -`headers` would just propagate all the headers unmodified. We might later add support for a whitelist here to avoid -leaking sensitive information. +`headers` contains only the headers used for the sampling decision. ```json { @@ -68,9 +67,8 @@ leaking sensitive information. "id": "YBxAhpUFfs1rPGo323WcmQ", "timestamp": "2020-05-21T08:06:29.4981587Z", "status": 200, - "trace_format": "jaeger", "headers": { - "Uber-Trace-Id": "09931e3444de7c99:50ed16db42b98999:0:1" + "Uber-Trace-Id": ["09931e3444de7c99:50ed16db42b98999:0:1"] }, "requestor": { "acc": "WEB", diff --git a/server/accounts.go b/server/accounts.go index 42e19389..403b6c73 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -15,12 +15,14 @@ package server import ( "bytes" + "encoding/hex" "errors" "fmt" "io/ioutil" "math" "math/rand" "net/http" + "net/textproto" "net/url" "reflect" "sort" @@ -104,23 +106,24 @@ type streamImport struct { // Import service mapping struct type serviceImport struct { - acc *Account - claim *jwt.Import - se *serviceExport - sid []byte - from string - to string - exsub string - ts int64 - rt ServiceRespType - latency *serviceLatency - m1 *ServiceLatency - rc *client - hasWC bool - response bool - invalid bool - tracking bool - share bool + acc *Account + claim *jwt.Import + se *serviceExport + sid []byte + from string + to string + exsub string + ts int64 + rt ServiceRespType + latency *serviceLatency + m1 *ServiceLatency + rc *client + hasWC bool + response bool + invalid bool + share bool + tracking bool + trackingHdr http.Header // header from request } // This is used to record when we create a mapping for implicit service @@ -179,7 +182,7 @@ type serviceExport struct { // Used to track service latency. type serviceLatency struct { - sampling int8 + sampling int8 // percentage from 1-100 or 0 to indicate triggered by header subject string } @@ -651,8 +654,10 @@ func (a *Account) TrackServiceExportWithSampling(service, results string, sampli return ErrMissingAccount } - if sampling < 1 || sampling > 100 { - return ErrBadSampling + if sampling != 0 { // 0 means triggered by header + if sampling < 1 || sampling > 100 { + return ErrBadSampling + } } if !IsValidPublishSubject(results) { return ErrBadPublishSubject @@ -804,6 +809,7 @@ type ServiceLatency struct { Error string `json:"description,omitempty"` Requestor LatencyClient `json:"requestor,omitempty"` Responder LatencyClient `json:"responder,omitempty"` + RequestHeader http.Header `json:"header,omitempty"` // only contains header(s) triggering the measurement RequestStart time.Time `json:"start"` ServiceLatency time.Duration `json:"service"` SystemLatency time.Duration `json:"system"` @@ -876,7 +882,6 @@ func (a *Account) sendLatencyResult(si *serviceImport, sl *ServiceLatency) { sl.Type = ServiceLatencyType sl.ID = a.nextEventID() sl.Time = time.Now().UTC() - a.mu.Lock() lsubj := si.latency.subject si.rc = nil @@ -886,12 +891,13 @@ func (a *Account) sendLatencyResult(si *serviceImport, sl *ServiceLatency) { } // Used to send a bad request metric when we do not have a reply subject -func (a *Account) sendBadRequestTrackingLatency(si *serviceImport, requestor *client) { +func (a *Account) sendBadRequestTrackingLatency(si *serviceImport, requestor *client, header http.Header) { sl := &ServiceLatency{ Status: 400, Error: "Bad Request", Requestor: requestor.getClientInfo(si.share), } + sl.RequestHeader = header sl.RequestStart = time.Now().Add(-sl.Requestor.RTT).UTC() a.sendLatencyResult(si, sl) } @@ -907,6 +913,7 @@ func (a *Account) sendReplyInterestLostTrackLatency(si *serviceImport) { rc := si.rc share := si.share ts := si.ts + sl.RequestHeader = si.trackingHdr a.mu.RUnlock() if rc != nil { sl.Requestor = rc.getClientInfo(share) @@ -921,6 +928,7 @@ func (a *Account) sendBackendErrorTrackingLatency(si *serviceImport, reason rsiR rc := si.rc share := si.share ts := si.ts + sl.RequestHeader = si.trackingHdr a.mu.RUnlock() if rc != nil { sl.Requestor = rc.getClientInfo(share) @@ -961,6 +969,7 @@ func (a *Account) sendTrackingLatency(si *serviceImport, responder *client) bool sl.SystemLatency = time.Since(ts) sl.TotalLatency += sl.SystemLatency } + sl.RequestHeader = si.trackingHdr sanitizeLatencyMetric(sl) sl.Type = ServiceLatencyType @@ -1324,7 +1333,7 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im } hasWC := subjectHasWildcard(from) - si := &serviceImport{dest, claim, se, nil, from, to, "", 0, rt, lat, nil, nil, hasWC, false, false, false, false} + si := &serviceImport{dest, claim, se, nil, from, to, "", 0, rt, lat, nil, nil, hasWC, false, false, false, false, nil} a.imports.services[from] = si a.mu.Unlock() @@ -1417,15 +1426,116 @@ func (a *Account) addAllServiceImportSubs() { } } -// Helper to determine when to sample. -func shouldSample(l *serviceLatency) bool { - if l == nil || l.sampling <= 0 { - return false +var ( + // header where all information is encoded in one value. + trcUber = textproto.CanonicalMIMEHeaderKey("Uber-Trace-Id") + trcCtx = textproto.CanonicalMIMEHeaderKey("Traceparent") + trcB3 = textproto.CanonicalMIMEHeaderKey("B3") + // openzipkin header to check + trcB3Sm = textproto.CanonicalMIMEHeaderKey("X-B3-Sampled") + trcB3Id = textproto.CanonicalMIMEHeaderKey("X-B3-TraceId") + // additional header needed to include when present + trcB3PSId = textproto.CanonicalMIMEHeaderKey("X-B3-ParentSpanId") + trcB3SId = textproto.CanonicalMIMEHeaderKey("X-B3-SpanId") + trcCtxSt = textproto.CanonicalMIMEHeaderKey("Tracestate") + trcUberCtxPrefix = textproto.CanonicalMIMEHeaderKey("Uberctx-") +) + +func newB3Header(h http.Header) http.Header { + retHdr := http.Header{} + if v, ok := h[trcB3Sm]; ok { + retHdr[trcB3Sm] = v + } + if v, ok := h[trcB3Id]; ok { + retHdr[trcB3Id] = v + } + if v, ok := h[trcB3PSId]; ok { + retHdr[trcB3PSId] = v + } + if v, ok := h[trcB3SId]; ok { + retHdr[trcB3SId] = v + } + return retHdr +} + +func newUberHeader(h http.Header, tId []string) http.Header { + retHdr := http.Header{trcUber: tId} + for k, v := range h { + if strings.HasPrefix(k, trcUberCtxPrefix) { + retHdr[k] = v + } + } + return retHdr +} + +func newTraceCtxHeader(h http.Header, tId []string) http.Header { + retHdr := http.Header{trcCtx: tId} + if v, ok := h[trcCtxSt]; ok { + retHdr[trcCtxSt] = v + } + return retHdr +} + +// Helper to determine when to sample. When header has a value, sampling is driven by header +func shouldSample(l *serviceLatency, c *client) (bool, http.Header) { + if l == nil { + return false, nil + } + if l.sampling < 0 { + return false, nil } if l.sampling >= 100 { - return true + return true, nil } - return rand.Int31n(100) <= int32(l.sampling) + if l.sampling > 0 && rand.Int31n(100) <= int32(l.sampling) { + return true, nil + } + h := c.parseState.getHeader() + if len(h) == 0 { + return false, nil + } + if tId := h[trcUber]; len(tId) != 0 { + // sample 479fefe9525eddb:5adb976bfc1f95c1:479fefe9525eddb:1 + tk := strings.Split(tId[0], ":") + if len(tk) == 4 && len(tk[3]) > 0 && len(tk[3]) <= 2 { + dst := [2]byte{} + src := [2]byte{'0', tk[3][0]} + if len(tk[3]) == 2 { + src[1] = tk[3][1] + } + if _, err := hex.Decode(dst[:], src[:]); err == nil && dst[0]&1 == 1 { + return true, newUberHeader(h, tId) + } + } + return false, nil + } else if sampled := h[trcB3Sm]; len(sampled) != 0 && sampled[0] == "1" { + return true, newB3Header(h) // allowed + } else if len(sampled) != 0 && sampled[0] == "0" { + return false, nil // denied + } else if _, ok := h[trcB3Id]; ok { + // sample 80f198ee56343ba864fe8b2a57d3eff7 + // presence (with X-B3-Sampled not being 0) means sampling left to recipient + return true, newB3Header(h) + } else if b3 := h[trcB3]; len(b3) != 0 { + // sample 80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-1-05e3ac9a4f6e3b90 + // sample 0 + tk := strings.Split(b3[0], "-") + if len(tk) > 2 && tk[2] == "0" { + return false, nil // denied + } else if len(tk) == 1 && tk[0] == "0" { + return false, nil // denied + } + return true, http.Header{trcB3: b3} // sampling allowed or left to recipient of header + } else if tId := h[trcCtx]; len(tId) != 0 { + // sample 00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01 + tk := strings.Split(tId[0], "-") + if len(tk) == 4 && len([]byte(tk[3])) == 2 && tk[3] == "01" { + return true, newTraceCtxHeader(h, tId) + } else { + return false, nil + } + } + return false, nil } // Used to mimic client like replies. @@ -1629,8 +1739,7 @@ func (a *Account) SetServiceExportResponseThreshold(export string, maxTime time. } // This is for internal service import responses. -func (a *Account) addRespServiceImport(dest *Account, to string, osi *serviceImport) *serviceImport { - tracking := shouldSample(osi.latency) +func (a *Account) addRespServiceImport(dest *Account, to string, osi *serviceImport, tracking bool, header http.Header) *serviceImport { nrr := string(osi.acc.newServiceReply(tracking)) a.mu.Lock() @@ -1638,7 +1747,7 @@ func (a *Account) addRespServiceImport(dest *Account, to string, osi *serviceImp // dest is the requestor's account. a is the service responder with the export. // Marked as internal here, that is how we distinguish. - si := &serviceImport{dest, nil, osi.se, nil, nrr, to, osi.to, 0, rt, nil, nil, nil, false, true, false, false, osi.share} + si := &serviceImport{dest, nil, osi.se, nil, nrr, to, osi.to, 0, rt, nil, nil, nil, false, true, false, osi.share, false, nil} if a.exports.responses == nil { a.exports.responses = make(map[string]*serviceImport) @@ -1652,6 +1761,7 @@ func (a *Account) addRespServiceImport(dest *Account, to string, osi *serviceImp if rt == Singleton && tracking { si.latency = osi.latency si.tracking = true + si.trackingHdr = header } a.mu.Unlock() diff --git a/server/accounts_test.go b/server/accounts_test.go index 4d5f730a..f6eca0fe 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -17,6 +17,7 @@ import ( "encoding/base64" "encoding/json" "fmt" + "net/http" "os" "strconv" "strings" @@ -2342,3 +2343,63 @@ func BenchmarkNewRouteReply(b *testing.B) { g.newServiceReply(false) } } + +func TestSamplingHeader(t *testing.T) { + test := func(expectSampling bool, h http.Header) { + t.Helper() + b := strings.Builder{} + b.WriteString("\r\n") // simulate status line + h.Write(&b) + b.WriteString("\r\n") + hdrString := b.String() + c := &client{parseState: parseState{msgBuf: []byte(hdrString), pa: pubArg{hdr: len(hdrString)}}} + sample, hdr := shouldSample(&serviceLatency{0, "foo"}, c) + if expectSampling { + if !sample { + t.Fatal("Expected to sample") + } else if hdr == nil { + t.Fatal("Expected a header") + } + for k, v := range h { + if hdr.Get(k) != v[0] { + t.Fatal("Expect header to match") + } + } + } else { + if sample { + t.Fatal("Expected not to sample") + } else if hdr != nil { + t.Fatal("Expected no header") + } + } + } + + test(false, http.Header{"Uber-Trace-Id": []string{"0:0:0:0"}}) + test(false, http.Header{"Uber-Trace-Id": []string{"0:0:0:00"}}) // one byte encoded as two hex digits + test(true, http.Header{"Uber-Trace-Id": []string{"0:0:0:1"}}) + test(true, http.Header{"Uber-Trace-Id": []string{"0:0:0:01"}}) + test(true, http.Header{"Uber-Trace-Id": []string{"0:0:0:5"}}) // debug and sample + test(true, http.Header{"Uber-Trace-Id": []string{"479fefe9525eddb:5adb976bfc1f95c1:479fefe9525eddb:1"}}) + test(true, http.Header{"Uber-Trace-Id": []string{"479fefe9525eddb:479fefe9525eddb:0:1"}}) + test(false, http.Header{"Uber-Trace-Id": []string{"479fefe9525eddb:5adb976bfc1f95c1:479fefe9525eddb:0"}}) + test(false, http.Header{"Uber-Trace-Id": []string{"479fefe9525eddb:479fefe9525eddb:0:0"}}) + + test(true, http.Header{"X-B3-Sampled": []string{"1"}}) + test(false, http.Header{"X-B3-Sampled": []string{"0"}}) + test(true, http.Header{"X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"}}) // decision left to recipient + test(false, http.Header{"X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"}, "X-B3-Sampled": []string{"0"}}) + test(true, http.Header{"X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"}, "X-B3-Sampled": []string{"1"}}) + + test(false, http.Header{"B3": []string{"0"}}) // deny only + test(false, http.Header{"B3": []string{"0-0-0-0"}}) + test(false, http.Header{"B3": []string{"0-0-0"}}) + test(true, http.Header{"B3": []string{"0-0-1-0"}}) + test(true, http.Header{"B3": []string{"0-0-1"}}) + test(true, http.Header{"B3": []string{"0-0-d"}}) // debug is not a deny + test(true, http.Header{"B3": []string{"80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-1"}}) + test(true, http.Header{"B3": []string{"80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-1-05e3ac9a4f6e3b90"}}) + test(false, http.Header{"B3": []string{"80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-0-05e3ac9a4f6e3b90"}}) + + test(true, http.Header{"traceparent": []string{"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"}}) + test(false, http.Header{"traceparent": []string{"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-00"}}) +} diff --git a/server/client.go b/server/client.go index 420f4f3d..8545fab6 100644 --- a/server/client.go +++ b/server/client.go @@ -21,6 +21,7 @@ import ( "io" "math/rand" "net" + "net/http" "regexp" "runtime" "strconv" @@ -3346,8 +3347,8 @@ func (c *client) handleGWReplyMap(msg []byte) bool { } // Used to setup the response map for a service import request that has a reply subject. -func (c *client) setupResponseServiceImport(acc *Account, si *serviceImport) *serviceImport { - rsi := si.acc.addRespServiceImport(acc, string(c.pa.reply), si) +func (c *client) setupResponseServiceImport(acc *Account, si *serviceImport, tracking bool, header http.Header) *serviceImport { + rsi := si.acc.addRespServiceImport(acc, string(c.pa.reply), si, tracking, header) if si.latency != nil { if c.rtt == 0 { // We have a service import that we are tracking but have not established RTT. @@ -3380,22 +3381,17 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt // Check if there is a reply present and set up a response. // TODO(dlc) - restrict to configured service imports and not responses? + tracking, headers := shouldSample(si.latency, c) if len(c.pa.reply) > 0 { - rsi = c.setupResponseServiceImport(acc, si) + rsi = c.setupResponseServiceImport(acc, si, tracking, headers) if rsi != nil { nrr = []byte(rsi.from) } - } - - // Pick correct to subject. If we matched on a wildcard use the literal publish subject. - to := si.to - if si.hasWC { - to = string(c.pa.subject) - } - - // Check to see if this was a bad request with no reply and we were supposed to be tracking. - if !si.response && si.latency != nil && len(c.pa.reply) == 0 && shouldSample(si.latency) { - si.acc.sendBadRequestTrackingLatency(si, c) + } else { + // Check to see if this was a bad request with no reply and we were supposed to be tracking. + if !si.response && si.latency != nil && tracking { + si.acc.sendBadRequestTrackingLatency(si, c, headers) + } } // Send tracking info here if we are tracking this response. @@ -3405,6 +3401,12 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt didSendTL = acc.sendTrackingLatency(si, c) } + // Pick correct to subject. If we matched on a wildcard use the literal publish subject. + to := si.to + if si.hasWC { + to = string(c.pa.subject) + } + // FIXME(dlc) - Do L1 cache trick like normal client? rr := si.acc.sl.Match(to) diff --git a/server/opts.go b/server/opts.go index ec6a413a..ca005785 100644 --- a/server/opts.go +++ b/server/opts.go @@ -2415,7 +2415,7 @@ func parseServiceLatency(root token, v interface{}) (l *serviceLatency, retErr e // Read sampling value. if v, ok := latency["sampling"]; ok { tk, v := unwrapValue(v, <) - + header := false var sample int64 switch vv := v.(type) { case int64: @@ -2423,6 +2423,11 @@ func parseServiceLatency(root token, v interface{}) (l *serviceLatency, retErr e sample = vv case string: // Sample is a string, like "50%". + if strings.ToLower(strings.TrimSpace(vv)) == "headers" { + header = true + sample = 0 + break + } s := strings.TrimSuffix(vv, "%") n, err := strconv.Atoi(s) if err != nil { @@ -2434,9 +2439,11 @@ func parseServiceLatency(root token, v interface{}) (l *serviceLatency, retErr e return nil, &configErr{token: tk, reason: fmt.Sprintf("Expected latency sample to be a string or map/struct, got %T", v)} } - if sample < 1 || sample > 100 { - return nil, &configErr{token: tk, - reason: ErrBadSampling.Error()} + if !header { + if sample < 1 || sample > 100 { + return nil, &configErr{token: tk, + reason: ErrBadSampling.Error()} + } } sl.sampling = int8(sample) diff --git a/server/parser.go b/server/parser.go index b3ac091b..26f8088c 100644 --- a/server/parser.go +++ b/server/parser.go @@ -14,7 +14,11 @@ package server import ( + "bufio" + "bytes" "fmt" + "net/http" + "net/textproto" ) type parserState int @@ -26,6 +30,7 @@ type parseState struct { pa pubArg argBuf []byte msgBuf []byte + header http.Header // access via getHeader scratch [MAX_CONTROL_LINE_SIZE]byte } @@ -442,7 +447,7 @@ func (c *client) parse(buf []byte) error { c.traceMsg(c.msgBuf) } c.processInboundMsg(c.msgBuf) - c.argBuf, c.msgBuf = nil, nil + c.argBuf, c.msgBuf, c.header = nil, nil, nil c.drop, c.as, c.state = 0, i+1, OP_START // Drop all pub args c.pa.arg, c.pa.pacache, c.pa.origin, c.pa.account, c.pa.subject = nil, nil, nil, nil, nil @@ -1175,3 +1180,17 @@ func (c *client) clonePubArg() error { } } } + +func (ps *parseState) getHeader() http.Header { + if ps.header == nil { + if hdr := ps.pa.hdr; hdr > 0 { + reader := bufio.NewReader(bytes.NewReader(ps.msgBuf[0:hdr])) + tp := textproto.NewReader(reader) + tp.ReadLine() // skip over first line, contains version + if mimeHeader, err := tp.ReadMIMEHeader(); err == nil { + ps.header = http.Header(mimeHeader) + } + } + } + return ps.header +} diff --git a/test/service_latency_test.go b/test/service_latency_test.go index 1b94a651..bd0f0e56 100644 --- a/test/service_latency_test.go +++ b/test/service_latency_test.go @@ -18,8 +18,10 @@ import ( "fmt" "io/ioutil" "math/rand" + "net/http" "os" "strings" + "sync" "sync/atomic" "testing" "time" @@ -1423,3 +1425,278 @@ func TestServiceLatencyRequestorSharesConfig(t *testing.T) { json.Unmarshal(rmsg.Data, &sl2) noShareCheck(t, &sl2.Requestor) } + +func TestServiceLatencyLossTest(t *testing.T) { + // assure that behavior with respect to requests timing out (and samples being reordered) is as expected. + conf := createConfFile(t, []byte(` + listen: 127.0.0.1:-1 + accounts: { + SVC: { + users: [ {user: svc, password: pass} ] + exports: [ { + service: "svc.echo" + threshold: "500ms" + accounts: [CLIENT] + latency: { + sampling: headers + subject: latency.svc + } + } ] + }, + CLIENT: { + users: [{user: client, password: pass} ] + imports: [ {service: {account: SVC, subject: svc.echo}, to: SVC, share:true} ] + }, + SYS: { users: [{user: admin, password: pass}] } + } + system_account: SYS + `)) + defer os.Remove(conf) + srv, opts := RunServerWithConfig(conf) + defer srv.Shutdown() + + // Responder connection + ncr, err := nats.Connect(fmt.Sprintf("nats://svc:pass@%s:%d", opts.Host, opts.Port), nats.Name("responder")) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer ncr.Close() + + ncl, err := nats.Connect(fmt.Sprintf("nats://svc:pass@%s:%d", opts.Host, opts.Port), nats.Name("latency")) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer ncl.Close() + // Table of expected state for which message. + // This also codifies that the first message, in respsonse to second request is ok. + // Second message, in response to first request times out. + expectedState := map[int]int{1: http.StatusOK, 2: http.StatusGatewayTimeout} + msgCnt := 0 + start := time.Now().Add(250 * time.Millisecond) + + var latErr []error + // Listen for metrics + wg := sync.WaitGroup{} + wg.Add(2) + rsub, _ := ncl.Subscribe("latency.svc", func(rmsg *nats.Msg) { + defer wg.Done() + var sl server.ServiceLatency + json.Unmarshal(rmsg.Data, &sl) + msgCnt++ + if want := expectedState[msgCnt]; want != sl.Status { + latErr = append(latErr, fmt.Errorf("Expected different status for msg #%d: %d != %d", msgCnt, want, sl.Status)) + } + if msgCnt > 1 { + if start.Before(sl.RequestStart) { + latErr = append(latErr, fmt.Errorf("start times should indicate reordering %v : %v", start, sl.RequestStart)) + } + } + start = sl.RequestStart + if strings.EqualFold(sl.RequestHeader.Get("Uber-Trace-Id"), fmt.Sprintf("msg-%d", msgCnt)) { + latErr = append(latErr, fmt.Errorf("no header present")) + } + }) + defer rsub.Unsubscribe() + // Setup requestor + nc2, err := nats.Connect(fmt.Sprintf("nats://client:pass@%s:%d", opts.Host, opts.Port), + nats.UseOldRequestStyle(), nats.Name("requestor")) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc2.Close() + + respCnt := int64(0) + reply := nc2.NewRespInbox() + repSub, _ := nc2.Subscribe(reply, func(msg *nats.Msg) { + atomic.AddInt64(&respCnt, 1) + }) + defer repSub.Unsubscribe() + nc2.Flush() + // use dedicated send that publishes requests using same reply subject + send := func(msg string) { + if err := nc2.PublishMsg(&nats.Msg{Subject: "SVC", Data: []byte(msg), Reply: reply, + Header: http.Header{"X-B3-Sampled": []string{"1"}}}); err != nil { + t.Fatalf("Expected a response got: %v", err) + } + } + // Setup responder that skips responding and triggers next request OR responds + sub, _ := ncr.Subscribe("svc.echo", func(msg *nats.Msg) { + if string(msg.Data) != "msg2" { + msg.Respond([]byte("response")) + } else { + wg.Add(1) + go func() { // second request (use go routine to not block in responders callback) + defer wg.Done() + time.Sleep(250 * time.Millisecond) + send("msg1") // will cause the first latency measurement + }() + } + }) + ncr.Flush() + ncl.Flush() + nc2.Flush() + defer sub.Unsubscribe() + // Send first request, which is expected to timeout + send("msg2") + // wait till we got enough responses + wg.Wait() + if len(latErr) > 0 { + t.Fatalf("Got errors %v", latErr) + } + if atomic.LoadInt64(&respCnt) != 1 { + t.Fatalf("Expected only one message") + } +} + +func TestServiceLatencyHeaderTriggered(t *testing.T) { + receiveAndTest := func(t *testing.T, rsub *nats.Subscription, shared bool, header http.Header, status int, srvName string) server.ServiceLatency { + t.Helper() + var sl server.ServiceLatency + rmsg, _ := rsub.NextMsg(time.Second) + if rmsg == nil { + t.Fatal("Expected message") + return sl + } + json.Unmarshal(rmsg.Data, &sl) + if sl.Status != status { + t.Fatalf("Expected different status %d != %d", status, sl.Status) + } + if status == http.StatusOK { + extendedCheck(t, &sl.Responder, "svc", "", srvName) + } + if shared { + extendedCheck(t, &sl.Requestor, "client", "", srvName) + } else { + noShareCheck(t, &sl.Requestor) + } + // header are always included + if v := sl.RequestHeader.Get("Some-Other"); v != "" { + t.Fatalf("Expected header to be gone") + } + for k, value := range header { + if v := sl.RequestHeader.Get(k); v != value[0] { + t.Fatalf("Expected header set") + } + } + return sl + } + for _, v := range []struct { + shared bool + header http.Header + }{ + {true, http.Header{"Uber-Trace-Id": []string{"479fefe9525eddb:479fefe9525eddb:0:1"}}}, + {true, http.Header{"X-B3-Sampled": []string{"1"}}}, + {true, http.Header{"X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"}}}, + {true, http.Header{"B3": []string{"80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-1-05e3ac9a4f6e3b90"}}}, + {true, http.Header{"Traceparent": []string{"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"}}}, + {false, http.Header{"Uber-Trace-Id": []string{"479fefe9525eddb:479fefe9525eddb:0:1"}}}, + {false, http.Header{"X-B3-Sampled": []string{"1"}}}, + {false, http.Header{"X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"}}}, + {false, http.Header{"B3": []string{"80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-1-05e3ac9a4f6e3b90"}}}, + {false, http.Header{"Traceparent": []string{"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"}}}, + {false, http.Header{ + "X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"}, + "X-B3-ParentSpanId": []string{"05e3ac9a4f6e3b90"}, + "X-B3-SpanId": []string{"e457b5a2e4d86bd1"}, + "X-B3-Sampled": []string{"1"}, + }}, + {false, http.Header{ + "X-B3-TraceId": []string{"80f198ee56343ba864fe8b2a57d3eff7"}, + "X-B3-ParentSpanId": []string{"05e3ac9a4f6e3b90"}, + "X-B3-SpanId": []string{"e457b5a2e4d86bd1"}, + }}, + {false, http.Header{ + "Uber-Trace-Id": []string{"479fefe9525eddb:479fefe9525eddb:0:1"}, + "Uberctx-X": []string{"foo"}, + }}, + {false, http.Header{ + "Traceparent": []string{"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"}, + "Tracestate": []string{"rojo=00f067aa0ba902b7,congo=t61rcWkgMzE"}, + }}, + } { + t.Run(fmt.Sprintf("%s_%t_%s", t.Name(), v.shared, v.header), func(t *testing.T) { + conf := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:-1 + accounts: { + SVC: { + users: [ {user: svc, password: pass} ] + exports: [ { + service: "svc.echo" + accounts: [CLIENT] + latency: { + sampling: headers + subject: latency.svc + } + }] + }, + CLIENT: { + users: [{user: client, password: pass} ] + imports: [ {service: {account: SVC, subject: svc.echo}, to: SVC, share:%t} ] + }, + SYS: { users: [{user: admin, password: pass}] } + } + + system_account: SYS + `, v.shared))) + defer os.Remove(conf) + srv, opts := RunServerWithConfig(conf) + defer srv.Shutdown() + + // Responder + nc, err := nats.Connect(fmt.Sprintf("nats://svc:pass@%s:%d", opts.Host, opts.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + // Listen for metrics + rsub, _ := nc.SubscribeSync("latency.svc") + defer rsub.Unsubscribe() + + // Setup responder + serviceTime := 25 * time.Millisecond + sub, _ := nc.Subscribe("svc.echo", func(msg *nats.Msg) { + time.Sleep(serviceTime) + msg.Respond([]byte("world")) + }) + nc.Flush() + defer sub.Unsubscribe() + + // Setup requestor + nc2, err := nats.Connect(fmt.Sprintf("nats://client:pass@%s:%d", opts.Host, opts.Port), nats.UseOldRequestStyle()) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc2.Close() + + // Send a request + start := time.Now() + msg := &nats.Msg{ + Subject: "SVC", + Data: []byte("1h"), + Header: v.header.Clone(), + } + msg.Header.Add("Some-Other", "value") + if _, err := nc2.RequestMsg(msg, 50*time.Millisecond); err != nil { + t.Fatalf("Expected a response") + } + sl := receiveAndTest(t, rsub, v.shared, v.header, http.StatusOK, srv.Name()) + checkServiceLatency(t, sl, start, serviceTime) + // shut down responder to test various error scenarios + sub.Unsubscribe() + nc.Flush() + // Send a request without responder + if _, err := nc2.RequestMsg(msg, 50*time.Millisecond); err == nil { + t.Fatalf("Expected no response") + } + receiveAndTest(t, rsub, v.shared, v.header, http.StatusServiceUnavailable, srv.Name()) + + // send a message without a response + msg.Reply = "" + if err := nc2.PublishMsg(msg); err != nil { + t.Fatalf("Expected no error got %v", err) + } + receiveAndTest(t, rsub, v.shared, v.header, http.StatusBadRequest, srv.Name()) + }) + } +}