Merge pull request #2137 from nats-io/csubs

Concurrent multiple durable subscribers fix
This commit is contained in:
Derek Collison
2021-04-20 20:12:35 -07:00
committed by GitHub
11 changed files with 525 additions and 182 deletions

3
go.mod
View File

@@ -3,10 +3,11 @@ module github.com/nats-io/nats-server/v2
go 1.16
require (
github.com/golang/protobuf v1.4.2 // indirect
github.com/klauspost/compress v1.11.12
github.com/minio/highwayhash v1.0.1
github.com/nats-io/jwt/v2 v2.0.1
github.com/nats-io/nats.go v1.10.1-0.20210330225420-a0b1f60162f8
github.com/nats-io/nats.go v1.10.1-0.20210419223411-20527524c393
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b

2
go.sum
View File

@@ -41,6 +41,8 @@ github.com/nats-io/nats.go v1.10.1-0.20210211000709-75ded9c77585/go.mod h1:uBWnC
github.com/nats-io/nats.go v1.10.1-0.20210228004050-ed743748acac/go.mod h1:hxFvLNbNmT6UppX5B5Tr/r3g+XSwGjJzFn6mxPNJEHc=
github.com/nats-io/nats.go v1.10.1-0.20210330225420-a0b1f60162f8 h1:z/0dTBxMgMfWOtmpyHrbIDKx2duzrxkUeQYJMUnRPj4=
github.com/nats-io/nats.go v1.10.1-0.20210330225420-a0b1f60162f8/go.mod h1:Zq9IEHy7zurF0kFbU5aLIknnFI7guh8ijHk+2v+Vf5g=
github.com/nats-io/nats.go v1.10.1-0.20210419223411-20527524c393 h1:GQxfDz4otI9mde5QqJlpyRNpa2tfURHiPy0YLf7hy4c=
github.com/nats-io/nats.go v1.10.1-0.20210419223411-20527524c393/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=

View File

@@ -122,6 +122,7 @@ type consumerAssignment struct {
// Internal
responded bool
deleted bool
pending bool
err error
}
@@ -2410,9 +2411,10 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
return
}
// Check if we have an existing consumer assignment.
if sa.consumers == nil {
sa.consumers = make(map[string]*consumerAssignment)
} else if oca := sa.consumers[ca.Name]; oca != nil {
} else if oca := sa.consumers[ca.Name]; oca != nil && !oca.pending {
// Copy over private existing state from former CA.
ca.Group.node = oca.Group.node
ca.responded = oca.responded
@@ -2517,9 +2519,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment) {
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
}
result.Response.Error = jsNotFoundError(ErrJetStreamStreamNotFound)
// Send response to the metadata leader. They will forward to the user as needed.
b, _ := json.Marshal(result) // Avoids auto-processing and doing fancy json with newlines.
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, b)
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result)
js.mu.Unlock()
return
}
@@ -2534,6 +2534,19 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment) {
ocfg := o.config()
if configsEqualSansDelivery(ocfg, *ca.Config) && o.hasNoLocalInterest() {
o.updateDeliverSubject(ca.Config.DeliverSubject)
} else {
// This is essentially and update that has failed.
js.mu.Lock()
result := &consumerAssignmentResult{
Account: ca.Client.serviceAccount(),
Stream: ca.Stream,
Consumer: ca.Name,
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
}
result.Response.Error = jsNotFoundError(ErrJetStreamConsumerAlreadyUsed)
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result)
js.mu.Unlock()
return
}
}
o.setConsumerAssignment(ca)
@@ -3950,6 +3963,7 @@ func (cc *jetStreamCluster) createGroupForConsumer(sa *streamAssignment) *raftGr
return &raftGroup{Name: groupNameForConsumer(peers, sa.Config.Storage), Storage: sa.Config.Storage, Peers: peers}
}
// jsClusteredConsumerRequest is first point of entry to create a consumer with R > 1.
func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subject, reply string, rmsg []byte, stream string, cfg *ConsumerConfig) {
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
@@ -3998,7 +4012,12 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
oname = cfg.Durable
if ca := sa.consumers[oname]; ca != nil && !ca.deleted {
// This can be ok if delivery subject update.
if !reflect.DeepEqual(cfg, ca.Config) && !configsEqualSansDelivery(*cfg, *ca.Config) {
shouldErr := !reflect.DeepEqual(cfg, ca.Config) && !configsEqualSansDelivery(*cfg, *ca.Config) || ca.pending
if !shouldErr {
rr := acc.sl.Match(ca.Config.DeliverSubject)
shouldErr = len(rr.psubs)+len(rr.qsubs) != 0
}
if shouldErr {
resp.Error = jsError(ErrJetStreamConsumerAlreadyUsed)
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
@@ -4007,7 +4026,19 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
}
ca := &consumerAssignment{Group: rg, Stream: stream, Name: oname, Config: cfg, Subject: subject, Reply: reply, Client: ci, Created: time.Now().UTC()}
cc.meta.Propose(encodeAddConsumerAssignment(ca))
eca := encodeAddConsumerAssignment(ca)
// Mark this as pending if a durable.
if isDurableConsumer(cfg) {
if sa.consumers == nil {
sa.consumers = make(map[string]*consumerAssignment)
}
ca.pending = true
sa.consumers[ca.Name] = ca
}
// Do formal proposal.
cc.meta.Propose(eca)
}
func encodeAddConsumerAssignment(ca *consumerAssignment) []byte {

View File

@@ -844,7 +844,7 @@ func TestJetStreamClusterStreamSynchedTimeStamps(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
meta, _ := m.MetaData()
meta, _ := m.Metadata()
sub.Unsubscribe()
@@ -2431,12 +2431,12 @@ func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) {
// We should have all the messages for first delivery delivered.
wantSeq := 101
for _, m := range fetchMsgs(t, jsub, 100, 5*time.Second) {
meta, err := m.MetaData()
meta, err := m.Metadata()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if meta.Stream != uint64(wantSeq) {
t.Fatalf("Expected stream sequence of %d, but got %d", wantSeq, meta.Stream)
if meta.Sequence.Stream != uint64(wantSeq) {
t.Fatalf("Expected stream sequence of %d, but got %d", wantSeq, meta.Sequence.Stream)
}
m.Ack()
wantSeq++
@@ -4823,11 +4823,11 @@ func TestJetStreamClusterJSAPIImport(t *testing.T) {
if m.Header != nil {
t.Fatalf("Expected no header on the message, got: %v", m.Header)
}
meta, err := m.MetaData()
meta, err := m.Metadata()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if meta.Consumer != 1 || meta.Stream != 1 || meta.Delivered != 1 || meta.Pending != 0 {
if meta.Sequence.Consumer != 1 || meta.Sequence.Stream != 1 || meta.NumDelivered != 1 || meta.NumPending != 0 {
t.Fatalf("Bad meta: %+v", meta)
}
@@ -4839,11 +4839,11 @@ func TestJetStreamClusterJSAPIImport(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
meta, err = m.MetaData()
meta, err = m.Metadata()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if meta.Consumer != 2 || meta.Stream != 2 || meta.Delivered != 1 || meta.Pending != 1 {
if meta.Sequence.Consumer != 2 || meta.Sequence.Stream != 2 || meta.NumDelivered != 1 || meta.NumPending != 1 {
t.Fatalf("Bad meta: %+v", meta)
}
@@ -5516,7 +5516,7 @@ func TestJetStreamClusterMixedMode(t *testing.T) {
if len(ps.knownPeers) != 3 {
return fmt.Errorf("Expected known peers to be 3, but got %+v", ps.knownPeers)
}
if ps.clusterSize != 3 {
if ps.clusterSize < 3 {
return fmt.Errorf("Expected cluster size to be 3, but got %+v", ps)
}
return nil
@@ -5842,6 +5842,57 @@ func TestJetStreamClusterMirrorAndSourceSubLeaks(t *testing.T) {
}
}
func TestJetStreamClusterCreateConcurrentDurableConsumers(t *testing.T) {
c := createJetStreamClusterExplicit(t, "MSL", 3)
defer c.shutdown()
// Client for API requests.
nc, _ := jsClientConnect(t, c.randomServer())
defer nc.Close()
js, err := nc.JetStream(nats.MaxWait(10 * time.Second))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Create origin stream, muct be R > 1
if _, err := js.AddStream(&nats.StreamConfig{Name: "ORDERS", Replicas: 3}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Now try to create durables concurrently.
start := make(chan struct{})
var wg sync.WaitGroup
created := uint32(0)
errs := make(chan error, 10)
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
<-start
_, err := js.QueueSubscribeSync("ORDERS", "wq", nats.Durable("shared"))
if err == nil {
atomic.AddUint32(&created, 1)
} else if !strings.Contains(err.Error(), "consumer name already") {
errs <- err
}
}()
}
close(start)
wg.Wait()
if lc := atomic.LoadUint32(&created); lc != 1 {
t.Fatalf("Expected only 1 to be created, got %d", lc)
}
if len(errs) > 0 {
t.Fatalf("Failed to create some sub: %v", <-errs)
}
}
// Support functions
// Used to setup superclusters for tests.

View File

@@ -11,10 +11,10 @@ install:
- go get -u github.com/client9/misspell/cmd/misspell
before_script:
- $(exit $(go fmt ./... | wc -l))
- go vet ./...
- go vet -modfile=go_test.mod ./...
- find . -type f -name "*.go" | xargs misspell -error -locale US
- staticcheck ./...
script:
- go test -i -race ./...
- go test -v -run=TestNoRace -p=1 ./...
- go test -modfile=go_test.mod -race ./...
- go test -modfile=go_test.mod -v -run=TestNoRace -p=1 ./...
- if [[ "$TRAVIS_GO_VERSION" =~ 1.16 ]]; then ./scripts/cov.sh TRAVIS; else go test -race -v -p=1 ./... --failfast; fi

View File

@@ -33,7 +33,7 @@ go get github.com/nats-io/nats-server/v2
## Basic Usage
```go
import nats "github.com/nats-io/nats.go"
import "github.com/nats-io/nats.go"
// Connect to a server
nc, _ := nats.Connect(nats.DefaultURL)
@@ -278,7 +278,6 @@ nc.Publish("foo.bar.baz", []byte("Hello World"))
nc.QueueSubscribe("foo", "job_workers", func(_ *Msg) {
received += 1;
})
```
## Advanced Usage
@@ -423,6 +422,85 @@ resp := &response{}
err := c.RequestWithContext(ctx, "foo", req, resp)
```
## JetStream Basic Usage
```go
import "github.com/nats-io/nats.go"
// Connect to NATS
nc, _ := nats.Connect(nats.DefaultURL)
// Create JetStream Context
js, _ := nc.JetStream(nats.PublishAsyncMaxPending(256))
// Simple Stream Publisher
js.Publish("ORDERS.scratch", []byte("hello"))
// Simple Async Stream Publisher
for i := 0; i < 500; i++ {
js.PublishAsync("ORDERS.scratch", []byte("hello"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
fmt.Println("Did not resolve in time")
}
// Simple Async Ephemeral Consumer
js.Subscribe("ORDERS.*", func(m *nats.Msg) {
fmt.Printf("Received a JetStream message: %s\n", string(m.Data))
})
// Simple Sync Durable Consumer (optional SubOpts at the end)
sub, err := js.SubscribeSync("ORDERS.*", nats.Durable("MONITOR"), nats.MaxDeliver(3))
m, err := sub.NextMsg(timeout)
// Simple Pull Consumer
sub, err := js.PullSubscribe("ORDERS.*", "MONITOR")
msgs, err := sub.Fetch(10)
// Unsubscribe
sub.Unsubscribe()
// Drain
sub.Drain()
```
## JetStream Basic Management
```go
import "github.com/nats-io/nats.go"
// Connect to NATS
nc, _ := nats.Connect(nats.DefaultURL)
// Create JetStream Context
js, _ := nc.JetStream()
// Create a Stream
js.AddStream(&nats.StreamConfig{
Name: "ORDERS",
Subjects: []string{"ORDERS.*"},
})
// Update a Stream
js.UpdateStream(&nats.StreamConfig{
Name: "ORDERS",
MaxBytes: 8,
})
// Create a Consumer
js.AddConsumer("ORDERS", &nats.ConsumerConfig{
Durable: "MONITOR",
})
// Delete Consumer
js.DeleteConsumer("ORDERS", "MONITOR")
// Delete Stream
js.DeleteStream("ORDERS")
```
## License
Unless otherwise noted, the NATS source files are distributed

View File

@@ -1,11 +1,8 @@
module github.com/nats-io/nats.go
go 1.15
go 1.16
require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.2.1-0.20210330155036-61cbd74e213d
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
)

View File

@@ -1,80 +1,11 @@
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.12 h1:famVnQVu7QwryBN4jNseQdUKES71ZAOnB6UQQJPZvqk=
github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/minio/highwayhash v1.0.0/go.mod h1:xQboMTeM9nY9v/LlAOxFctujiv5+Aq2hR5dxBpaMbdc=
github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M=
github.com/nats-io/jwt v1.1.0/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M=
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
github.com/nats-io/jwt/v2 v2.0.0-20200916203241-1f8ce17dff02/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ=
github.com/nats-io/jwt/v2 v2.0.0-20201015190852-e11ce317263c/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ=
github.com/nats-io/jwt/v2 v2.0.0-20210125223648-1c24d462becc/go.mod h1:PuO5FToRL31ecdFqVjc794vK0Bj0CwzveQEDvkb7MoQ=
github.com/nats-io/jwt/v2 v2.0.0-20210208203759-ff814ca5f813/go.mod h1:PuO5FToRL31ecdFqVjc794vK0Bj0CwzveQEDvkb7MoQ=
github.com/nats-io/jwt/v2 v2.0.1 h1:SycklijeduR742i/1Y3nRhURYM7imDzZZ3+tuAQqhQA=
github.com/nats-io/jwt/v2 v2.0.1/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093/go.mod h1:rQnBf2Rv4P9adtAs/Ti6LfFmVtFG6HLhl/H7cVshcJU=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71/go.mod h1:Nan/1L5Sa1JRW+Thm4HNYcIDcVRFc5zK9OpSZeI2kk4=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad/go.mod h1:TkHpUIDETmTI7mrHN40D1pzxfzHZuGmtMbtb83TGVQw=
github.com/nats-io/nats-server/v2 v2.1.8-0.20201129161730-ebe63db3e3ed/go.mod h1:XD0zHR/jTXdZvWaQfS5mQgsXj6x12kMjKLyAk/cOGgY=
github.com/nats-io/nats-server/v2 v2.1.8-0.20210205154825-f7ab27f7dad4/go.mod h1:kauGd7hB5517KeSqspW2U1Mz/jhPbTrE8eOXzUPk1m0=
github.com/nats-io/nats-server/v2 v2.1.8-0.20210227190344-51550e242af8/go.mod h1:/QQ/dpqFavkNhVnjvMILSQ3cj5hlmhB66adlgNbjuoA=
github.com/nats-io/nats-server/v2 v2.2.1-0.20210330155036-61cbd74e213d h1:Fi5DT3pdyqP280FPGdkQD+bDjfpR5orUhZ2hhVEU/JA=
github.com/nats-io/nats-server/v2 v2.2.1-0.20210330155036-61cbd74e213d/go.mod h1:eKlAaGmSQHZMFQA6x56AaP5/Bl9N3mWF4awyT2TTpzc=
github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I=
github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4=
github.com/nats-io/nats.go v1.10.1-0.20201021145452-94be476ad6e0/go.mod h1:VU2zERjp8xmF+Lw2NH4u2t5qWZxwc7jB3+7HVMWQXPI=
github.com/nats-io/nats.go v1.10.1-0.20210127212649-5b4924938a9a/go.mod h1:Sa3kLIonafChP5IF0b55i9uvGR10I3hPETFbi4+9kOI=
github.com/nats-io/nats.go v1.10.1-0.20210211000709-75ded9c77585/go.mod h1:uBWnCKg9luW1g7hgzPxUjHFRI40EuTSX7RCzgnc74Jk=
github.com/nats-io/nats.go v1.10.1-0.20210228004050-ed743748acac/go.mod h1:hxFvLNbNmT6UppX5B5Tr/r3g+XSwGjJzFn6mxPNJEHc=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 h1:nxC68pudNYkKU6jWhgrqdreuFiOQWj1Fs7T3VrH4Pjw=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=

View File

@@ -97,11 +97,11 @@ type JetStream interface {
// PublishAsync publishes a message to JetStream and returns a PubAckFuture.
// The data should not be changed until the PubAckFuture has been processed.
PublishAsync(subj string, data []byte, opts ...PubOpt) (*PubAckFuture, error)
PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error)
// PublishMsgAsync publishes a Msg to JetStream and returms a PubAckFuture.
// The message should not be changed until the PubAckFuture has been processed.
PublishMsgAsync(m *Msg, opts ...PubOpt) (*PubAckFuture, error)
PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error)
// PublishAsyncPending returns the number of async publishes outstanding for this context.
PublishAsyncPending() int
@@ -143,7 +143,7 @@ type js struct {
mu sync.RWMutex
rpre string
rsub *Subscription
pafs map[string]*PubAckFuture
pafs map[string]*pubAckFuture
stc chan struct{}
dch chan struct{}
}
@@ -346,7 +346,18 @@ func (js *js) Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error)
}
// PubAckFuture is a future for a PubAck.
type PubAckFuture struct {
type PubAckFuture interface {
// Ok returns a receive only channel that can be used to get a PubAck.
Ok() <-chan *PubAck
// Err returns a receive only channel that can be used to get the error from an async publish.
Err() <-chan error
// Msg returns the message that was sent to the server.
Msg() *Msg
}
type pubAckFuture struct {
js *js
msg *Msg
pa *PubAck
@@ -356,7 +367,7 @@ type PubAckFuture struct {
doneCh chan *PubAck
}
func (paf *PubAckFuture) Ok() <-chan *PubAck {
func (paf *pubAckFuture) Ok() <-chan *PubAck {
paf.js.mu.Lock()
defer paf.js.mu.Unlock()
@@ -370,7 +381,7 @@ func (paf *PubAckFuture) Ok() <-chan *PubAck {
return paf.doneCh
}
func (paf *PubAckFuture) Err() <-chan error {
func (paf *pubAckFuture) Err() <-chan error {
paf.js.mu.Lock()
defer paf.js.mu.Unlock()
@@ -384,7 +395,7 @@ func (paf *PubAckFuture) Err() <-chan error {
return paf.errCh
}
func (paf *PubAckFuture) Msg() *Msg {
func (paf *pubAckFuture) Msg() *Msg {
paf.js.mu.RLock()
defer paf.js.mu.RUnlock()
return paf.msg
@@ -426,10 +437,10 @@ func (js *js) newAsyncReply() string {
}
// registerPAF will register for a PubAckFuture.
func (js *js) registerPAF(id string, paf *PubAckFuture) (int, int) {
func (js *js) registerPAF(id string, paf *pubAckFuture) (int, int) {
js.mu.Lock()
if js.pafs == nil {
js.pafs = make(map[string]*PubAckFuture)
js.pafs = make(map[string]*pubAckFuture)
}
paf.js = js
js.pafs[id] = paf
@@ -440,7 +451,7 @@ func (js *js) registerPAF(id string, paf *PubAckFuture) (int, int) {
}
// Lock should be held.
func (js *js) getPAF(id string) *PubAckFuture {
func (js *js) getPAF(id string) *pubAckFuture {
if js.pafs == nil {
return nil
}
@@ -565,11 +576,11 @@ func PublishAsyncMaxPending(max int) JSOpt {
}
// PublishAsync publishes a message to JetStream and returns a PubAckFuture
func (js *js) PublishAsync(subj string, data []byte, opts ...PubOpt) (*PubAckFuture, error) {
func (js *js) PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error) {
return js.PublishMsgAsync(&Msg{Subject: subj, Data: data}, opts...)
}
func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (*PubAckFuture, error) {
func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
var o pubOpts
if len(opts) > 0 {
if m.Header == nil {
@@ -610,7 +621,7 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (*PubAckFuture, error) {
return nil, errors.New("nats: error creating async reply handler")
}
id := m.Reply[aReplyPreLen:]
paf := &PubAckFuture{msg: m, st: time.Now()}
paf := &pubAckFuture{msg: m, st: time.Now()}
numPending, maxPending := js.registerPAF(id, paf)
if maxPending > 0 && numPending >= maxPending {
@@ -682,6 +693,7 @@ type ackOpts struct {
ctx context.Context
}
// AckOpt are the options that can be passed when acknowledge a message.
type AckOpt interface {
configureAck(opts *ackOpts) error
}
@@ -765,6 +777,8 @@ type ConsumerConfig struct {
SampleFrequency string `json:"sample_freq,omitempty"`
MaxWaiting int `json:"max_waiting,omitempty"`
MaxAckPending int `json:"max_ack_pending,omitempty"`
FlowControl bool `json:"flow_control,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
}
// ConsumerInfo is the info from a JetStream consumer.
@@ -804,6 +818,19 @@ type jsSub struct {
pull bool
durable bool
attached bool
// Heartbeats and Flow Control handling from push consumers.
hbs bool
fc bool
// cmeta is holds metadata from a push consumer when HBs are enabled.
cmeta atomic.Value
}
// controlMetadata is metadata used to be able to detect sequence mismatch
// errors in push based consumers that have heartbeats enabled.
type controlMetadata struct {
meta string
}
func (jsi *jsSub) unsubscribe(drainMode bool) error {
@@ -879,6 +906,8 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
isPullMode := ch == nil && cb == nil
badPullAck := o.cfg.AckPolicy == AckNonePolicy || o.cfg.AckPolicy == AckAllPolicy
hasHeartbeats := o.cfg.Heartbeat > 0
hasFC := o.cfg.FlowControl
if isPullMode && badPullAck {
return nil, fmt.Errorf("nats: invalid ack mode for pull consumers: %s", o.cfg.AckPolicy)
}
@@ -887,10 +916,12 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
err error
shouldCreate bool
ccfg *ConsumerConfig
info *ConsumerInfo
deliver string
attached bool
stream = o.stream
consumer = o.consumer
isDurable = o.cfg.Durable != _EMPTY_
)
// Find the stream mapped to the subject if not bound to a stream already.
@@ -905,7 +936,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
// With an explicit durable name, then can lookup
// the consumer to which it should be attaching to.
var info *ConsumerInfo
consumer = o.cfg.Durable
if consumer != _EMPTY_ {
// Only create in case there is no consumer already.
@@ -949,9 +979,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
}
if isPullMode {
sub = &Subscription{Subject: subj, conn: js.nc, typ: PullSubscription, jsi: &jsSub{js: js, pull: true}}
sub = &Subscription{Subject: subj, conn: js.nc, typ: PullSubscription, jsi: &jsSub{js: js, pull: isPullMode}}
} else {
sub, err = js.nc.subscribe(deliver, queue, cb, ch, isSync, &jsSub{js: js})
sub, err = js.nc.subscribe(deliver, queue, cb, ch, isSync, &jsSub{js: js, hbs: hasHeartbeats, fc: hasFC})
if err != nil {
return nil, err
}
@@ -982,7 +1012,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
}
var ccSubj string
isDurable := cfg.Durable != _EMPTY_
if isDurable {
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable)
} else {
@@ -998,37 +1027,96 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
return nil, err
}
var info consumerResponse
err = json.Unmarshal(resp.Data, &info)
var cinfo consumerResponse
err = json.Unmarshal(resp.Data, &cinfo)
if err != nil {
sub.Unsubscribe()
return nil, err
}
if info.Error != nil {
info = cinfo.ConsumerInfo
if cinfo.Error != nil {
// Remove interest from previous subscribe since it
// may have an incorrect delivery subject.
sub.Unsubscribe()
return nil, fmt.Errorf("nats: %s", info.Error.Description)
// Multiple subscribers could compete in creating the first consumer
// that will be shared using the same durable name. If this happens, then
// do a lookup of the consumer info and resubscribe using the latest info.
if consumer != _EMPTY_ && strings.Contains(cinfo.Error.Description, `consumer already exists`) {
info, err = js.ConsumerInfo(stream, consumer)
if err != nil && err.Error() != "nats: consumer not found" {
return nil, err
}
ccfg = &info.Config
// Validate that the original subject does still match.
if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject {
return nil, ErrSubjectMismatch
}
// Use the deliver subject from latest consumer config to attach.
if ccfg.DeliverSubject != _EMPTY_ {
sub, err = js.nc.subscribe(ccfg.DeliverSubject, queue, cb, ch, isSync,
&jsSub{js: js, hbs: hasHeartbeats, fc: hasFC})
if err != nil {
return nil, err
}
}
attached = true
} else {
return nil, fmt.Errorf("nats: %s", cinfo.Error.Description)
}
}
// Hold onto these for later.
sub.jsi.stream = info.Stream
sub.jsi.consumer = info.Name
sub.jsi.deliver = info.Config.DeliverSubject
sub.jsi.durable = isDurable
} else {
sub.jsi.stream = stream
sub.jsi.consumer = consumer
sub.jsi.deliver = ccfg.DeliverSubject
stream = info.Stream
consumer = info.Name
deliver = info.Config.DeliverSubject
}
sub.jsi.stream = stream
sub.jsi.consumer = consumer
sub.jsi.durable = isDurable
sub.jsi.attached = attached
// If we are pull based go ahead and fire off the first request to populate.
if isPullMode {
sub.jsi.pull = o.pull
}
sub.jsi.deliver = deliver
return sub, nil
}
func (nc *Conn) processControlFlow(msg *Msg, s *Subscription, jsi *jsSub) {
// If it is a flow control message then have to ack.
if msg.Reply != "" {
nc.publish(msg.Reply, _EMPTY_, nil, nil)
} else if jsi.hbs {
// Process heartbeat received, get latest control metadata if present.
var ctrl *controlMetadata
cmeta := jsi.cmeta.Load()
if cmeta == nil {
return
}
ctrl = cmeta.(*controlMetadata)
tokens, err := getMetadataFields(ctrl.meta)
if err != nil {
return
}
// Consumer sequence
dseq := tokens[6]
ldseq := msg.Header.Get(lastConsumerSeqHdr)
// Detect consumer sequence mismatch and whether
// should restart the consumer.
if ldseq != dseq {
// Dispatch async error including details such as
// from where the consumer could be restarted.
sseq := parseNum(tokens[5])
ecs := &ErrConsumerSequenceMismatch{
StreamResumeSequence: uint64(sseq),
ConsumerSequence: parseNum(dseq),
LastConsumerSequence: parseNum(ldseq),
}
nc.handleConsumerSequenceMismatch(s, ecs)
}
}
}
type streamRequest struct {
Subject string `json:"subject,omitempty"`
}
@@ -1065,8 +1153,6 @@ func (js *js) lookupStreamBySubject(subj string) (string, error) {
type subOpts struct {
// For attaching.
stream, consumer string
// For pull based consumers.
pull bool
// For manual ack
mack bool
// For creating or updating.
@@ -1209,6 +1295,22 @@ func BindStream(name string) SubOpt {
})
}
// EnableFlowControl enables flow control for a push based consumer.
func EnableFlowControl() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.FlowControl = true
return nil
})
}
// IdleHeartbeat enables push based consumers to have idle heartbeats delivered.
func IdleHeartbeat(duration time.Duration) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.Heartbeat = duration
return nil
})
}
func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
sub.mu.Lock()
// TODO(dlc) - Better way to mark especially if we attach.
@@ -1230,11 +1332,12 @@ type pullOpts struct {
ctx context.Context
}
// PullOpt are the options that can be passed when pulling a batch of messages.
type PullOpt interface {
configurePull(opts *pullOpts) error
}
// PullMaxWaiting defines the max inflight pull requests to be delivered more messages.
// PullMaxWaiting defines the max inflight pull requests.
func PullMaxWaiting(n int) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.MaxWaiting = n
@@ -1652,28 +1755,21 @@ func (m *Msg) InProgress(opts ...AckOpt) error {
}
// MsgMetadata is the JetStream metadata associated with received messages.
type MsgMetaData struct {
Consumer uint64
Stream uint64
Delivered uint64
Pending uint64
Timestamp time.Time
StreamName string
type MsgMetadata struct {
Sequence SequencePair
NumDelivered uint64
NumPending uint64
Timestamp time.Time
Stream string
Consumer string
}
// MetaData retrieves the metadata from a JetStream message. This method will
// return an error for non-JetStream Msgs.
func (m *Msg) MetaData() (*MsgMetaData, error) {
if _, _, err := m.checkReply(); err != nil {
return nil, err
}
func getMetadataFields(subject string) ([]string, error) {
const expectedTokens = 9
const btsep = '.'
tsa := [expectedTokens]string{}
start, tokens := 0, tsa[:0]
subject := m.Reply
for i := 0; i < len(subject); i++ {
if subject[i] == btsep {
tokens = append(tokens, subject[start:i])
@@ -1684,16 +1780,30 @@ func (m *Msg) MetaData() (*MsgMetaData, error) {
if len(tokens) != expectedTokens || tokens[0] != "$JS" || tokens[1] != "ACK" {
return nil, ErrNotJSMessage
}
return tokens, nil
}
meta := &MsgMetaData{
Delivered: uint64(parseNum(tokens[4])),
Stream: uint64(parseNum(tokens[5])),
Consumer: uint64(parseNum(tokens[6])),
Timestamp: time.Unix(0, parseNum(tokens[7])),
Pending: uint64(parseNum(tokens[8])),
StreamName: tokens[2],
// Metadata retrieves the metadata from a JetStream message. This method will
// return an error for non-JetStream Msgs.
func (m *Msg) Metadata() (*MsgMetadata, error) {
if _, _, err := m.checkReply(); err != nil {
return nil, err
}
tokens, err := getMetadataFields(m.Reply)
if err != nil {
return nil, err
}
meta := &MsgMetadata{
NumDelivered: uint64(parseNum(tokens[4])),
NumPending: uint64(parseNum(tokens[8])),
Timestamp: time.Unix(0, parseNum(tokens[7])),
Stream: tokens[2],
Consumer: tokens[3],
}
meta.Sequence.Stream = uint64(parseNum(tokens[5]))
meta.Sequence.Consumer = uint64(parseNum(tokens[6]))
return meta, nil
}

View File

@@ -542,6 +542,23 @@ type Subscription struct {
// Msg represents a message delivered by NATS. This structure is used
// by Subscribers and PublishMsg().
//
// Types of Acknowledgements
//
// In case using JetStream, there are multiple ways to ack a Msg:
//
// // Acknowledgement that a message has been processed.
// msg.Ack()
//
// // Negatively acknowledges a message.
// msg.Nak()
//
// // Terminate a message so that it is not redelivered further.
// msg.Term()
//
// // Signal the server that the message is being worked on and reset redelivery timer.
// msg.InProgress()
//
type Msg struct {
Subject string
Reply string
@@ -2451,6 +2468,8 @@ func (nc *Conn) processMsg(data []byte) {
// Check if we have headers encoded here.
var h http.Header
var err error
var ctrl bool
var hasFC bool
if nc.ps.ma.hdr > 0 {
hbuf := msgPayload[:nc.ps.ma.hdr]
@@ -2472,6 +2491,13 @@ func (nc *Conn) processMsg(data []byte) {
sub.mu.Lock()
// Skip flow control messages in case of using a JetStream context.
jsi := sub.jsi
if jsi != nil {
ctrl = isControlMessage(m)
hasFC = jsi.fc
}
// Check if closed.
if sub.closed {
sub.mu.Unlock()
@@ -2498,23 +2524,28 @@ func (nc *Conn) processMsg(data []byte) {
// We have two modes of delivery. One is the channel, used by channel
// subscribers and syncSubscribers, the other is a linked list for async.
if sub.mch != nil {
select {
case sub.mch <- m:
default:
goto slowConsumer
}
} else {
// Push onto the async pList
if sub.pHead == nil {
sub.pHead = m
sub.pTail = m
if sub.pCond != nil {
sub.pCond.Signal()
if !ctrl {
if sub.mch != nil {
select {
case sub.mch <- m:
default:
goto slowConsumer
}
} else {
sub.pTail.next = m
sub.pTail = m
// Push onto the async pList
if sub.pHead == nil {
sub.pHead = m
sub.pTail = m
if sub.pCond != nil {
sub.pCond.Signal()
}
} else {
sub.pTail.next = m
sub.pTail = m
}
}
if hasFC {
jsi.trackSequences(m)
}
}
@@ -2522,6 +2553,13 @@ func (nc *Conn) processMsg(data []byte) {
sub.sc = false
sub.mu.Unlock()
// Handle flow control and heartbeat messages automatically
// for JetStream Push consumers.
if ctrl {
nc.processControlFlow(m, sub, jsi)
}
return
slowConsumer:
@@ -2833,14 +2871,17 @@ func NewMsg(subject string) *Msg {
}
const (
hdrLine = "NATS/1.0\r\n"
crlf = "\r\n"
hdrPreEnd = len(hdrLine) - len(crlf)
statusHdr = "Status"
descrHdr = "Description"
noResponders = "503"
noMessages = "404"
statusLen = 3 // e.g. 20x, 40x, 50x
hdrLine = "NATS/1.0\r\n"
crlf = "\r\n"
hdrPreEnd = len(hdrLine) - len(crlf)
statusHdr = "Status"
descrHdr = "Description"
lastConsumerSeqHdr = "Nats-Last-Consumer"
lastStreamSeqHdr = "Nats-Last-Stream"
noResponders = "503"
noMessages = "404"
controlMsg = "100"
statusLen = 3 // e.g. 20x, 40x, 50x
)
// decodeHeadersMsg will decode and headers.
@@ -2850,10 +2891,12 @@ func decodeHeadersMsg(data []byte) (http.Header, error) {
if err != nil || len(l) < hdrPreEnd || l[:hdrPreEnd] != hdrLine[:hdrPreEnd] {
return nil, ErrBadHeaderMsg
}
mh, err := tp.ReadMIMEHeader()
mh, err := readMIMEHeader(tp)
if err != nil {
return nil, ErrBadHeaderMsg
return nil, err
}
// Check if we have an inlined status.
if len(l) > hdrPreEnd {
var description string
@@ -2870,6 +2913,53 @@ func decodeHeadersMsg(data []byte) (http.Header, error) {
return http.Header(mh), nil
}
// readMIMEHeader returns a MIMEHeader that preserves the
// original case of the MIME header, based on the implementation
// of textproto.ReadMIMEHeader.
//
// https://golang.org/pkg/net/textproto/#Reader.ReadMIMEHeader
func readMIMEHeader(tp *textproto.Reader) (textproto.MIMEHeader, error) {
var (
m = make(textproto.MIMEHeader)
strs []string
)
for {
kv, err := tp.ReadLine()
if len(kv) == 0 {
return m, err
}
// Process key fetching original case.
i := bytes.IndexByte([]byte(kv), ':')
if i < 0 {
return nil, ErrBadHeaderMsg
}
key := kv[:i]
if key == "" {
// Skip empty keys.
continue
}
i++
for i < len(kv) && (kv[i] == ' ' || kv[i] == '\t') {
i++
}
value := string(kv[i:])
vv := m[key]
if vv == nil && len(strs) > 0 {
// Single value header.
vv, strs = strs[:1:1], strs[1:]
vv[0] = value
m[key] = vv
} else {
// Multi value header.
m[key] = append(vv, value)
}
if err != nil {
return m, err
}
}
}
// PublishMsg publishes the Msg structure, which includes the
// Subject, an optional Reply and an optional Data field.
func (nc *Conn) PublishMsg(m *Msg) error {
@@ -3581,7 +3671,8 @@ func (s *Subscription) AutoUnsubscribe(max int) error {
// unsubscribe performs the low level unsubscribe to the server.
// Use Subscription.Unsubscribe()
func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error {
// Check whether it is a JetStream sub and should clean up consumers.
// For JetStream consumers, need to clean up ephemeral consumers
// or delete durable ones if called with Unsubscribe.
sub.mu.Lock()
jsi := sub.jsi
sub.mu.Unlock()
@@ -3629,6 +3720,55 @@ func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error {
return nil
}
// ErrConsumerSequenceMismatch represents an error from a consumer
// that received a Heartbeat including sequence different to the
// one expected from the view of the client.
type ErrConsumerSequenceMismatch struct {
// StreamResumeSequence is the stream sequence from where the consumer
// should resume consuming from the stream.
StreamResumeSequence uint64
// ConsumerSequence is the sequence of the consumer that is behind.
ConsumerSequence int64
// LastConsumerSequence is the sequence of the consumer when the heartbeat
// was received.
LastConsumerSequence int64
}
func (ecs *ErrConsumerSequenceMismatch) Error() string {
return fmt.Sprintf("nats: sequence mismatch for consumer at sequence %d (%d sequences behind), should restart consumer from stream sequence %d",
ecs.ConsumerSequence,
ecs.LastConsumerSequence-ecs.ConsumerSequence,
ecs.StreamResumeSequence,
)
}
// handleConsumerSequenceMismatch will send an async error that can be used to restart a push based consumer.
func (nc *Conn) handleConsumerSequenceMismatch(sub *Subscription, err error) {
nc.mu.Lock()
errCB := nc.Opts.AsyncErrorCB
if errCB != nil {
nc.ach.push(func() { errCB(nc, sub, err) })
}
nc.mu.Unlock()
}
func isControlMessage(msg *Msg) bool {
return len(msg.Data) == 0 && msg.Header.Get(statusHdr) == controlMsg
}
func (jsi *jsSub) trackSequences(msg *Msg) {
var ctrl *controlMetadata
if cmeta := jsi.cmeta.Load(); cmeta == nil {
ctrl = &controlMetadata{}
} else {
ctrl = cmeta.(*controlMetadata)
}
ctrl.meta = msg.Reply
jsi.cmeta.Store(ctrl)
}
// NextMsg will return the next message available to a synchronous subscriber
// or block until one is available. An error is returned if the subscription is invalid (ErrBadSubscription),
// the connection is closed (ErrConnectionClosed), or the timeout is reached (ErrTimeout).

4
vendor/modules.txt vendored
View File

@@ -1,3 +1,5 @@
# github.com/golang/protobuf v1.4.2
## explicit
# github.com/klauspost/compress v1.11.12
## explicit
github.com/klauspost/compress/s2
@@ -7,7 +9,7 @@ github.com/minio/highwayhash
# github.com/nats-io/jwt/v2 v2.0.1
## explicit
github.com/nats-io/jwt/v2
# github.com/nats-io/nats.go v1.10.1-0.20210330225420-a0b1f60162f8
# github.com/nats-io/nats.go v1.10.1-0.20210419223411-20527524c393
## explicit
github.com/nats-io/nats.go
github.com/nats-io/nats.go/encoders/builtin