mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
@@ -33,8 +33,8 @@ If you are interested in contributing to NATS, read about our...
|
||||
[Fossa-Image]: https://app.fossa.io/api/projects/git%2Bgithub.com%2Fnats-io%2Fnats-server.svg?type=shield
|
||||
[Build-Status-Url]: https://travis-ci.com/github/nats-io/nats-server
|
||||
[Build-Status-Image]: https://travis-ci.com/nats-io/nats-server.svg?branch=main
|
||||
[Release-Url]: https://github.com/nats-io/nats-server/releases/tag/v2.3.4
|
||||
[Release-image]: https://img.shields.io/badge/release-v2.3.4-1eb0fc.svg
|
||||
[Release-Url]: https://github.com/nats-io/nats-server/releases/tag/v2.4.0
|
||||
[Release-image]: https://img.shields.io/badge/release-v2.4.0-1eb0fc.svg
|
||||
[Coverage-Url]: https://coveralls.io/r/nats-io/nats-server?branch=main
|
||||
[Coverage-image]: https://coveralls.io/repos/github/nats-io/nats-server/badge.svg?branch=main
|
||||
[ReportCard-Url]: https://goreportcard.com/report/nats-io/nats-server
|
||||
|
||||
2
go.mod
2
go.mod
@@ -7,7 +7,7 @@ require (
|
||||
github.com/klauspost/compress v1.13.4
|
||||
github.com/minio/highwayhash v1.0.1
|
||||
github.com/nats-io/jwt/v2 v2.0.3
|
||||
github.com/nats-io/nats.go v1.11.1-0.20210819195927-9053aa4200f0
|
||||
github.com/nats-io/nats.go v1.12.0
|
||||
github.com/nats-io/nkeys v0.3.0
|
||||
github.com/nats-io/nuid v1.0.1
|
||||
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e
|
||||
|
||||
19
go.sum
19
go.sum
@@ -5,12 +5,11 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W
|
||||
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
|
||||
github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
|
||||
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
|
||||
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
|
||||
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/klauspost/compress v1.11.12 h1:famVnQVu7QwryBN4jNseQdUKES71ZAOnB6UQQJPZvqk=
|
||||
github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
|
||||
github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s=
|
||||
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
|
||||
github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
|
||||
@@ -19,20 +18,8 @@ github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
|
||||
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
|
||||
github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI=
|
||||
github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
|
||||
github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30 h1:9GqilBhZaR3xYis0JgMlJjNw933WIobdjKhilXm+Vls=
|
||||
github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nats.go v1.11.1-0.20210803204434-91bdffe39f41 h1:GUUkiOgD00OMr4foruBN6YG1be3lFnHl0LJIoEs8cQg=
|
||||
github.com/nats-io/nats.go v1.11.1-0.20210803204434-91bdffe39f41/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nats.go v1.11.1-0.20210810010129-d1955c8653ca h1:9MdvV5kneekw/B/TebU4Om19T3eOUBRB3yP1c82yKSk=
|
||||
github.com/nats-io/nats.go v1.11.1-0.20210810010129-d1955c8653ca/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nats.go v1.11.1-0.20210813172934-9c00d13a8a19 h1:9WQzXoYc37xBQ9YoQSSc1aoMJCvzX5OmirlivU0GEFU=
|
||||
github.com/nats-io/nats.go v1.11.1-0.20210813172934-9c00d13a8a19/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nats.go v1.11.1-0.20210817011318-78b4cc260af9 h1:aJYmbbVrq6rsFGAvQnAvoChjkjUOJGqVBdQ47vbEWD4=
|
||||
github.com/nats-io/nats.go v1.11.1-0.20210817011318-78b4cc260af9/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nats.go v1.11.1-0.20210817171728-9d3a000c8a66 h1:J3LNTmD/AUgjKJjZK2IEsGl2GD1znemMOq64ZKu83ok=
|
||||
github.com/nats-io/nats.go v1.11.1-0.20210817171728-9d3a000c8a66/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nats.go v1.11.1-0.20210819195927-9053aa4200f0 h1:lffRgFiHXqxwf8lYNSXXeOZdOgAIOabGwOSwdttqCn0=
|
||||
github.com/nats-io/nats.go v1.11.1-0.20210819195927-9053aa4200f0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nats.go v1.12.0 h1:n0oZzK2aIZDMKuEiMKJ9qkCUgVY5vTAAksSXtLlz5Xc=
|
||||
github.com/nats-io/nats.go v1.12.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
|
||||
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
|
||||
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
|
||||
|
||||
@@ -41,7 +41,7 @@ var (
|
||||
|
||||
const (
|
||||
// VERSION is the current version for the server.
|
||||
VERSION = "2.3.5-beta.2"
|
||||
VERSION = "2.4.0"
|
||||
|
||||
// PROTO is the currently supported protocol.
|
||||
// 0 was the original
|
||||
|
||||
2
vendor/github.com/nats-io/nats.go/README.md
generated
vendored
2
vendor/github.com/nats-io/nats.go/README.md
generated
vendored
@@ -21,7 +21,7 @@ When using or transitioning to Go modules support:
|
||||
```bash
|
||||
# Go client latest or explicit version
|
||||
go get github.com/nats-io/nats.go/@latest
|
||||
go get github.com/nats-io/nats.go/@v1.11.0
|
||||
go get github.com/nats-io/nats.go/@v1.12.0
|
||||
|
||||
# For latest NATS Server, add /v2 at the end
|
||||
go get github.com/nats-io/nats-server/v2
|
||||
|
||||
2
vendor/github.com/nats-io/nats.go/go_test.mod
generated
vendored
2
vendor/github.com/nats-io/nats.go/go_test.mod
generated
vendored
@@ -4,7 +4,7 @@ go 1.16
|
||||
|
||||
require (
|
||||
github.com/golang/protobuf v1.4.2
|
||||
github.com/nats-io/nats-server/v2 v2.3.5-0.20210815013007-eb8aeb217178
|
||||
github.com/nats-io/nats-server/v2 v2.3.5-0.20210825221009-41a253dabb43
|
||||
github.com/nats-io/nkeys v0.3.0
|
||||
github.com/nats-io/nuid v1.0.1
|
||||
google.golang.org/protobuf v1.23.0
|
||||
|
||||
12
vendor/github.com/nats-io/nats.go/go_test.sum
generated
vendored
12
vendor/github.com/nats-io/nats.go/go_test.sum
generated
vendored
@@ -5,21 +5,23 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W
|
||||
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
|
||||
github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
|
||||
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
|
||||
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
|
||||
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
|
||||
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/klauspost/compress v1.11.12 h1:famVnQVu7QwryBN4jNseQdUKES71ZAOnB6UQQJPZvqk=
|
||||
github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
|
||||
github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s=
|
||||
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
|
||||
github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
|
||||
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
|
||||
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
|
||||
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
|
||||
github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI=
|
||||
github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
|
||||
github.com/nats-io/nats-server/v2 v2.3.5-0.20210815013007-eb8aeb217178 h1:6/bt9zMGA1D/i3ROeq8GjF8Tig5BVFh4V3gI+qpoWIs=
|
||||
github.com/nats-io/nats-server/v2 v2.3.5-0.20210815013007-eb8aeb217178/go.mod h1:7mTh0KSxKc63xAVop97cFCIGRkWCv6HoX9lMXRSNOhU=
|
||||
github.com/nats-io/nats.go v1.11.1-0.20210813172934-9c00d13a8a19/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nats-server/v2 v2.3.5-0.20210825221009-41a253dabb43 h1:Sbb4QxNsccsPERg0C7uQX7/xgOCOTMIvDH9Ytb5MXsU=
|
||||
github.com/nats-io/nats-server/v2 v2.3.5-0.20210825221009-41a253dabb43/go.mod h1:jgHRB+EfZisUr6j50/g7Gcah7AR8qtk3as42DJmESCk=
|
||||
github.com/nats-io/nats.go v1.11.1-0.20210819195927-9053aa4200f0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
|
||||
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
|
||||
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
|
||||
|
||||
133
vendor/github.com/nats-io/nats.go/js.go
generated
vendored
133
vendor/github.com/nats-io/nats.go/js.go
generated
vendored
@@ -813,8 +813,8 @@ type ConsumerInfo struct {
|
||||
Name string `json:"name"`
|
||||
Created time.Time `json:"created"`
|
||||
Config ConsumerConfig `json:"config"`
|
||||
Delivered SequencePair `json:"delivered"`
|
||||
AckFloor SequencePair `json:"ack_floor"`
|
||||
Delivered SequenceInfo `json:"delivered"`
|
||||
AckFloor SequenceInfo `json:"ack_floor"`
|
||||
NumAckPending int `json:"num_ack_pending"`
|
||||
NumRedelivered int `json:"num_redelivered"`
|
||||
NumWaiting int `json:"num_waiting"`
|
||||
@@ -823,13 +823,19 @@ type ConsumerInfo struct {
|
||||
PushBound bool `json:"push_bound,omitempty"`
|
||||
}
|
||||
|
||||
// SequencePair includes the consumer and stream sequence info from a JetStream consumer.
|
||||
type SequencePair struct {
|
||||
// SequenceInfo has both the consumer and the stream sequence and last activity.
|
||||
type SequenceInfo struct {
|
||||
Consumer uint64 `json:"consumer_seq"`
|
||||
Stream uint64 `json:"stream_seq"`
|
||||
Last *time.Time `json:"last_active,omitempty"`
|
||||
}
|
||||
|
||||
// SequencePair includes the consumer and stream sequence info from a JetStream consumer.
|
||||
type SequencePair struct {
|
||||
Consumer uint64 `json:"consumer_seq"`
|
||||
Stream uint64 `json:"stream_seq"`
|
||||
}
|
||||
|
||||
// nextRequest is for getting next messages for pull based consumers from JetStream.
|
||||
type nextRequest struct {
|
||||
Expires time.Duration `json:"expires,omitempty"`
|
||||
@@ -967,7 +973,7 @@ func (js *js) PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription
|
||||
return js.subscribe(subj, _EMPTY_, nil, mch, true, true, append(opts, Durable(durable)))
|
||||
}
|
||||
|
||||
func processConsInfo(info *ConsumerInfo, isPullMode bool, subj, queue string) (string, error) {
|
||||
func processConsInfo(info *ConsumerInfo, userCfg *ConsumerConfig, isPullMode bool, subj, queue string) (string, error) {
|
||||
ccfg := &info.Config
|
||||
|
||||
// Make sure this new subject matches or is a subset.
|
||||
@@ -984,7 +990,7 @@ func processConsInfo(info *ConsumerInfo, isPullMode bool, subj, queue string) (s
|
||||
|
||||
// If pull mode, nothing else to check here.
|
||||
if isPullMode {
|
||||
return _EMPTY_, nil
|
||||
return _EMPTY_, checkConfig(ccfg, userCfg)
|
||||
}
|
||||
|
||||
// At this point, we know the user wants push mode, and the JS consumer is
|
||||
@@ -1013,14 +1019,80 @@ func processConsInfo(info *ConsumerInfo, isPullMode bool, subj, queue string) (s
|
||||
queue, dg)
|
||||
}
|
||||
}
|
||||
if err := checkConfig(ccfg, userCfg); err != nil {
|
||||
return _EMPTY_, err
|
||||
}
|
||||
return ccfg.DeliverSubject, nil
|
||||
}
|
||||
|
||||
func checkConfig(s, u *ConsumerConfig) error {
|
||||
makeErr := func(fieldName string, usrVal, srvVal interface{}) error {
|
||||
return fmt.Errorf("configuration requests %s to be %v, but consumer's value is %v", fieldName, usrVal, srvVal)
|
||||
}
|
||||
|
||||
if u.Durable != _EMPTY_ && u.Durable != s.Durable {
|
||||
return makeErr("durable", u.Durable, s.Durable)
|
||||
}
|
||||
if u.Description != _EMPTY_ && u.Description != s.Description {
|
||||
return makeErr("description", u.Description, s.Description)
|
||||
}
|
||||
if u.DeliverPolicy != deliverPolicyNotSet && u.DeliverPolicy != s.DeliverPolicy {
|
||||
return makeErr("deliver policy", u.DeliverPolicy, s.DeliverPolicy)
|
||||
}
|
||||
if u.OptStartSeq > 0 && u.OptStartSeq != s.OptStartSeq {
|
||||
return makeErr("optional start sequence", u.OptStartSeq, s.OptStartSeq)
|
||||
}
|
||||
if u.OptStartTime != nil && !u.OptStartTime.IsZero() && u.OptStartTime != s.OptStartTime {
|
||||
return makeErr("optional start time", u.OptStartTime, s.OptStartTime)
|
||||
}
|
||||
if u.AckPolicy != ackPolicyNotSet && u.AckPolicy != s.AckPolicy {
|
||||
return makeErr("ack policy", u.AckPolicy, s.AckPolicy)
|
||||
}
|
||||
if u.AckWait > 0 && u.AckWait != s.AckWait {
|
||||
return makeErr("ack wait", u.AckWait, s.AckWait)
|
||||
}
|
||||
if u.MaxDeliver > 0 && u.MaxDeliver != s.MaxDeliver {
|
||||
return makeErr("max deliver", u.MaxDeliver, s.MaxDeliver)
|
||||
}
|
||||
if u.ReplayPolicy != replayPolicyNotSet && u.ReplayPolicy != s.ReplayPolicy {
|
||||
return makeErr("replay policy", u.ReplayPolicy, s.ReplayPolicy)
|
||||
}
|
||||
if u.RateLimit > 0 && u.RateLimit != s.RateLimit {
|
||||
return makeErr("rate limit", u.RateLimit, s.RateLimit)
|
||||
}
|
||||
if u.SampleFrequency != _EMPTY_ && u.SampleFrequency != s.SampleFrequency {
|
||||
return makeErr("sample frequency", u.SampleFrequency, s.SampleFrequency)
|
||||
}
|
||||
if u.MaxWaiting > 0 && u.MaxWaiting != s.MaxWaiting {
|
||||
return makeErr("max waiting", u.MaxWaiting, s.MaxWaiting)
|
||||
}
|
||||
if u.MaxAckPending > 0 && u.MaxAckPending != s.MaxAckPending {
|
||||
return makeErr("max ack pending", u.MaxAckPending, s.MaxAckPending)
|
||||
}
|
||||
// For flow control, we want to fail if the user explicit wanted it, but
|
||||
// it is not set in the existing consumer. If it is not asked by the user,
|
||||
// the library still handles it and so no reason to fail.
|
||||
if u.FlowControl && !s.FlowControl {
|
||||
return makeErr("flow control", u.FlowControl, s.FlowControl)
|
||||
}
|
||||
if u.Heartbeat > 0 && u.Heartbeat != s.Heartbeat {
|
||||
return makeErr("heartbeat", u.Heartbeat, s.Heartbeat)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, isPullMode bool, opts []SubOpt) (*Subscription, error) {
|
||||
cfg := ConsumerConfig{AckPolicy: ackPolicyNotSet}
|
||||
cfg := ConsumerConfig{
|
||||
DeliverPolicy: deliverPolicyNotSet,
|
||||
AckPolicy: ackPolicyNotSet,
|
||||
ReplayPolicy: replayPolicyNotSet,
|
||||
}
|
||||
o := subOpts{cfg: &cfg}
|
||||
if len(opts) > 0 {
|
||||
for _, opt := range opts {
|
||||
if opt == nil {
|
||||
continue
|
||||
}
|
||||
if err := opt.configureSubscribe(&o); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1153,7 +1225,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
|
||||
|
||||
switch {
|
||||
case info != nil:
|
||||
deliver, err = processConsInfo(info, isPullMode, subj, queue)
|
||||
deliver, err = processConsInfo(info, o.cfg, isPullMode, subj, queue)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1183,10 +1255,19 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
|
||||
cfg.DeliverGroup = queue
|
||||
}
|
||||
|
||||
// If not set default to ack explicit.
|
||||
// If not set, default to deliver all
|
||||
if cfg.DeliverPolicy == deliverPolicyNotSet {
|
||||
cfg.DeliverPolicy = DeliverAllPolicy
|
||||
}
|
||||
// If not set, default to ack explicit.
|
||||
if cfg.AckPolicy == ackPolicyNotSet {
|
||||
cfg.AckPolicy = AckExplicitPolicy
|
||||
}
|
||||
// If not set, default to instant
|
||||
if cfg.ReplayPolicy == replayPolicyNotSet {
|
||||
cfg.ReplayPolicy = ReplayInstantPolicy
|
||||
}
|
||||
|
||||
// If we have acks at all and the MaxAckPending is not set go ahead
|
||||
// and set to the internal max.
|
||||
// TODO(dlc) - We should be able to update this if client updates PendingLimits.
|
||||
@@ -1297,7 +1378,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
deliver, err = processConsInfo(info, isPullMode, subj, queue)
|
||||
deliver, err = processConsInfo(info, o.cfg, isPullMode, subj, queue)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1706,6 +1787,14 @@ func ManualAck() SubOpt {
|
||||
})
|
||||
}
|
||||
|
||||
// Description will set the description for the created consumer.
|
||||
func Description(description string) SubOpt {
|
||||
return subOptFn(func(opts *subOpts) error {
|
||||
opts.cfg.Description = description
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Durable defines the consumer name for JetStream durable subscribers.
|
||||
func Durable(consumer string) SubOpt {
|
||||
return subOptFn(func(opts *subOpts) error {
|
||||
@@ -1830,6 +1919,14 @@ func ReplayOriginal() SubOpt {
|
||||
})
|
||||
}
|
||||
|
||||
// ReplayInstant replays the messages as fast as possible.
|
||||
func ReplayInstant() SubOpt {
|
||||
return subOptFn(func(opts *subOpts) error {
|
||||
opts.cfg.ReplayPolicy = ReplayInstantPolicy
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// RateLimit is the Bits per sec rate limit applied to a push consumer.
|
||||
func RateLimit(n uint64) SubOpt {
|
||||
return subOptFn(func(opts *subOpts) error {
|
||||
@@ -2325,11 +2422,13 @@ func getMetadataFields(subject string) ([]string, error) {
|
||||
// New subject would be:
|
||||
// $JS.ACK.<domain>.<account hash>.<stream>.<consumer>.<delivered>.<sseq>.<cseq>.<tm>.<pending>.<a token with a random value>
|
||||
//
|
||||
// v1 has 9 tokens, v2 has 12.
|
||||
// v1 has 9 tokens, v2 has 12, but we must not be strict on the 12th since
|
||||
// it may be removed in the future. Also, the library has no use for it.
|
||||
// The point is that a v2 ACK subject is valid if it has at least 11 tokens.
|
||||
//
|
||||
l := len(tokens)
|
||||
// If lower than 9 or more than 9 but less than 12, report an error
|
||||
if l < v1TokenCounts || (l > v1TokenCounts && l < v2TokenCounts) {
|
||||
// If lower than 9 or more than 9 but less than 11, report an error
|
||||
if l < v1TokenCounts || (l > v1TokenCounts && l < v2TokenCounts-1) {
|
||||
return nil, ErrNotJSMessage
|
||||
}
|
||||
if tokens[0] != "$JS" || tokens[1] != "ACK" {
|
||||
@@ -2412,7 +2511,7 @@ const (
|
||||
// AckExplicitPolicy requires ack or nack for all messages.
|
||||
AckExplicitPolicy
|
||||
|
||||
// For setting
|
||||
// For configuration mismatch check
|
||||
ackPolicyNotSet = 99
|
||||
)
|
||||
|
||||
@@ -2472,6 +2571,9 @@ const (
|
||||
|
||||
// ReplayOriginalPolicy will maintain the same timing as the messages were received.
|
||||
ReplayOriginalPolicy
|
||||
|
||||
// For configuration mismatch check
|
||||
replayPolicyNotSet = 99
|
||||
)
|
||||
|
||||
func (p *ReplayPolicy) UnmarshalJSON(data []byte) error {
|
||||
@@ -2532,6 +2634,9 @@ const (
|
||||
// DeliverLastPerSubjectPolicy will start the consumer with the last message
|
||||
// for all subjects received.
|
||||
DeliverLastPerSubjectPolicy
|
||||
|
||||
// For configuration mismatch check
|
||||
deliverPolicyNotSet = 99
|
||||
)
|
||||
|
||||
func (p *DeliverPolicy) UnmarshalJSON(data []byte) error {
|
||||
|
||||
2
vendor/github.com/nats-io/nats.go/nats.go
generated
vendored
2
vendor/github.com/nats-io/nats.go/nats.go
generated
vendored
@@ -46,7 +46,7 @@ import (
|
||||
|
||||
// Default Constants
|
||||
const (
|
||||
Version = "1.11.0"
|
||||
Version = "1.12.0"
|
||||
DefaultURL = "nats://127.0.0.1:4222"
|
||||
DefaultPort = 4222
|
||||
DefaultMaxReconnect = 60
|
||||
|
||||
2
vendor/modules.txt
vendored
2
vendor/modules.txt
vendored
@@ -9,7 +9,7 @@ github.com/minio/highwayhash
|
||||
# github.com/nats-io/jwt/v2 v2.0.3
|
||||
## explicit
|
||||
github.com/nats-io/jwt/v2
|
||||
# github.com/nats-io/nats.go v1.11.1-0.20210819195927-9053aa4200f0
|
||||
# github.com/nats-io/nats.go v1.12.0
|
||||
## explicit
|
||||
github.com/nats-io/nats.go
|
||||
github.com/nats-io/nats.go/encoders/builtin
|
||||
|
||||
Reference in New Issue
Block a user