Update Go client

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-08-13 15:08:55 -07:00
parent 5ec5020a8b
commit b8d059e179
5 changed files with 18 additions and 13 deletions

2
go.mod
View File

@@ -7,7 +7,7 @@ require (
github.com/klauspost/compress v1.11.12
github.com/minio/highwayhash v1.0.1
github.com/nats-io/jwt/v2 v2.0.3
github.com/nats-io/nats.go v1.11.1-0.20210810010129-d1955c8653ca
github.com/nats-io/nats.go v1.11.1-0.20210813172934-9c00d13a8a19
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e

2
go.sum
View File

@@ -22,6 +22,8 @@ github.com/nats-io/nats.go v1.11.1-0.20210803204434-91bdffe39f41 h1:GUUkiOgD00OM
github.com/nats-io/nats.go v1.11.1-0.20210803204434-91bdffe39f41/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.11.1-0.20210810010129-d1955c8653ca h1:9MdvV5kneekw/B/TebU4Om19T3eOUBRB3yP1c82yKSk=
github.com/nats-io/nats.go v1.11.1-0.20210810010129-d1955c8653ca/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.11.1-0.20210813172934-9c00d13a8a19 h1:9WQzXoYc37xBQ9YoQSSc1aoMJCvzX5OmirlivU0GEFU=
github.com/nats-io/nats.go v1.11.1-0.20210813172934-9c00d13a8a19/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=

View File

@@ -818,6 +818,7 @@ type jsSub struct {
// For pull subscribers, this is the next message subject to send requests to.
nms string
psubj string // the subject that was passed by user to the subscribe calls
consumer string
stream string
deliver string
@@ -1114,13 +1115,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
dseq: 1,
pull: isPullMode,
nms: nms,
psubj: subj,
}
sub, err = nc.subscribe(deliver, queue, cb, ch, isSync, jsi)
// Since JetStream sends on different subject, make sure this reflects the user's intentions.
sub.mu.Lock()
sub.Subject = subj
sub.mu.Unlock()
if err != nil {
return nil, err
}
@@ -1211,10 +1209,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
if err != nil {
return nil, err
}
// Since JetStream sends on different subject, make sure this reflects the user's intentions.
sub.mu.Lock()
sub.Subject = subj
sub.mu.Unlock()
}
} else {
if cinfo.Error.Code == 404 {

View File

@@ -1270,7 +1270,15 @@ func defaultErrHandler(nc *Conn, sub *Subscription, err error) {
}
var errStr string
if sub != nil {
errStr = fmt.Sprintf("%s on connection [%d] for subscription on %q\n", err.Error(), cid, sub.Subject)
var subject string
sub.mu.Lock()
if sub.jsi != nil {
subject = sub.jsi.psubj
} else {
subject = sub.Subject
}
sub.mu.Unlock()
errStr = fmt.Sprintf("%s on connection [%d] for subscription on %q\n", err.Error(), cid, subject)
} else {
errStr = fmt.Sprintf("%s on connection [%d]\n", err.Error(), cid)
}
@@ -4451,12 +4459,13 @@ func (nc *Conn) resendSubscriptions() {
continue
}
}
subj, queue, sid := s.Subject, s.Queue, s.sid
s.mu.Unlock()
nc.bw.writeDirect(fmt.Sprintf(subProto, s.Subject, s.Queue, s.sid))
nc.bw.writeDirect(fmt.Sprintf(subProto, subj, queue, sid))
if adjustedMax > 0 {
maxStr := strconv.Itoa(int(adjustedMax))
nc.bw.writeDirect(fmt.Sprintf(unsubProto, s.sid, maxStr))
nc.bw.writeDirect(fmt.Sprintf(unsubProto, sid, maxStr))
}
}
}

2
vendor/modules.txt vendored
View File

@@ -9,7 +9,7 @@ github.com/minio/highwayhash
# github.com/nats-io/jwt/v2 v2.0.3
## explicit
github.com/nats-io/jwt/v2
# github.com/nats-io/nats.go v1.11.1-0.20210810010129-d1955c8653ca
# github.com/nats-io/nats.go v1.11.1-0.20210813172934-9c00d13a8a19
## explicit
github.com/nats-io/nats.go
github.com/nats-io/nats.go/encoders/builtin