From 610d2d21b7e14ebbaf079a17301b9f2e7944f0ec Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 19 Oct 2020 19:38:28 -0700 Subject: [PATCH] More robust waiting queue for pull mode consumers Signed-off-by: Derek Collison --- go.mod | 4 +- go.sum | 6 + server/consumer.go | 174 +++++++-- server/jetstream_api.go | 3 + server/stream.go | 1 + test/jetstream_test.go | 192 ++++++++- vendor/github.com/nats-io/nats.go/.travis.yml | 4 +- .../github.com/nats-io/nats.go/MAINTAINERS.md | 6 +- vendor/github.com/nats-io/nats.go/README.md | 18 +- vendor/github.com/nats-io/nats.go/context.go | 53 +-- vendor/github.com/nats-io/nats.go/enc.go | 2 +- vendor/github.com/nats-io/nats.go/go.mod | 2 +- vendor/github.com/nats-io/nats.go/go.sum | 9 + .../github.com/nats-io/nats.go/jetstream.go | 366 ++++++++++++++++++ .../nats-io/nats.go/jetstream_consumer.go | 268 +++++++++++++ vendor/github.com/nats-io/nats.go/nats.go | 243 +++++++++--- vendor/modules.txt | 4 +- 17 files changed, 1240 insertions(+), 115 deletions(-) create mode 100644 vendor/github.com/nats-io/nats.go/jetstream.go create mode 100644 vendor/github.com/nats-io/nats.go/jetstream_consumer.go diff --git a/go.mod b/go.mod index 3744c01f..f59a2c2a 100644 --- a/go.mod +++ b/go.mod @@ -3,10 +3,10 @@ module github.com/nats-io/nats-server/v2 require ( github.com/minio/highwayhash v1.0.0 github.com/nats-io/jwt/v2 v2.0.0-20201015190852-e11ce317263c - github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a + github.com/nats-io/nats.go v1.10.1-0.20201013114232-5a33ce07522f github.com/nats-io/nkeys v0.2.0 github.com/nats-io/nuid v1.0.1 - golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 + golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7 golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 ) diff --git a/go.sum b/go.sum index a41b8040..c953c6a6 100644 --- a/go.sum +++ b/go.sum @@ -14,14 +14,18 @@ github.com/minio/highwayhash v1.0.0/go.mod h1:xQboMTeM9nY9v/LlAOxFctujiv5+Aq2hR5 github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7 h1:RnGotxlghqR5D2KDAu4TyuLqyjuylOsJiAFhXvMvQIc= github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M= +github.com/nats-io/jwt/v2 v2.0.0-20200916203241-1f8ce17dff02/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ= github.com/nats-io/jwt/v2 v2.0.0-20201015190852-e11ce317263c h1:Hc1D9ChlsCMVwCxJ6QT5xqfk2zJ4XNea+LtdfaYhd20= github.com/nats-io/jwt/v2 v2.0.0-20201015190852-e11ce317263c/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ= github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093/go.mod h1:rQnBf2Rv4P9adtAs/Ti6LfFmVtFG6HLhl/H7cVshcJU= github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71/go.mod h1:Nan/1L5Sa1JRW+Thm4HNYcIDcVRFc5zK9OpSZeI2kk4= +github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad/go.mod h1:TkHpUIDETmTI7mrHN40D1pzxfzHZuGmtMbtb83TGVQw= github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE= github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I= github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a h1:gzSKZOBlu/DpbuPbt34paXCOvA6+E+lVfU2BmomQ9HA= github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4= +github.com/nats-io/nats.go v1.10.1-0.20201013114232-5a33ce07522f h1:ip3GUJTsw7ybH4qIRtv/GjNzMyHmsJnxXzRmsbqYh8M= +github.com/nats-io/nats.go v1.10.1-0.20201013114232-5a33ce07522f/go.mod h1:VU2zERjp8xmF+Lw2NH4u2t5qWZxwc7jB3+7HVMWQXPI= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.2.0 h1:WXKF7diOaPU9cJdLD7nuzwasQy9vT1tBqzXZZf3AMJM= @@ -32,6 +36,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 h1:3zb4D3T4G8jdExgVU/95+vQXfpEPiMdCaZgmGVxjNHM= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 h1:pLI5jrR7OSLijeIDcmRxNmw2api+jEfxLoykJVice/E= +golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/server/consumer.go b/server/consumer.go index 2dde289d..6095ab0c 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -18,6 +18,7 @@ import ( "crypto/rand" "crypto/sha256" "encoding/json" + "errors" "fmt" mrand "math/rand" "reflect" @@ -40,6 +41,7 @@ type ConsumerInfo struct { AckFloor SequencePair `json:"ack_floor"` NumPending int `json:"num_pending"` NumRedelivered int `json:"num_redelivered"` + NumWaiting int `json:"num_waiting"` } type ConsumerConfig struct { @@ -55,6 +57,7 @@ type ConsumerConfig struct { ReplayPolicy ReplayPolicy `json:"replay_policy"` RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec SampleFrequency string `json:"sample_freq,omitempty"` + MaxWaiting int `json:"max_waiting,omitempty"` } type CreateConsumerRequest struct { @@ -178,7 +181,7 @@ type Consumer struct { rdq []uint64 rdc map[uint64]uint64 maxdc uint64 - waiting []string + waiting *waitQueue config ConsumerConfig store ConsumerStore active bool @@ -218,6 +221,9 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) { if mset.deliveryFormsCycle(config.DeliverSubject) { return nil, fmt.Errorf("consumer deliver subject forms a cycle") } + if config.MaxWaiting != 0 { + return nil, fmt.Errorf("consumer in push mode can not set max waiting") + } } else { // Pull mode / work queue mode require explicit ack. if config.AckPolicy != AckExplicit { @@ -231,6 +237,13 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) { if config.RateLimit > 0 { return nil, fmt.Errorf("consumer in pull mode can not have rate limit set") } + if config.MaxWaiting < 0 { + return nil, fmt.Errorf("consumer max waiting needs to be positive") + } + // Set to default if not specified. + if config.MaxWaiting == 0 { + config.MaxWaiting = JSWaitQueueDefaultMax + } } // Setup proper default for ack wait if we are in explicit ack mode. @@ -380,6 +393,9 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) { return nil, fmt.Errorf("consumer name is too long, maximum allowed is %d", JSMaxNameLen) } o.name = config.Durable + if o.isPullMode() { + o.waiting = newWaitQueue(config.MaxWaiting) + } } else { for { o.name = createConsumerName() @@ -885,6 +901,10 @@ func (o *Consumer) Info() *ConsumerInfo { NumPending: len(o.pending), NumRedelivered: len(o.rdc), } + // If we are a pull mode consumer, report on number of waiting requests. + if o.isPullMode() { + info.NumWaiting = o.waiting.len() + } o.mu.Unlock() return info } @@ -1042,10 +1062,97 @@ func batchSizeFromMsg(msg []byte) int { return bs } +// Represents a request that is on the internal waiting queue +type waitingRequest struct { + client *client + reply string + n int // For batching + expires time.Time +} + +// waiting queue for requests that are waiting for new messages to arrive. +type waitQueue struct { + rp, wp int + reqs []*waitingRequest +} + +// Create a new ring buffer with at most max items. +func newWaitQueue(max int) *waitQueue { + return &waitQueue{rp: -1, reqs: make([]*waitingRequest, max)} +} + +var ( + errWaitQueueFull = errors.New("wait queue is full") + errWaitQueueNil = errors.New("wait queue is nil") +) + +// Adds in a new request. +func (wq *waitQueue) add(req *waitingRequest) error { + if wq == nil { + return errWaitQueueNil + } + if wq.isFull() { + return errWaitQueueFull + } + wq.reqs[wq.wp] = req + // TODO(dlc) - Could make pow2 and get rid of mod. + wq.wp = (wq.wp + 1) % cap(wq.reqs) + + // Adjust read pointer if we were empty. + if wq.rp < 0 { + wq.rp = 0 + } + + return nil +} + +func (wq *waitQueue) isFull() bool { + return wq.rp == wq.wp +} + +func (wq *waitQueue) len() int { + if wq == nil || wq.rp < 0 { + return 0 + } + if wq.rp < wq.wp { + return wq.wp - wq.rp + } + return cap(wq.reqs) - wq.rp + wq.wp +} + +// Peek will return the next request waiting or nil if empty. +func (wq *waitQueue) peek() *waitingRequest { + if wq == nil { + return nil + } + var wr *waitingRequest + if wq.rp >= 0 { + wr = wq.reqs[wq.rp] + } + return wr +} + +// pop will return the next request and move the read cursor. +func (wq *waitQueue) pop() *waitingRequest { + wr := wq.peek() + if wr != nil { + wr.n-- + if wr.n <= 0 { + wq.reqs[wq.rp] = nil + wq.rp = (wq.rp + 1) % cap(wq.reqs) + // Check if we are empty. + if wq.rp == wq.wp { + wq.rp, wq.wp = -1, 0 + } + } + } + return wr +} + // processNextMsgReq will process a request for the next message available. A nil message payload means deliver // a single message. If the payload is a number parseable with Atoi(), then we will send a batch of messages without // requiring another request to this endpoint, or an ACK. -func (o *Consumer) processNextMsgReq(_ *subscription, _ *client, _, reply string, msg []byte) { +func (o *Consumer) processNextMsgReq(_ *subscription, c *client, _, reply string, msg []byte) { // Check payload here to see if they sent in batch size. batchSize := batchSizeFromMsg(msg) @@ -1055,23 +1162,41 @@ func (o *Consumer) processNextMsgReq(_ *subscription, _ *client, _, reply string o.mu.Unlock() return } - shouldSignal := false + + if o.waiting.isFull() { + // If our waiting queue is full return an empty response with the proper header. + // FIXME(dlc) - Should we do advisory here as well? + sendq := mset.sendq + o.mu.Unlock() + hdr := []byte("NATS/1.0 500 WaitQueue Exceeded\r\n\r\n") + pmsg := &jsPubMsg{reply, reply, _EMPTY_, hdr, nil, nil, 0} + // Send message. + sendq <- pmsg + return + } + + // In case we have to queue up this request. This is all on stack pre-allocated. + wr := waitingRequest{client: c, reply: reply, n: batchSize} + + // If we are in replay mode, defer to processReplay for delivery. + if o.replay { + o.waiting.add(&wr) + o.mu.Unlock() + mset.signalConsumers() + return + } for i := 0; i < batchSize; i++ { - // If we are in replay mode, defer to processReplay for delivery. - if o.replay { - o.waiting = append(o.waiting, reply) - shouldSignal = true - } else if subj, hdr, msg, seq, dc, ts, err := o.getNextMsg(); err == nil { + if subj, hdr, msg, seq, dc, ts, err := o.getNextMsg(); err == nil { o.deliverMsg(reply, subj, hdr, msg, seq, dc, ts) + // Need to discount this from the total n for the request. + wr.n-- } else { - o.waiting = append(o.waiting, reply) + o.waiting.add(&wr) + break } } o.mu.Unlock() - if shouldSignal { - mset.signalConsumers() - } } // Increase the delivery count for this message. @@ -1163,15 +1288,20 @@ func (o *Consumer) getNextMsg() (subj string, hdr, msg []byte, seq uint64, dcoun // Will check to make sure those waiting still have registered interest. func (o *Consumer) checkWaitingForInterest() bool { - for len(o.waiting) > 0 { - rr := o.acc.sl.Match(o.waiting[0]) + now := time.Now() + for wr := o.waiting.peek(); wr != nil; wr = o.waiting.peek() { + if !wr.expires.IsZero() && now.After(wr.expires) { + o.waiting.pop() + continue + } + rr := o.acc.sl.Match(wr.reply) if len(rr.psubs)+len(rr.qsubs) > 0 { break } // No more interest so go ahead and remove this one from our list. - o.waiting = append(o.waiting[:0], o.waiting[1:]...) + o.waiting.pop() } - return len(o.waiting) > 0 + return o.waiting.len() > 0 } func (o *Consumer) loopAndDeliverMsgs(s *Server, a *Account) { @@ -1235,9 +1365,8 @@ func (o *Consumer) loopAndDeliverMsgs(s *Server, a *Account) { } } - if len(o.waiting) > 0 { - dsubj = o.waiting[0] - o.waiting = append(o.waiting[:0], o.waiting[1:]...) + if wr := o.waiting.pop(); wr != nil { + dsubj = wr.reply } else { dsubj = o.dsubj } @@ -1326,10 +1455,10 @@ func (o *Consumer) deliverCurrentMsg(subj string, hdr, msg []byte, seq uint64, t o.mu.Unlock() return true } + var dsubj string - if len(o.waiting) > 0 { - dsubj = o.waiting[0] - o.waiting = append(o.waiting[:0], o.waiting[1:]...) + if wr := o.waiting.pop(); wr != nil { + dsubj = wr.reply } else { dsubj = o.dsubj } @@ -1725,6 +1854,7 @@ func (o *Consumer) stop(dflag, doSignal, advisory bool) error { stopAndClearTimer(&o.ptmr) stopAndClearTimer(&o.dtmr) delivery := o.config.DeliverSubject + o.waiting = nil o.mu.Unlock() if delivery != "" { diff --git a/server/jetstream_api.go b/server/jetstream_api.go index c15ec57e..ff200447 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -366,6 +366,9 @@ type JSApiMsgGetResponse struct { const JSApiMsgGetResponseType = "io.nats.jetstream.api.v1.stream_msg_get_response" +// JSWaitQueueDefaultMax is the default max number of outstanding requests for pull consumers. +const JSWaitQueueDefaultMax = 1024 + // JSApiConsumerCreateResponse. type JSApiConsumerCreateResponse struct { ApiResponse diff --git a/server/stream.go b/server/stream.go index 0361ee57..a2b2d288 100644 --- a/server/stream.go +++ b/server/stream.go @@ -945,6 +945,7 @@ func (mset *Stream) internalSendLoop() { c.pa.hdb = nil msg = append(pm.msg, _CRLF_...) } + didDeliver := c.processInboundClientMsg(msg) c.pa.szb = nil c.flushClients(0) diff --git a/test/jetstream_test.go b/test/jetstream_test.go index 0a0aa4d4..af7a59f0 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -1364,7 +1364,7 @@ func TestJetStreamBasicWorkQueue(t *testing.T) { go func() { time.Sleep(nextDelay) - nc.Request(sendSubj, []byte("Hello World!"), 100*time.Millisecond) + sendStreamMsg(t, nc, sendSubj, "Hello World!") }() start := time.Now() @@ -1388,6 +1388,176 @@ func TestJetStreamBasicWorkQueue(t *testing.T) { } } +func TestJetStreamWorkQueueMaxWaiting(t *testing.T) { + cases := []struct { + name string + mconfig *server.StreamConfig + }{ + {"MemoryStore", &server.StreamConfig{Name: "MY_MSG_SET", Storage: server.MemoryStorage, Subjects: []string{"foo", "bar"}}}, + {"FileStore", &server.StreamConfig{Name: "MY_MSG_SET", Storage: server.FileStorage, Subjects: []string{"foo", "bar"}}}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + mset, err := s.GlobalAccount().AddStream(c.mconfig) + if err != nil { + t.Fatalf("Unexpected error adding stream: %v", err) + } + defer mset.Delete() + + // Make sure these cases fail + cfg := &server.ConsumerConfig{Durable: "foo", AckPolicy: server.AckExplicit, MaxWaiting: 10, DeliverSubject: "_INBOX.22"} + if _, err := mset.AddConsumer(cfg); err == nil { + t.Fatalf("Expected an error with MaxWaiting set on non-pull based consumer") + } + cfg = &server.ConsumerConfig{Durable: "foo", AckPolicy: server.AckExplicit, MaxWaiting: -1} + if _, err := mset.AddConsumer(cfg); err == nil { + t.Fatalf("Expected an error with MaxWaiting being negative") + } + + // Create basic work queue mode consumer. + wcfg := workerModeConfig("MAXWQ") + o, err := mset.AddConsumer(wcfg) + if err != nil { + t.Fatalf("Expected no error with registered interest, got %v", err) + } + defer o.Delete() + + // Make sure we set default correctly. + if cfg := o.Config(); cfg.MaxWaiting != server.JSWaitQueueDefaultMax { + t.Fatalf("Expected default max waiting to have been set to %d, got %d", server.JSWaitQueueDefaultMax, cfg.MaxWaiting) + } + + expectWaiting := func(expected int) { + t.Helper() + checkFor(t, time.Second, 25*time.Millisecond, func() error { + if oi := o.Info(); oi.NumWaiting != expected { + return fmt.Errorf("Expected %d waiting, got %d", expected, oi.NumWaiting) + } + return nil + }) + } + + nc := clientConnectWithOldRequest(t, s) + defer nc.Close() + + // Like muxed new INBOX. + sub, _ := nc.SubscribeSync("req.*") + defer sub.Unsubscribe() + nc.Flush() + + getSubj := o.RequestNextMsgSubject() + // Queue up JSWaitQueueDefaultMax requests. + for i := 0; i < server.JSWaitQueueDefaultMax; i++ { + nc.PublishRequest(getSubj, fmt.Sprintf("req.%d", i), nil) + } + expectWaiting(server.JSWaitQueueDefaultMax) + + // Now we should get an immediate response since we are full and the system can not disqualify anyone. + nm, err := nc.Request(getSubj, nil, 100*time.Millisecond) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + // FIXME(dlc) - Update Go client to parse description too. + if nm.Header.Get("Status") == "" { + t.Fatalf("Expected a non-empty status code") + } + sendStreamMsg(t, nc, "foo", "Hello World!") + sendStreamMsg(t, nc, "bar", "Hello World!") + expectWaiting(server.JSWaitQueueDefaultMax - 2) + }) + } +} + +func TestJetStreamWorkQueueWrapWaiting(t *testing.T) { + cases := []struct { + name string + mconfig *server.StreamConfig + }{ + {"MemoryStore", &server.StreamConfig{Name: "MY_MSG_SET", Storage: server.MemoryStorage, Subjects: []string{"foo", "bar"}}}, + {"FileStore", &server.StreamConfig{Name: "MY_MSG_SET", Storage: server.FileStorage, Subjects: []string{"foo", "bar"}}}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + mset, err := s.GlobalAccount().AddStream(c.mconfig) + if err != nil { + t.Fatalf("Unexpected error adding stream: %v", err) + } + defer mset.Delete() + + maxWaiting := 8 + wcfg := workerModeConfig("WRAP") + wcfg.MaxWaiting = maxWaiting + + o, err := mset.AddConsumer(wcfg) + if err != nil { + t.Fatalf("Expected no error with registered interest, got %v", err) + } + defer o.Delete() + + getSubj := o.RequestNextMsgSubject() + + expectWaiting := func(expected int) { + t.Helper() + checkFor(t, time.Second, 25*time.Millisecond, func() error { + if oi := o.Info(); oi.NumWaiting != expected { + return fmt.Errorf("Expected %d waiting, got %d", expected, oi.NumWaiting) + } + return nil + }) + } + + nc := clientConnectToServer(t, s) + defer nc.Close() + + sub, _ := nc.SubscribeSync("req.*") + defer sub.Unsubscribe() + nc.Flush() + + // Fill up waiting. + for i := 0; i < maxWaiting; i++ { + nc.PublishRequest(getSubj, fmt.Sprintf("req.%d", i), nil) + } + expectWaiting(maxWaiting) + + // Now use 1/2 of the waiting. + for i := 0; i < maxWaiting/2; i++ { + sendStreamMsg(t, nc, "foo", "Hello World!") + } + expectWaiting(maxWaiting / 2) + + // Now add in two (2) more pull requests. + for i := maxWaiting; i < maxWaiting+2; i++ { + nc.PublishRequest(getSubj, fmt.Sprintf("req.%d", i), nil) + } + expectWaiting(maxWaiting/2 + 2) + + // Now use second 1/2 of the waiting and the 2 extra. + for i := 0; i < maxWaiting/2+2; i++ { + sendStreamMsg(t, nc, "bar", "Hello World!") + } + expectWaiting(0) + + if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != maxWaiting+2 { + t.Fatalf("Expected sub to have %d pending, got %d", maxWaiting+2, nmsgs) + } + }) + } +} + func TestJetStreamSubjectFiltering(t *testing.T) { cases := []struct { name string @@ -1791,6 +1961,26 @@ func TestJetStreamWorkQueueRequestBatch(t *testing.T) { } return nil }) + + // Now queue up the request without messages and add them after. + sub, _ = nc.SubscribeSync(nats.NewInbox()) + defer sub.Unsubscribe() + mset.Purge() + + nc.PublishRequest(o.RequestNextMsgSubject(), sub.Subject, []byte(strconv.Itoa(batchSize))) + nc.Flush() // Make sure its registered. + + for i := 0; i < toSend; i++ { + sendStreamMsg(t, nc, sendSubj, "Hello World!") + } + + // We should receive batchSize with no acks or additional requests. + checkFor(t, 250*time.Millisecond, 10*time.Millisecond, func() error { + if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != batchSize { + return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, batchSize) + } + return nil + }) }) } } diff --git a/vendor/github.com/nats-io/nats.go/.travis.yml b/vendor/github.com/nats-io/nats.go/.travis.yml index 3b00ae19..dcbac5be 100644 --- a/vendor/github.com/nats-io/nats.go/.travis.yml +++ b/vendor/github.com/nats-io/nats.go/.travis.yml @@ -1,7 +1,7 @@ language: go go: +- 1.15.x - 1.14.x -- 1.13.x env: - GO111MODULE=off go_import_path: github.com/nats-io/nats.go @@ -20,4 +20,4 @@ before_script: script: - go test -i -race ./... - go test -v -run=TestNoRace -p=1 ./... -- if [[ "$TRAVIS_GO_VERSION" =~ 1.14 ]]; then ./scripts/cov.sh TRAVIS; else go test -race -v -p=1 ./... --failfast; fi +- if [[ "$TRAVIS_GO_VERSION" =~ 1.15 ]]; then ./scripts/cov.sh TRAVIS; else go test -race -v -p=1 ./... --failfast; fi diff --git a/vendor/github.com/nats-io/nats.go/MAINTAINERS.md b/vendor/github.com/nats-io/nats.go/MAINTAINERS.md index 323faa8e..23214655 100644 --- a/vendor/github.com/nats-io/nats.go/MAINTAINERS.md +++ b/vendor/github.com/nats-io/nats.go/MAINTAINERS.md @@ -2,9 +2,7 @@ Maintainership is on a per project basis. -### Core-maintainers +### Maintainers - Derek Collison [@derekcollison](https://github.com/derekcollison) - Ivan Kozlovic [@kozlovic](https://github.com/kozlovic) - -### Maintainers - - Waldemar Quevedo [@wallyqs](https://github.com/wallyqs) \ No newline at end of file + - Waldemar Quevedo [@wallyqs](https://github.com/wallyqs) diff --git a/vendor/github.com/nats-io/nats.go/README.md b/vendor/github.com/nats-io/nats.go/README.md index 5eb7c968..8b6e6f08 100644 --- a/vendor/github.com/nats-io/nats.go/README.md +++ b/vendor/github.com/nats-io/nats.go/README.md @@ -3,7 +3,8 @@ A [Go](http://golang.org) client for the [NATS messaging system](https://nats.io [![License Apache 2](https://img.shields.io/badge/License-Apache2-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0) [![FOSSA Status](https://app.fossa.io/api/projects/git%2Bgithub.com%2Fnats-io%2Fgo-nats.svg?type=shield)](https://app.fossa.io/projects/git%2Bgithub.com%2Fnats-io%2Fgo-nats?ref=badge_shield) -[![Go Report Card](https://goreportcard.com/badge/github.com/nats-io/nats.go)](https://goreportcard.com/report/github.com/nats-io/nats.go) [![Build Status](https://travis-ci.org/nats-io/nats.go.svg?branch=master)](http://travis-ci.org/nats-io/nats.go) [![GoDoc](https://godoc.org/github.com/nats-io/nats.go?status.svg)](http://godoc.org/github.com/nats-io/nats.go) [![Coverage Status](https://coveralls.io/repos/nats-io/nats.go/badge.svg?branch=master)](https://coveralls.io/r/nats-io/nats.go?branch=master) +[![Go Report Card](https://goreportcard.com/badge/github.com/nats-io/nats.go)](https://goreportcard.com/report/github.com/nats-io/nats.go) [![Build Status](https://travis-ci.org/nats-io/nats.go.svg?branch=master)](http://travis-ci.org/nats-io/nats.go) [![GoDoc](https://img.shields.io/badge/GoDoc-reference-007d9c)](https://pkg.go.dev/github.com/nats-io/nats.go) + [![Coverage Status](https://coveralls.io/repos/nats-io/nats.go/badge.svg?branch=master)](https://coveralls.io/r/nats-io/nats.go?branch=master) ## Installation @@ -284,6 +285,21 @@ nc.QueueSubscribe("foo", "job_workers", func(_ *Msg) { ```go +// Normally, the library will return an error when trying to connect and +// there is no server running. The RetryOnFailedConnect option will set +// the connection in reconnecting state if it failed to connect right away. +nc, err := nats.Connect(nats.DefaultURL, + nats.RetryOnFailedConnect(true), + nats.MaxReconnects(10), + nats.ReconnectWait(time.Second), + nats.ReconnectHandler(func(_ *nats.Conn) { + // Note that this will be invoked for the first asynchronous connect. + })) +if err != nil { + // Should not return an error even if it can't connect, but you still + // need to check in case there are some configuration errors. +} + // Flush connection to server, returns when all messages have been processed. nc.Flush() fmt.Println("All clear!") diff --git a/vendor/github.com/nats-io/nats.go/context.go b/vendor/github.com/nats-io/nats.go/context.go index 769f88a0..4aa3bf0e 100644 --- a/vendor/github.com/nats-io/nats.go/context.go +++ b/vendor/github.com/nats-io/nats.go/context.go @@ -58,34 +58,37 @@ func (nc *Conn) requestWithContext(ctx context.Context, subj string, hdr, data [ return nil, ctx.Err() } - nc.mu.Lock() + var m *Msg + var err error + // If user wants the old style. - if nc.Opts.UseOldRequestStyle { - nc.mu.Unlock() - return nc.oldRequestWithContext(ctx, subj, hdr, data) - } - - mch, token, err := nc.createNewRequestAndSend(subj, hdr, data) - if err != nil { - return nil, err - } - - var ok bool - var msg *Msg - - select { - case msg, ok = <-mch: - if !ok { - return nil, ErrConnectionClosed + if nc.useOldRequestStyle() { + m, err = nc.oldRequestWithContext(ctx, subj, hdr, data) + } else { + mch, token, err := nc.createNewRequestAndSend(subj, hdr, data) + if err != nil { + return nil, err } - case <-ctx.Done(): - nc.mu.Lock() - delete(nc.respMap, token) - nc.mu.Unlock() - return nil, ctx.Err() - } - return msg, nil + var ok bool + + select { + case m, ok = <-mch: + if !ok { + return nil, ErrConnectionClosed + } + case <-ctx.Done(): + nc.mu.Lock() + delete(nc.respMap, token) + nc.mu.Unlock() + return nil, ctx.Err() + } + } + // Check for no responder status. + if err == nil && len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders { + m, err = nil, ErrNoResponders + } + return m, err } // oldRequestWithContext utilizes inbox and subscription per request. diff --git a/vendor/github.com/nats-io/nats.go/enc.go b/vendor/github.com/nats-io/nats.go/enc.go index ee832f69..38149c98 100644 --- a/vendor/github.com/nats-io/nats.go/enc.go +++ b/vendor/github.com/nats-io/nats.go/enc.go @@ -130,7 +130,7 @@ func (c *EncodedConn) Request(subject string, v interface{}, vPtr interface{}, t // Handler is a specific callback used for Subscribe. It is generalized to // an interface{}, but we will discover its format and arguments at runtime -// and perform the correct callback, including de-marshaling JSON strings +// and perform the correct callback, including de-marshaling encoded data // back into the appropriate struct based on the signature of the Handler. // // Handlers are expected to have one of four signatures. diff --git a/vendor/github.com/nats-io/nats.go/go.mod b/vendor/github.com/nats-io/nats.go/go.mod index 59ee43d3..383f2230 100644 --- a/vendor/github.com/nats-io/nats.go/go.mod +++ b/vendor/github.com/nats-io/nats.go/go.mod @@ -4,7 +4,7 @@ go 1.14 require ( github.com/golang/protobuf v1.4.2 - github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71 + github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad github.com/nats-io/nkeys v0.2.0 github.com/nats-io/nuid v1.0.1 google.golang.org/protobuf v1.23.0 diff --git a/vendor/github.com/nats-io/nats.go/go.sum b/vendor/github.com/nats-io/nats.go/go.sum index f31d0af3..1419245b 100644 --- a/vendor/github.com/nats-io/nats.go/go.sum +++ b/vendor/github.com/nats-io/nats.go/go.sum @@ -7,17 +7,23 @@ github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0 github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/minio/highwayhash v1.0.0 h1:iMSDhgUILCr0TNm8LWlSjF8N0ZIj2qbO8WHp6Q/J2BA= github.com/minio/highwayhash v1.0.0/go.mod h1:xQboMTeM9nY9v/LlAOxFctujiv5+Aq2hR5dxBpaMbdc= github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7 h1:RnGotxlghqR5D2KDAu4TyuLqyjuylOsJiAFhXvMvQIc= github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M= +github.com/nats-io/jwt/v2 v2.0.0-20200916203241-1f8ce17dff02 h1:WloZv3SCb55D/rOHYy1rWBXLrj3BYc9zw8VIq6X54lI= +github.com/nats-io/jwt/v2 v2.0.0-20200916203241-1f8ce17dff02/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ= github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093/go.mod h1:rQnBf2Rv4P9adtAs/Ti6LfFmVtFG6HLhl/H7cVshcJU= github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71 h1:nexMtKbOeM+w3vGQMNF0BEt+2xZDmVCtYXql2Ym+RWg= github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71/go.mod h1:Nan/1L5Sa1JRW+Thm4HNYcIDcVRFc5zK9OpSZeI2kk4= +github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad h1:oRb9MIi1Y4N5cTZWciqH68aVNt1e+o4N2uRnjVzv/UE= +github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad/go.mod h1:TkHpUIDETmTI7mrHN40D1pzxfzHZuGmtMbtb83TGVQw= github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE= github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I= +github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.2.0 h1:WXKF7diOaPU9cJdLD7nuzwasQy9vT1tBqzXZZf3AMJM= @@ -35,6 +41,9 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7 h1:HmbHVPwrPEKPGLAcHSrMe6+hqSUlvZU0rab6x5EXfGU= golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI= +golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= diff --git a/vendor/github.com/nats-io/nats.go/jetstream.go b/vendor/github.com/nats-io/nats.go/jetstream.go new file mode 100644 index 00000000..4d2691ca --- /dev/null +++ b/vendor/github.com/nats-io/nats.go/jetstream.go @@ -0,0 +1,366 @@ +// Copyright 2020 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nats + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "strconv" + "time" +) + +// JetStreamMsgMetaData is metadata related to a JetStream originated message +type JetStreamMsgMetaData struct { + Stream string + Consumer string + Parsed bool + Delivered int + StreamSeq int + ConsumerSeq int + TimeStamp time.Time +} + +func (m *Msg) JetStreamMetaData() (*JetStreamMsgMetaData, error) { + var err error + + if m.jsMeta != nil && m.jsMeta.Parsed { + return m.jsMeta, nil + } + + m.jsMeta, err = m.parseJSMsgMetadata() + + return m.jsMeta, err +} + +func (m *Msg) parseJSMsgMetadata() (*JetStreamMsgMetaData, error) { + if m.jsMeta != nil { + return m.jsMeta, nil + } + + if len(m.Reply) == 0 { + return nil, ErrNotJSMessage + } + + meta := &JetStreamMsgMetaData{} + + tsa := [32]string{} + parts := tsa[:0] + start := 0 + btsep := byte('.') + for i := 0; i < len(m.Reply); i++ { + if m.Reply[i] == btsep { + parts = append(parts, m.Reply[start:i]) + start = i + 1 + } + } + parts = append(parts, m.Reply[start:]) + + if len(parts) != 8 || parts[0] != "$JS" || parts[1] != "ACK" { + return nil, ErrNotJSMessage + } + + var err error + + meta.Stream = parts[2] + meta.Consumer = parts[3] + meta.Delivered, err = strconv.Atoi(parts[4]) + if err != nil { + return nil, ErrNotJSMessage + } + + meta.StreamSeq, err = strconv.Atoi(parts[5]) + if err != nil { + return nil, ErrNotJSMessage + } + + meta.ConsumerSeq, err = strconv.Atoi(parts[6]) + if err != nil { + return nil, ErrNotJSMessage + } + + tsi, err := strconv.Atoi(parts[7]) + if err != nil { + return nil, ErrNotJSMessage + } + meta.TimeStamp = time.Unix(0, int64(tsi)) + + meta.Parsed = true + + return meta, nil +} + +const jsStreamUnspecified = "not.set" + +type jsOpts struct { + timeout time.Duration + ctx context.Context + + ackstr string + consumer *ConsumerConfig + streamName string +} + +func newJsOpts() *jsOpts { + return &jsOpts{ackstr: jsStreamUnspecified, consumer: &ConsumerConfig{}} +} + +func (j *jsOpts) context(dftl time.Duration) (context.Context, context.CancelFunc) { + if j.ctx != nil { + return context.WithCancel(j.ctx) + } + + if j.timeout == 0 { + j.timeout = dftl + } + + return context.WithTimeout(context.Background(), j.timeout) +} + +// AckOption configures the various JetStream message acknowledgement helpers +type AckOption func(opts *jsOpts) error + +// PublishOption configures publishing messages +type PublishOption func(opts *jsOpts) error + +// SubscribeOption configures JetStream consumer behavior +type SubscribeOption func(opts *jsOpts) error + +// Consumer creates a JetStream Consumer on a Stream +func Consumer(stream string, cfg ConsumerConfig) SubscribeOption { + return func(jopts *jsOpts) error { + jopts.consumer = &cfg + jopts.streamName = stream + return nil + } +} + +// PublishExpectsStream waits for an ack after publishing and ensure it's from a specific stream, empty arguments waits for any valid acknowledgement +func PublishExpectsStream(stream ...string) PublishOption { + return func(opts *jsOpts) error { + switch len(stream) { + case 0: + opts.ackstr = "" + case 1: + opts.ackstr = stream[0] + if !isValidJSName(opts.ackstr) { + return ErrInvalidStreamName + } + default: + return ErrMultiStreamUnsupported + } + + return nil + } +} + +// PublishStreamTimeout sets the period of time to wait for JetStream to acknowledge receipt, defaults to JetStreamTimeout option +func PublishStreamTimeout(t time.Duration) PublishOption { + return func(opts *jsOpts) error { + opts.timeout = t + return nil + } +} + +// PublishCtx sets an interrupt context for waiting on a stream to reply +func PublishCtx(ctx context.Context) PublishOption { + return func(opts *jsOpts) error { + opts.ctx = ctx + return nil + } +} + +// AckWaitDuration waits for confirmation from the JetStream server +func AckWaitDuration(d time.Duration) AckOption { + return func(opts *jsOpts) error { + opts.timeout = d + return nil + } +} + +func (m *Msg) jsAck(body []byte, opts ...AckOption) error { + if m.Reply == "" { + return ErrMsgNoReply + } + + if m == nil || m.Sub == nil { + return ErrMsgNotBound + } + + m.Sub.mu.Lock() + nc := m.Sub.conn + m.Sub.mu.Unlock() + + var err error + var aopts *jsOpts + + if len(opts) > 0 { + aopts = newJsOpts() + for _, f := range opts { + if err = f(aopts); err != nil { + return err + } + } + } + + if aopts == nil || aopts.timeout == 0 { + return m.Respond(body) + } + + _, err = nc.Request(m.Reply, body, aopts.timeout) + + return err +} + +// Ack acknowledges a JetStream messages received from a Consumer, indicating the message +// should not be received again later +func (m *Msg) Ack(opts ...AckOption) error { + return m.jsAck(AckAck, opts...) +} + +// Nak acknowledges a JetStream message received from a Consumer, indicating that the message +// is not completely processed and should be sent again later +func (m *Msg) Nak(opts ...AckOption) error { + return m.jsAck(AckNak, opts...) +} + +// AckProgress acknowledges a Jetstream message received from a Consumer, indicating that work is +// ongoing and further processing time is required equal to the configured AckWait of the Consumer +func (m *Msg) AckProgress(opts ...AckOption) error { + return m.jsAck(AckProgress, opts...) +} + +// AckNext performs an Ack() and request that the next message be sent to subject ib +func (m *Msg) AckNext(ib string) error { + return m.RespondMsg(&Msg{Subject: m.Reply, Reply: ib, Data: AckNext}) +} + +// AckAndFetch performs an AckNext() and returns the next message from the stream +func (m *Msg) AckAndFetch(opts ...AckOption) (*Msg, error) { + if m.Reply == "" { + return nil, ErrMsgNoReply + } + + if m == nil || m.Sub == nil { + return nil, ErrMsgNotBound + } + + m.Sub.mu.Lock() + nc := m.Sub.conn + m.Sub.mu.Unlock() + + var err error + + aopts := newJsOpts() + for _, f := range opts { + if err = f(aopts); err != nil { + return nil, err + } + } + + ctx, cancel := aopts.context(nc.Opts.JetStreamTimeout) + defer cancel() + + sub, err := nc.SubscribeSync(NewInbox()) + if err != nil { + return nil, err + } + sub.AutoUnsubscribe(1) + defer sub.Unsubscribe() + + err = m.RespondMsg(&Msg{Reply: sub.Subject, Data: AckNext, Subject: m.Reply}) + if err != nil { + return nil, err + } + nc.Flush() + + return sub.NextMsgWithContext(ctx) +} + +// AckTerm acknowledges a message received from JetStream indicating the message will not be processed +// and should not be sent to another consumer +func (m *Msg) AckTerm(opts ...AckOption) error { + return m.jsAck(AckTerm, opts...) +} + +// JetStreamPublishAck metadata received from JetStream when publishing messages +type JetStreamPublishAck struct { + Stream string `json:"stream"` + Sequence int `json:"seq"` +} + +// ParsePublishAck parses the publish acknowledgement sent by JetStream +func ParsePublishAck(m []byte) (*JetStreamPublishAck, error) { + if bytes.HasPrefix([]byte("-ERR"), m) { + if len(m) > 7 { + return nil, fmt.Errorf(string(m[6 : len(m)-1])) + } + + return nil, fmt.Errorf(string(m)) + } + + if !bytes.HasPrefix(m, []byte("+OK {")) { + return nil, fmt.Errorf("invalid JetStream Ack: %v", string(m)) + } + + ack := &JetStreamPublishAck{} + err := json.Unmarshal(m[3:], ack) + return ack, err +} + +func (nc *Conn) jsPublish(subj string, data []byte, opts []PublishOption) error { + var err error + var aopts *jsOpts + + if len(opts) > 0 { + aopts = newJsOpts() + for _, f := range opts { + if err = f(aopts); err != nil { + return err + } + } + } + + if aopts == nil || aopts.timeout == 0 && aopts.ctx == nil && aopts.ackstr == jsStreamUnspecified { + return nc.publish(subj, _EMPTY_, nil, data) + } + + ctx, cancel := aopts.context(nc.Opts.JetStreamTimeout) + defer cancel() + + resp, err := nc.RequestWithContext(ctx, subj, data) + if err != nil { + return err + } + + ack, err := ParsePublishAck(resp.Data) + if err != nil { + return err + } + + if ack.Stream == "" || ack.Sequence == 0 { + return ErrInvalidJSAck + } + + if aopts.ackstr == jsStreamUnspecified || aopts.ackstr == "" { + return nil + } + + if ack.Stream == aopts.ackstr { + return nil + } + + return fmt.Errorf("received ack from stream %q", ack.Stream) +} diff --git a/vendor/github.com/nats-io/nats.go/jetstream_consumer.go b/vendor/github.com/nats-io/nats.go/jetstream_consumer.go new file mode 100644 index 00000000..9c598594 --- /dev/null +++ b/vendor/github.com/nats-io/nats.go/jetstream_consumer.go @@ -0,0 +1,268 @@ +// Copyright 2020 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nats + +import ( + "encoding/json" + "fmt" + "strings" + "time" +) + +func (nc *Conn) createOrUpdateConsumer(opts *jsOpts, delivery string) (*ConsumerInfo, error) { + if opts.streamName == "" { + return nil, ErrStreamNameRequired + } + if opts.consumer == nil { + return nil, ErrConsumerConfigRequired + } + + crj, err := json.Marshal(&jSApiConsumerCreateRequest{ + Stream: opts.streamName, + Config: consumerConfig{DeliverSubject: delivery, ConsumerConfig: opts.consumer}, + }) + if err != nil { + return nil, err + } + + ctx, cancel := opts.context(nc.Opts.JetStreamTimeout) + defer cancel() + + var subj string + switch len(opts.consumer.Durable) { + case 0: + subj = fmt.Sprintf(jSApiConsumerCreateT, opts.streamName) + default: + subj = fmt.Sprintf(jSApiDurableCreateT, opts.streamName, opts.consumer.Durable) + } + + resp, err := nc.RequestWithContext(ctx, subj, crj) + if err != nil { + return nil, err + } + + cresp := &jSApiConsumerCreateResponse{} + err = json.Unmarshal(resp.Data, cresp) + if err != nil { + return nil, err + } + + if cresp.Error != nil { + return nil, cresp.Error + } + + return cresp.ConsumerInfo, nil +} + +const ( + jSApiConsumerCreateT = "$JS.API.CONSUMER.CREATE.%s" + jSApiDurableCreateT = "$JS.API.CONSUMER.DURABLE.CREATE.%s.%s" +) + +type apiError struct { + Code int `json:"code"` + Description string `json:"description,omitempty"` +} + +// Error implements error +func (e apiError) Error() string { + switch { + case e.Description == "" && e.Code == 0: + return "unknown JetStream Error" + case e.Description == "" && e.Code > 0: + return fmt.Sprintf("unknown JetStream %d Error", e.Code) + default: + return e.Description + } +} + +type jSApiResponse struct { + Type string `json:"type"` + Error *apiError `json:"error,omitempty"` +} + +// io.nats.jetstream.api.v1.consumer_create_request +type jSApiConsumerCreateRequest struct { + Stream string `json:"stream_name"` + Config consumerConfig `json:"config"` +} + +// io.nats.jetstream.api.v1.consumer_create_response +type jSApiConsumerCreateResponse struct { + jSApiResponse + *ConsumerInfo +} + +type AckPolicy int + +const ( + AckNone AckPolicy = iota + AckAll + AckExplicit +) + +func (p *AckPolicy) UnmarshalJSON(data []byte) error { + switch string(data) { + case jsonString("none"): + *p = AckNone + case jsonString("all"): + *p = AckAll + case jsonString("explicit"): + *p = AckExplicit + default: + return fmt.Errorf("can not unmarshal %q", data) + } + + return nil +} + +func (p AckPolicy) MarshalJSON() ([]byte, error) { + switch p { + case AckNone: + return json.Marshal("none") + case AckAll: + return json.Marshal("all") + case AckExplicit: + return json.Marshal("explicit") + default: + return nil, fmt.Errorf("unknown acknowlegement policy %v", p) + } +} + +type ReplayPolicy int + +const ( + ReplayInstant ReplayPolicy = iota + ReplayOriginal +) + +func (p *ReplayPolicy) UnmarshalJSON(data []byte) error { + switch string(data) { + case jsonString("instant"): + *p = ReplayInstant + case jsonString("original"): + *p = ReplayOriginal + default: + return fmt.Errorf("can not unmarshal %q", data) + } + + return nil +} + +func (p ReplayPolicy) MarshalJSON() ([]byte, error) { + switch p { + case ReplayOriginal: + return json.Marshal("original") + case ReplayInstant: + return json.Marshal("instant") + default: + return nil, fmt.Errorf("unknown replay policy %v", p) + } +} + +var ( + AckAck = []byte("+ACK") + AckNak = []byte("-NAK") + AckProgress = []byte("+WPI") + AckNext = []byte("+NXT") + AckTerm = []byte("+TERM") +) + +type DeliverPolicy int + +const ( + DeliverAll DeliverPolicy = iota + DeliverLast + DeliverNew + DeliverByStartSequence + DeliverByStartTime +) + +func (p *DeliverPolicy) UnmarshalJSON(data []byte) error { + switch string(data) { + case jsonString("all"), jsonString("undefined"): + *p = DeliverAll + case jsonString("last"): + *p = DeliverLast + case jsonString("new"): + *p = DeliverNew + case jsonString("by_start_sequence"): + *p = DeliverByStartSequence + case jsonString("by_start_time"): + *p = DeliverByStartTime + } + + return nil +} + +func (p DeliverPolicy) MarshalJSON() ([]byte, error) { + switch p { + case DeliverAll: + return json.Marshal("all") + case DeliverLast: + return json.Marshal("last") + case DeliverNew: + return json.Marshal("new") + case DeliverByStartSequence: + return json.Marshal("by_start_sequence") + case DeliverByStartTime: + return json.Marshal("by_start_time") + default: + return nil, fmt.Errorf("unknown deliver policy %v", p) + } +} + +// ConsumerConfig is the configuration for a JetStream consumes +type ConsumerConfig struct { + Durable string `json:"durable_name,omitempty"` + DeliverPolicy DeliverPolicy `json:"deliver_policy"` + OptStartSeq uint64 `json:"opt_start_seq,omitempty"` + OptStartTime *time.Time `json:"opt_start_time,omitempty"` + AckPolicy AckPolicy `json:"ack_policy"` + AckWait time.Duration `json:"ack_wait,omitempty"` + MaxDeliver int `json:"max_deliver,omitempty"` + FilterSubject string `json:"filter_subject,omitempty"` + ReplayPolicy ReplayPolicy `json:"replay_policy"` + SampleFrequency string `json:"sample_freq,omitempty"` + RateLimit uint64 `json:"rate_limit_bps,omitempty"` +} + +type consumerConfig struct { + DeliverSubject string `json:"deliver_subject,omitempty"` + *ConsumerConfig +} + +type SequencePair struct { + ConsumerSeq uint64 `json:"consumer_seq"` + StreamSeq uint64 `json:"stream_seq"` +} + +type ConsumerInfo struct { + Stream string `json:"stream_name"` + Name string `json:"name"` + Config ConsumerConfig `json:"config"` + Created time.Time `json:"created"` + Delivered SequencePair `json:"delivered"` + AckFloor SequencePair `json:"ack_floor"` + NumPending int `json:"num_pending"` + NumRedelivered int `json:"num_redelivered"` +} + +func jsonString(s string) string { + return "\"" + s + "\"" +} + +func isValidJSName(n string) bool { + return !(n == "" || strings.ContainsAny(n, ">*. ")) +} diff --git a/vendor/github.com/nats-io/nats.go/nats.go b/vendor/github.com/nats-io/nats.go/nats.go index daa56fdb..c5db0b41 100644 --- a/vendor/github.com/nats-io/nats.go/nats.go +++ b/vendor/github.com/nats-io/nats.go/nats.go @@ -54,6 +54,7 @@ const ( DefaultReconnectJitter = 100 * time.Millisecond DefaultReconnectJitterTLS = time.Second DefaultTimeout = 2 * time.Second + DefaultJetStreamTimeout = 2 * time.Second DefaultPingInterval = 2 * time.Minute DefaultMaxPingOut = 2 DefaultMaxChanLen = 8192 // 8k @@ -123,6 +124,14 @@ var ( ErrDisconnected = errors.New("nats: server is disconnected") ErrHeadersNotSupported = errors.New("nats: headers not supported by this server") ErrBadHeaderMsg = errors.New("nats: message could not decode headers") + ErrNoResponders = errors.New("nats: no responders available for request") + ErrNoContextOrTimeout = errors.New("nats: no context or timeout given") + ErrNotJSMessage = errors.New("nats: not a JetStream message") + ErrInvalidStreamName = errors.New("nats: invalid stream name") + ErrInvalidJSAck = errors.New("nats: invalid JetStream publish acknowledgement") + ErrMultiStreamUnsupported = errors.New("nats: multiple streams are not supported") + ErrStreamNameRequired = errors.New("nats: Stream name is required") + ErrConsumerConfigRequired = errors.New("nats: Consumer configuration is required") ) func init() { @@ -138,6 +147,7 @@ func GetDefaultOptions() Options { ReconnectJitter: DefaultReconnectJitter, ReconnectJitterTLS: DefaultReconnectJitterTLS, Timeout: DefaultTimeout, + JetStreamTimeout: DefaultJetStreamTimeout, PingInterval: DefaultPingInterval, MaxPingsOut: DefaultMaxPingOut, SubChanLen: DefaultMaxChanLen, @@ -292,6 +302,9 @@ type Options struct { // Timeout sets the timeout for a Dial operation on a connection. Timeout time.Duration + // JetStreamTimeout set the default timeout for the JetStream API + JetStreamTimeout time.Duration + // DrainTimeout sets the timeout for a Drain Operation to complete. DrainTimeout time.Duration @@ -391,6 +404,15 @@ type Options struct { // gradually disconnect all its connections before shuting down. This is // often used in deployments when upgrading NATS Servers. LameDuckModeHandler ConnHandler + + // RetryOnFailedConnect sets the connection in reconnecting state right + // away if it can't connect to a server in the initial set. The + // MaxReconnect and ReconnectWait options are used for this process, + // similarly to when an established connection is disconnected. + // If a ReconnectHandler is set, it will be invoked when the connection + // is established, and if a ClosedHandler is set, it will be invoked if + // it fails to connect (after exhausting the MaxReconnect attempts). + RetryOnFailedConnect bool } const ( @@ -472,6 +494,10 @@ type Subscription struct { // only be processed by one member of the group. Queue string + // ConsumerConfig is the configuration for the JetStream consumer if one was created + // or updated using the subscription options + ConsumerConfig *ConsumerConfig + delivered uint64 max uint64 conn *Conn @@ -508,6 +534,7 @@ type Msg struct { Sub *Subscription next *Msg barrier *barrierInfo + jsMeta *JetStreamMsgMetaData } func (m *Msg) headerBytes() ([]byte, error) { @@ -589,21 +616,22 @@ const ( ) type connectInfo struct { - Verbose bool `json:"verbose"` - Pedantic bool `json:"pedantic"` - UserJWT string `json:"jwt,omitempty"` - Nkey string `json:"nkey,omitempty"` - Signature string `json:"sig,omitempty"` - User string `json:"user,omitempty"` - Pass string `json:"pass,omitempty"` - Token string `json:"auth_token,omitempty"` - TLS bool `json:"tls_required"` - Name string `json:"name"` - Lang string `json:"lang"` - Version string `json:"version"` - Protocol int `json:"protocol"` - Echo bool `json:"echo"` - Headers bool `json:"headers"` + Verbose bool `json:"verbose"` + Pedantic bool `json:"pedantic"` + UserJWT string `json:"jwt,omitempty"` + Nkey string `json:"nkey,omitempty"` + Signature string `json:"sig,omitempty"` + User string `json:"user,omitempty"` + Pass string `json:"pass,omitempty"` + Token string `json:"auth_token,omitempty"` + TLS bool `json:"tls_required"` + Name string `json:"name"` + Lang string `json:"lang"` + Version string `json:"version"` + Protocol int `json:"protocol"` + Echo bool `json:"echo"` + Headers bool `json:"headers"` + NoResponders bool `json:"no_responders"` } // MsgHandler is a callback function that processes messages delivered to @@ -791,6 +819,14 @@ func Timeout(t time.Duration) Option { } } +// JetStreamTimeout is an Option to set the timeout for access to the JetStream API +func JetStreamTimeout(t time.Duration) Option { + return func(o *Options) error { + o.JetStreamTimeout = t + return nil + } +} + // FlusherTimeout is an Option to set the write (and flush) timeout on a connection. func FlusherTimeout(t time.Duration) Option { return func(o *Options) error { @@ -998,6 +1034,16 @@ func LameDuckModeHandler(cb ConnHandler) Option { } } +// RetryOnFailedConnect sets the connection in reconnecting state right away +// if it can't connect to a server in the initial set. +// See RetryOnFailedConnect option for more details. +func RetryOnFailedConnect(retry bool) Option { + return func(o *Options) error { + o.RetryOnFailedConnect = retry + return nil + } +} + // Handler processing // SetDisconnectHandler will set the disconnect event handler. @@ -1576,7 +1622,9 @@ func (nc *Conn) connect() error { nc.mu.Unlock() nc.close(DISCONNECTED, false, err) nc.mu.Lock() - nc.current = nil + // Do not reset nc.current here since it would prevent + // RetryOnFailedConnect to work should this be the last server + // to try before starting doReconnect(). } } else { // Cancel out default connection refused, will trigger the @@ -1586,11 +1634,27 @@ func (nc *Conn) connect() error { } } } - nc.initc = false + if returnedErr == nil && nc.status != CONNECTED { returnedErr = ErrNoServers } + if returnedErr == nil { + nc.initc = false + } else if nc.Opts.RetryOnFailedConnect { + nc.setup() + nc.status = RECONNECTING + nc.pending = new(bytes.Buffer) + if nc.bw == nil { + nc.bw = nc.newBuffer() + } + nc.bw.Reset(nc.pending) + go nc.doReconnect(ErrNoServers) + returnedErr = nil + } else { + nc.current = nil + } + return returnedErr } @@ -1711,8 +1775,10 @@ func (nc *Conn) connectProto() (string, error) { token = nc.Opts.TokenHandler() } + // If our server does not support headers then we can't do them or no responders. + hdrs := nc.info.Headers cinfo := connectInfo{o.Verbose, o.Pedantic, ujwt, nkey, sig, user, pass, token, - o.Secure, o.Name, LangString, Version, clientProtoInfo, !o.NoEcho, true} + o.Secure, o.Name, LangString, Version, clientProtoInfo, !o.NoEcho, hdrs, hdrs} b, err := json.Marshal(cinfo) if err != nil { @@ -1908,10 +1974,12 @@ func (nc *Conn) doReconnect(err error) { nc.err = nil // Perform appropriate callback if needed for a disconnect. // DisconnectedErrCB has priority over deprecated DisconnectedCB - if nc.Opts.DisconnectedErrCB != nil { - nc.ach.push(func() { nc.Opts.DisconnectedErrCB(nc, err) }) - } else if nc.Opts.DisconnectedCB != nil { - nc.ach.push(func() { nc.Opts.DisconnectedCB(nc) }) + if !nc.initc { + if nc.Opts.DisconnectedErrCB != nil { + nc.ach.push(func() { nc.Opts.DisconnectedErrCB(nc, err) }) + } else if nc.Opts.DisconnectedCB != nil { + nc.ach.push(func() { nc.Opts.DisconnectedCB(nc) }) + } } // This is used to wait on go routines exit if we start them in the loop @@ -2052,6 +2120,10 @@ func (nc *Conn) doReconnect(err error) { // This is where we are truly connected. nc.status = CONNECTED + // If we are here with a retry on failed connect, indicate that the + // initial connect is now complete. + nc.initc = false + // Queue up the reconnect callback. if nc.Opts.ReconnectedCB != nil { nc.ach.push(func() { nc.Opts.ReconnectedCB(nc) }) @@ -2528,7 +2600,7 @@ func (nc *Conn) processInfo(info string) error { // did not include themselves in the async INFO protocol. // If empty, do not remove the implicit servers from the pool. if len(ncInfo.ConnectURLs) == 0 { - if ncInfo.LameDuckMode && nc.Opts.LameDuckModeHandler != nil { + if !nc.initc && ncInfo.LameDuckMode && nc.Opts.LameDuckModeHandler != nil { nc.ach.push(func() { nc.Opts.LameDuckModeHandler(nc) }) } return nil @@ -2591,7 +2663,7 @@ func (nc *Conn) processInfo(info string) error { nc.ach.push(func() { nc.Opts.DiscoveredServersCB(nc) }) } } - if ncInfo.LameDuckMode && nc.Opts.LameDuckModeHandler != nil { + if !nc.initc && ncInfo.LameDuckMode && nc.Opts.LameDuckModeHandler != nil { nc.ach.push(func() { nc.Opts.LameDuckModeHandler(nc) }) } return nil @@ -2676,7 +2748,11 @@ func (nc *Conn) kickFlusher() { // Publish publishes the data argument to the given subject. The data // argument is left untouched and needs to be correctly interpreted on // the receiver. -func (nc *Conn) Publish(subj string, data []byte) error { +func (nc *Conn) Publish(subj string, data []byte, opts ...PublishOption) error { + if len(opts) > 0 { + return nc.jsPublish(subj, data, opts) + } + return nc.publish(subj, _EMPTY_, nil, data) } @@ -2689,21 +2765,28 @@ func NewMsg(subject string) *Msg { } const ( - hdrLine = "NATS/1.0\r\n" - crlf = "\r\n" - hdrPreEnd = len(hdrLine) - len(crlf) + hdrLine = "NATS/1.0\r\n" + crlf = "\r\n" + hdrPreEnd = len(hdrLine) - len(crlf) + statusHdr = "Status" + noResponders = "503" ) // decodeHeadersMsg will decode and headers. func decodeHeadersMsg(data []byte) (http.Header, error) { tp := textproto.NewReader(bufio.NewReader(bytes.NewReader(data))) - if l, err := tp.ReadLine(); err != nil || l != hdrLine[:hdrPreEnd] { + l, err := tp.ReadLine() + if err != nil || len(l) < hdrPreEnd || l[:hdrPreEnd] != hdrLine[:hdrPreEnd] { return nil, ErrBadHeaderMsg } mh, err := tp.ReadMIMEHeader() if err != nil { return nil, ErrBadHeaderMsg } + // Check if we have an inlined status. + if len(l) > hdrPreEnd { + mh.Add(statusHdr, strings.TrimLeft(l[hdrPreEnd:], " ")) + } return http.Header(mh), nil } @@ -2765,7 +2848,8 @@ func (nc *Conn) publish(subj, reply string, hdr, data []byte) error { // Proactively reject payloads over the threshold set by server. msgSize := int64(len(data) + len(hdr)) - if msgSize > nc.info.MaxPayload { + // Skip this check if we are not yet connected (RetryOnFailedConnect) + if !nc.initc && msgSize > nc.info.MaxPayload { nc.mu.Unlock() return ErrMaxPayload } @@ -2904,6 +2988,7 @@ func (nc *Conn) respHandler(m *Msg) { // Helper to setup and send new request style requests. Return the chan to receive the response. func (nc *Conn) createNewRequestAndSend(subj string, hdr, data []byte) (chan *Msg, string, error) { + nc.mu.Lock() // Do setup for the new style if needed. if nc.respMap == nil { nc.initNewResp() @@ -2944,7 +3029,6 @@ func (nc *Conn) RequestMsg(msg *Msg, timeout time.Duration) (*Msg, error) { if !nc.info.Headers { return nil, ErrHeadersNotSupported } - hdr, err = msg.headerBytes() if err != nil { return nil, err @@ -2960,18 +3044,32 @@ func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, return nc.request(subj, nil, data, timeout) } +func (nc *Conn) useOldRequestStyle() bool { + nc.mu.RLock() + r := nc.Opts.UseOldRequestStyle + nc.mu.RUnlock() + return r +} + func (nc *Conn) request(subj string, hdr, data []byte, timeout time.Duration) (*Msg, error) { if nc == nil { return nil, ErrInvalidConnection } - nc.mu.Lock() - if nc.Opts.UseOldRequestStyle { - nc.mu.Unlock() - return nc.oldRequest(subj, hdr, data, timeout) + var m *Msg + var err error + + if nc.useOldRequestStyle() { + m, err = nc.oldRequest(subj, hdr, data, timeout) + } else { + m, err = nc.newRequest(subj, hdr, data, timeout) } - return nc.newRequest(subj, hdr, data, timeout) + // Check for no responder status. + if err == nil && len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders { + m, err = nil, ErrNoResponders + } + return m, err } func (nc *Conn) newRequest(subj string, hdr, data []byte, timeout time.Duration) (*Msg, error) { @@ -3095,15 +3193,15 @@ func (nc *Conn) respToken(respInbox string) string { // Subscribe will express interest in the given subject. The subject // can have wildcards (partial:*, full:>). Messages will be delivered // to the associated MsgHandler. -func (nc *Conn) Subscribe(subj string, cb MsgHandler) (*Subscription, error) { - return nc.subscribe(subj, _EMPTY_, cb, nil, false) +func (nc *Conn) Subscribe(subj string, cb MsgHandler, opts ...SubscribeOption) (*Subscription, error) { + return nc.subscribe(subj, _EMPTY_, cb, nil, false, opts...) } // ChanSubscribe will express interest in the given subject and place // all messages received on the channel. // You should not close the channel until sub.Unsubscribe() has been called. -func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error) { - return nc.subscribe(subj, _EMPTY_, nil, ch, false) +func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg, opts ...SubscribeOption) (*Subscription, error) { + return nc.subscribe(subj, _EMPTY_, nil, ch, false, opts...) } // ChanQueueSubscribe will express interest in the given subject. @@ -3112,18 +3210,18 @@ func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error) // which will be placed on the channel. // You should not close the channel until sub.Unsubscribe() has been called. // Note: This is the same than QueueSubscribeSyncWithChan. -func (nc *Conn) ChanQueueSubscribe(subj, group string, ch chan *Msg) (*Subscription, error) { - return nc.subscribe(subj, group, nil, ch, false) +func (nc *Conn) ChanQueueSubscribe(subj, group string, ch chan *Msg, opts ...SubscribeOption) (*Subscription, error) { + return nc.subscribe(subj, group, nil, ch, false, opts...) } // SubscribeSync will express interest on the given subject. Messages will // be received synchronously using Subscription.NextMsg(). -func (nc *Conn) SubscribeSync(subj string) (*Subscription, error) { +func (nc *Conn) SubscribeSync(subj string, opts ...SubscribeOption) (*Subscription, error) { if nc == nil { return nil, ErrInvalidConnection } mch := make(chan *Msg, nc.Opts.SubChanLen) - s, e := nc.subscribe(subj, _EMPTY_, nil, mch, true) + s, e := nc.subscribe(subj, _EMPTY_, nil, mch, true, opts...) return s, e } @@ -3131,17 +3229,17 @@ func (nc *Conn) SubscribeSync(subj string) (*Subscription, error) { // All subscribers with the same queue name will form the queue group and // only one member of the group will be selected to receive any given // message asynchronously. -func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription, error) { - return nc.subscribe(subj, queue, cb, nil, false) +func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubscribeOption) (*Subscription, error) { + return nc.subscribe(subj, queue, cb, nil, false, opts...) } // QueueSubscribeSync creates a synchronous queue subscriber on the given // subject. All subscribers with the same queue name will form the queue // group and only one member of the group will be selected to receive any // given message synchronously using Subscription.NextMsg(). -func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error) { +func (nc *Conn) QueueSubscribeSync(subj, queue string, opts ...SubscribeOption) (*Subscription, error) { mch := make(chan *Msg, nc.Opts.SubChanLen) - s, e := nc.subscribe(subj, queue, nil, mch, true) + s, e := nc.subscribe(subj, queue, nil, mch, true, opts...) return s, e } @@ -3151,8 +3249,8 @@ func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error) { // which will be placed on the channel. // You should not close the channel until sub.Unsubscribe() has been called. // Note: This is the same than ChanQueueSubscribe. -func (nc *Conn) QueueSubscribeSyncWithChan(subj, queue string, ch chan *Msg) (*Subscription, error) { - return nc.subscribe(subj, queue, nil, ch, false) +func (nc *Conn) QueueSubscribeSyncWithChan(subj, queue string, ch chan *Msg, opts ...SubscribeOption) (*Subscription, error) { + return nc.subscribe(subj, queue, nil, ch, false, opts...) } // badSubject will do quick test on whether a subject is acceptable. @@ -3176,17 +3274,47 @@ func badQueue(qname string) bool { } // subscribe is the internal subscribe function that indicates interest in a subject. -func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool) (*Subscription, error) { +func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, opts ...SubscribeOption) (*Subscription, error) { if nc == nil { return nil, ErrInvalidConnection } + + var aopts *jsOpts + if len(opts) > 0 { + aopts = newJsOpts() + for _, f := range opts { + if err := f(aopts); err != nil { + return nil, err + } + } + + if subj == "" { + subj = NewInbox() + } + } + nc.mu.Lock() - s, err := nc.subscribeLocked(subj, queue, cb, ch, isSync) + s, err := nc.subscribeLocked(subj, queue, cb, ch, isSync, opts...) nc.mu.Unlock() - return s, err + if err != nil { + return nil, err + } + + // here so that interest exist already when doing ephemerals + if aopts != nil { + nfo, err := nc.createOrUpdateConsumer(aopts, subj) + if err != nil { + s.Unsubscribe() + return nil, fmt.Errorf("nats: JetStream consumer creation failed: %s", err) + } + + s.ConsumerConfig = &nfo.Config + } + + return s, nil } -func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool) (*Subscription, error) { +func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, opts ...SubscribeOption) (*Subscription, error) { if nc == nil { return nil, ErrInvalidConnection } @@ -4238,6 +4366,13 @@ func (nc *Conn) MaxPayload() int64 { return nc.info.MaxPayload } +// HeadersSupported will return if the server supports headers +func (nc *Conn) HeadersSupported() bool { + nc.mu.RLock() + defer nc.mu.RUnlock() + return nc.info.Headers +} + // AuthRequired will return if the connected server requires authorization. func (nc *Conn) AuthRequired() bool { nc.mu.RLock() diff --git a/vendor/modules.txt b/vendor/modules.txt index 0a54e824..11583200 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -2,7 +2,7 @@ github.com/minio/highwayhash # github.com/nats-io/jwt/v2 v2.0.0-20201015190852-e11ce317263c github.com/nats-io/jwt/v2 -# github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a +# github.com/nats-io/nats.go v1.10.1-0.20201013114232-5a33ce07522f github.com/nats-io/nats.go github.com/nats-io/nats.go/encoders/builtin github.com/nats-io/nats.go/util @@ -10,7 +10,7 @@ github.com/nats-io/nats.go/util github.com/nats-io/nkeys # github.com/nats-io/nuid v1.0.1 github.com/nats-io/nuid -# golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 +# golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 golang.org/x/crypto/bcrypt golang.org/x/crypto/blowfish golang.org/x/crypto/ed25519