More robust waiting queue for pull mode consumers

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-10-19 19:38:28 -07:00
parent 923335e52d
commit 610d2d21b7
17 changed files with 1240 additions and 115 deletions

View File

@@ -1,7 +1,7 @@
language: go
go:
- 1.15.x
- 1.14.x
- 1.13.x
env:
- GO111MODULE=off
go_import_path: github.com/nats-io/nats.go
@@ -20,4 +20,4 @@ before_script:
script:
- go test -i -race ./...
- go test -v -run=TestNoRace -p=1 ./...
- if [[ "$TRAVIS_GO_VERSION" =~ 1.14 ]]; then ./scripts/cov.sh TRAVIS; else go test -race -v -p=1 ./... --failfast; fi
- if [[ "$TRAVIS_GO_VERSION" =~ 1.15 ]]; then ./scripts/cov.sh TRAVIS; else go test -race -v -p=1 ./... --failfast; fi

View File

@@ -2,9 +2,7 @@
Maintainership is on a per project basis.
### Core-maintainers
### Maintainers
- Derek Collison <derek@nats.io> [@derekcollison](https://github.com/derekcollison)
- Ivan Kozlovic <ivan@nats.io> [@kozlovic](https://github.com/kozlovic)
### Maintainers
- Waldemar Quevedo <wally@nats.io> [@wallyqs](https://github.com/wallyqs)
- Waldemar Quevedo <wally@nats.io> [@wallyqs](https://github.com/wallyqs)

View File

@@ -3,7 +3,8 @@ A [Go](http://golang.org) client for the [NATS messaging system](https://nats.io
[![License Apache 2](https://img.shields.io/badge/License-Apache2-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0)
[![FOSSA Status](https://app.fossa.io/api/projects/git%2Bgithub.com%2Fnats-io%2Fgo-nats.svg?type=shield)](https://app.fossa.io/projects/git%2Bgithub.com%2Fnats-io%2Fgo-nats?ref=badge_shield)
[![Go Report Card](https://goreportcard.com/badge/github.com/nats-io/nats.go)](https://goreportcard.com/report/github.com/nats-io/nats.go) [![Build Status](https://travis-ci.org/nats-io/nats.go.svg?branch=master)](http://travis-ci.org/nats-io/nats.go) [![GoDoc](https://godoc.org/github.com/nats-io/nats.go?status.svg)](http://godoc.org/github.com/nats-io/nats.go) [![Coverage Status](https://coveralls.io/repos/nats-io/nats.go/badge.svg?branch=master)](https://coveralls.io/r/nats-io/nats.go?branch=master)
[![Go Report Card](https://goreportcard.com/badge/github.com/nats-io/nats.go)](https://goreportcard.com/report/github.com/nats-io/nats.go) [![Build Status](https://travis-ci.org/nats-io/nats.go.svg?branch=master)](http://travis-ci.org/nats-io/nats.go) [![GoDoc](https://img.shields.io/badge/GoDoc-reference-007d9c)](https://pkg.go.dev/github.com/nats-io/nats.go)
[![Coverage Status](https://coveralls.io/repos/nats-io/nats.go/badge.svg?branch=master)](https://coveralls.io/r/nats-io/nats.go?branch=master)
## Installation
@@ -284,6 +285,21 @@ nc.QueueSubscribe("foo", "job_workers", func(_ *Msg) {
```go
// Normally, the library will return an error when trying to connect and
// there is no server running. The RetryOnFailedConnect option will set
// the connection in reconnecting state if it failed to connect right away.
nc, err := nats.Connect(nats.DefaultURL,
nats.RetryOnFailedConnect(true),
nats.MaxReconnects(10),
nats.ReconnectWait(time.Second),
nats.ReconnectHandler(func(_ *nats.Conn) {
// Note that this will be invoked for the first asynchronous connect.
}))
if err != nil {
// Should not return an error even if it can't connect, but you still
// need to check in case there are some configuration errors.
}
// Flush connection to server, returns when all messages have been processed.
nc.Flush()
fmt.Println("All clear!")

View File

@@ -58,34 +58,37 @@ func (nc *Conn) requestWithContext(ctx context.Context, subj string, hdr, data [
return nil, ctx.Err()
}
nc.mu.Lock()
var m *Msg
var err error
// If user wants the old style.
if nc.Opts.UseOldRequestStyle {
nc.mu.Unlock()
return nc.oldRequestWithContext(ctx, subj, hdr, data)
}
mch, token, err := nc.createNewRequestAndSend(subj, hdr, data)
if err != nil {
return nil, err
}
var ok bool
var msg *Msg
select {
case msg, ok = <-mch:
if !ok {
return nil, ErrConnectionClosed
if nc.useOldRequestStyle() {
m, err = nc.oldRequestWithContext(ctx, subj, hdr, data)
} else {
mch, token, err := nc.createNewRequestAndSend(subj, hdr, data)
if err != nil {
return nil, err
}
case <-ctx.Done():
nc.mu.Lock()
delete(nc.respMap, token)
nc.mu.Unlock()
return nil, ctx.Err()
}
return msg, nil
var ok bool
select {
case m, ok = <-mch:
if !ok {
return nil, ErrConnectionClosed
}
case <-ctx.Done():
nc.mu.Lock()
delete(nc.respMap, token)
nc.mu.Unlock()
return nil, ctx.Err()
}
}
// Check for no responder status.
if err == nil && len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders {
m, err = nil, ErrNoResponders
}
return m, err
}
// oldRequestWithContext utilizes inbox and subscription per request.

View File

@@ -130,7 +130,7 @@ func (c *EncodedConn) Request(subject string, v interface{}, vPtr interface{}, t
// Handler is a specific callback used for Subscribe. It is generalized to
// an interface{}, but we will discover its format and arguments at runtime
// and perform the correct callback, including de-marshaling JSON strings
// and perform the correct callback, including de-marshaling encoded data
// back into the appropriate struct based on the signature of the Handler.
//
// Handlers are expected to have one of four signatures.

View File

@@ -4,7 +4,7 @@ go 1.14
require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71
github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad
github.com/nats-io/nkeys v0.2.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0

View File

@@ -7,17 +7,23 @@ github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/minio/highwayhash v1.0.0 h1:iMSDhgUILCr0TNm8LWlSjF8N0ZIj2qbO8WHp6Q/J2BA=
github.com/minio/highwayhash v1.0.0/go.mod h1:xQboMTeM9nY9v/LlAOxFctujiv5+Aq2hR5dxBpaMbdc=
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7 h1:RnGotxlghqR5D2KDAu4TyuLqyjuylOsJiAFhXvMvQIc=
github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M=
github.com/nats-io/jwt/v2 v2.0.0-20200916203241-1f8ce17dff02 h1:WloZv3SCb55D/rOHYy1rWBXLrj3BYc9zw8VIq6X54lI=
github.com/nats-io/jwt/v2 v2.0.0-20200916203241-1f8ce17dff02/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093/go.mod h1:rQnBf2Rv4P9adtAs/Ti6LfFmVtFG6HLhl/H7cVshcJU=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71 h1:nexMtKbOeM+w3vGQMNF0BEt+2xZDmVCtYXql2Ym+RWg=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71/go.mod h1:Nan/1L5Sa1JRW+Thm4HNYcIDcVRFc5zK9OpSZeI2kk4=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad h1:oRb9MIi1Y4N5cTZWciqH68aVNt1e+o4N2uRnjVzv/UE=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad/go.mod h1:TkHpUIDETmTI7mrHN40D1pzxfzHZuGmtMbtb83TGVQw=
github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I=
github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.2.0 h1:WXKF7diOaPU9cJdLD7nuzwasQy9vT1tBqzXZZf3AMJM=
@@ -35,6 +41,9 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7 h1:HmbHVPwrPEKPGLAcHSrMe6+hqSUlvZU0rab6x5EXfGU=
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=

366
vendor/github.com/nats-io/nats.go/jetstream.go generated vendored Normal file
View File

@@ -0,0 +1,366 @@
// Copyright 2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nats
import (
"bytes"
"context"
"encoding/json"
"fmt"
"strconv"
"time"
)
// JetStreamMsgMetaData is metadata related to a JetStream originated message
type JetStreamMsgMetaData struct {
Stream string
Consumer string
Parsed bool
Delivered int
StreamSeq int
ConsumerSeq int
TimeStamp time.Time
}
func (m *Msg) JetStreamMetaData() (*JetStreamMsgMetaData, error) {
var err error
if m.jsMeta != nil && m.jsMeta.Parsed {
return m.jsMeta, nil
}
m.jsMeta, err = m.parseJSMsgMetadata()
return m.jsMeta, err
}
func (m *Msg) parseJSMsgMetadata() (*JetStreamMsgMetaData, error) {
if m.jsMeta != nil {
return m.jsMeta, nil
}
if len(m.Reply) == 0 {
return nil, ErrNotJSMessage
}
meta := &JetStreamMsgMetaData{}
tsa := [32]string{}
parts := tsa[:0]
start := 0
btsep := byte('.')
for i := 0; i < len(m.Reply); i++ {
if m.Reply[i] == btsep {
parts = append(parts, m.Reply[start:i])
start = i + 1
}
}
parts = append(parts, m.Reply[start:])
if len(parts) != 8 || parts[0] != "$JS" || parts[1] != "ACK" {
return nil, ErrNotJSMessage
}
var err error
meta.Stream = parts[2]
meta.Consumer = parts[3]
meta.Delivered, err = strconv.Atoi(parts[4])
if err != nil {
return nil, ErrNotJSMessage
}
meta.StreamSeq, err = strconv.Atoi(parts[5])
if err != nil {
return nil, ErrNotJSMessage
}
meta.ConsumerSeq, err = strconv.Atoi(parts[6])
if err != nil {
return nil, ErrNotJSMessage
}
tsi, err := strconv.Atoi(parts[7])
if err != nil {
return nil, ErrNotJSMessage
}
meta.TimeStamp = time.Unix(0, int64(tsi))
meta.Parsed = true
return meta, nil
}
const jsStreamUnspecified = "not.set"
type jsOpts struct {
timeout time.Duration
ctx context.Context
ackstr string
consumer *ConsumerConfig
streamName string
}
func newJsOpts() *jsOpts {
return &jsOpts{ackstr: jsStreamUnspecified, consumer: &ConsumerConfig{}}
}
func (j *jsOpts) context(dftl time.Duration) (context.Context, context.CancelFunc) {
if j.ctx != nil {
return context.WithCancel(j.ctx)
}
if j.timeout == 0 {
j.timeout = dftl
}
return context.WithTimeout(context.Background(), j.timeout)
}
// AckOption configures the various JetStream message acknowledgement helpers
type AckOption func(opts *jsOpts) error
// PublishOption configures publishing messages
type PublishOption func(opts *jsOpts) error
// SubscribeOption configures JetStream consumer behavior
type SubscribeOption func(opts *jsOpts) error
// Consumer creates a JetStream Consumer on a Stream
func Consumer(stream string, cfg ConsumerConfig) SubscribeOption {
return func(jopts *jsOpts) error {
jopts.consumer = &cfg
jopts.streamName = stream
return nil
}
}
// PublishExpectsStream waits for an ack after publishing and ensure it's from a specific stream, empty arguments waits for any valid acknowledgement
func PublishExpectsStream(stream ...string) PublishOption {
return func(opts *jsOpts) error {
switch len(stream) {
case 0:
opts.ackstr = ""
case 1:
opts.ackstr = stream[0]
if !isValidJSName(opts.ackstr) {
return ErrInvalidStreamName
}
default:
return ErrMultiStreamUnsupported
}
return nil
}
}
// PublishStreamTimeout sets the period of time to wait for JetStream to acknowledge receipt, defaults to JetStreamTimeout option
func PublishStreamTimeout(t time.Duration) PublishOption {
return func(opts *jsOpts) error {
opts.timeout = t
return nil
}
}
// PublishCtx sets an interrupt context for waiting on a stream to reply
func PublishCtx(ctx context.Context) PublishOption {
return func(opts *jsOpts) error {
opts.ctx = ctx
return nil
}
}
// AckWaitDuration waits for confirmation from the JetStream server
func AckWaitDuration(d time.Duration) AckOption {
return func(opts *jsOpts) error {
opts.timeout = d
return nil
}
}
func (m *Msg) jsAck(body []byte, opts ...AckOption) error {
if m.Reply == "" {
return ErrMsgNoReply
}
if m == nil || m.Sub == nil {
return ErrMsgNotBound
}
m.Sub.mu.Lock()
nc := m.Sub.conn
m.Sub.mu.Unlock()
var err error
var aopts *jsOpts
if len(opts) > 0 {
aopts = newJsOpts()
for _, f := range opts {
if err = f(aopts); err != nil {
return err
}
}
}
if aopts == nil || aopts.timeout == 0 {
return m.Respond(body)
}
_, err = nc.Request(m.Reply, body, aopts.timeout)
return err
}
// Ack acknowledges a JetStream messages received from a Consumer, indicating the message
// should not be received again later
func (m *Msg) Ack(opts ...AckOption) error {
return m.jsAck(AckAck, opts...)
}
// Nak acknowledges a JetStream message received from a Consumer, indicating that the message
// is not completely processed and should be sent again later
func (m *Msg) Nak(opts ...AckOption) error {
return m.jsAck(AckNak, opts...)
}
// AckProgress acknowledges a Jetstream message received from a Consumer, indicating that work is
// ongoing and further processing time is required equal to the configured AckWait of the Consumer
func (m *Msg) AckProgress(opts ...AckOption) error {
return m.jsAck(AckProgress, opts...)
}
// AckNext performs an Ack() and request that the next message be sent to subject ib
func (m *Msg) AckNext(ib string) error {
return m.RespondMsg(&Msg{Subject: m.Reply, Reply: ib, Data: AckNext})
}
// AckAndFetch performs an AckNext() and returns the next message from the stream
func (m *Msg) AckAndFetch(opts ...AckOption) (*Msg, error) {
if m.Reply == "" {
return nil, ErrMsgNoReply
}
if m == nil || m.Sub == nil {
return nil, ErrMsgNotBound
}
m.Sub.mu.Lock()
nc := m.Sub.conn
m.Sub.mu.Unlock()
var err error
aopts := newJsOpts()
for _, f := range opts {
if err = f(aopts); err != nil {
return nil, err
}
}
ctx, cancel := aopts.context(nc.Opts.JetStreamTimeout)
defer cancel()
sub, err := nc.SubscribeSync(NewInbox())
if err != nil {
return nil, err
}
sub.AutoUnsubscribe(1)
defer sub.Unsubscribe()
err = m.RespondMsg(&Msg{Reply: sub.Subject, Data: AckNext, Subject: m.Reply})
if err != nil {
return nil, err
}
nc.Flush()
return sub.NextMsgWithContext(ctx)
}
// AckTerm acknowledges a message received from JetStream indicating the message will not be processed
// and should not be sent to another consumer
func (m *Msg) AckTerm(opts ...AckOption) error {
return m.jsAck(AckTerm, opts...)
}
// JetStreamPublishAck metadata received from JetStream when publishing messages
type JetStreamPublishAck struct {
Stream string `json:"stream"`
Sequence int `json:"seq"`
}
// ParsePublishAck parses the publish acknowledgement sent by JetStream
func ParsePublishAck(m []byte) (*JetStreamPublishAck, error) {
if bytes.HasPrefix([]byte("-ERR"), m) {
if len(m) > 7 {
return nil, fmt.Errorf(string(m[6 : len(m)-1]))
}
return nil, fmt.Errorf(string(m))
}
if !bytes.HasPrefix(m, []byte("+OK {")) {
return nil, fmt.Errorf("invalid JetStream Ack: %v", string(m))
}
ack := &JetStreamPublishAck{}
err := json.Unmarshal(m[3:], ack)
return ack, err
}
func (nc *Conn) jsPublish(subj string, data []byte, opts []PublishOption) error {
var err error
var aopts *jsOpts
if len(opts) > 0 {
aopts = newJsOpts()
for _, f := range opts {
if err = f(aopts); err != nil {
return err
}
}
}
if aopts == nil || aopts.timeout == 0 && aopts.ctx == nil && aopts.ackstr == jsStreamUnspecified {
return nc.publish(subj, _EMPTY_, nil, data)
}
ctx, cancel := aopts.context(nc.Opts.JetStreamTimeout)
defer cancel()
resp, err := nc.RequestWithContext(ctx, subj, data)
if err != nil {
return err
}
ack, err := ParsePublishAck(resp.Data)
if err != nil {
return err
}
if ack.Stream == "" || ack.Sequence == 0 {
return ErrInvalidJSAck
}
if aopts.ackstr == jsStreamUnspecified || aopts.ackstr == "" {
return nil
}
if ack.Stream == aopts.ackstr {
return nil
}
return fmt.Errorf("received ack from stream %q", ack.Stream)
}

268
vendor/github.com/nats-io/nats.go/jetstream_consumer.go generated vendored Normal file
View File

@@ -0,0 +1,268 @@
// Copyright 2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nats
import (
"encoding/json"
"fmt"
"strings"
"time"
)
func (nc *Conn) createOrUpdateConsumer(opts *jsOpts, delivery string) (*ConsumerInfo, error) {
if opts.streamName == "" {
return nil, ErrStreamNameRequired
}
if opts.consumer == nil {
return nil, ErrConsumerConfigRequired
}
crj, err := json.Marshal(&jSApiConsumerCreateRequest{
Stream: opts.streamName,
Config: consumerConfig{DeliverSubject: delivery, ConsumerConfig: opts.consumer},
})
if err != nil {
return nil, err
}
ctx, cancel := opts.context(nc.Opts.JetStreamTimeout)
defer cancel()
var subj string
switch len(opts.consumer.Durable) {
case 0:
subj = fmt.Sprintf(jSApiConsumerCreateT, opts.streamName)
default:
subj = fmt.Sprintf(jSApiDurableCreateT, opts.streamName, opts.consumer.Durable)
}
resp, err := nc.RequestWithContext(ctx, subj, crj)
if err != nil {
return nil, err
}
cresp := &jSApiConsumerCreateResponse{}
err = json.Unmarshal(resp.Data, cresp)
if err != nil {
return nil, err
}
if cresp.Error != nil {
return nil, cresp.Error
}
return cresp.ConsumerInfo, nil
}
const (
jSApiConsumerCreateT = "$JS.API.CONSUMER.CREATE.%s"
jSApiDurableCreateT = "$JS.API.CONSUMER.DURABLE.CREATE.%s.%s"
)
type apiError struct {
Code int `json:"code"`
Description string `json:"description,omitempty"`
}
// Error implements error
func (e apiError) Error() string {
switch {
case e.Description == "" && e.Code == 0:
return "unknown JetStream Error"
case e.Description == "" && e.Code > 0:
return fmt.Sprintf("unknown JetStream %d Error", e.Code)
default:
return e.Description
}
}
type jSApiResponse struct {
Type string `json:"type"`
Error *apiError `json:"error,omitempty"`
}
// io.nats.jetstream.api.v1.consumer_create_request
type jSApiConsumerCreateRequest struct {
Stream string `json:"stream_name"`
Config consumerConfig `json:"config"`
}
// io.nats.jetstream.api.v1.consumer_create_response
type jSApiConsumerCreateResponse struct {
jSApiResponse
*ConsumerInfo
}
type AckPolicy int
const (
AckNone AckPolicy = iota
AckAll
AckExplicit
)
func (p *AckPolicy) UnmarshalJSON(data []byte) error {
switch string(data) {
case jsonString("none"):
*p = AckNone
case jsonString("all"):
*p = AckAll
case jsonString("explicit"):
*p = AckExplicit
default:
return fmt.Errorf("can not unmarshal %q", data)
}
return nil
}
func (p AckPolicy) MarshalJSON() ([]byte, error) {
switch p {
case AckNone:
return json.Marshal("none")
case AckAll:
return json.Marshal("all")
case AckExplicit:
return json.Marshal("explicit")
default:
return nil, fmt.Errorf("unknown acknowlegement policy %v", p)
}
}
type ReplayPolicy int
const (
ReplayInstant ReplayPolicy = iota
ReplayOriginal
)
func (p *ReplayPolicy) UnmarshalJSON(data []byte) error {
switch string(data) {
case jsonString("instant"):
*p = ReplayInstant
case jsonString("original"):
*p = ReplayOriginal
default:
return fmt.Errorf("can not unmarshal %q", data)
}
return nil
}
func (p ReplayPolicy) MarshalJSON() ([]byte, error) {
switch p {
case ReplayOriginal:
return json.Marshal("original")
case ReplayInstant:
return json.Marshal("instant")
default:
return nil, fmt.Errorf("unknown replay policy %v", p)
}
}
var (
AckAck = []byte("+ACK")
AckNak = []byte("-NAK")
AckProgress = []byte("+WPI")
AckNext = []byte("+NXT")
AckTerm = []byte("+TERM")
)
type DeliverPolicy int
const (
DeliverAll DeliverPolicy = iota
DeliverLast
DeliverNew
DeliverByStartSequence
DeliverByStartTime
)
func (p *DeliverPolicy) UnmarshalJSON(data []byte) error {
switch string(data) {
case jsonString("all"), jsonString("undefined"):
*p = DeliverAll
case jsonString("last"):
*p = DeliverLast
case jsonString("new"):
*p = DeliverNew
case jsonString("by_start_sequence"):
*p = DeliverByStartSequence
case jsonString("by_start_time"):
*p = DeliverByStartTime
}
return nil
}
func (p DeliverPolicy) MarshalJSON() ([]byte, error) {
switch p {
case DeliverAll:
return json.Marshal("all")
case DeliverLast:
return json.Marshal("last")
case DeliverNew:
return json.Marshal("new")
case DeliverByStartSequence:
return json.Marshal("by_start_sequence")
case DeliverByStartTime:
return json.Marshal("by_start_time")
default:
return nil, fmt.Errorf("unknown deliver policy %v", p)
}
}
// ConsumerConfig is the configuration for a JetStream consumes
type ConsumerConfig struct {
Durable string `json:"durable_name,omitempty"`
DeliverPolicy DeliverPolicy `json:"deliver_policy"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
AckPolicy AckPolicy `json:"ack_policy"`
AckWait time.Duration `json:"ack_wait,omitempty"`
MaxDeliver int `json:"max_deliver,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
ReplayPolicy ReplayPolicy `json:"replay_policy"`
SampleFrequency string `json:"sample_freq,omitempty"`
RateLimit uint64 `json:"rate_limit_bps,omitempty"`
}
type consumerConfig struct {
DeliverSubject string `json:"deliver_subject,omitempty"`
*ConsumerConfig
}
type SequencePair struct {
ConsumerSeq uint64 `json:"consumer_seq"`
StreamSeq uint64 `json:"stream_seq"`
}
type ConsumerInfo struct {
Stream string `json:"stream_name"`
Name string `json:"name"`
Config ConsumerConfig `json:"config"`
Created time.Time `json:"created"`
Delivered SequencePair `json:"delivered"`
AckFloor SequencePair `json:"ack_floor"`
NumPending int `json:"num_pending"`
NumRedelivered int `json:"num_redelivered"`
}
func jsonString(s string) string {
return "\"" + s + "\""
}
func isValidJSName(n string) bool {
return !(n == "" || strings.ContainsAny(n, ">*. "))
}

View File

@@ -54,6 +54,7 @@ const (
DefaultReconnectJitter = 100 * time.Millisecond
DefaultReconnectJitterTLS = time.Second
DefaultTimeout = 2 * time.Second
DefaultJetStreamTimeout = 2 * time.Second
DefaultPingInterval = 2 * time.Minute
DefaultMaxPingOut = 2
DefaultMaxChanLen = 8192 // 8k
@@ -123,6 +124,14 @@ var (
ErrDisconnected = errors.New("nats: server is disconnected")
ErrHeadersNotSupported = errors.New("nats: headers not supported by this server")
ErrBadHeaderMsg = errors.New("nats: message could not decode headers")
ErrNoResponders = errors.New("nats: no responders available for request")
ErrNoContextOrTimeout = errors.New("nats: no context or timeout given")
ErrNotJSMessage = errors.New("nats: not a JetStream message")
ErrInvalidStreamName = errors.New("nats: invalid stream name")
ErrInvalidJSAck = errors.New("nats: invalid JetStream publish acknowledgement")
ErrMultiStreamUnsupported = errors.New("nats: multiple streams are not supported")
ErrStreamNameRequired = errors.New("nats: Stream name is required")
ErrConsumerConfigRequired = errors.New("nats: Consumer configuration is required")
)
func init() {
@@ -138,6 +147,7 @@ func GetDefaultOptions() Options {
ReconnectJitter: DefaultReconnectJitter,
ReconnectJitterTLS: DefaultReconnectJitterTLS,
Timeout: DefaultTimeout,
JetStreamTimeout: DefaultJetStreamTimeout,
PingInterval: DefaultPingInterval,
MaxPingsOut: DefaultMaxPingOut,
SubChanLen: DefaultMaxChanLen,
@@ -292,6 +302,9 @@ type Options struct {
// Timeout sets the timeout for a Dial operation on a connection.
Timeout time.Duration
// JetStreamTimeout set the default timeout for the JetStream API
JetStreamTimeout time.Duration
// DrainTimeout sets the timeout for a Drain Operation to complete.
DrainTimeout time.Duration
@@ -391,6 +404,15 @@ type Options struct {
// gradually disconnect all its connections before shuting down. This is
// often used in deployments when upgrading NATS Servers.
LameDuckModeHandler ConnHandler
// RetryOnFailedConnect sets the connection in reconnecting state right
// away if it can't connect to a server in the initial set. The
// MaxReconnect and ReconnectWait options are used for this process,
// similarly to when an established connection is disconnected.
// If a ReconnectHandler is set, it will be invoked when the connection
// is established, and if a ClosedHandler is set, it will be invoked if
// it fails to connect (after exhausting the MaxReconnect attempts).
RetryOnFailedConnect bool
}
const (
@@ -472,6 +494,10 @@ type Subscription struct {
// only be processed by one member of the group.
Queue string
// ConsumerConfig is the configuration for the JetStream consumer if one was created
// or updated using the subscription options
ConsumerConfig *ConsumerConfig
delivered uint64
max uint64
conn *Conn
@@ -508,6 +534,7 @@ type Msg struct {
Sub *Subscription
next *Msg
barrier *barrierInfo
jsMeta *JetStreamMsgMetaData
}
func (m *Msg) headerBytes() ([]byte, error) {
@@ -589,21 +616,22 @@ const (
)
type connectInfo struct {
Verbose bool `json:"verbose"`
Pedantic bool `json:"pedantic"`
UserJWT string `json:"jwt,omitempty"`
Nkey string `json:"nkey,omitempty"`
Signature string `json:"sig,omitempty"`
User string `json:"user,omitempty"`
Pass string `json:"pass,omitempty"`
Token string `json:"auth_token,omitempty"`
TLS bool `json:"tls_required"`
Name string `json:"name"`
Lang string `json:"lang"`
Version string `json:"version"`
Protocol int `json:"protocol"`
Echo bool `json:"echo"`
Headers bool `json:"headers"`
Verbose bool `json:"verbose"`
Pedantic bool `json:"pedantic"`
UserJWT string `json:"jwt,omitempty"`
Nkey string `json:"nkey,omitempty"`
Signature string `json:"sig,omitempty"`
User string `json:"user,omitempty"`
Pass string `json:"pass,omitempty"`
Token string `json:"auth_token,omitempty"`
TLS bool `json:"tls_required"`
Name string `json:"name"`
Lang string `json:"lang"`
Version string `json:"version"`
Protocol int `json:"protocol"`
Echo bool `json:"echo"`
Headers bool `json:"headers"`
NoResponders bool `json:"no_responders"`
}
// MsgHandler is a callback function that processes messages delivered to
@@ -791,6 +819,14 @@ func Timeout(t time.Duration) Option {
}
}
// JetStreamTimeout is an Option to set the timeout for access to the JetStream API
func JetStreamTimeout(t time.Duration) Option {
return func(o *Options) error {
o.JetStreamTimeout = t
return nil
}
}
// FlusherTimeout is an Option to set the write (and flush) timeout on a connection.
func FlusherTimeout(t time.Duration) Option {
return func(o *Options) error {
@@ -998,6 +1034,16 @@ func LameDuckModeHandler(cb ConnHandler) Option {
}
}
// RetryOnFailedConnect sets the connection in reconnecting state right away
// if it can't connect to a server in the initial set.
// See RetryOnFailedConnect option for more details.
func RetryOnFailedConnect(retry bool) Option {
return func(o *Options) error {
o.RetryOnFailedConnect = retry
return nil
}
}
// Handler processing
// SetDisconnectHandler will set the disconnect event handler.
@@ -1576,7 +1622,9 @@ func (nc *Conn) connect() error {
nc.mu.Unlock()
nc.close(DISCONNECTED, false, err)
nc.mu.Lock()
nc.current = nil
// Do not reset nc.current here since it would prevent
// RetryOnFailedConnect to work should this be the last server
// to try before starting doReconnect().
}
} else {
// Cancel out default connection refused, will trigger the
@@ -1586,11 +1634,27 @@ func (nc *Conn) connect() error {
}
}
}
nc.initc = false
if returnedErr == nil && nc.status != CONNECTED {
returnedErr = ErrNoServers
}
if returnedErr == nil {
nc.initc = false
} else if nc.Opts.RetryOnFailedConnect {
nc.setup()
nc.status = RECONNECTING
nc.pending = new(bytes.Buffer)
if nc.bw == nil {
nc.bw = nc.newBuffer()
}
nc.bw.Reset(nc.pending)
go nc.doReconnect(ErrNoServers)
returnedErr = nil
} else {
nc.current = nil
}
return returnedErr
}
@@ -1711,8 +1775,10 @@ func (nc *Conn) connectProto() (string, error) {
token = nc.Opts.TokenHandler()
}
// If our server does not support headers then we can't do them or no responders.
hdrs := nc.info.Headers
cinfo := connectInfo{o.Verbose, o.Pedantic, ujwt, nkey, sig, user, pass, token,
o.Secure, o.Name, LangString, Version, clientProtoInfo, !o.NoEcho, true}
o.Secure, o.Name, LangString, Version, clientProtoInfo, !o.NoEcho, hdrs, hdrs}
b, err := json.Marshal(cinfo)
if err != nil {
@@ -1908,10 +1974,12 @@ func (nc *Conn) doReconnect(err error) {
nc.err = nil
// Perform appropriate callback if needed for a disconnect.
// DisconnectedErrCB has priority over deprecated DisconnectedCB
if nc.Opts.DisconnectedErrCB != nil {
nc.ach.push(func() { nc.Opts.DisconnectedErrCB(nc, err) })
} else if nc.Opts.DisconnectedCB != nil {
nc.ach.push(func() { nc.Opts.DisconnectedCB(nc) })
if !nc.initc {
if nc.Opts.DisconnectedErrCB != nil {
nc.ach.push(func() { nc.Opts.DisconnectedErrCB(nc, err) })
} else if nc.Opts.DisconnectedCB != nil {
nc.ach.push(func() { nc.Opts.DisconnectedCB(nc) })
}
}
// This is used to wait on go routines exit if we start them in the loop
@@ -2052,6 +2120,10 @@ func (nc *Conn) doReconnect(err error) {
// This is where we are truly connected.
nc.status = CONNECTED
// If we are here with a retry on failed connect, indicate that the
// initial connect is now complete.
nc.initc = false
// Queue up the reconnect callback.
if nc.Opts.ReconnectedCB != nil {
nc.ach.push(func() { nc.Opts.ReconnectedCB(nc) })
@@ -2528,7 +2600,7 @@ func (nc *Conn) processInfo(info string) error {
// did not include themselves in the async INFO protocol.
// If empty, do not remove the implicit servers from the pool.
if len(ncInfo.ConnectURLs) == 0 {
if ncInfo.LameDuckMode && nc.Opts.LameDuckModeHandler != nil {
if !nc.initc && ncInfo.LameDuckMode && nc.Opts.LameDuckModeHandler != nil {
nc.ach.push(func() { nc.Opts.LameDuckModeHandler(nc) })
}
return nil
@@ -2591,7 +2663,7 @@ func (nc *Conn) processInfo(info string) error {
nc.ach.push(func() { nc.Opts.DiscoveredServersCB(nc) })
}
}
if ncInfo.LameDuckMode && nc.Opts.LameDuckModeHandler != nil {
if !nc.initc && ncInfo.LameDuckMode && nc.Opts.LameDuckModeHandler != nil {
nc.ach.push(func() { nc.Opts.LameDuckModeHandler(nc) })
}
return nil
@@ -2676,7 +2748,11 @@ func (nc *Conn) kickFlusher() {
// Publish publishes the data argument to the given subject. The data
// argument is left untouched and needs to be correctly interpreted on
// the receiver.
func (nc *Conn) Publish(subj string, data []byte) error {
func (nc *Conn) Publish(subj string, data []byte, opts ...PublishOption) error {
if len(opts) > 0 {
return nc.jsPublish(subj, data, opts)
}
return nc.publish(subj, _EMPTY_, nil, data)
}
@@ -2689,21 +2765,28 @@ func NewMsg(subject string) *Msg {
}
const (
hdrLine = "NATS/1.0\r\n"
crlf = "\r\n"
hdrPreEnd = len(hdrLine) - len(crlf)
hdrLine = "NATS/1.0\r\n"
crlf = "\r\n"
hdrPreEnd = len(hdrLine) - len(crlf)
statusHdr = "Status"
noResponders = "503"
)
// decodeHeadersMsg will decode and headers.
func decodeHeadersMsg(data []byte) (http.Header, error) {
tp := textproto.NewReader(bufio.NewReader(bytes.NewReader(data)))
if l, err := tp.ReadLine(); err != nil || l != hdrLine[:hdrPreEnd] {
l, err := tp.ReadLine()
if err != nil || len(l) < hdrPreEnd || l[:hdrPreEnd] != hdrLine[:hdrPreEnd] {
return nil, ErrBadHeaderMsg
}
mh, err := tp.ReadMIMEHeader()
if err != nil {
return nil, ErrBadHeaderMsg
}
// Check if we have an inlined status.
if len(l) > hdrPreEnd {
mh.Add(statusHdr, strings.TrimLeft(l[hdrPreEnd:], " "))
}
return http.Header(mh), nil
}
@@ -2765,7 +2848,8 @@ func (nc *Conn) publish(subj, reply string, hdr, data []byte) error {
// Proactively reject payloads over the threshold set by server.
msgSize := int64(len(data) + len(hdr))
if msgSize > nc.info.MaxPayload {
// Skip this check if we are not yet connected (RetryOnFailedConnect)
if !nc.initc && msgSize > nc.info.MaxPayload {
nc.mu.Unlock()
return ErrMaxPayload
}
@@ -2904,6 +2988,7 @@ func (nc *Conn) respHandler(m *Msg) {
// Helper to setup and send new request style requests. Return the chan to receive the response.
func (nc *Conn) createNewRequestAndSend(subj string, hdr, data []byte) (chan *Msg, string, error) {
nc.mu.Lock()
// Do setup for the new style if needed.
if nc.respMap == nil {
nc.initNewResp()
@@ -2944,7 +3029,6 @@ func (nc *Conn) RequestMsg(msg *Msg, timeout time.Duration) (*Msg, error) {
if !nc.info.Headers {
return nil, ErrHeadersNotSupported
}
hdr, err = msg.headerBytes()
if err != nil {
return nil, err
@@ -2960,18 +3044,32 @@ func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg,
return nc.request(subj, nil, data, timeout)
}
func (nc *Conn) useOldRequestStyle() bool {
nc.mu.RLock()
r := nc.Opts.UseOldRequestStyle
nc.mu.RUnlock()
return r
}
func (nc *Conn) request(subj string, hdr, data []byte, timeout time.Duration) (*Msg, error) {
if nc == nil {
return nil, ErrInvalidConnection
}
nc.mu.Lock()
if nc.Opts.UseOldRequestStyle {
nc.mu.Unlock()
return nc.oldRequest(subj, hdr, data, timeout)
var m *Msg
var err error
if nc.useOldRequestStyle() {
m, err = nc.oldRequest(subj, hdr, data, timeout)
} else {
m, err = nc.newRequest(subj, hdr, data, timeout)
}
return nc.newRequest(subj, hdr, data, timeout)
// Check for no responder status.
if err == nil && len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders {
m, err = nil, ErrNoResponders
}
return m, err
}
func (nc *Conn) newRequest(subj string, hdr, data []byte, timeout time.Duration) (*Msg, error) {
@@ -3095,15 +3193,15 @@ func (nc *Conn) respToken(respInbox string) string {
// Subscribe will express interest in the given subject. The subject
// can have wildcards (partial:*, full:>). Messages will be delivered
// to the associated MsgHandler.
func (nc *Conn) Subscribe(subj string, cb MsgHandler) (*Subscription, error) {
return nc.subscribe(subj, _EMPTY_, cb, nil, false)
func (nc *Conn) Subscribe(subj string, cb MsgHandler, opts ...SubscribeOption) (*Subscription, error) {
return nc.subscribe(subj, _EMPTY_, cb, nil, false, opts...)
}
// ChanSubscribe will express interest in the given subject and place
// all messages received on the channel.
// You should not close the channel until sub.Unsubscribe() has been called.
func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error) {
return nc.subscribe(subj, _EMPTY_, nil, ch, false)
func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg, opts ...SubscribeOption) (*Subscription, error) {
return nc.subscribe(subj, _EMPTY_, nil, ch, false, opts...)
}
// ChanQueueSubscribe will express interest in the given subject.
@@ -3112,18 +3210,18 @@ func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error)
// which will be placed on the channel.
// You should not close the channel until sub.Unsubscribe() has been called.
// Note: This is the same than QueueSubscribeSyncWithChan.
func (nc *Conn) ChanQueueSubscribe(subj, group string, ch chan *Msg) (*Subscription, error) {
return nc.subscribe(subj, group, nil, ch, false)
func (nc *Conn) ChanQueueSubscribe(subj, group string, ch chan *Msg, opts ...SubscribeOption) (*Subscription, error) {
return nc.subscribe(subj, group, nil, ch, false, opts...)
}
// SubscribeSync will express interest on the given subject. Messages will
// be received synchronously using Subscription.NextMsg().
func (nc *Conn) SubscribeSync(subj string) (*Subscription, error) {
func (nc *Conn) SubscribeSync(subj string, opts ...SubscribeOption) (*Subscription, error) {
if nc == nil {
return nil, ErrInvalidConnection
}
mch := make(chan *Msg, nc.Opts.SubChanLen)
s, e := nc.subscribe(subj, _EMPTY_, nil, mch, true)
s, e := nc.subscribe(subj, _EMPTY_, nil, mch, true, opts...)
return s, e
}
@@ -3131,17 +3229,17 @@ func (nc *Conn) SubscribeSync(subj string) (*Subscription, error) {
// All subscribers with the same queue name will form the queue group and
// only one member of the group will be selected to receive any given
// message asynchronously.
func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription, error) {
return nc.subscribe(subj, queue, cb, nil, false)
func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubscribeOption) (*Subscription, error) {
return nc.subscribe(subj, queue, cb, nil, false, opts...)
}
// QueueSubscribeSync creates a synchronous queue subscriber on the given
// subject. All subscribers with the same queue name will form the queue
// group and only one member of the group will be selected to receive any
// given message synchronously using Subscription.NextMsg().
func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error) {
func (nc *Conn) QueueSubscribeSync(subj, queue string, opts ...SubscribeOption) (*Subscription, error) {
mch := make(chan *Msg, nc.Opts.SubChanLen)
s, e := nc.subscribe(subj, queue, nil, mch, true)
s, e := nc.subscribe(subj, queue, nil, mch, true, opts...)
return s, e
}
@@ -3151,8 +3249,8 @@ func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error) {
// which will be placed on the channel.
// You should not close the channel until sub.Unsubscribe() has been called.
// Note: This is the same than ChanQueueSubscribe.
func (nc *Conn) QueueSubscribeSyncWithChan(subj, queue string, ch chan *Msg) (*Subscription, error) {
return nc.subscribe(subj, queue, nil, ch, false)
func (nc *Conn) QueueSubscribeSyncWithChan(subj, queue string, ch chan *Msg, opts ...SubscribeOption) (*Subscription, error) {
return nc.subscribe(subj, queue, nil, ch, false, opts...)
}
// badSubject will do quick test on whether a subject is acceptable.
@@ -3176,17 +3274,47 @@ func badQueue(qname string) bool {
}
// subscribe is the internal subscribe function that indicates interest in a subject.
func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool) (*Subscription, error) {
func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, opts ...SubscribeOption) (*Subscription, error) {
if nc == nil {
return nil, ErrInvalidConnection
}
var aopts *jsOpts
if len(opts) > 0 {
aopts = newJsOpts()
for _, f := range opts {
if err := f(aopts); err != nil {
return nil, err
}
}
if subj == "" {
subj = NewInbox()
}
}
nc.mu.Lock()
s, err := nc.subscribeLocked(subj, queue, cb, ch, isSync)
s, err := nc.subscribeLocked(subj, queue, cb, ch, isSync, opts...)
nc.mu.Unlock()
return s, err
if err != nil {
return nil, err
}
// here so that interest exist already when doing ephemerals
if aopts != nil {
nfo, err := nc.createOrUpdateConsumer(aopts, subj)
if err != nil {
s.Unsubscribe()
return nil, fmt.Errorf("nats: JetStream consumer creation failed: %s", err)
}
s.ConsumerConfig = &nfo.Config
}
return s, nil
}
func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool) (*Subscription, error) {
func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, opts ...SubscribeOption) (*Subscription, error) {
if nc == nil {
return nil, ErrInvalidConnection
}
@@ -4238,6 +4366,13 @@ func (nc *Conn) MaxPayload() int64 {
return nc.info.MaxPayload
}
// HeadersSupported will return if the server supports headers
func (nc *Conn) HeadersSupported() bool {
nc.mu.RLock()
defer nc.mu.RUnlock()
return nc.info.Headers
}
// AuthRequired will return if the connected server requires authorization.
func (nc *Conn) AuthRequired() bool {
nc.mu.RLock()