Merge pull request #1924 from nats-io/sc_interest

Cross cluster consumer interest.
This commit is contained in:
Derek Collison
2021-02-18 20:13:19 -07:00
committed by GitHub
11 changed files with 449 additions and 156 deletions

2
go.mod
View File

@@ -6,7 +6,7 @@ require (
github.com/klauspost/compress v1.11.7
github.com/minio/highwayhash v1.0.0
github.com/nats-io/jwt/v2 v2.0.0-20210208203759-ff814ca5f813
github.com/nats-io/nats.go v1.10.1-0.20210127212649-5b4924938a9a
github.com/nats-io/nats.go v1.10.1-0.20210211000709-75ded9c77585
github.com/nats-io/nkeys v0.2.0
github.com/nats-io/nuid v1.0.1
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897

4
go.sum
View File

@@ -19,18 +19,22 @@ github.com/nats-io/jwt v1.1.0 h1:+vOlgtM0ZsF46GbmUoadq0/2rChNS45gtxHEa3H1gqM=
github.com/nats-io/jwt v1.1.0/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M=
github.com/nats-io/jwt/v2 v2.0.0-20200916203241-1f8ce17dff02/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ=
github.com/nats-io/jwt/v2 v2.0.0-20201015190852-e11ce317263c/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ=
github.com/nats-io/jwt/v2 v2.0.0-20210125223648-1c24d462becc/go.mod h1:PuO5FToRL31ecdFqVjc794vK0Bj0CwzveQEDvkb7MoQ=
github.com/nats-io/jwt/v2 v2.0.0-20210208203759-ff814ca5f813 h1:km4lLzT86NyJRhO++VqfP/vn5cbfm+E05i2bGdqDbrY=
github.com/nats-io/jwt/v2 v2.0.0-20210208203759-ff814ca5f813/go.mod h1:PuO5FToRL31ecdFqVjc794vK0Bj0CwzveQEDvkb7MoQ=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093/go.mod h1:rQnBf2Rv4P9adtAs/Ti6LfFmVtFG6HLhl/H7cVshcJU=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71/go.mod h1:Nan/1L5Sa1JRW+Thm4HNYcIDcVRFc5zK9OpSZeI2kk4=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad/go.mod h1:TkHpUIDETmTI7mrHN40D1pzxfzHZuGmtMbtb83TGVQw=
github.com/nats-io/nats-server/v2 v2.1.8-0.20201129161730-ebe63db3e3ed/go.mod h1:XD0zHR/jTXdZvWaQfS5mQgsXj6x12kMjKLyAk/cOGgY=
github.com/nats-io/nats-server/v2 v2.1.8-0.20210205154825-f7ab27f7dad4/go.mod h1:kauGd7hB5517KeSqspW2U1Mz/jhPbTrE8eOXzUPk1m0=
github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I=
github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4=
github.com/nats-io/nats.go v1.10.1-0.20201021145452-94be476ad6e0/go.mod h1:VU2zERjp8xmF+Lw2NH4u2t5qWZxwc7jB3+7HVMWQXPI=
github.com/nats-io/nats.go v1.10.1-0.20210127212649-5b4924938a9a h1:EjwBk6T/arS7o0ZGdMgdzYrQHeUITT1GHf3cFQFtr3I=
github.com/nats-io/nats.go v1.10.1-0.20210127212649-5b4924938a9a/go.mod h1:Sa3kLIonafChP5IF0b55i9uvGR10I3hPETFbi4+9kOI=
github.com/nats-io/nats.go v1.10.1-0.20210211000709-75ded9c77585 h1:xbs6PNOyQcxNFXII9qcFvodqBtQKec8hP7WzGHOdsz0=
github.com/nats-io/nats.go v1.10.1-0.20210211000709-75ded9c77585/go.mod h1:uBWnCKg9luW1g7hgzPxUjHFRI40EuTSX7RCzgnc74Jk=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.2.0 h1:WXKF7diOaPU9cJdLD7nuzwasQy9vT1tBqzXZZf3AMJM=

View File

@@ -679,7 +679,10 @@ func (o *consumer) setLeader(isLeader bool) {
if o.isPushMode() {
o.inch = make(chan bool, 8)
o.acc.sl.RegisterNotification(o.cfg.DeliverSubject, o.inch)
o.active = <-o.inch
if o.active = <-o.inch; !o.active {
// Check gateways in case they are enabled.
o.active = s.hasGatewayInterest(o.acc.Name, o.cfg.DeliverSubject)
}
}
// If we are not in ReplayInstant mode mark us as in replay state until resolved.
@@ -854,17 +857,24 @@ func (o *consumer) hasDeliveryInterest(localInterest bool) bool {
}
// If we are here check gateways.
if acc.srv != nil && acc.srv.gateway.enabled {
gw := acc.srv.gateway
gw.RLock()
for _, gwc := range gw.outo {
psi, qr := gwc.gatewayInterest(acc.Name, deliver)
if psi || qr != nil {
gw.RUnlock()
return true
}
if s := acc.srv; s != nil && s.hasGatewayInterest(acc.Name, deliver) {
return true
}
return false
}
func (s *Server) hasGatewayInterest(account, subject string) bool {
gw := s.gateway
if !gw.enabled {
return false
}
gw.RLock()
defer gw.RUnlock()
for _, gwc := range gw.outo {
psi, qr := gwc.gatewayInterest(account, subject)
if psi || qr != nil {
return true
}
gw.RUnlock()
}
return false
}
@@ -1860,10 +1870,15 @@ func (o *consumer) expireWaiting() int {
expired++
continue
}
rr := o.acc.sl.Match(wr.reply)
s, acc := o.acc.srv, o.acc
rr := acc.sl.Match(wr.reply)
if len(rr.psubs)+len(rr.qsubs) > 0 {
break
}
// If we are here check on gateways.
if s != nil && s.hasGatewayInterest(acc.Name, wr.reply) {
break
}
// No more interest so go ahead and remove this one from our list.
o.forceExpireFirstWaiting()
expired++

View File

@@ -2023,7 +2023,7 @@ func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
jsub, err := js.SubscribeSync("foo", nats.Attach("TEST", "dlc"), nats.Pull(batchSize))
jsub, err := js.SubscribeSync("foo", nats.Durable("dlc"), nats.Pull(batchSize))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -2457,8 +2457,9 @@ func TestJetStreamClusterStreamInterestOnlyPolicy(t *testing.T) {
// Now delete the consumer.
sub.Unsubscribe()
if err := js.DeleteConsumer("foo", "dlc"); err != nil {
t.Fatalf("Unexpected error: %v", err)
// That should make it go away.
if _, err := js.ConsumerInfo("foo", "dlc"); err == nil {
t.Fatalf("Expected not found error, got none")
}
// Wait for the messages to be purged.
@@ -3522,6 +3523,49 @@ func TestJetStreamClusterSuperClusterBasics(t *testing.T) {
}
}
// Test that consumer interest across gateways and superclusters is properly identitifed in a remote cluster.
func TestJetStreamClusterSuperClusterCrossClusterConsumerInterest(t *testing.T) {
sc := createJetStreamSuperCluster(t, 3, 3)
defer sc.shutdown()
// Client based API - Connect to Cluster C1. Stream and consumer will live in C2.
s := sc.clusterForName("C1").randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
pcn := "C2"
_, err := js.AddStream(&nats.StreamConfig{Name: "foo", Replicas: 3, Placement: &nats.Placement{Cluster: pcn}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Pull based first.
sub, err := js.SubscribeSync("foo", nats.Durable("dlc"), nats.Pull(1))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Send a message.
if _, err = js.Publish("foo", []byte("CCI")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
checkSubsPending(t, sub, 1)
// Now check push based delivery.
sub, err = js.SubscribeSync("foo", nats.Durable("rip"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkSubsPending(t, sub, 1)
// Send another message.
if _, err = js.Publish("foo", []byte("CCI")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
checkSubsPending(t, sub, 2)
}
func TestJetStreamClusterSuperClusterPeerReassign(t *testing.T) {
sc := createJetStreamSuperCluster(t, 3, 3)
defer sc.shutdown()
@@ -3602,7 +3646,6 @@ func TestJetStreamClusterSuperClusterPeerReassign(t *testing.T) {
}
return nil
})
}
func TestJetStreamClusterStreamPerf(t *testing.T) {

View File

@@ -400,7 +400,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
}
}
n.notice("Started")
n.debug("Started")
n.Lock()
n.resetElectionTimeout()
@@ -478,7 +478,7 @@ func (s *Server) transferRaftLeaders() bool {
var nodes []RaftNode
s.rnMu.RLock()
if len(s.raftNodes) > 0 {
s.Noticef("Transferring any raft leaders")
s.Debugf("Transferring any raft leaders")
}
for _, n := range s.raftNodes {
nodes = append(nodes, n)
@@ -1066,9 +1066,9 @@ func (n *raft) shutdown(shouldDelete bool) {
s.unregisterRaftNode(g)
if shouldDelete {
n.notice("Deleted")
n.debug("Deleted")
} else {
n.notice("Shutdown")
n.debug("Shutdown")
}
if wal != nil {
if shouldDelete {
@@ -1189,11 +1189,6 @@ func (n *raft) error(format string, args ...interface{}) {
n.s.Errorf(nf, args...)
}
func (n *raft) notice(format string, args ...interface{}) {
nf := fmt.Sprintf("RAFT [%s - %s] %s", n.id, n.group, format)
n.s.Noticef(nf, args...)
}
func (n *raft) electTimer() *time.Timer {
n.RLock()
defer n.RUnlock()
@@ -1479,7 +1474,6 @@ func (n *raft) runAsLeader() {
n.switchToFollower(noLeader)
return
}
case vresp := <-n.votes:
if vresp.term > n.currentTerm() {
n.switchToFollower(noLeader)
@@ -1997,6 +1991,24 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
return
}
// Are we receiving from another leader.
if n.state == Leader {
if ae.term > n.term {
n.term = ae.term
n.vote = noVote
n.writeTermVote()
n.debug("Received append entry from another leader, stepping down to %q", ae.leader)
n.attemptStepDown(ae.leader)
} else {
// Let them know we are the leader.
ar := &appendEntryResponse{n.term, n.pindex, n.id, false, _EMPTY_}
n.Unlock()
n.debug("AppendEntry ignoring old term from another leader")
n.sendRPC(ae.reply, _EMPTY_, ar.encode())
return
}
}
// If we received an append entry as a candidate we should convert to a follower.
if n.state == Candidate {
n.debug("Received append entry in candidate state from %q, converting to follower", ae.leader)
@@ -2026,8 +2038,10 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
// Ignore old terms.
if isNew && ae.term < n.term {
ar := &appendEntryResponse{n.term, n.pindex, n.id, false, _EMPTY_}
n.Unlock()
n.debug("AppendEntry ignoring old term")
n.sendRPC(ae.reply, _EMPTY_, ar.encode())
return
}
@@ -2090,7 +2104,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
if eae, err := n.loadEntry(ae.pindex); err == nil && eae != nil {
// If terms mismatched, delete that entry and all others past it.
if ae.pterm > eae.pterm {
n.wal.Truncate(ae.pindex - 1)
n.wal.Truncate(ae.pindex)
n.pindex = ae.pindex
n.pterm = ae.pterm
ar = &appendEntryResponse{n.pterm, n.pindex, n.id, false, _EMPTY_}
@@ -2252,8 +2266,18 @@ func (n *raft) handleAppendEntryResponse(sub *subscription, c *client, subject,
n.trackPeer(ar.peer)
if ar.success {
n.trackResponse(ar)
} else if ar.reply != _EMPTY_ {
n.catchupFollower(ar)
} else {
// False here, check to make sure they do not have a higher term.
if ar.term > n.term {
n.term = ar.term
n.vote = noVote
n.writeTermVote()
n.Lock()
n.attemptStepDown(noLeader)
n.Unlock()
} else if ar.reply != _EMPTY_ {
n.catchupFollower(ar)
}
}
}
@@ -2516,6 +2540,11 @@ func (n *raft) handleVoteResponse(sub *subscription, c *client, _, reply string,
n.error("Received malformed vote response for %q", n.group)
return
}
if state := n.State(); state != Candidate && state != Leader {
n.debug("Ignoring old vote response, we have stepped down")
return
}
select {
case n.votes <- vr:
default:
@@ -2525,21 +2554,18 @@ func (n *raft) handleVoteResponse(sub *subscription, c *client, _, reply string,
}
func (n *raft) processVoteRequest(vr *voteRequest) error {
n.RLock()
vresp := voteResponse{n.term, n.id, false}
n.RUnlock()
n.debug("Received a voteRequest %+v", vr)
defer n.debug("Sending a voteResponse %+v -> %q", &vresp, vr.reply)
if err := n.trackPeer(vr.candidate); err != nil {
n.sendReply(vr.reply, vresp.encode())
return err
}
n.Lock()
n.resetElectionTimeout()
vresp := &voteResponse{n.term, n.id, false}
defer n.debug("Sending a voteResponse %+v -> %q", &vresp, vr.reply)
// Ignore if we are newer.
if vr.term < n.term {
n.Unlock()
@@ -2552,14 +2578,15 @@ func (n *raft) processVoteRequest(vr *voteRequest) error {
n.term = vr.term
n.vote = noVote
n.writeTermVote()
if n.state == Candidate {
if n.state != Follower {
n.debug("Stepping down from candidate, detected higher term: %d vs %d", vr.term, n.term)
n.attemptStepDown(noLeader)
}
}
// Only way we get to yes is through here.
if vr.lastIndex >= n.pindex && n.vote == noVote || n.vote == vr.candidate {
voteOk := n.vote == noVote || n.vote == vr.candidate
if voteOk && vr.lastTerm >= n.pterm && vr.lastIndex >= n.pindex {
vresp.granted = true
n.vote = vr.candidate
n.writeTermVote()
@@ -2588,7 +2615,7 @@ func (n *raft) requestVote() {
n.Lock()
if n.state != Candidate {
n.Unlock()
panic("raft requestVote not from candidate")
return
}
n.vote = n.id
n.writeTermVote()
@@ -2666,7 +2693,7 @@ func (n *raft) switchToFollower(leader string) {
if n.state == Closed {
return
}
n.notice("Switching to follower")
n.debug("Switching to follower")
n.leader = leader
n.switchState(Follower)
}
@@ -2678,7 +2705,7 @@ func (n *raft) switchToCandidate() {
return
}
if n.state != Candidate {
n.notice("Switching to candidate")
n.debug("Switching to candidate")
} else if n.lostQuorumLocked() {
// We signal to the upper layers such that can alert on quorum lost.
n.updateLeadChange(false)
@@ -2696,7 +2723,7 @@ func (n *raft) switchToLeader() {
if n.state == Closed {
return
}
n.notice("Switching to leader")
n.debug("Switching to leader")
n.leader = n.id
n.switchState(Leader)
}

View File

@@ -1,10 +1,10 @@
module github.com/nats-io/nats.go
go 1.14
go 1.15
require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.1.8-0.20201129161730-ebe63db3e3ed
github.com/nats-io/nats-server/v2 v2.1.8-0.20210205154825-f7ab27f7dad4
github.com/nats-io/nkeys v0.2.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0

View File

@@ -7,30 +7,30 @@ github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.11.7 h1:0hzRabrMN4tSTvMfnL3SCv1ZGeAP23ynzodBgaHeMeg=
github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/minio/highwayhash v1.0.0 h1:iMSDhgUILCr0TNm8LWlSjF8N0ZIj2qbO8WHp6Q/J2BA=
github.com/minio/highwayhash v1.0.0/go.mod h1:xQboMTeM9nY9v/LlAOxFctujiv5+Aq2hR5dxBpaMbdc=
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7 h1:RnGotxlghqR5D2KDAu4TyuLqyjuylOsJiAFhXvMvQIc=
github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M=
github.com/nats-io/jwt/v2 v2.0.0-20200916203241-1f8ce17dff02 h1:WloZv3SCb55D/rOHYy1rWBXLrj3BYc9zw8VIq6X54lI=
github.com/nats-io/jwt v1.1.0 h1:+vOlgtM0ZsF46GbmUoadq0/2rChNS45gtxHEa3H1gqM=
github.com/nats-io/jwt v1.1.0/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M=
github.com/nats-io/jwt/v2 v2.0.0-20200916203241-1f8ce17dff02/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ=
github.com/nats-io/jwt/v2 v2.0.0-20201015190852-e11ce317263c h1:Hc1D9ChlsCMVwCxJ6QT5xqfk2zJ4XNea+LtdfaYhd20=
github.com/nats-io/jwt/v2 v2.0.0-20201015190852-e11ce317263c/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ=
github.com/nats-io/jwt/v2 v2.0.0-20210125223648-1c24d462becc h1:pu+s4XC+bYnI0iD2vDtOl83zjCYUau/q6c83pEvsGZc=
github.com/nats-io/jwt/v2 v2.0.0-20210125223648-1c24d462becc/go.mod h1:PuO5FToRL31ecdFqVjc794vK0Bj0CwzveQEDvkb7MoQ=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093/go.mod h1:rQnBf2Rv4P9adtAs/Ti6LfFmVtFG6HLhl/H7cVshcJU=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71 h1:nexMtKbOeM+w3vGQMNF0BEt+2xZDmVCtYXql2Ym+RWg=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71/go.mod h1:Nan/1L5Sa1JRW+Thm4HNYcIDcVRFc5zK9OpSZeI2kk4=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad h1:oRb9MIi1Y4N5cTZWciqH68aVNt1e+o4N2uRnjVzv/UE=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad/go.mod h1:TkHpUIDETmTI7mrHN40D1pzxfzHZuGmtMbtb83TGVQw=
github.com/nats-io/nats-server/v2 v2.1.8-0.20201127180949-a428a26c0e82 h1:fR42B6rzfIBv9Vk3hh8tyzDimKhlUyK/VUUBmw9Ejj4=
github.com/nats-io/nats-server/v2 v2.1.8-0.20201127180949-a428a26c0e82/go.mod h1:XD0zHR/jTXdZvWaQfS5mQgsXj6x12kMjKLyAk/cOGgY=
github.com/nats-io/nats-server/v2 v2.1.8-0.20201129161730-ebe63db3e3ed h1:/FdiqqED2Wy6pyVh7K61gN5G0WfbvFVQzGgpHTcAlHA=
github.com/nats-io/nats-server/v2 v2.1.8-0.20201129161730-ebe63db3e3ed/go.mod h1:XD0zHR/jTXdZvWaQfS5mQgsXj6x12kMjKLyAk/cOGgY=
github.com/nats-io/nats-server/v2 v2.1.8-0.20210205154825-f7ab27f7dad4 h1:GStuc0W1rK45FSlpt3+7UTLzmRys2/6WSDuJFyzZ6Xg=
github.com/nats-io/nats-server/v2 v2.1.8-0.20210205154825-f7ab27f7dad4/go.mod h1:kauGd7hB5517KeSqspW2U1Mz/jhPbTrE8eOXzUPk1m0=
github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I=
github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4=
github.com/nats-io/nats.go v1.10.1-0.20201021145452-94be476ad6e0/go.mod h1:VU2zERjp8xmF+Lw2NH4u2t5qWZxwc7jB3+7HVMWQXPI=
github.com/nats-io/nats.go v1.10.1-0.20210127212649-5b4924938a9a/go.mod h1:Sa3kLIonafChP5IF0b55i9uvGR10I3hPETFbi4+9kOI=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.2.0 h1:WXKF7diOaPU9cJdLD7nuzwasQy9vT1tBqzXZZf3AMJM=
@@ -39,7 +39,6 @@ github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 h1:3zb4D3T4G8jdExgVU/95+vQXfpEPiMdCaZgmGVxjNHM=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 h1:pLI5jrR7OSLijeIDcmRxNmw2api+jEfxLoykJVice/E=
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
@@ -47,12 +46,12 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7 h1:HmbHVPwrPEKPGLAcHSrMe6+hqSUlvZU0rab6x5EXfGU=
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=

View File

@@ -73,6 +73,9 @@ const (
// apiStreamListT is the endpoint that will return all detailed stream information
apiStreamList = "STREAM.LIST"
// apiMsgGetT is the endpoint to get a message.
apiMsgGetT = "STREAM.MSG.GET.%s"
// apiMsgDeleteT is the endpoint to remove a message.
apiMsgDeleteT = "STREAM.MSG.DELETE.%s"
)
@@ -96,6 +99,9 @@ type JetStream interface {
// QueueSubscribe creates a Subscription with a queue group.
QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)
// QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously.
QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error)
}
// JetStreamContext is the public interface for JetStream.
@@ -115,10 +121,10 @@ type js struct {
direct bool
}
const defaultRequestWait = 5 * time.Second
// JetStream returns a JetStream context for pub/sub interactions.
func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
const defaultRequestWait = 5 * time.Second
js := &js{nc: nc, pre: defaultAPIPrefix, wait: defaultRequestWait}
for _, opt := range opts {
@@ -392,13 +398,40 @@ type SequencePair struct {
Stream uint64 `json:"stream_seq"`
}
// NextRequest is for getting next messages for pull based consumers from JetStream.
type NextRequest struct {
// nextRequest is for getting next messages for pull based consumers from JetStream.
type nextRequest struct {
Expires *time.Time `json:"expires,omitempty"`
Batch int `json:"batch,omitempty"`
NoWait bool `json:"no_wait,omitempty"`
}
// jsSub includes JetStream subscription info.
type jsSub struct {
js *js
consumer string
stream string
deliver string
pull int
durable bool
attached bool
}
func (jsi *jsSub) unsubscribe(drainMode bool) error {
if drainMode && (jsi.durable || jsi.attached) {
// Skip deleting consumer for durables/attached
// consumers when using drain mode.
return nil
}
// Skip if in direct mode as well.
js := jsi.js
if js.direct {
return nil
}
return js.DeleteConsumer(jsi.stream, jsi.consumer)
}
// SubOpt configures options for subscribing to JetStream consumers.
type SubOpt interface {
configureSubscribe(opts *subOpts) error
@@ -427,6 +460,12 @@ func (js *js) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt)
return js.subscribe(subj, queue, cb, nil, opts)
}
// QueueSubscribeSync will create a sync subscription to the appropriate stream and consumer with queue semantics.
func (js *js) QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error) {
mch := make(chan *Msg, js.nc.Opts.SubChanLen)
return js.subscribe(subj, queue, nil, mch, opts)
}
// Subscribe will create a subscription to the appropriate stream and consumer.
func (js *js) ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) {
return js.subscribe(subj, _EMPTY_, nil, ch, opts)
@@ -448,15 +487,18 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []
return nil, ErrPullModeNotAllowed
}
var err error
var stream, deliver string
var ccfg *ConsumerConfig
var (
err error
shouldCreate bool
ccfg *ConsumerConfig
deliver string
attached bool
stream = o.stream
consumer = o.consumer
requiresAPI = (stream == _EMPTY_ && consumer == _EMPTY_) && o.cfg.DeliverSubject == _EMPTY_
)
// If we are attaching to an existing consumer.
shouldAttach := o.stream != _EMPTY_ && o.consumer != _EMPTY_ || o.cfg.DeliverSubject != _EMPTY_
shouldCreate := !shouldAttach
if js.direct && shouldCreate {
if js.direct && requiresAPI {
return nil, ErrDirectModeRequired
}
@@ -466,33 +508,49 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []
} else {
deliver = NewInbox()
}
} else if shouldAttach {
info, err := js.getConsumerInfo(o.stream, o.consumer)
if err != nil {
return nil, err
}
ccfg = &info.Config
// Make sure this new subject matches or is a subset.
if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject {
return nil, ErrSubjectMismatch
}
if ccfg.DeliverSubject != _EMPTY_ {
deliver = ccfg.DeliverSubject
} else {
deliver = NewInbox()
}
} else {
// Find the stream mapped to the subject.
stream, err = js.lookupStreamBySubject(subj)
if err != nil {
return nil, err
}
deliver = NewInbox()
if !isPullMode {
cfg.DeliverSubject = deliver
// With an explicit durable name, then can lookup
// the consumer to which it should be attaching to.
var info *ConsumerInfo
consumer = o.cfg.Durable
if consumer != _EMPTY_ {
// Only create in case there is no consumer already.
info, err = js.ConsumerInfo(stream, consumer)
if err != nil && err.Error() != `consumer not found` {
return nil, err
}
}
if info != nil {
// Attach using the found consumer config.
ccfg = &info.Config
attached = true
// Make sure this new subject matches or is a subset.
if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject {
return nil, ErrSubjectMismatch
}
if ccfg.DeliverSubject != _EMPTY_ {
deliver = ccfg.DeliverSubject
} else {
deliver = NewInbox()
}
} else {
shouldCreate = true
deliver = NewInbox()
if !isPullMode {
cfg.DeliverSubject = deliver
}
// Do filtering always, server will clear as needed.
cfg.FilterSubject = subj
}
// Do filtering always, server will clear as needed.
cfg.FilterSubject = subj
}
var sub *Subscription
@@ -502,7 +560,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []
ocb := cb
cb = func(m *Msg) { ocb(m); m.Ack() }
}
sub, err = js.nc.subscribe(deliver, queue, cb, ch, cb == nil, &jsSub{js: js})
if err != nil {
return nil, err
@@ -533,7 +590,8 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []
}
var ccSubj string
if cfg.Durable != _EMPTY_ {
isDurable := cfg.Durable != _EMPTY_
if isDurable {
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable)
} else {
ccSubj = fmt.Sprintf(apiConsumerCreateT, stream)
@@ -563,15 +621,17 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []
sub.jsi.stream = info.Stream
sub.jsi.consumer = info.Name
sub.jsi.deliver = info.Config.DeliverSubject
sub.jsi.durable = isDurable
} else {
sub.jsi.stream = o.stream
sub.jsi.consumer = o.consumer
sub.jsi.stream = stream
sub.jsi.consumer = consumer
if js.direct {
sub.jsi.deliver = o.cfg.DeliverSubject
} else {
sub.jsi.deliver = ccfg.DeliverSubject
}
}
sub.jsi.attached = attached
// If we are pull based go ahead and fire off the first request to populate.
if isPullMode {
@@ -626,6 +686,7 @@ type subOpts struct {
cfg *ConsumerConfig
}
// Durable defines the consumer name for JetStream durable subscribers.
func Durable(name string) SubOpt {
return subOptFn(func(opts *subOpts) error {
if strings.Contains(name, ".") {
@@ -637,14 +698,8 @@ func Durable(name string) SubOpt {
})
}
func Attach(stream, consumer string) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.stream = stream
opts.consumer = consumer
return nil
})
}
// Pull defines the batch size of messages that will be received
// when using pull based JetStream consumers.
func Pull(batchSize int) SubOpt {
return subOptFn(func(opts *subOpts) error {
if batchSize == 0 {
@@ -667,13 +722,7 @@ func PullDirect(stream, consumer string, batchSize int) SubOpt {
})
}
func PushDirect(deliverSubject string) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.DeliverSubject = deliverSubject
return nil
})
}
// ManualAck disables auto ack functionality for async subscriptions.
func ManualAck() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.mack = true
@@ -757,7 +806,13 @@ func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
return nil, ErrTypeSubscription
}
// Consumer info lookup should fail if in direct mode.
js := sub.jsi.js
if js.direct {
sub.mu.Unlock()
return nil, ErrDirectModeRequired
}
stream, consumer := sub.jsi.stream, sub.jsi.consumer
sub.mu.Unlock()
@@ -776,7 +831,7 @@ func (sub *Subscription) Poll() error {
js := sub.jsi.js
sub.mu.Unlock()
req, _ := json.Marshal(&NextRequest{Batch: batch})
req, _ := json.Marshal(&nextRequest{Batch: batch})
reqNext := js.apiSubj(fmt.Sprintf(apiRequestNextT, stream, consumer))
return nc.PublishRequest(reqNext, reply, req)
}
@@ -812,7 +867,9 @@ func (m *Msg) checkReply() (*js, bool, error) {
sub.mu.Lock()
if sub.jsi == nil {
sub.mu.Unlock()
return nil, false, ErrNotJSMessage
// Not using a JS context.
return nil, false, nil
}
js := sub.jsi.js
isPullMode := sub.jsi.pull > 0
@@ -824,27 +881,56 @@ func (m *Msg) checkReply() (*js, bool, error) {
// ackReply handles all acks. Will do the right thing for pull and sync mode.
// It ensures that an ack is only sent a single time, regardless of
// how many times it is being called to avoid duplicated acks.
func (m *Msg) ackReply(ackType []byte, sync bool) error {
func (m *Msg) ackReply(ackType []byte, sync bool, opts ...PubOpt) error {
var o pubOpts
for _, opt := range opts {
if err := opt.configurePublish(&o); err != nil {
return err
}
}
js, isPullMode, err := m.checkReply()
if err != nil {
return err
}
// Skip if already acked.
if atomic.LoadUint32(&m.ackd) == 1 {
return ErrInvalidJSAck
}
m.Sub.mu.Lock()
nc := m.Sub.conn
m.Sub.mu.Unlock()
ctx := o.ctx
wait := defaultRequestWait
if o.ttl > 0 {
wait = o.ttl
} else if js != nil {
wait = js.wait
}
if isPullMode {
if bytes.Equal(ackType, AckAck) {
err = js.nc.PublishRequest(m.Reply, m.Sub.Subject, AckNext)
err = nc.PublishRequest(m.Reply, m.Sub.Subject, AckNext)
} else if bytes.Equal(ackType, AckNak) || bytes.Equal(ackType, AckTerm) {
err = js.nc.PublishRequest(m.Reply, m.Sub.Subject, []byte("+NXT {\"batch\":1}"))
err = nc.PublishRequest(m.Reply, m.Sub.Subject, []byte("+NXT {\"batch\":1}"))
}
if sync && err == nil {
_, err = js.nc.Request(m.Reply, nil, js.wait)
if ctx != nil {
_, err = nc.RequestWithContext(ctx, m.Reply, nil)
} else {
_, err = nc.Request(m.Reply, nil, wait)
}
}
} else if sync {
_, err = js.nc.Request(m.Reply, ackType, js.wait)
if ctx != nil {
_, err = nc.RequestWithContext(ctx, m.Reply, ackType)
} else {
_, err = nc.Request(m.Reply, ackType, wait)
}
} else {
err = js.nc.Publish(m.Reply, ackType)
err = nc.Publish(m.Reply, ackType)
}
// Mark that the message has been acked unless it is AckProgress
@@ -864,8 +950,8 @@ func (m *Msg) Ack() error {
}
// Ack a message and wait for a response from the server.
func (m *Msg) AckSync() error {
return m.ackReply(AckAck, true)
func (m *Msg) AckSync(opts ...PubOpt) error {
return m.ackReply(AckAck, true, opts...)
}
// Nak this message, indicating we can not process.
@@ -884,7 +970,7 @@ func (m *Msg) InProgress() error {
return m.ackReply(AckProgress, false)
}
// JetStream metadata associated with received messages.
// MsgMetadata is the JetStream metadata associated with received messages.
type MsgMetaData struct {
Consumer uint64
Stream uint64
@@ -893,6 +979,7 @@ type MsgMetaData struct {
Timestamp time.Time
}
// MetaData retrieves the metadata from a JetStream message.
func (m *Msg) MetaData() (*MsgMetaData, error) {
if _, _, err := m.checkReply(); err != nil {
return nil, err

View File

@@ -17,6 +17,7 @@ import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"time"
)
@@ -41,6 +42,9 @@ type JetStreamManager interface {
// NewStreamLister is used to return pages of StreamInfo objects.
NewStreamLister() *StreamLister
// GetMsg retrieves a raw stream message stored in JetStream by sequence number.
GetMsg(name string, seq uint64) (*RawStreamMsg, error)
// DeleteMsg erases a message from a Stream.
DeleteMsg(name string, seq uint64) error
@@ -78,6 +82,13 @@ type StreamConfig struct {
NoAck bool `json:"no_ack,omitempty"`
Template string `json:"template_owner,omitempty"`
Duplicates time.Duration `json:"duplicate_window,omitempty"`
Placement *Placement `json:"placement,omitempty"`
}
// Placement is used to guide placement of streams in clustered JetStream.
type Placement struct {
Cluster string `json:"cluster"`
Tags []string `json:"tags,omitempty"`
}
// apiError is included in all API responses if there was an error.
@@ -107,10 +118,18 @@ type apiPagedRequest struct {
// AccountInfo contains info about the JetStream usage from the current account.
type AccountInfo struct {
Memory uint64 `json:"memory"`
Store uint64 `json:"storage"`
Streams int `json:"streams"`
Limits AccountLimits `json:"limits"`
Memory uint64 `json:"memory"`
Store uint64 `json:"storage"`
Streams int `json:"streams"`
Consumers int `json:"consumers"`
API APIStats `json:"api"`
Limits AccountLimits `json:"limits"`
}
// APIStats reports on API calls to JetStream for this account.
type APIStats struct {
Total uint64 `json:"total"`
Errors uint64 `json:"errors"`
}
// AccountLimits includes the JetStream limits of the current account.
@@ -204,12 +223,12 @@ type consumerDeleteResponse struct {
}
// DeleteConsumer deletes a Consumer.
func (js *js) DeleteConsumer(stream, durable string) error {
func (js *js) DeleteConsumer(stream, consumer string) error {
if stream == _EMPTY_ {
return ErrStreamNameRequired
}
dcSubj := js.apiSubj(fmt.Sprintf(apiConsumerDeleteT, stream, durable))
dcSubj := js.apiSubj(fmt.Sprintf(apiConsumerDeleteT, stream, consumer))
r, err := js.nc.Request(dcSubj, nil, js.wait)
if err != nil {
return err
@@ -225,8 +244,8 @@ func (js *js) DeleteConsumer(stream, durable string) error {
}
// ConsumerInfo returns information about a Consumer.
func (js *js) ConsumerInfo(stream, durable string) (*ConsumerInfo, error) {
return js.getConsumerInfo(stream, durable)
func (js *js) ConsumerInfo(stream, consumer string) (*ConsumerInfo, error) {
return js.getConsumerInfo(stream, consumer)
}
// ConsumerLister fetches pages of ConsumerInfo objects. This object is not
@@ -391,7 +410,9 @@ type ClusterInfo struct {
type PeerInfo struct {
Name string `json:"name"`
Current bool `json:"current"`
Offline bool `json:"offline,omitempty"`
Active time.Duration `json:"active"`
Lag uint64 `json:"lag,omitempty"`
}
// UpdateStream updates a Stream.
@@ -447,6 +468,79 @@ func (js *js) DeleteStream(name string) error {
return nil
}
type apiMsgGetRequest struct {
Seq uint64 `json:"seq"`
}
// RawStreamMsg is a raw message stored in JetStream.
type RawStreamMsg struct {
Subject string
Sequence uint64
Header http.Header
Data []byte
Time time.Time
}
// storedMsg is a raw message stored in JetStream.
type storedMsg struct {
Subject string `json:"subject"`
Sequence uint64 `json:"seq"`
Header []byte `json:"hdrs,omitempty"`
Data []byte `json:"data,omitempty"`
Time time.Time `json:"time"`
}
// apiMsgGetResponse is the response for a Stream get request.
type apiMsgGetResponse struct {
apiResponse
Message *storedMsg `json:"message,omitempty"`
Success bool `json:"success,omitempty"`
}
// GetMsg retrieves a raw stream message stored in JetStream by sequence number.
func (js *js) GetMsg(name string, seq uint64) (*RawStreamMsg, error) {
if name == _EMPTY_ {
return nil, ErrStreamNameRequired
}
req, err := json.Marshal(&apiMsgGetRequest{Seq: seq})
if err != nil {
return nil, err
}
dsSubj := js.apiSubj(fmt.Sprintf(apiMsgGetT, name))
r, err := js.nc.Request(dsSubj, req, js.wait)
if err != nil {
return nil, err
}
var resp apiMsgGetResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return nil, err
}
if resp.Error != nil {
return nil, errors.New(resp.Error.Description)
}
msg := resp.Message
var hdr http.Header
if msg.Header != nil {
hdr, err = decodeHeadersMsg(msg.Header)
if err != nil {
return nil, err
}
}
return &RawStreamMsg{
Subject: msg.Subject,
Sequence: msg.Sequence,
Header: hdr,
Data: msg.Data,
Time: msg.Time,
}, nil
}
type msgDeleteRequest struct {
Seq uint64 `json:"seq"`
}

View File

@@ -1,4 +1,4 @@
// Copyright 2012-2020 The NATS Authors
// Copyright 2012-2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -54,7 +54,6 @@ const (
DefaultReconnectJitter = 100 * time.Millisecond
DefaultReconnectJitterTLS = time.Second
DefaultTimeout = 2 * time.Second
DefaultJetStreamTimeout = 2 * time.Second
DefaultPingInterval = 2 * time.Minute
DefaultMaxPingOut = 2
DefaultMaxChanLen = 64 * 1024 // 64k
@@ -158,7 +157,6 @@ func GetDefaultOptions() Options {
ReconnectJitter: DefaultReconnectJitter,
ReconnectJitterTLS: DefaultReconnectJitterTLS,
Timeout: DefaultTimeout,
JetStreamTimeout: DefaultJetStreamTimeout,
PingInterval: DefaultPingInterval,
MaxPingsOut: DefaultMaxPingOut,
SubChanLen: DefaultMaxChanLen,
@@ -313,9 +311,6 @@ type Options struct {
// Timeout sets the timeout for a Dial operation on a connection.
Timeout time.Duration
// JetStreamTimeout set the default timeout for the JetStream API
JetStreamTimeout time.Duration
// DrainTimeout sets the timeout for a Drain Operation to complete.
DrainTimeout time.Duration
@@ -491,7 +486,7 @@ type Conn struct {
respRand *rand.Rand // Used for generating suffix
}
// A Subscription represents interest in a given subject.
// Subscription represents interest in a given subject.
type Subscription struct {
mu sync.Mutex
sid int64
@@ -535,16 +530,8 @@ type Subscription struct {
dropped int
}
// For JetStream subscription info.
type jsSub struct {
js *js
consumer string
stream string
deliver string
pull int
}
// Msg is a structure used by Subscribers and PublishMsg().
// Msg represents a message delivered by NATS. This structure is used
// by Subscribers and PublishMsg().
type Msg struct {
Subject string
Reply string
@@ -839,14 +826,6 @@ func Timeout(t time.Duration) Option {
}
}
// JetStreamTimeout is an Option to set the timeout for access to the JetStream API
func JetStreamTimeout(t time.Duration) Option {
return func(o *Options) error {
o.JetStreamTimeout = t
return nil
}
}
// FlusherTimeout is an Option to set the write (and flush) timeout on a connection.
func FlusherTimeout(t time.Duration) Option {
return func(o *Options) error {
@@ -2829,7 +2808,7 @@ func (nc *Conn) Publish(subj string, data []byte) error {
return nc.publish(subj, _EMPTY_, nil, data)
}
// Used to create a new message for publishing that will use headers.
// NewMsg creates a message for publishing that will use headers.
func NewMsg(subject string) *Msg {
return &Msg{
Subject: subject,
@@ -3586,6 +3565,17 @@ func (s *Subscription) AutoUnsubscribe(max int) error {
// unsubscribe performs the low level unsubscribe to the server.
// Use Subscription.Unsubscribe()
func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error {
// Check whether it is a JetStream sub and should clean up consumers.
sub.mu.Lock()
jsi := sub.jsi
sub.mu.Unlock()
if jsi != nil {
err := jsi.unsubscribe(drainMode)
if err != nil {
return err
}
}
nc.mu.Lock()
// ok here, but defer is expensive
defer nc.mu.Unlock()
@@ -3725,6 +3715,7 @@ func (s *Subscription) processNextMsgDelivered(msg *Msg) error {
s.mu.Lock()
nc := s.conn
max := s.max
jsi := s.jsi
// Update some stats.
s.delivered++
@@ -3747,6 +3738,12 @@ func (s *Subscription) processNextMsgDelivered(msg *Msg) error {
}
}
// In case this is a JetStream message and in pull mode
// then check whether it is an JS API error.
if jsi != nil && jsi.pull > 0 && len(msg.Data) == 0 && msg.Header.Get(statusHdr) == noResponders {
return ErrNoResponders
}
return nil
}
@@ -4242,10 +4239,16 @@ func (nc *Conn) drainConnection() {
subs := make([]*Subscription, 0, len(nc.subs))
for _, s := range nc.subs {
if s == nc.respMux {
// Skip since might be in use while messages
// are being processed (can miss responses).
continue
}
subs = append(subs, s)
}
errCB := nc.Opts.AsyncErrorCB
drainWait := nc.Opts.DrainTimeout
respMux := nc.respMux
nc.mu.Unlock()
// for pushing errors with context.
@@ -4258,7 +4261,7 @@ func (nc *Conn) drainConnection() {
nc.mu.Unlock()
}
// Do subs first
// Do subs first, skip request handler if present.
for _, s := range subs {
if err := s.Drain(); err != nil {
// We will notify about these but continue.
@@ -4268,13 +4271,34 @@ func (nc *Conn) drainConnection() {
// Wait for the subscriptions to drop to zero.
timeout := time.Now().Add(drainWait)
var min int
if respMux != nil {
min = 1
} else {
min = 0
}
for time.Now().Before(timeout) {
if nc.NumSubscriptions() == 0 {
if nc.NumSubscriptions() == min {
break
}
time.Sleep(10 * time.Millisecond)
}
// In case there was a request/response handler
// then need to call drain at the end.
if respMux != nil {
if err := respMux.Drain(); err != nil {
// We will notify about these but continue.
pushErr(err)
}
for time.Now().Before(timeout) {
if nc.NumSubscriptions() == 0 {
break
}
time.Sleep(10 * time.Millisecond)
}
}
// Check if we timed out.
if nc.NumSubscriptions() != 0 {
pushErr(ErrDrainTimeout)

2
vendor/modules.txt vendored
View File

@@ -7,7 +7,7 @@ github.com/minio/highwayhash
# github.com/nats-io/jwt/v2 v2.0.0-20210208203759-ff814ca5f813
## explicit
github.com/nats-io/jwt/v2
# github.com/nats-io/nats.go v1.10.1-0.20210127212649-5b4924938a9a
# github.com/nats-io/nats.go v1.10.1-0.20210211000709-75ded9c77585
## explicit
github.com/nats-io/nats.go
github.com/nats-io/nats.go/encoders/builtin