From 518ff9be145cee1ff412f814f9d5720d1bdba978 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 20 Apr 2021 19:50:24 -0700 Subject: [PATCH] Concurrent multiple durable subscribers would cause unpredictable behaviors. Upgraded to current Go client. Signed-off-by: Derek Collison --- go.mod | 3 +- go.sum | 2 + server/jetstream_cluster.go | 43 +++- server/jetstream_cluster_test.go | 69 +++++- vendor/github.com/nats-io/nats.go/.travis.yml | 6 +- vendor/github.com/nats-io/nats.go/README.md | 82 ++++++- vendor/github.com/nats-io/nats.go/go.mod | 5 +- vendor/github.com/nats-io/nats.go/go.sum | 69 ------ vendor/github.com/nats-io/nats.go/js.go | 232 +++++++++++++----- vendor/github.com/nats-io/nats.go/nats.go | 192 +++++++++++++-- vendor/modules.txt | 4 +- 11 files changed, 525 insertions(+), 182 deletions(-) diff --git a/go.mod b/go.mod index 9416450b..ff9c0d72 100644 --- a/go.mod +++ b/go.mod @@ -3,10 +3,11 @@ module github.com/nats-io/nats-server/v2 go 1.16 require ( + github.com/golang/protobuf v1.4.2 // indirect github.com/klauspost/compress v1.11.12 github.com/minio/highwayhash v1.0.1 github.com/nats-io/jwt/v2 v2.0.1 - github.com/nats-io/nats.go v1.10.1-0.20210330225420-a0b1f60162f8 + github.com/nats-io/nats.go v1.10.1-0.20210419223411-20527524c393 github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b diff --git a/go.sum b/go.sum index bed1a270..824a251f 100644 --- a/go.sum +++ b/go.sum @@ -41,6 +41,8 @@ github.com/nats-io/nats.go v1.10.1-0.20210211000709-75ded9c77585/go.mod h1:uBWnC github.com/nats-io/nats.go v1.10.1-0.20210228004050-ed743748acac/go.mod h1:hxFvLNbNmT6UppX5B5Tr/r3g+XSwGjJzFn6mxPNJEHc= github.com/nats-io/nats.go v1.10.1-0.20210330225420-a0b1f60162f8 h1:z/0dTBxMgMfWOtmpyHrbIDKx2duzrxkUeQYJMUnRPj4= github.com/nats-io/nats.go v1.10.1-0.20210330225420-a0b1f60162f8/go.mod h1:Zq9IEHy7zurF0kFbU5aLIknnFI7guh8ijHk+2v+Vf5g= +github.com/nats-io/nats.go v1.10.1-0.20210419223411-20527524c393 h1:GQxfDz4otI9mde5QqJlpyRNpa2tfURHiPy0YLf7hy4c= +github.com/nats-io/nats.go v1.10.1-0.20210419223411-20527524c393/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index dbf3f114..1f768500 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -122,6 +122,7 @@ type consumerAssignment struct { // Internal responded bool deleted bool + pending bool err error } @@ -2410,9 +2411,10 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { return } + // Check if we have an existing consumer assignment. if sa.consumers == nil { sa.consumers = make(map[string]*consumerAssignment) - } else if oca := sa.consumers[ca.Name]; oca != nil { + } else if oca := sa.consumers[ca.Name]; oca != nil && !oca.pending { // Copy over private existing state from former CA. ca.Group.node = oca.Group.node ca.responded = oca.responded @@ -2517,9 +2519,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment) { Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}, } result.Response.Error = jsNotFoundError(ErrJetStreamStreamNotFound) - // Send response to the metadata leader. They will forward to the user as needed. - b, _ := json.Marshal(result) // Avoids auto-processing and doing fancy json with newlines. - s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, b) + s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result) js.mu.Unlock() return } @@ -2534,6 +2534,19 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment) { ocfg := o.config() if configsEqualSansDelivery(ocfg, *ca.Config) && o.hasNoLocalInterest() { o.updateDeliverSubject(ca.Config.DeliverSubject) + } else { + // This is essentially and update that has failed. + js.mu.Lock() + result := &consumerAssignmentResult{ + Account: ca.Client.serviceAccount(), + Stream: ca.Stream, + Consumer: ca.Name, + Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}, + } + result.Response.Error = jsNotFoundError(ErrJetStreamConsumerAlreadyUsed) + s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result) + js.mu.Unlock() + return } } o.setConsumerAssignment(ca) @@ -3950,6 +3963,7 @@ func (cc *jetStreamCluster) createGroupForConsumer(sa *streamAssignment) *raftGr return &raftGroup{Name: groupNameForConsumer(peers, sa.Config.Storage), Storage: sa.Config.Storage, Peers: peers} } +// jsClusteredConsumerRequest is first point of entry to create a consumer with R > 1. func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subject, reply string, rmsg []byte, stream string, cfg *ConsumerConfig) { js, cc := s.getJetStreamCluster() if js == nil || cc == nil { @@ -3998,7 +4012,12 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec oname = cfg.Durable if ca := sa.consumers[oname]; ca != nil && !ca.deleted { // This can be ok if delivery subject update. - if !reflect.DeepEqual(cfg, ca.Config) && !configsEqualSansDelivery(*cfg, *ca.Config) { + shouldErr := !reflect.DeepEqual(cfg, ca.Config) && !configsEqualSansDelivery(*cfg, *ca.Config) || ca.pending + if !shouldErr { + rr := acc.sl.Match(ca.Config.DeliverSubject) + shouldErr = len(rr.psubs)+len(rr.qsubs) != 0 + } + if shouldErr { resp.Error = jsError(ErrJetStreamConsumerAlreadyUsed) s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return @@ -4007,7 +4026,19 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec } ca := &consumerAssignment{Group: rg, Stream: stream, Name: oname, Config: cfg, Subject: subject, Reply: reply, Client: ci, Created: time.Now().UTC()} - cc.meta.Propose(encodeAddConsumerAssignment(ca)) + eca := encodeAddConsumerAssignment(ca) + + // Mark this as pending if a durable. + if isDurableConsumer(cfg) { + if sa.consumers == nil { + sa.consumers = make(map[string]*consumerAssignment) + } + ca.pending = true + sa.consumers[ca.Name] = ca + } + + // Do formal proposal. + cc.meta.Propose(eca) } func encodeAddConsumerAssignment(ca *consumerAssignment) []byte { diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 8c715768..190a4db7 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -844,7 +844,7 @@ func TestJetStreamClusterStreamSynchedTimeStamps(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - meta, _ := m.MetaData() + meta, _ := m.Metadata() sub.Unsubscribe() @@ -2431,12 +2431,12 @@ func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) { // We should have all the messages for first delivery delivered. wantSeq := 101 for _, m := range fetchMsgs(t, jsub, 100, 5*time.Second) { - meta, err := m.MetaData() + meta, err := m.Metadata() if err != nil { t.Fatalf("Unexpected error: %v", err) } - if meta.Stream != uint64(wantSeq) { - t.Fatalf("Expected stream sequence of %d, but got %d", wantSeq, meta.Stream) + if meta.Sequence.Stream != uint64(wantSeq) { + t.Fatalf("Expected stream sequence of %d, but got %d", wantSeq, meta.Sequence.Stream) } m.Ack() wantSeq++ @@ -4823,11 +4823,11 @@ func TestJetStreamClusterJSAPIImport(t *testing.T) { if m.Header != nil { t.Fatalf("Expected no header on the message, got: %v", m.Header) } - meta, err := m.MetaData() + meta, err := m.Metadata() if err != nil { t.Fatalf("Unexpected error: %v", err) } - if meta.Consumer != 1 || meta.Stream != 1 || meta.Delivered != 1 || meta.Pending != 0 { + if meta.Sequence.Consumer != 1 || meta.Sequence.Stream != 1 || meta.NumDelivered != 1 || meta.NumPending != 0 { t.Fatalf("Bad meta: %+v", meta) } @@ -4839,11 +4839,11 @@ func TestJetStreamClusterJSAPIImport(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - meta, err = m.MetaData() + meta, err = m.Metadata() if err != nil { t.Fatalf("Unexpected error: %v", err) } - if meta.Consumer != 2 || meta.Stream != 2 || meta.Delivered != 1 || meta.Pending != 1 { + if meta.Sequence.Consumer != 2 || meta.Sequence.Stream != 2 || meta.NumDelivered != 1 || meta.NumPending != 1 { t.Fatalf("Bad meta: %+v", meta) } @@ -5516,7 +5516,7 @@ func TestJetStreamClusterMixedMode(t *testing.T) { if len(ps.knownPeers) != 3 { return fmt.Errorf("Expected known peers to be 3, but got %+v", ps.knownPeers) } - if ps.clusterSize != 3 { + if ps.clusterSize < 3 { return fmt.Errorf("Expected cluster size to be 3, but got %+v", ps) } return nil @@ -5842,6 +5842,57 @@ func TestJetStreamClusterMirrorAndSourceSubLeaks(t *testing.T) { } } +func TestJetStreamClusterCreateConcurrentDurableConsumers(t *testing.T) { + c := createJetStreamClusterExplicit(t, "MSL", 3) + defer c.shutdown() + + // Client for API requests. + nc, _ := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + js, err := nc.JetStream(nats.MaxWait(10 * time.Second)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Create origin stream, muct be R > 1 + if _, err := js.AddStream(&nats.StreamConfig{Name: "ORDERS", Replicas: 3}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Now try to create durables concurrently. + start := make(chan struct{}) + var wg sync.WaitGroup + created := uint32(0) + errs := make(chan error, 10) + + for i := 0; i < 10; i++ { + wg.Add(1) + + go func() { + defer wg.Done() + <-start + _, err := js.QueueSubscribeSync("ORDERS", "wq", nats.Durable("shared")) + if err == nil { + atomic.AddUint32(&created, 1) + } else if !strings.Contains(err.Error(), "consumer name already") { + errs <- err + } + }() + } + + close(start) + wg.Wait() + + if lc := atomic.LoadUint32(&created); lc != 1 { + t.Fatalf("Expected only 1 to be created, got %d", lc) + } + + if len(errs) > 0 { + t.Fatalf("Failed to create some sub: %v", <-errs) + } +} + // Support functions // Used to setup superclusters for tests. diff --git a/vendor/github.com/nats-io/nats.go/.travis.yml b/vendor/github.com/nats-io/nats.go/.travis.yml index bcdbf44f..f43acfe0 100644 --- a/vendor/github.com/nats-io/nats.go/.travis.yml +++ b/vendor/github.com/nats-io/nats.go/.travis.yml @@ -11,10 +11,10 @@ install: - go get -u github.com/client9/misspell/cmd/misspell before_script: - $(exit $(go fmt ./... | wc -l)) -- go vet ./... +- go vet -modfile=go_test.mod ./... - find . -type f -name "*.go" | xargs misspell -error -locale US - staticcheck ./... script: -- go test -i -race ./... -- go test -v -run=TestNoRace -p=1 ./... +- go test -modfile=go_test.mod -race ./... +- go test -modfile=go_test.mod -v -run=TestNoRace -p=1 ./... - if [[ "$TRAVIS_GO_VERSION" =~ 1.16 ]]; then ./scripts/cov.sh TRAVIS; else go test -race -v -p=1 ./... --failfast; fi diff --git a/vendor/github.com/nats-io/nats.go/README.md b/vendor/github.com/nats-io/nats.go/README.md index 17746568..d86ba668 100644 --- a/vendor/github.com/nats-io/nats.go/README.md +++ b/vendor/github.com/nats-io/nats.go/README.md @@ -33,7 +33,7 @@ go get github.com/nats-io/nats-server/v2 ## Basic Usage ```go -import nats "github.com/nats-io/nats.go" +import "github.com/nats-io/nats.go" // Connect to a server nc, _ := nats.Connect(nats.DefaultURL) @@ -278,7 +278,6 @@ nc.Publish("foo.bar.baz", []byte("Hello World")) nc.QueueSubscribe("foo", "job_workers", func(_ *Msg) { received += 1; }) - ``` ## Advanced Usage @@ -423,6 +422,85 @@ resp := &response{} err := c.RequestWithContext(ctx, "foo", req, resp) ``` +## JetStream Basic Usage + +```go +import "github.com/nats-io/nats.go" + +// Connect to NATS +nc, _ := nats.Connect(nats.DefaultURL) + +// Create JetStream Context +js, _ := nc.JetStream(nats.PublishAsyncMaxPending(256)) + +// Simple Stream Publisher +js.Publish("ORDERS.scratch", []byte("hello")) + +// Simple Async Stream Publisher +for i := 0; i < 500; i++ { + js.PublishAsync("ORDERS.scratch", []byte("hello")) +} +select { +case <-js.PublishAsyncComplete(): +case <-time.After(5 * time.Second): + fmt.Println("Did not resolve in time") +} + +// Simple Async Ephemeral Consumer +js.Subscribe("ORDERS.*", func(m *nats.Msg) { + fmt.Printf("Received a JetStream message: %s\n", string(m.Data)) +}) + +// Simple Sync Durable Consumer (optional SubOpts at the end) +sub, err := js.SubscribeSync("ORDERS.*", nats.Durable("MONITOR"), nats.MaxDeliver(3)) +m, err := sub.NextMsg(timeout) + +// Simple Pull Consumer +sub, err := js.PullSubscribe("ORDERS.*", "MONITOR") +msgs, err := sub.Fetch(10) + +// Unsubscribe +sub.Unsubscribe() + +// Drain +sub.Drain() +``` + +## JetStream Basic Management + +```go +import "github.com/nats-io/nats.go" + +// Connect to NATS +nc, _ := nats.Connect(nats.DefaultURL) + +// Create JetStream Context +js, _ := nc.JetStream() + +// Create a Stream +js.AddStream(&nats.StreamConfig{ + Name: "ORDERS", + Subjects: []string{"ORDERS.*"}, +}) + +// Update a Stream +js.UpdateStream(&nats.StreamConfig{ + Name: "ORDERS", + MaxBytes: 8, +}) + +// Create a Consumer +js.AddConsumer("ORDERS", &nats.ConsumerConfig{ + Durable: "MONITOR", +}) + +// Delete Consumer +js.DeleteConsumer("ORDERS", "MONITOR") + +// Delete Stream +js.DeleteStream("ORDERS") +``` + ## License Unless otherwise noted, the NATS source files are distributed diff --git a/vendor/github.com/nats-io/nats.go/go.mod b/vendor/github.com/nats-io/nats.go/go.mod index 21f393d5..63faedf4 100644 --- a/vendor/github.com/nats-io/nats.go/go.mod +++ b/vendor/github.com/nats-io/nats.go/go.mod @@ -1,11 +1,8 @@ module github.com/nats-io/nats.go -go 1.15 +go 1.16 require ( - github.com/golang/protobuf v1.4.2 - github.com/nats-io/nats-server/v2 v2.2.1-0.20210330155036-61cbd74e213d github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 - google.golang.org/protobuf v1.23.0 ) diff --git a/vendor/github.com/nats-io/nats.go/go.sum b/vendor/github.com/nats-io/nats.go/go.sum index 49ca66f9..2138ffb5 100644 --- a/vendor/github.com/nats-io/nats.go/go.sum +++ b/vendor/github.com/nats-io/nats.go/go.sum @@ -1,80 +1,11 @@ -github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= -github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= -github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= -github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= -github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= -github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= -github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= -github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= -github.com/klauspost/compress v1.11.12 h1:famVnQVu7QwryBN4jNseQdUKES71ZAOnB6UQQJPZvqk= -github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= -github.com/minio/highwayhash v1.0.0/go.mod h1:xQboMTeM9nY9v/LlAOxFctujiv5+Aq2hR5dxBpaMbdc= -github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0= -github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= -github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= -github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M= -github.com/nats-io/jwt v1.1.0/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M= -github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= -github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= -github.com/nats-io/jwt/v2 v2.0.0-20200916203241-1f8ce17dff02/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ= -github.com/nats-io/jwt/v2 v2.0.0-20201015190852-e11ce317263c/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ= -github.com/nats-io/jwt/v2 v2.0.0-20210125223648-1c24d462becc/go.mod h1:PuO5FToRL31ecdFqVjc794vK0Bj0CwzveQEDvkb7MoQ= -github.com/nats-io/jwt/v2 v2.0.0-20210208203759-ff814ca5f813/go.mod h1:PuO5FToRL31ecdFqVjc794vK0Bj0CwzveQEDvkb7MoQ= -github.com/nats-io/jwt/v2 v2.0.1 h1:SycklijeduR742i/1Y3nRhURYM7imDzZZ3+tuAQqhQA= -github.com/nats-io/jwt/v2 v2.0.1/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY= -github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093/go.mod h1:rQnBf2Rv4P9adtAs/Ti6LfFmVtFG6HLhl/H7cVshcJU= -github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71/go.mod h1:Nan/1L5Sa1JRW+Thm4HNYcIDcVRFc5zK9OpSZeI2kk4= -github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad/go.mod h1:TkHpUIDETmTI7mrHN40D1pzxfzHZuGmtMbtb83TGVQw= -github.com/nats-io/nats-server/v2 v2.1.8-0.20201129161730-ebe63db3e3ed/go.mod h1:XD0zHR/jTXdZvWaQfS5mQgsXj6x12kMjKLyAk/cOGgY= -github.com/nats-io/nats-server/v2 v2.1.8-0.20210205154825-f7ab27f7dad4/go.mod h1:kauGd7hB5517KeSqspW2U1Mz/jhPbTrE8eOXzUPk1m0= -github.com/nats-io/nats-server/v2 v2.1.8-0.20210227190344-51550e242af8/go.mod h1:/QQ/dpqFavkNhVnjvMILSQ3cj5hlmhB66adlgNbjuoA= -github.com/nats-io/nats-server/v2 v2.2.1-0.20210330155036-61cbd74e213d h1:Fi5DT3pdyqP280FPGdkQD+bDjfpR5orUhZ2hhVEU/JA= -github.com/nats-io/nats-server/v2 v2.2.1-0.20210330155036-61cbd74e213d/go.mod h1:eKlAaGmSQHZMFQA6x56AaP5/Bl9N3mWF4awyT2TTpzc= -github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE= -github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I= -github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4= -github.com/nats-io/nats.go v1.10.1-0.20201021145452-94be476ad6e0/go.mod h1:VU2zERjp8xmF+Lw2NH4u2t5qWZxwc7jB3+7HVMWQXPI= -github.com/nats-io/nats.go v1.10.1-0.20210127212649-5b4924938a9a/go.mod h1:Sa3kLIonafChP5IF0b55i9uvGR10I3hPETFbi4+9kOI= -github.com/nats-io/nats.go v1.10.1-0.20210211000709-75ded9c77585/go.mod h1:uBWnCKg9luW1g7hgzPxUjHFRI40EuTSX7RCzgnc74Jk= -github.com/nats-io/nats.go v1.10.1-0.20210228004050-ed743748acac/go.mod h1:hxFvLNbNmT6UppX5B5Tr/r3g+XSwGjJzFn6mxPNJEHc= -github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= -github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= -github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 h1:nxC68pudNYkKU6jWhgrqdreuFiOQWj1Fs7T3VrH4Pjw= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI= -golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= -google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= -google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= -google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= -google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= -google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= -google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= diff --git a/vendor/github.com/nats-io/nats.go/js.go b/vendor/github.com/nats-io/nats.go/js.go index ae2f204f..626bd819 100644 --- a/vendor/github.com/nats-io/nats.go/js.go +++ b/vendor/github.com/nats-io/nats.go/js.go @@ -97,11 +97,11 @@ type JetStream interface { // PublishAsync publishes a message to JetStream and returns a PubAckFuture. // The data should not be changed until the PubAckFuture has been processed. - PublishAsync(subj string, data []byte, opts ...PubOpt) (*PubAckFuture, error) + PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error) // PublishMsgAsync publishes a Msg to JetStream and returms a PubAckFuture. // The message should not be changed until the PubAckFuture has been processed. - PublishMsgAsync(m *Msg, opts ...PubOpt) (*PubAckFuture, error) + PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) // PublishAsyncPending returns the number of async publishes outstanding for this context. PublishAsyncPending() int @@ -143,7 +143,7 @@ type js struct { mu sync.RWMutex rpre string rsub *Subscription - pafs map[string]*PubAckFuture + pafs map[string]*pubAckFuture stc chan struct{} dch chan struct{} } @@ -346,7 +346,18 @@ func (js *js) Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error) } // PubAckFuture is a future for a PubAck. -type PubAckFuture struct { +type PubAckFuture interface { + // Ok returns a receive only channel that can be used to get a PubAck. + Ok() <-chan *PubAck + + // Err returns a receive only channel that can be used to get the error from an async publish. + Err() <-chan error + + // Msg returns the message that was sent to the server. + Msg() *Msg +} + +type pubAckFuture struct { js *js msg *Msg pa *PubAck @@ -356,7 +367,7 @@ type PubAckFuture struct { doneCh chan *PubAck } -func (paf *PubAckFuture) Ok() <-chan *PubAck { +func (paf *pubAckFuture) Ok() <-chan *PubAck { paf.js.mu.Lock() defer paf.js.mu.Unlock() @@ -370,7 +381,7 @@ func (paf *PubAckFuture) Ok() <-chan *PubAck { return paf.doneCh } -func (paf *PubAckFuture) Err() <-chan error { +func (paf *pubAckFuture) Err() <-chan error { paf.js.mu.Lock() defer paf.js.mu.Unlock() @@ -384,7 +395,7 @@ func (paf *PubAckFuture) Err() <-chan error { return paf.errCh } -func (paf *PubAckFuture) Msg() *Msg { +func (paf *pubAckFuture) Msg() *Msg { paf.js.mu.RLock() defer paf.js.mu.RUnlock() return paf.msg @@ -426,10 +437,10 @@ func (js *js) newAsyncReply() string { } // registerPAF will register for a PubAckFuture. -func (js *js) registerPAF(id string, paf *PubAckFuture) (int, int) { +func (js *js) registerPAF(id string, paf *pubAckFuture) (int, int) { js.mu.Lock() if js.pafs == nil { - js.pafs = make(map[string]*PubAckFuture) + js.pafs = make(map[string]*pubAckFuture) } paf.js = js js.pafs[id] = paf @@ -440,7 +451,7 @@ func (js *js) registerPAF(id string, paf *PubAckFuture) (int, int) { } // Lock should be held. -func (js *js) getPAF(id string) *PubAckFuture { +func (js *js) getPAF(id string) *pubAckFuture { if js.pafs == nil { return nil } @@ -565,11 +576,11 @@ func PublishAsyncMaxPending(max int) JSOpt { } // PublishAsync publishes a message to JetStream and returns a PubAckFuture -func (js *js) PublishAsync(subj string, data []byte, opts ...PubOpt) (*PubAckFuture, error) { +func (js *js) PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error) { return js.PublishMsgAsync(&Msg{Subject: subj, Data: data}, opts...) } -func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (*PubAckFuture, error) { +func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) { var o pubOpts if len(opts) > 0 { if m.Header == nil { @@ -610,7 +621,7 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (*PubAckFuture, error) { return nil, errors.New("nats: error creating async reply handler") } id := m.Reply[aReplyPreLen:] - paf := &PubAckFuture{msg: m, st: time.Now()} + paf := &pubAckFuture{msg: m, st: time.Now()} numPending, maxPending := js.registerPAF(id, paf) if maxPending > 0 && numPending >= maxPending { @@ -682,6 +693,7 @@ type ackOpts struct { ctx context.Context } +// AckOpt are the options that can be passed when acknowledge a message. type AckOpt interface { configureAck(opts *ackOpts) error } @@ -765,6 +777,8 @@ type ConsumerConfig struct { SampleFrequency string `json:"sample_freq,omitempty"` MaxWaiting int `json:"max_waiting,omitempty"` MaxAckPending int `json:"max_ack_pending,omitempty"` + FlowControl bool `json:"flow_control,omitempty"` + Heartbeat time.Duration `json:"idle_heartbeat,omitempty"` } // ConsumerInfo is the info from a JetStream consumer. @@ -804,6 +818,19 @@ type jsSub struct { pull bool durable bool attached bool + + // Heartbeats and Flow Control handling from push consumers. + hbs bool + fc bool + + // cmeta is holds metadata from a push consumer when HBs are enabled. + cmeta atomic.Value +} + +// controlMetadata is metadata used to be able to detect sequence mismatch +// errors in push based consumers that have heartbeats enabled. +type controlMetadata struct { + meta string } func (jsi *jsSub) unsubscribe(drainMode bool) error { @@ -879,6 +906,8 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync isPullMode := ch == nil && cb == nil badPullAck := o.cfg.AckPolicy == AckNonePolicy || o.cfg.AckPolicy == AckAllPolicy + hasHeartbeats := o.cfg.Heartbeat > 0 + hasFC := o.cfg.FlowControl if isPullMode && badPullAck { return nil, fmt.Errorf("nats: invalid ack mode for pull consumers: %s", o.cfg.AckPolicy) } @@ -887,10 +916,12 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync err error shouldCreate bool ccfg *ConsumerConfig + info *ConsumerInfo deliver string attached bool stream = o.stream consumer = o.consumer + isDurable = o.cfg.Durable != _EMPTY_ ) // Find the stream mapped to the subject if not bound to a stream already. @@ -905,7 +936,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync // With an explicit durable name, then can lookup // the consumer to which it should be attaching to. - var info *ConsumerInfo consumer = o.cfg.Durable if consumer != _EMPTY_ { // Only create in case there is no consumer already. @@ -949,9 +979,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync } if isPullMode { - sub = &Subscription{Subject: subj, conn: js.nc, typ: PullSubscription, jsi: &jsSub{js: js, pull: true}} + sub = &Subscription{Subject: subj, conn: js.nc, typ: PullSubscription, jsi: &jsSub{js: js, pull: isPullMode}} } else { - sub, err = js.nc.subscribe(deliver, queue, cb, ch, isSync, &jsSub{js: js}) + sub, err = js.nc.subscribe(deliver, queue, cb, ch, isSync, &jsSub{js: js, hbs: hasHeartbeats, fc: hasFC}) if err != nil { return nil, err } @@ -982,7 +1012,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync } var ccSubj string - isDurable := cfg.Durable != _EMPTY_ if isDurable { ccSubj = fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable) } else { @@ -998,37 +1027,96 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync return nil, err } - var info consumerResponse - err = json.Unmarshal(resp.Data, &info) + var cinfo consumerResponse + err = json.Unmarshal(resp.Data, &cinfo) if err != nil { sub.Unsubscribe() return nil, err } - if info.Error != nil { + info = cinfo.ConsumerInfo + if cinfo.Error != nil { + // Remove interest from previous subscribe since it + // may have an incorrect delivery subject. sub.Unsubscribe() - return nil, fmt.Errorf("nats: %s", info.Error.Description) + + // Multiple subscribers could compete in creating the first consumer + // that will be shared using the same durable name. If this happens, then + // do a lookup of the consumer info and resubscribe using the latest info. + if consumer != _EMPTY_ && strings.Contains(cinfo.Error.Description, `consumer already exists`) { + info, err = js.ConsumerInfo(stream, consumer) + if err != nil && err.Error() != "nats: consumer not found" { + return nil, err + } + ccfg = &info.Config + + // Validate that the original subject does still match. + if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject { + return nil, ErrSubjectMismatch + } + + // Use the deliver subject from latest consumer config to attach. + if ccfg.DeliverSubject != _EMPTY_ { + sub, err = js.nc.subscribe(ccfg.DeliverSubject, queue, cb, ch, isSync, + &jsSub{js: js, hbs: hasHeartbeats, fc: hasFC}) + if err != nil { + return nil, err + } + } + attached = true + } else { + return nil, fmt.Errorf("nats: %s", cinfo.Error.Description) + } } - - // Hold onto these for later. - sub.jsi.stream = info.Stream - sub.jsi.consumer = info.Name - sub.jsi.deliver = info.Config.DeliverSubject - sub.jsi.durable = isDurable - } else { - sub.jsi.stream = stream - sub.jsi.consumer = consumer - sub.jsi.deliver = ccfg.DeliverSubject + stream = info.Stream + consumer = info.Name + deliver = info.Config.DeliverSubject } + sub.jsi.stream = stream + sub.jsi.consumer = consumer + sub.jsi.durable = isDurable sub.jsi.attached = attached - - // If we are pull based go ahead and fire off the first request to populate. - if isPullMode { - sub.jsi.pull = o.pull - } + sub.jsi.deliver = deliver return sub, nil } +func (nc *Conn) processControlFlow(msg *Msg, s *Subscription, jsi *jsSub) { + // If it is a flow control message then have to ack. + if msg.Reply != "" { + nc.publish(msg.Reply, _EMPTY_, nil, nil) + } else if jsi.hbs { + // Process heartbeat received, get latest control metadata if present. + var ctrl *controlMetadata + cmeta := jsi.cmeta.Load() + if cmeta == nil { + return + } + + ctrl = cmeta.(*controlMetadata) + tokens, err := getMetadataFields(ctrl.meta) + if err != nil { + return + } + // Consumer sequence + dseq := tokens[6] + ldseq := msg.Header.Get(lastConsumerSeqHdr) + + // Detect consumer sequence mismatch and whether + // should restart the consumer. + if ldseq != dseq { + // Dispatch async error including details such as + // from where the consumer could be restarted. + sseq := parseNum(tokens[5]) + ecs := &ErrConsumerSequenceMismatch{ + StreamResumeSequence: uint64(sseq), + ConsumerSequence: parseNum(dseq), + LastConsumerSequence: parseNum(ldseq), + } + nc.handleConsumerSequenceMismatch(s, ecs) + } + } +} + type streamRequest struct { Subject string `json:"subject,omitempty"` } @@ -1065,8 +1153,6 @@ func (js *js) lookupStreamBySubject(subj string) (string, error) { type subOpts struct { // For attaching. stream, consumer string - // For pull based consumers. - pull bool // For manual ack mack bool // For creating or updating. @@ -1209,6 +1295,22 @@ func BindStream(name string) SubOpt { }) } +// EnableFlowControl enables flow control for a push based consumer. +func EnableFlowControl() SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.FlowControl = true + return nil + }) +} + +// IdleHeartbeat enables push based consumers to have idle heartbeats delivered. +func IdleHeartbeat(duration time.Duration) SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.Heartbeat = duration + return nil + }) +} + func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { sub.mu.Lock() // TODO(dlc) - Better way to mark especially if we attach. @@ -1230,11 +1332,12 @@ type pullOpts struct { ctx context.Context } +// PullOpt are the options that can be passed when pulling a batch of messages. type PullOpt interface { configurePull(opts *pullOpts) error } -// PullMaxWaiting defines the max inflight pull requests to be delivered more messages. +// PullMaxWaiting defines the max inflight pull requests. func PullMaxWaiting(n int) SubOpt { return subOptFn(func(opts *subOpts) error { opts.cfg.MaxWaiting = n @@ -1652,28 +1755,21 @@ func (m *Msg) InProgress(opts ...AckOpt) error { } // MsgMetadata is the JetStream metadata associated with received messages. -type MsgMetaData struct { - Consumer uint64 - Stream uint64 - Delivered uint64 - Pending uint64 - Timestamp time.Time - StreamName string +type MsgMetadata struct { + Sequence SequencePair + NumDelivered uint64 + NumPending uint64 + Timestamp time.Time + Stream string + Consumer string } -// MetaData retrieves the metadata from a JetStream message. This method will -// return an error for non-JetStream Msgs. -func (m *Msg) MetaData() (*MsgMetaData, error) { - if _, _, err := m.checkReply(); err != nil { - return nil, err - } - +func getMetadataFields(subject string) ([]string, error) { const expectedTokens = 9 const btsep = '.' tsa := [expectedTokens]string{} start, tokens := 0, tsa[:0] - subject := m.Reply for i := 0; i < len(subject); i++ { if subject[i] == btsep { tokens = append(tokens, subject[start:i]) @@ -1684,16 +1780,30 @@ func (m *Msg) MetaData() (*MsgMetaData, error) { if len(tokens) != expectedTokens || tokens[0] != "$JS" || tokens[1] != "ACK" { return nil, ErrNotJSMessage } + return tokens, nil +} - meta := &MsgMetaData{ - Delivered: uint64(parseNum(tokens[4])), - Stream: uint64(parseNum(tokens[5])), - Consumer: uint64(parseNum(tokens[6])), - Timestamp: time.Unix(0, parseNum(tokens[7])), - Pending: uint64(parseNum(tokens[8])), - StreamName: tokens[2], +// Metadata retrieves the metadata from a JetStream message. This method will +// return an error for non-JetStream Msgs. +func (m *Msg) Metadata() (*MsgMetadata, error) { + if _, _, err := m.checkReply(); err != nil { + return nil, err } + tokens, err := getMetadataFields(m.Reply) + if err != nil { + return nil, err + } + + meta := &MsgMetadata{ + NumDelivered: uint64(parseNum(tokens[4])), + NumPending: uint64(parseNum(tokens[8])), + Timestamp: time.Unix(0, parseNum(tokens[7])), + Stream: tokens[2], + Consumer: tokens[3], + } + meta.Sequence.Stream = uint64(parseNum(tokens[5])) + meta.Sequence.Consumer = uint64(parseNum(tokens[6])) return meta, nil } diff --git a/vendor/github.com/nats-io/nats.go/nats.go b/vendor/github.com/nats-io/nats.go/nats.go index eabd9adf..3f025261 100644 --- a/vendor/github.com/nats-io/nats.go/nats.go +++ b/vendor/github.com/nats-io/nats.go/nats.go @@ -542,6 +542,23 @@ type Subscription struct { // Msg represents a message delivered by NATS. This structure is used // by Subscribers and PublishMsg(). +// +// Types of Acknowledgements +// +// In case using JetStream, there are multiple ways to ack a Msg: +// +// // Acknowledgement that a message has been processed. +// msg.Ack() +// +// // Negatively acknowledges a message. +// msg.Nak() +// +// // Terminate a message so that it is not redelivered further. +// msg.Term() +// +// // Signal the server that the message is being worked on and reset redelivery timer. +// msg.InProgress() +// type Msg struct { Subject string Reply string @@ -2451,6 +2468,8 @@ func (nc *Conn) processMsg(data []byte) { // Check if we have headers encoded here. var h http.Header var err error + var ctrl bool + var hasFC bool if nc.ps.ma.hdr > 0 { hbuf := msgPayload[:nc.ps.ma.hdr] @@ -2472,6 +2491,13 @@ func (nc *Conn) processMsg(data []byte) { sub.mu.Lock() + // Skip flow control messages in case of using a JetStream context. + jsi := sub.jsi + if jsi != nil { + ctrl = isControlMessage(m) + hasFC = jsi.fc + } + // Check if closed. if sub.closed { sub.mu.Unlock() @@ -2498,23 +2524,28 @@ func (nc *Conn) processMsg(data []byte) { // We have two modes of delivery. One is the channel, used by channel // subscribers and syncSubscribers, the other is a linked list for async. - if sub.mch != nil { - select { - case sub.mch <- m: - default: - goto slowConsumer - } - } else { - // Push onto the async pList - if sub.pHead == nil { - sub.pHead = m - sub.pTail = m - if sub.pCond != nil { - sub.pCond.Signal() + if !ctrl { + if sub.mch != nil { + select { + case sub.mch <- m: + default: + goto slowConsumer } } else { - sub.pTail.next = m - sub.pTail = m + // Push onto the async pList + if sub.pHead == nil { + sub.pHead = m + sub.pTail = m + if sub.pCond != nil { + sub.pCond.Signal() + } + } else { + sub.pTail.next = m + sub.pTail = m + } + } + if hasFC { + jsi.trackSequences(m) } } @@ -2522,6 +2553,13 @@ func (nc *Conn) processMsg(data []byte) { sub.sc = false sub.mu.Unlock() + + // Handle flow control and heartbeat messages automatically + // for JetStream Push consumers. + if ctrl { + nc.processControlFlow(m, sub, jsi) + } + return slowConsumer: @@ -2833,14 +2871,17 @@ func NewMsg(subject string) *Msg { } const ( - hdrLine = "NATS/1.0\r\n" - crlf = "\r\n" - hdrPreEnd = len(hdrLine) - len(crlf) - statusHdr = "Status" - descrHdr = "Description" - noResponders = "503" - noMessages = "404" - statusLen = 3 // e.g. 20x, 40x, 50x + hdrLine = "NATS/1.0\r\n" + crlf = "\r\n" + hdrPreEnd = len(hdrLine) - len(crlf) + statusHdr = "Status" + descrHdr = "Description" + lastConsumerSeqHdr = "Nats-Last-Consumer" + lastStreamSeqHdr = "Nats-Last-Stream" + noResponders = "503" + noMessages = "404" + controlMsg = "100" + statusLen = 3 // e.g. 20x, 40x, 50x ) // decodeHeadersMsg will decode and headers. @@ -2850,10 +2891,12 @@ func decodeHeadersMsg(data []byte) (http.Header, error) { if err != nil || len(l) < hdrPreEnd || l[:hdrPreEnd] != hdrLine[:hdrPreEnd] { return nil, ErrBadHeaderMsg } - mh, err := tp.ReadMIMEHeader() + + mh, err := readMIMEHeader(tp) if err != nil { - return nil, ErrBadHeaderMsg + return nil, err } + // Check if we have an inlined status. if len(l) > hdrPreEnd { var description string @@ -2870,6 +2913,53 @@ func decodeHeadersMsg(data []byte) (http.Header, error) { return http.Header(mh), nil } +// readMIMEHeader returns a MIMEHeader that preserves the +// original case of the MIME header, based on the implementation +// of textproto.ReadMIMEHeader. +// +// https://golang.org/pkg/net/textproto/#Reader.ReadMIMEHeader +func readMIMEHeader(tp *textproto.Reader) (textproto.MIMEHeader, error) { + var ( + m = make(textproto.MIMEHeader) + strs []string + ) + for { + kv, err := tp.ReadLine() + if len(kv) == 0 { + return m, err + } + + // Process key fetching original case. + i := bytes.IndexByte([]byte(kv), ':') + if i < 0 { + return nil, ErrBadHeaderMsg + } + key := kv[:i] + if key == "" { + // Skip empty keys. + continue + } + i++ + for i < len(kv) && (kv[i] == ' ' || kv[i] == '\t') { + i++ + } + value := string(kv[i:]) + vv := m[key] + if vv == nil && len(strs) > 0 { + // Single value header. + vv, strs = strs[:1:1], strs[1:] + vv[0] = value + m[key] = vv + } else { + // Multi value header. + m[key] = append(vv, value) + } + if err != nil { + return m, err + } + } +} + // PublishMsg publishes the Msg structure, which includes the // Subject, an optional Reply and an optional Data field. func (nc *Conn) PublishMsg(m *Msg) error { @@ -3581,7 +3671,8 @@ func (s *Subscription) AutoUnsubscribe(max int) error { // unsubscribe performs the low level unsubscribe to the server. // Use Subscription.Unsubscribe() func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error { - // Check whether it is a JetStream sub and should clean up consumers. + // For JetStream consumers, need to clean up ephemeral consumers + // or delete durable ones if called with Unsubscribe. sub.mu.Lock() jsi := sub.jsi sub.mu.Unlock() @@ -3629,6 +3720,55 @@ func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error { return nil } +// ErrConsumerSequenceMismatch represents an error from a consumer +// that received a Heartbeat including sequence different to the +// one expected from the view of the client. +type ErrConsumerSequenceMismatch struct { + // StreamResumeSequence is the stream sequence from where the consumer + // should resume consuming from the stream. + StreamResumeSequence uint64 + + // ConsumerSequence is the sequence of the consumer that is behind. + ConsumerSequence int64 + + // LastConsumerSequence is the sequence of the consumer when the heartbeat + // was received. + LastConsumerSequence int64 +} + +func (ecs *ErrConsumerSequenceMismatch) Error() string { + return fmt.Sprintf("nats: sequence mismatch for consumer at sequence %d (%d sequences behind), should restart consumer from stream sequence %d", + ecs.ConsumerSequence, + ecs.LastConsumerSequence-ecs.ConsumerSequence, + ecs.StreamResumeSequence, + ) +} + +// handleConsumerSequenceMismatch will send an async error that can be used to restart a push based consumer. +func (nc *Conn) handleConsumerSequenceMismatch(sub *Subscription, err error) { + nc.mu.Lock() + errCB := nc.Opts.AsyncErrorCB + if errCB != nil { + nc.ach.push(func() { errCB(nc, sub, err) }) + } + nc.mu.Unlock() +} + +func isControlMessage(msg *Msg) bool { + return len(msg.Data) == 0 && msg.Header.Get(statusHdr) == controlMsg +} + +func (jsi *jsSub) trackSequences(msg *Msg) { + var ctrl *controlMetadata + if cmeta := jsi.cmeta.Load(); cmeta == nil { + ctrl = &controlMetadata{} + } else { + ctrl = cmeta.(*controlMetadata) + } + ctrl.meta = msg.Reply + jsi.cmeta.Store(ctrl) +} + // NextMsg will return the next message available to a synchronous subscriber // or block until one is available. An error is returned if the subscription is invalid (ErrBadSubscription), // the connection is closed (ErrConnectionClosed), or the timeout is reached (ErrTimeout). diff --git a/vendor/modules.txt b/vendor/modules.txt index 26c4e76a..f4dbbff6 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,3 +1,5 @@ +# github.com/golang/protobuf v1.4.2 +## explicit # github.com/klauspost/compress v1.11.12 ## explicit github.com/klauspost/compress/s2 @@ -7,7 +9,7 @@ github.com/minio/highwayhash # github.com/nats-io/jwt/v2 v2.0.1 ## explicit github.com/nats-io/jwt/v2 -# github.com/nats-io/nats.go v1.10.1-0.20210330225420-a0b1f60162f8 +# github.com/nats-io/nats.go v1.10.1-0.20210419223411-20527524c393 ## explicit github.com/nats-io/nats.go github.com/nats-io/nats.go/encoders/builtin