Update Go client in tests

This commit is contained in:
Jaime Piña
2021-03-18 10:21:40 -07:00
parent 10ba6c95ba
commit 6941bb3ade
12 changed files with 1586 additions and 667 deletions

2
go.mod
View File

@@ -6,7 +6,7 @@ require (
github.com/klauspost/compress v1.11.12
github.com/minio/highwayhash v1.0.1
github.com/nats-io/jwt/v2 v2.0.1
github.com/nats-io/nats.go v1.10.1-0.20210228004050-ed743748acac
github.com/nats-io/nats.go v1.10.1-0.20210330002604-882e98e18c99
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b

4
go.sum
View File

@@ -31,14 +31,16 @@ github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad/go.mod h1
github.com/nats-io/nats-server/v2 v2.1.8-0.20201129161730-ebe63db3e3ed/go.mod h1:XD0zHR/jTXdZvWaQfS5mQgsXj6x12kMjKLyAk/cOGgY=
github.com/nats-io/nats-server/v2 v2.1.8-0.20210205154825-f7ab27f7dad4/go.mod h1:kauGd7hB5517KeSqspW2U1Mz/jhPbTrE8eOXzUPk1m0=
github.com/nats-io/nats-server/v2 v2.1.8-0.20210227190344-51550e242af8/go.mod h1:/QQ/dpqFavkNhVnjvMILSQ3cj5hlmhB66adlgNbjuoA=
github.com/nats-io/nats-server/v2 v2.2.1-0.20210327180151-03aee09847d0/go.mod h1:eKlAaGmSQHZMFQA6x56AaP5/Bl9N3mWF4awyT2TTpzc=
github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I=
github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4=
github.com/nats-io/nats.go v1.10.1-0.20201021145452-94be476ad6e0/go.mod h1:VU2zERjp8xmF+Lw2NH4u2t5qWZxwc7jB3+7HVMWQXPI=
github.com/nats-io/nats.go v1.10.1-0.20210127212649-5b4924938a9a/go.mod h1:Sa3kLIonafChP5IF0b55i9uvGR10I3hPETFbi4+9kOI=
github.com/nats-io/nats.go v1.10.1-0.20210211000709-75ded9c77585/go.mod h1:uBWnCKg9luW1g7hgzPxUjHFRI40EuTSX7RCzgnc74Jk=
github.com/nats-io/nats.go v1.10.1-0.20210228004050-ed743748acac h1:/cF7DEtxQBcwRDhpFZ3J0XU4TFpJa9KQF/xDirRNNI0=
github.com/nats-io/nats.go v1.10.1-0.20210228004050-ed743748acac/go.mod h1:hxFvLNbNmT6UppX5B5Tr/r3g+XSwGjJzFn6mxPNJEHc=
github.com/nats-io/nats.go v1.10.1-0.20210330002604-882e98e18c99 h1:KHz7ujBiN9Zg9lqK5IvxW6ZwhW1v/PIRHCCVHkn0XZ0=
github.com/nats-io/nats.go v1.10.1-0.20210330002604-882e98e18c99/go.mod h1:OieyGzlIObT5YMgJfjuZS4tXG7fUUdRH+hDqioUKbDw=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=

View File

@@ -15,6 +15,7 @@ package server
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
@@ -584,19 +585,15 @@ func TestJetStreamClusterConsumerState(t *testing.T) {
}
}
sub, err := js.SubscribeSync("foo", nats.Durable("dlc"), nats.Pull(1))
sub, err := js.PullSubscribe("foo", "dlc")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkSubsPending(t, sub, 1)
// Pull 5 messages and ack.
for i := 0; i < 5; i++ {
m, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error getting msg %d: %v", i+1, err)
}
msgs := fetchMsgs(t, sub, 1, 5*time.Second)
m := msgs[0]
m.Ack()
}
@@ -629,10 +626,8 @@ func TestJetStreamClusterConsumerState(t *testing.T) {
// Now make sure we can receive new messages.
// Pull last 5.
for i := 0; i < 5; i++ {
m, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error getting msg %d: %v", i+1, err)
}
msgs := fetchMsgs(t, sub, 1, 5*time.Second)
m := msgs[0]
m.Ack()
}
nci, _ = sub.ConsumerInfo()
@@ -672,12 +667,12 @@ func TestJetStreamClusterFullConsumerState(t *testing.T) {
}
}
sub, err := js.SubscribeSync("foo", nats.Durable("dlc"), nats.Pull(1))
sub, err := js.PullSubscribe("foo", "dlc")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkSubsPending(t, sub, 1)
fetchMsgs(t, sub, 1, 5*time.Second)
// Now purge the stream.
if err := js.PurgeStream("TEST"); err != nil {
@@ -907,29 +902,21 @@ func TestJetStreamClusterRestoreSingleConsumer(t *testing.T) {
nc, js = jsClientConnect(t, s)
defer nc.Close()
resp, err := nc.Request(JSApiStreams, nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
var names []string
for name := range js.StreamNames() {
names = append(names, name)
}
var streams JSApiStreamNamesResponse
if err = json.Unmarshal(resp.Data, &streams); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(streams.Streams) != 1 {
t.Fatalf("Expected only 1 stream but got %d", len(streams.Streams))
if len(names) != 1 {
t.Fatalf("Expected only 1 stream but got %d", len(names))
}
// Now do detailed version.
resp, err = nc.Request(JSApiStreamList, nil, 5*time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
var infos []*nats.StreamInfo
for info := range js.StreamsInfo() {
infos = append(infos, info)
}
var listResponse JSApiStreamListResponse
if err = json.Unmarshal(resp.Data, &listResponse); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(listResponse.Streams) != 1 {
t.Fatalf("Expected 1 stream but got %d", len(listResponse.Streams))
if len(infos) != 1 {
t.Fatalf("Expected 1 stream but got %d", len(infos))
}
si, err := js.StreamInfo("foo")
if err != nil {
@@ -940,18 +927,13 @@ func TestJetStreamClusterRestoreSingleConsumer(t *testing.T) {
}
// Now check for consumer.
resp, err = nc.Request(fmt.Sprintf(JSApiConsumersT, "foo"), nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
names = names[:0]
for name := range js.ConsumerNames("foo") {
names = append(names, name)
}
var clResponse JSApiConsumerNamesResponse
if err = json.Unmarshal(resp.Data, &clResponse); err != nil {
t.Fatalf("Unexpected error: %v", err)
if len(names) != 1 {
t.Fatalf("Expected 1 consumer but got %d", len(names))
}
if len(clResponse.Consumers) != 1 {
t.Fatalf("Expected 1 consumer but got %d", len(clResponse.Consumers))
}
}
func TestJetStreamClusterStreamPublishWithActiveConsumers(t *testing.T) {
@@ -1074,26 +1056,21 @@ func TestJetStreamClusterStreamOverlapSubjects(t *testing.T) {
}
// Now grab list of streams and make sure the second is not there.
resp, err := nc.Request(JSApiStreams, nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
var names []string
for name := range js.StreamNames() {
names = append(names, name)
}
var streams JSApiStreamNamesResponse
if err = json.Unmarshal(resp.Data, &streams); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(streams.Streams) != 1 {
t.Fatalf("Expected only 1 stream but got %d", len(streams.Streams))
if len(names) != 1 {
t.Fatalf("Expected only 1 stream but got %d", len(names))
}
// Now do a detailed version.
resp, err = nc.Request(JSApiStreamList, nil, 5*time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
var infos []*nats.StreamInfo
for info := range js.StreamsInfo() {
infos = append(infos, info)
}
var listResponse JSApiStreamListResponse
if err = json.Unmarshal(resp.Data, &listResponse); err != nil {
t.Fatalf("Unexpected error: %v", err)
if len(infos) != 1 {
t.Fatalf("Expected only 1 stream but got %d", len(infos))
}
}
@@ -1132,15 +1109,19 @@ func TestJetStreamClusterStreamInfoList(t *testing.T) {
sendBatch("baz", 33)
// Now get the stream list info.
sl := js.NewStreamLister()
if !sl.Next() {
t.Fatalf("Unexpected error: %v", sl.Err())
}
p := sl.Page()
if len(p) != 3 {
t.Fatalf("StreamInfo expected 3 results, got %d", len(p))
}
for _, si := range p {
var infos []*nats.StreamInfo
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
infos = infos[:0]
for info := range js.StreamsInfo() {
infos = append(infos, info)
}
if len(infos) != 3 {
return fmt.Errorf("StreamInfo expected 3 results, got %d", len(infos))
}
return nil
})
for _, si := range infos {
switch si.Config.Name {
case "foo":
if si.State.Msgs != 10 {
@@ -1180,11 +1161,10 @@ func TestJetStreamClusterConsumerInfoList(t *testing.T) {
createConsumer := func(name string) *nats.Subscription {
t.Helper()
sub, err := js.SubscribeSync("TEST", nats.Durable(name), nats.Pull(2))
sub, err := js.PullSubscribe("TEST", name)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkSubsPending(t, sub, 2)
return sub
}
@@ -1202,25 +1182,21 @@ func TestJetStreamClusterConsumerInfoList(t *testing.T) {
{subBar, 2, 0},
{subBaz, 8, 6},
} {
for i := 0; i < ss.fetch; i++ {
if m, err := ss.sub.NextMsg(time.Second); err != nil {
t.Fatalf("Unexpected error getting message %d: %v", i, err)
} else if i < ss.ack {
m.Ack()
}
msgs := fetchMsgs(t, ss.sub, ss.fetch, 5*time.Second)
for i := 0; i < ss.ack; i++ {
msgs[i].Ack()
}
}
// Now get the consumer list info.
cl := js.NewConsumerLister("TEST")
if !cl.Next() {
t.Fatalf("Unexpected error: %v", cl.Err())
var infos []*nats.ConsumerInfo
for info := range js.ConsumersInfo("TEST") {
infos = append(infos, info)
}
p := cl.Page()
if len(p) != 3 {
t.Fatalf("ConsumerInfo expected 3 results, got %d", len(p))
if len(infos) != 3 {
t.Fatalf("ConsumerInfo expected 3 results, got %d", len(infos))
}
for _, ci := range p {
for _, ci := range infos {
switch ci.Name {
case "foo":
if ci.Delivered.Consumer != 4 {
@@ -1825,12 +1801,12 @@ func TestJetStreamClusterExtendedStreamInfo(t *testing.T) {
})
// Now do consumer.
sub, err := js.SubscribeSync("foo", nats.Durable("dlc"), nats.Pull(10))
sub, err := js.PullSubscribe("foo", "dlc")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()
checkSubsPending(t, sub, 10)
fetchMsgs(t, sub, 10, 5*time.Second)
leader = c.consumerLeader("$G", "TEST", "dlc").Name()
ci, err := sub.ConsumerInfo()
@@ -1844,11 +1820,14 @@ func TestJetStreamClusterExtendedStreamInfo(t *testing.T) {
if len(ci.Cluster.Replicas) != 2 {
t.Fatalf("Expected %d replicas, got %d", 2, len(ci.Cluster.Replicas))
}
for _, peer := range ci.Cluster.Replicas {
if !peer.Current {
t.Fatalf("Expected replica to be current: %+v", peer)
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
for _, peer := range si.Cluster.Replicas {
if !peer.Current {
return fmt.Errorf("Expected replica to be current: %+v", peer)
}
}
}
return nil
})
}
func TestJetStreamClusterExtendedStreamInfoSingleReplica(t *testing.T) {
@@ -1895,13 +1874,12 @@ func TestJetStreamClusterExtendedStreamInfoSingleReplica(t *testing.T) {
}
// Make sure we can grab consumer lists from any
cl := js.NewConsumerLister("TEST")
if !cl.Next() {
t.Fatalf("Unexpected error: %v", cl.Err())
var infos []*nats.ConsumerInfo
for info := range js.ConsumersInfo("TEST") {
infos = append(infos, info)
}
p := cl.Page()
if len(p) != 0 {
t.Fatalf("ConsumerInfo expected no paged results, got %d", len(p))
if len(infos) != 0 {
t.Fatalf("ConsumerInfo expected no paged results, got %d", len(infos))
}
// Now add in a consumer.
@@ -1910,28 +1888,22 @@ func TestJetStreamClusterExtendedStreamInfoSingleReplica(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
cl = js.NewConsumerLister("TEST")
if !cl.Next() {
t.Fatalf("Unexpected error: %v", cl.Err())
infos = infos[:0]
for info := range js.ConsumersInfo("TEST") {
infos = append(infos, info)
}
p = cl.Page()
if len(p) != 1 {
t.Fatalf("ConsumerInfo expected 1 result, got %d", len(p))
if len(infos) != 1 {
t.Fatalf("ConsumerInfo expected 1 result, got %d", len(infos))
}
// Now do direct names list as well.
resp, err := nc.Request(fmt.Sprintf(JSApiConsumersT, "TEST"), nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
var names []string
for name := range js.ConsumerNames("TEST") {
names = append(names, name)
}
var clResponse JSApiConsumerNamesResponse
if err = json.Unmarshal(resp.Data, &clResponse); err != nil {
t.Fatalf("Unexpected error: %v", err)
if len(names) != 1 {
t.Fatalf("Expected only 1 consumer but got %d", len(names))
}
if len(clResponse.Consumers) != 1 {
t.Fatalf("Expected only 1 consumer but got %d", len(clResponse.Consumers))
}
}
func TestJetStreamClusterInterestRetention(t *testing.T) {
@@ -2112,15 +2084,14 @@ func TestJetStreamClusterEphemeralConsumerCleanup(t *testing.T) {
}
getConsumers := func() []string {
resp, err := nc.Request(fmt.Sprintf(JSApiConsumersT, "foo"), nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
var names []string
for name := range js.ConsumerNames("foo", nats.Context(ctx)) {
names = append(names, name)
}
var clResponse JSApiConsumerNamesResponse
if err = json.Unmarshal(resp.Data, &clResponse); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
return clResponse.Consumers
return names
}
checkConsumer := func(expected int) {
@@ -2215,7 +2186,7 @@ func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
toSend, batchSize := 200, 50
toSend := 200
for i := 0; i < toSend; i++ {
if _, err = js.Publish("foo", []byte("OK")); err != nil {
@@ -2235,25 +2206,16 @@ func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
jsub, err := js.SubscribeSync("foo", nats.Durable("dlc"), nats.Pull(batchSize))
jsub, err := js.PullSubscribe("foo", "dlc")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkSubsPending(t, jsub, batchSize)
// Ack first 50.
for i := 1; i <= 50; i++ {
m, err := jsub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error getting msg %d: %v", i, err)
}
for _, m := range fetchMsgs(t, jsub, 50, 5*time.Second) {
m.Ack()
}
// Now ack every third message for next 50.
for i := 51; i <= 100; i++ {
m, err := jsub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error getting msg %d: %v", i, err)
}
for i, m := range fetchMsgs(t, jsub, 50, 5*time.Second) {
if i%3 == 0 {
m.Ack()
}
@@ -2411,26 +2373,22 @@ func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) {
// Make sure consumer works.
// It should pick up with the next delivery spot, so check for that as first message.
// We should have all the messages for first delivery delivered.
start := 101
end := toSend
for i := start; i <= end; i++ {
m, err := jsub.NextMsg(2 * time.Second)
if err != nil {
t.Fatalf("Unexpected error getting msg [%d]: %v", i, err)
}
wantSeq := 101
for _, m := range fetchMsgs(t, jsub, 100, 5*time.Second) {
meta, err := m.MetaData()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if meta.Stream != uint64(i) {
t.Fatalf("Expected stream sequence of %d, but got %d", i, meta.Stream)
if meta.Stream != uint64(wantSeq) {
t.Fatalf("Expected stream sequence of %d, but got %d", wantSeq, meta.Stream)
}
m.Ack()
wantSeq++
}
// Check that redelivered come in now..
redelivered := 50/3 + 1
checkSubsPending(t, jsub, redelivered)
fetchMsgs(t, jsub, redelivered, 5*time.Second)
// Now make sure the other server was properly caughtup.
// Need to call this by hand for now.
@@ -2660,23 +2618,14 @@ func TestJetStreamClusterAccountInfoAndLimits(t *testing.T) {
sendBatch("bar", 75)
sendBatch("baz", 10)
accountStats := func() *JetStreamAccountStats {
accountStats := func() *nats.AccountInfo {
t.Helper()
resp, err := nc.Request(JSApiAccountInfo, nil, time.Second)
info, err := js.AccountInfo()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var info JSApiAccountInfoResponse
if err := json.Unmarshal(resp.Data, &info); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if info.Error != nil {
t.Fatalf("Unexpected error: %+v", info.Error)
}
if info.JetStreamAccountStats == nil {
t.Fatalf("AccountStats missing")
}
return info.JetStreamAccountStats
return info
}
// If subject is not 3 letters or payload not 2 this needs to change.
@@ -2947,16 +2896,14 @@ func TestJetStreamClusterExtendedAccountInfo(t *testing.T) {
sendBatch("TEST-3", 100)
// Go client will lag so use direct for now.
getAccountInfo := func() *JetStreamAccountStats {
resp, err := nc.Request(JSApiAccountInfo, nil, time.Second)
getAccountInfo := func() *nats.AccountInfo {
t.Helper()
info, err := js.AccountInfo()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var info JSApiAccountInfoResponse
if err := json.Unmarshal(resp.Data, &info); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
return info.JetStreamAccountStats
return info
}
// Wait to accumulate.
@@ -3198,7 +3145,7 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) {
})
notAvailableErr := func(err error) bool {
return err != nil && strings.Contains(err.Error(), "unavailable")
return err != nil && (strings.Contains(err.Error(), "unavailable") || err == context.DeadlineExceeded)
}
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
@@ -3250,11 +3197,11 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) {
t.Fatalf("Expected an 'unavailable' error, got %v", err)
}
// Listers
if sl := js.NewStreamLister(); sl.Next() || !notAvailableErr(sl.Err()) {
t.Fatalf("Expected an 'unavailable' error, got %v", sl.Err())
for info := range js.StreamsInfo() {
t.Fatalf("Unexpected stream info, got %v", info)
}
if cl := js.NewConsumerLister("NO-Q"); cl.Next() || !notAvailableErr(cl.Err()) {
t.Fatalf("Expected an 'unavailable' error, got %v", cl.Err())
for info := range js.ConsumersInfo("NO-Q") {
t.Fatalf("Unexpected consumer info, got %v", info)
}
}
@@ -4037,28 +3984,17 @@ func TestJetStreamClusterSuperClusterBasics(t *testing.T) {
}
// Now check we can place a stream.
// Need to do this by hand for now until Go client catches up.
pcn := "C3"
cfg := StreamConfig{
scResp, err := js.AddStream(&nats.StreamConfig{
Name: "TEST2",
Storage: FileStorage,
Placement: &Placement{Cluster: pcn},
}
req, err := json.Marshal(cfg)
Placement: &nats.Placement{Cluster: pcn},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
resp, _ := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
var scResp JSApiStreamCreateResponse
if err := json.Unmarshal(resp.Data, &scResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if scResp.StreamInfo == nil || scResp.Error != nil {
t.Fatalf("Did not receive correct response: %+v", scResp.Error)
}
if scResp.StreamInfo.Cluster.Name != pcn {
t.Fatalf("Expected the stream to be placed in %q, got %q", pcn, scResp.StreamInfo.Cluster.Name)
if scResp.Cluster.Name != pcn {
t.Fatalf("Expected the stream to be placed in %q, got %q", pcn, scResp.Cluster.Name)
}
}
@@ -4079,7 +4015,7 @@ func TestJetStreamClusterSuperClusterCrossClusterConsumerInterest(t *testing.T)
}
// Pull based first.
sub, err := js.SubscribeSync("foo", nats.Durable("dlc"), nats.Pull(1))
sub, err := js.PullSubscribe("foo", "dlc")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -4089,7 +4025,7 @@ func TestJetStreamClusterSuperClusterCrossClusterConsumerInterest(t *testing.T)
t.Fatalf("Unexpected publish error: %v", err)
}
checkSubsPending(t, sub, 1)
fetchMsgs(t, sub, 1, 5*time.Second)
// Now check push based delivery.
sub, err = js.SubscribeSync("foo", nats.Durable("rip"))
@@ -4629,40 +4565,21 @@ func TestJetStreamCrossAccountMirrorsAndSources(t *testing.T) {
}
}
nc2, _ := jsClientConnect(t, s)
nc2, js2 := jsClientConnect(t, s)
defer nc2.Close()
// Have to do this direct until we get Go client support.
// Need to match jsClusterMirrorSourceImportsTempl imports.
cfg := StreamConfig{
Name: "MY_MIRROR_TEST",
Storage: FileStorage,
Mirror: &StreamSource{
_, err := js2.AddStream(&nats.StreamConfig{
Name: "MY_MIRROR_TEST",
Mirror: &nats.StreamSource{
Name: "TEST",
External: &ExternalStream{
ApiPrefix: "RI.JS.API",
External: &nats.ExternalStream{
APIPrefix: "RI.JS.API",
DeliverPrefix: "RI.DELIVER.SYNC.MIRRORS",
},
},
}
req, err := json.Marshal(cfg)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
resp, err := nc2.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var scResp JSApiStreamCreateResponse
if err := json.Unmarshal(resp.Data, &scResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if scResp.StreamInfo == nil || scResp.Error != nil {
t.Fatalf("Did not receive correct response: %+v", scResp.Error)
}
js2, err := nc2.JetStream(nats.MaxWait(50 * time.Millisecond))
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -4670,7 +4587,7 @@ func TestJetStreamCrossAccountMirrorsAndSources(t *testing.T) {
checkFor(t, 20*time.Second, 500*time.Millisecond, func() error {
si, err := js2.StreamInfo("MY_MIRROR_TEST")
if err != nil {
t.Fatalf("Could not retrieve stream info")
t.Fatalf("Could not retrieve stream info: %s", err)
}
if si.State.Msgs != uint64(toSend) {
return fmt.Errorf("Expected %d msgs, got state: %+v", toSend, si.State)
@@ -4679,35 +4596,21 @@ func TestJetStreamCrossAccountMirrorsAndSources(t *testing.T) {
})
// Now do sources as well.
cfg = StreamConfig{
Name: "MY_SOURCE_TEST",
Storage: FileStorage,
Sources: []*StreamSource{
&StreamSource{
_, err = js2.AddStream(&nats.StreamConfig{
Name: "MY_SOURCE_TEST",
Sources: []*nats.StreamSource{
&nats.StreamSource{
Name: "TEST",
External: &ExternalStream{
ApiPrefix: "RI.JS.API",
External: &nats.ExternalStream{
APIPrefix: "RI.JS.API",
DeliverPrefix: "RI.DELIVER.SYNC.SOURCES",
},
},
},
}
req, err = json.Marshal(cfg)
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
resp, err = nc2.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
scResp.Error = nil
if err := json.Unmarshal(resp.Data, &scResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if scResp.StreamInfo == nil || scResp.Error != nil {
t.Fatalf("Did not receive correct response: %+v", scResp.Error)
}
checkFor(t, 20*time.Second, 100*time.Millisecond, func() error {
si, err := js2.StreamInfo("MY_SOURCE_TEST")
@@ -4835,16 +4738,12 @@ func TestJetStreamClusterJSAPIImport(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
// Names list..
resp, err := nc.Request(JSApiStreams, nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
var names []string
for name := range js.StreamNames() {
names = append(names, name)
}
var streams JSApiStreamNamesResponse
if err = json.Unmarshal(resp.Data, &streams); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(streams.Streams) != 1 {
t.Fatalf("Expected only 1 stream but got %d", len(streams.Streams))
if len(names) != 1 {
t.Fatalf("Expected only 1 stream but got %d", len(names))
}
// Now send to stream.
@@ -4852,16 +4751,13 @@ func TestJetStreamClusterJSAPIImport(t *testing.T) {
t.Fatalf("Unexpected publish error: %v", err)
}
sub, err = js.SubscribeSync("TEST", nats.Durable("tr"), nats.Pull(1))
sub, err = js.PullSubscribe("TEST", "tr")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkSubsPending(t, sub, 1)
msgs := fetchMsgs(t, sub, 1, 5*time.Second)
m, err := sub.NextMsg(0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
m := msgs[0]
if m.Subject != "TEST" {
t.Fatalf("Expected subject of %q, got %q", "TEST", m.Subject)
}
@@ -5757,6 +5653,18 @@ func checkSubsPending(t *testing.T, sub *nats.Subscription, numExpected int) {
})
}
func fetchMsgs(t *testing.T, sub *nats.Subscription, numExpected int, wait time.Duration) []*nats.Msg {
t.Helper()
msgs, err := sub.Fetch(numExpected, nats.MaxWait(wait))
if err != nil {
t.Fatal(err)
}
if len(msgs) != numExpected {
t.Fatalf("Unexpected msg count, got %d, want %d", len(msgs), numExpected)
}
return msgs
}
func (c *cluster) restartServer(rs *Server) *Server {
c.t.Helper()
index := -1

View File

@@ -10849,54 +10849,40 @@ func TestJetStreamMirrorBasics(t *testing.T) {
nc, js := jsClientConnect(t, s)
defer nc.Close()
createStream := func(cfg *StreamConfig) *JSApiStreamCreateResponse {
t.Helper()
req, err := json.Marshal(cfg)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
rm, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var resp JSApiStreamCreateResponse
if err := json.Unmarshal(rm.Data, &resp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
return &resp
createStream := func(cfg *nats.StreamConfig) (*nats.StreamInfo, error) {
return js.AddStream(cfg)
}
createStreamOk := func(cfg *StreamConfig) {
createStreamOk := func(cfg *nats.StreamConfig) {
t.Helper()
if scr := createStream(cfg); scr.Error != nil {
t.Fatalf("Expected error, got %+v", scr.Error)
if _, err := createStream(cfg); err != nil {
t.Fatalf("Expected error, got %+v", err)
}
}
// Test we get right config errors etc.
cfg := &StreamConfig{
cfg := &nats.StreamConfig{
Name: "M1",
Storage: FileStorage,
Subjects: []string{"foo", "bar", "baz"},
Mirror: &StreamSource{Name: "S1"},
Mirror: &nats.StreamSource{Name: "S1"},
}
scr := createStream(cfg)
if scr.Error == nil || !strings.Contains(scr.Error.Description, "stream mirrors can not") {
t.Fatalf("Expected error, got %+v", scr.Error)
_, err := createStream(cfg)
if err == nil || !strings.Contains(err.Error(), "stream mirrors can not") {
t.Fatalf("Expected error, got %+v", err)
}
// Clear subjects.
cfg.Subjects = nil
// Source
scfg := &StreamConfig{
scfg := &nats.StreamConfig{
Name: "S1",
Storage: FileStorage,
Subjects: []string{"foo", "bar", "baz"},
}
// Create source stream
createStreamOk(scfg)
// Now create our mirror stream.
createStreamOk(cfg)
@@ -10915,7 +10901,7 @@ func TestJetStreamMirrorBasics(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
checkFor(t, 5*time.Second, 250*time.Millisecond, func() error {
si, err := js2.StreamInfo("M1")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
@@ -10937,16 +10923,16 @@ func TestJetStreamMirrorBasics(t *testing.T) {
}
}
cfg = &StreamConfig{
cfg = &nats.StreamConfig{
Name: "M2",
Storage: FileStorage,
Mirror: &StreamSource{Name: "S1"},
Storage: nats.FileStorage,
Mirror: &nats.StreamSource{Name: "S1"},
}
// Now create our second mirror stream.
createStreamOk(cfg)
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
checkFor(t, 5*time.Second, 250*time.Millisecond, func() error {
si, err := js2.StreamInfo("M2")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
@@ -10967,15 +10953,14 @@ func TestJetStreamMirrorBasics(t *testing.T) {
}
}
cfg = &StreamConfig{
Name: "M3",
Storage: FileStorage,
Mirror: &StreamSource{Name: "S1", OptStartSeq: 150},
cfg = &nats.StreamConfig{
Name: "M3",
Mirror: &nats.StreamSource{Name: "S1", OptStartSeq: 150},
}
createStreamOk(cfg)
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
checkFor(t, 5*time.Second, 250*time.Millisecond, func() error {
si, err := js2.StreamInfo("M3")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
@@ -10991,14 +10976,13 @@ func TestJetStreamMirrorBasics(t *testing.T) {
// Make sure setting time works ok.
start := time.Now().UTC().Add(-2 * time.Hour)
cfg = &StreamConfig{
Name: "M4",
Storage: FileStorage,
Mirror: &StreamSource{Name: "S1", OptStartTime: &start},
cfg = &nats.StreamConfig{
Name: "M4",
Mirror: &nats.StreamSource{Name: "S1", OptStartTime: &start},
}
createStreamOk(cfg)
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
checkFor(t, 5*time.Second, 250*time.Millisecond, func() error {
si, err := js2.StreamInfo("M4")
if err != nil {
t.Fatalf("Unexpected error: %v", err)

View File

@@ -1457,22 +1457,10 @@ func TestNoRaceJetStreamClusterSuperClusterMirrors(t *testing.T) {
}
// Needed while Go client does not have mirror support.
createStream := func(cfg *StreamConfig) {
createStream := func(cfg *nats.StreamConfig) {
t.Helper()
req, err := json.Marshal(cfg)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
rm, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var resp JSApiStreamCreateResponse
if err := json.Unmarshal(rm.Data, &resp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if resp.Error != nil {
t.Fatalf("Unexpected error: %+v", resp.Error)
if _, err := js.AddStream(cfg); err != nil {
t.Fatalf("Unexpected error: %+v", err)
}
}
@@ -1483,11 +1471,10 @@ func TestNoRaceJetStreamClusterSuperClusterMirrors(t *testing.T) {
}
}
createStream(&StreamConfig{
createStream(&nats.StreamConfig{
Name: "M1",
Storage: FileStorage,
Mirror: &StreamSource{Name: "S1"},
Placement: &Placement{Cluster: "C1"},
Mirror: &nats.StreamSource{Name: "S1"},
Placement: &nats.Placement{Cluster: "C1"},
})
// Faster timeout since we loop below checking for condition.
@@ -1517,12 +1504,11 @@ func TestNoRaceJetStreamClusterSuperClusterMirrors(t *testing.T) {
}
}
createStream(&StreamConfig{
createStream(&nats.StreamConfig{
Name: "M2",
Storage: FileStorage,
Mirror: &StreamSource{Name: "S1"},
Mirror: &nats.StreamSource{Name: "S1"},
Replicas: 3,
Placement: &Placement{Cluster: "C3"},
Placement: &nats.Placement{Cluster: "C3"},
})
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
@@ -1609,32 +1595,19 @@ func TestNoRaceJetStreamClusterSuperClusterSources(t *testing.T) {
sendBatch("baz", 25)
// Needed while Go client does not have mirror support for creating mirror or source streams.
createStream := func(cfg *StreamConfig) {
createStream := func(cfg *nats.StreamConfig) {
t.Helper()
req, err := json.Marshal(cfg)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
rm, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var resp JSApiStreamCreateResponse
if err := json.Unmarshal(rm.Data, &resp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if resp.Error != nil {
t.Fatalf("Unexpected error: %+v", resp.Error)
if _, err := js.AddStream(cfg); err != nil {
t.Fatalf("Unexpected error: %+v", err)
}
}
cfg := &StreamConfig{
Name: "MS",
Storage: FileStorage,
Sources: []*StreamSource{
&StreamSource{Name: "foo"},
&StreamSource{Name: "bar"},
&StreamSource{Name: "baz"},
cfg := &nats.StreamConfig{
Name: "MS",
Sources: []*nats.StreamSource{
{Name: "foo"},
{Name: "bar"},
{Name: "baz"},
},
}
@@ -1673,16 +1646,15 @@ func TestNoRaceJetStreamClusterSuperClusterSources(t *testing.T) {
sendBatch("bar", 15)
sendBatch("baz", 25)
cfg = &StreamConfig{
Name: "MS2",
Storage: FileStorage,
Sources: []*StreamSource{
&StreamSource{Name: "foo"},
&StreamSource{Name: "bar"},
&StreamSource{Name: "baz"},
cfg = &nats.StreamConfig{
Name: "MS2",
Sources: []*nats.StreamSource{
{Name: "foo"},
{Name: "bar"},
{Name: "baz"},
},
Replicas: 3,
Placement: &Placement{Cluster: "C3"},
Placement: &nats.Placement{Cluster: "C3"},
}
createStream(cfg)

View File

@@ -1,7 +1,7 @@
language: go
go:
- 1.16.x
- 1.15.x
- 1.14.x
go_import_path: github.com/nats-io/nats.go
install:
- go get -t ./...
@@ -17,4 +17,4 @@ before_script:
script:
- go test -i -race ./...
- go test -v -run=TestNoRace -p=1 ./...
- if [[ "$TRAVIS_GO_VERSION" =~ 1.15 ]]; then ./scripts/cov.sh TRAVIS; else go test -race -v -p=1 ./... --failfast; fi
- if [[ "$TRAVIS_GO_VERSION" =~ 1.16 ]]; then ./scripts/cov.sh TRAVIS; else go test -race -v -p=1 ./... --failfast; fi

View File

@@ -4,8 +4,8 @@ go 1.15
require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.1.8-0.20210227190344-51550e242af8
github.com/nats-io/nkeys v0.2.0
github.com/nats-io/nats-server/v2 v2.2.1-0.20210327180151-03aee09847d0
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
)

View File

@@ -9,55 +9,66 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.11.7 h1:0hzRabrMN4tSTvMfnL3SCv1ZGeAP23ynzodBgaHeMeg=
github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/minio/highwayhash v1.0.0 h1:iMSDhgUILCr0TNm8LWlSjF8N0ZIj2qbO8WHp6Q/J2BA=
github.com/klauspost/compress v1.11.12 h1:famVnQVu7QwryBN4jNseQdUKES71ZAOnB6UQQJPZvqk=
github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/minio/highwayhash v1.0.0/go.mod h1:xQboMTeM9nY9v/LlAOxFctujiv5+Aq2hR5dxBpaMbdc=
github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M=
github.com/nats-io/jwt v1.1.0 h1:+vOlgtM0ZsF46GbmUoadq0/2rChNS45gtxHEa3H1gqM=
github.com/nats-io/jwt v1.1.0/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M=
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
github.com/nats-io/jwt/v2 v2.0.0-20200916203241-1f8ce17dff02/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ=
github.com/nats-io/jwt/v2 v2.0.0-20201015190852-e11ce317263c/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ=
github.com/nats-io/jwt/v2 v2.0.0-20210125223648-1c24d462becc h1:pu+s4XC+bYnI0iD2vDtOl83zjCYUau/q6c83pEvsGZc=
github.com/nats-io/jwt/v2 v2.0.0-20210125223648-1c24d462becc/go.mod h1:PuO5FToRL31ecdFqVjc794vK0Bj0CwzveQEDvkb7MoQ=
github.com/nats-io/jwt/v2 v2.0.0-20210208203759-ff814ca5f813 h1:km4lLzT86NyJRhO++VqfP/vn5cbfm+E05i2bGdqDbrY=
github.com/nats-io/jwt/v2 v2.0.0-20210208203759-ff814ca5f813/go.mod h1:PuO5FToRL31ecdFqVjc794vK0Bj0CwzveQEDvkb7MoQ=
github.com/nats-io/jwt/v2 v2.0.1 h1:SycklijeduR742i/1Y3nRhURYM7imDzZZ3+tuAQqhQA=
github.com/nats-io/jwt/v2 v2.0.1/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093/go.mod h1:rQnBf2Rv4P9adtAs/Ti6LfFmVtFG6HLhl/H7cVshcJU=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71/go.mod h1:Nan/1L5Sa1JRW+Thm4HNYcIDcVRFc5zK9OpSZeI2kk4=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad/go.mod h1:TkHpUIDETmTI7mrHN40D1pzxfzHZuGmtMbtb83TGVQw=
github.com/nats-io/nats-server/v2 v2.1.8-0.20201129161730-ebe63db3e3ed/go.mod h1:XD0zHR/jTXdZvWaQfS5mQgsXj6x12kMjKLyAk/cOGgY=
github.com/nats-io/nats-server/v2 v2.1.8-0.20210205154825-f7ab27f7dad4 h1:GStuc0W1rK45FSlpt3+7UTLzmRys2/6WSDuJFyzZ6Xg=
github.com/nats-io/nats-server/v2 v2.1.8-0.20210205154825-f7ab27f7dad4/go.mod h1:kauGd7hB5517KeSqspW2U1Mz/jhPbTrE8eOXzUPk1m0=
github.com/nats-io/nats-server/v2 v2.1.8-0.20210227190344-51550e242af8 h1:jPZZofsCevE2oJl3YexVw3drWOFdo8H4AWMb/1WcVoc=
github.com/nats-io/nats-server/v2 v2.1.8-0.20210227190344-51550e242af8/go.mod h1:/QQ/dpqFavkNhVnjvMILSQ3cj5hlmhB66adlgNbjuoA=
github.com/nats-io/nats-server/v2 v2.2.1-0.20210327180151-03aee09847d0 h1:ybeT5VFA73CVQb4rCL+48+up91xWheriSBbJ3M2Pzps=
github.com/nats-io/nats-server/v2 v2.2.1-0.20210327180151-03aee09847d0/go.mod h1:eKlAaGmSQHZMFQA6x56AaP5/Bl9N3mWF4awyT2TTpzc=
github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I=
github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4=
github.com/nats-io/nats.go v1.10.1-0.20201021145452-94be476ad6e0/go.mod h1:VU2zERjp8xmF+Lw2NH4u2t5qWZxwc7jB3+7HVMWQXPI=
github.com/nats-io/nats.go v1.10.1-0.20210127212649-5b4924938a9a/go.mod h1:Sa3kLIonafChP5IF0b55i9uvGR10I3hPETFbi4+9kOI=
github.com/nats-io/nats.go v1.10.1-0.20210211000709-75ded9c77585/go.mod h1:uBWnCKg9luW1g7hgzPxUjHFRI40EuTSX7RCzgnc74Jk=
github.com/nats-io/nats.go v1.10.1-0.20210228004050-ed743748acac/go.mod h1:hxFvLNbNmT6UppX5B5Tr/r3g+XSwGjJzFn6mxPNJEHc=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.2.0 h1:WXKF7diOaPU9cJdLD7nuzwasQy9vT1tBqzXZZf3AMJM=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 h1:pLI5jrR7OSLijeIDcmRxNmw2api+jEfxLoykJVice/E=
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 h1:nxC68pudNYkKU6jWhgrqdreuFiOQWj1Fs7T3VrH4Pjw=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=

1142
vendor/github.com/nats-io/nats.go/js.go generated vendored

File diff suppressed because it is too large Load Diff

View File

@@ -14,6 +14,7 @@
package nats
import (
"context"
"encoding/json"
"errors"
"fmt"
@@ -22,46 +23,52 @@ import (
"time"
)
// JetStreamManager is the public interface for managing JetStream streams & consumers.
// JetStreamManager manages JetStream Streams and Consumers.
type JetStreamManager interface {
// AddStream creates a stream.
AddStream(cfg *StreamConfig) (*StreamInfo, error)
AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error)
// UpdateStream updates a stream.
UpdateStream(cfg *StreamConfig) (*StreamInfo, error)
UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error)
// DeleteStream deletes a stream.
DeleteStream(name string) error
DeleteStream(name string, opts ...JSOpt) error
// StreamInfo retrieves information from a stream.
StreamInfo(stream string) (*StreamInfo, error)
StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error)
// Purge stream messages.
PurgeStream(name string) error
// PurgeStream purges a stream messages.
PurgeStream(name string, opts ...JSOpt) error
// NewStreamLister is used to return pages of StreamInfo objects.
NewStreamLister() *StreamLister
// StreamsInfo can be used to retrieve a list of StreamInfo objects.
StreamsInfo(opts ...JSOpt) <-chan *StreamInfo
// StreamNames is used to retrieve a list of Stream names.
StreamNames(opts ...JSOpt) <-chan string
// GetMsg retrieves a raw stream message stored in JetStream by sequence number.
GetMsg(name string, seq uint64) (*RawStreamMsg, error)
GetMsg(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, error)
// DeleteMsg erases a message from a stream.
DeleteMsg(name string, seq uint64) error
DeleteMsg(name string, seq uint64, opts ...JSOpt) error
// AddConsumer adds a consumer to a stream.
AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error)
AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error)
// DeleteConsumer deletes a consumer.
DeleteConsumer(stream, consumer string) error
DeleteConsumer(stream, consumer string, opts ...JSOpt) error
// ConsumerInfo retrieves consumer information.
ConsumerInfo(stream, name string) (*ConsumerInfo, error)
// ConsumerInfo retrieves information of a consumer from a stream.
ConsumerInfo(stream, name string, opts ...JSOpt) (*ConsumerInfo, error)
// NewConsumerLister is used to return pages of ConsumerInfo objects.
NewConsumerLister(stream string) *ConsumerLister
// ConsumersInfo is used to retrieve a list of ConsumerInfo objects.
ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo
// ConsumerNames is used to retrieve a list of Consumer names.
ConsumerNames(stream string, opts ...JSOpt) <-chan string
// AccountInfo retrieves info about the JetStream usage from an account.
AccountInfo() (*AccountInfo, error)
AccountInfo(opts ...JSOpt) (*AccountInfo, error)
}
// StreamConfig will determine the properties for a stream.
@@ -95,10 +102,18 @@ type Placement struct {
// StreamSource dictates how streams can source from other streams.
type StreamSource struct {
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
External *ExternalStream `json:"external,omitempty"`
}
// ExternalStream allows you to qualify access to a stream source in another
// account.
type ExternalStream struct {
APIPrefix string `json:"api"`
DeliverPrefix string `json:"deliver"`
}
// apiError is included in all API responses if there was an error.
@@ -156,8 +171,16 @@ type accountInfoResponse struct {
}
// AccountInfo retrieves info about the JetStream usage from the current account.
func (js *js) AccountInfo() (*AccountInfo, error) {
resp, err := js.nc.Request(js.apiSubj(apiAccountInfo), nil, js.wait)
func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) {
o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
return nil, err
}
if cancel != nil {
defer cancel()
}
resp, err := js.nc.RequestWithContext(o.ctx, js.apiSubj(apiAccountInfo), nil)
if err != nil {
return nil, err
}
@@ -189,7 +212,15 @@ type consumerResponse struct {
}
// AddConsumer will add a JetStream consumer.
func (js *js) AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error) {
func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
return nil, err
}
if cancel != nil {
defer cancel()
}
if stream == _EMPTY_ {
return nil, ErrStreamNameRequired
}
@@ -208,7 +239,7 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, er
ccSubj = fmt.Sprintf(apiConsumerCreateT, stream)
}
resp, err := js.nc.Request(js.apiSubj(ccSubj), req, js.wait)
resp, err := js.nc.RequestWithContext(o.ctx, js.apiSubj(ccSubj), req)
if err != nil {
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
@@ -233,13 +264,21 @@ type consumerDeleteResponse struct {
}
// DeleteConsumer deletes a Consumer.
func (js *js) DeleteConsumer(stream, consumer string) error {
func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error {
o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
return err
}
if cancel != nil {
defer cancel()
}
if stream == _EMPTY_ {
return ErrStreamNameRequired
}
dcSubj := js.apiSubj(fmt.Sprintf(apiConsumerDeleteT, stream, consumer))
r, err := js.nc.Request(dcSubj, nil, js.wait)
r, err := js.nc.RequestWithContext(o.ctx, dcSubj, nil)
if err != nil {
return err
}
@@ -254,13 +293,20 @@ func (js *js) DeleteConsumer(stream, consumer string) error {
}
// ConsumerInfo returns information about a Consumer.
func (js *js) ConsumerInfo(stream, consumer string) (*ConsumerInfo, error) {
return js.getConsumerInfo(stream, consumer)
func (js *js) ConsumerInfo(stream, consumer string, opts ...JSOpt) (*ConsumerInfo, error) {
o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
return nil, err
}
if cancel != nil {
defer cancel()
}
return js.getConsumerInfoContext(o.ctx, stream, consumer)
}
// ConsumerLister fetches pages of ConsumerInfo objects. This object is not
// consumerLister fetches pages of ConsumerInfo objects. This object is not
// safe to use for multiple threads.
type ConsumerLister struct {
type consumerLister struct {
stream string
js *js
@@ -283,7 +329,7 @@ type consumerListResponse struct {
}
// Next fetches the next ConsumerInfo page.
func (c *ConsumerLister) Next() bool {
func (c *consumerLister) Next() bool {
if c.err != nil {
return false
}
@@ -302,8 +348,16 @@ func (c *ConsumerLister) Next() bool {
c.err = err
return false
}
var cancel context.CancelFunc
ctx := c.js.opts.ctx
if ctx == nil {
ctx, cancel = context.WithTimeout(context.Background(), c.js.opts.wait)
defer cancel()
}
clSubj := c.js.apiSubj(fmt.Sprintf(apiConsumerListT, c.stream))
r, err := c.js.nc.Request(clSubj, req, c.js.wait)
r, err := c.js.nc.RequestWithContext(ctx, clSubj, req)
if err != nil {
c.err = err
return false
@@ -325,18 +379,138 @@ func (c *ConsumerLister) Next() bool {
}
// Page returns the current ConsumerInfo page.
func (c *ConsumerLister) Page() []*ConsumerInfo {
func (c *consumerLister) Page() []*ConsumerInfo {
return c.page
}
// Err returns any errors found while fetching pages.
func (c *ConsumerLister) Err() error {
func (c *consumerLister) Err() error {
return c.err
}
// NewConsumerLister is used to return pages of ConsumerInfo objects.
func (js *js) NewConsumerLister(stream string) *ConsumerLister {
return &ConsumerLister{stream: stream, js: js}
// ConsumersInfo is used to retrieve a list of ConsumerInfo objects.
func (jsc *js) ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo {
o, cancel, err := getJSContextOpts(jsc.opts, opts...)
if err != nil {
return nil
}
ch := make(chan *ConsumerInfo)
l := &consumerLister{js: &js{nc: jsc.nc, opts: o}, stream: stream}
go func() {
if cancel != nil {
defer cancel()
}
defer close(ch)
for l.Next() {
for _, info := range l.Page() {
select {
case ch <- info:
case <-o.ctx.Done():
return
}
}
}
}()
return ch
}
type consumerNamesLister struct {
stream string
js *js
err error
offset int
page []string
pageInfo *apiPaged
}
// consumerNamesListResponse is the response for a Consumers Names List request.
type consumerNamesListResponse struct {
apiResponse
apiPaged
Consumers []string `json:"consumers"`
}
// Next fetches the next ConsumerInfo page.
func (c *consumerNamesLister) Next() bool {
if c.err != nil {
return false
}
if c.stream == _EMPTY_ {
c.err = ErrStreamNameRequired
return false
}
if c.pageInfo != nil && c.offset >= c.pageInfo.Total {
return false
}
var cancel context.CancelFunc
ctx := c.js.opts.ctx
if ctx == nil {
ctx, cancel = context.WithTimeout(context.Background(), c.js.opts.wait)
defer cancel()
}
clSubj := c.js.apiSubj(fmt.Sprintf(apiConsumerNamesT, c.stream))
r, err := c.js.nc.RequestWithContext(ctx, clSubj, nil)
if err != nil {
c.err = err
return false
}
var resp consumerNamesListResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
c.err = err
return false
}
if resp.Error != nil {
c.err = errors.New(resp.Error.Description)
return false
}
c.pageInfo = &resp.apiPaged
c.page = resp.Consumers
c.offset += len(c.page)
return true
}
// Page returns the current ConsumerInfo page.
func (c *consumerNamesLister) Page() []string {
return c.page
}
// Err returns any errors found while fetching pages.
func (c *consumerNamesLister) Err() error {
return c.err
}
// ConsumerNames is used to retrieve a list of Consumer names.
func (jsc *js) ConsumerNames(stream string, opts ...JSOpt) <-chan string {
o, cancel, err := getJSContextOpts(jsc.opts, opts...)
if err != nil {
return nil
}
ch := make(chan string)
l := &consumerNamesLister{stream: stream, js: &js{nc: jsc.nc, opts: o}}
go func() {
if cancel != nil {
defer cancel()
}
defer close(ch)
for l.Next() {
for _, info := range l.Page() {
select {
case ch <- info:
case <-o.ctx.Done():
return
}
}
}
}()
return ch
}
// streamCreateResponse stream creation.
@@ -345,7 +519,15 @@ type streamCreateResponse struct {
*StreamInfo
}
func (js *js) AddStream(cfg *StreamConfig) (*StreamInfo, error) {
func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
return nil, err
}
if cancel != nil {
defer cancel()
}
if cfg == nil || cfg.Name == _EMPTY_ {
return nil, ErrStreamNameRequired
}
@@ -356,7 +538,7 @@ func (js *js) AddStream(cfg *StreamConfig) (*StreamInfo, error) {
}
csSubj := js.apiSubj(fmt.Sprintf(apiStreamCreateT, cfg.Name))
r, err := js.nc.Request(csSubj, req, js.wait)
r, err := js.nc.RequestWithContext(o.ctx, csSubj, req)
if err != nil {
return nil, err
}
@@ -372,9 +554,17 @@ func (js *js) AddStream(cfg *StreamConfig) (*StreamInfo, error) {
type streamInfoResponse = streamCreateResponse
func (js *js) StreamInfo(stream string) (*StreamInfo, error) {
func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {
o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
return nil, err
}
if cancel != nil {
defer cancel()
}
csSubj := js.apiSubj(fmt.Sprintf(apiStreamInfoT, stream))
r, err := js.nc.Request(csSubj, nil, js.wait)
r, err := js.nc.RequestWithContext(o.ctx, csSubj, nil)
if err != nil {
return nil, err
}
@@ -435,7 +625,15 @@ type PeerInfo struct {
}
// UpdateStream updates a Stream.
func (js *js) UpdateStream(cfg *StreamConfig) (*StreamInfo, error) {
func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
return nil, err
}
if cancel != nil {
defer cancel()
}
if cfg == nil || cfg.Name == _EMPTY_ {
return nil, ErrStreamNameRequired
}
@@ -446,7 +644,7 @@ func (js *js) UpdateStream(cfg *StreamConfig) (*StreamInfo, error) {
}
usSubj := js.apiSubj(fmt.Sprintf(apiStreamUpdateT, cfg.Name))
r, err := js.nc.Request(usSubj, req, js.wait)
r, err := js.nc.RequestWithContext(o.ctx, usSubj, req)
if err != nil {
return nil, err
}
@@ -467,13 +665,21 @@ type streamDeleteResponse struct {
}
// DeleteStream deletes a Stream.
func (js *js) DeleteStream(name string) error {
func (js *js) DeleteStream(name string, opts ...JSOpt) error {
o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
return err
}
if cancel != nil {
defer cancel()
}
if name == _EMPTY_ {
return ErrStreamNameRequired
}
dsSubj := js.apiSubj(fmt.Sprintf(apiStreamDeleteT, name))
r, err := js.nc.Request(dsSubj, nil, js.wait)
r, err := js.nc.RequestWithContext(o.ctx, dsSubj, nil)
if err != nil {
return err
}
@@ -517,7 +723,15 @@ type apiMsgGetResponse struct {
}
// GetMsg retrieves a raw stream message stored in JetStream by sequence number.
func (js *js) GetMsg(name string, seq uint64) (*RawStreamMsg, error) {
func (js *js) GetMsg(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, error) {
o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
return nil, err
}
if cancel != nil {
defer cancel()
}
if name == _EMPTY_ {
return nil, ErrStreamNameRequired
}
@@ -528,7 +742,7 @@ func (js *js) GetMsg(name string, seq uint64) (*RawStreamMsg, error) {
}
dsSubj := js.apiSubj(fmt.Sprintf(apiMsgGetT, name))
r, err := js.nc.Request(dsSubj, req, js.wait)
r, err := js.nc.RequestWithContext(o.ctx, dsSubj, req)
if err != nil {
return nil, err
}
@@ -571,7 +785,15 @@ type msgDeleteResponse struct {
}
// DeleteMsg deletes a message from a stream.
func (js *js) DeleteMsg(name string, seq uint64) error {
func (js *js) DeleteMsg(name string, seq uint64, opts ...JSOpt) error {
o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
return err
}
if cancel != nil {
defer cancel()
}
if name == _EMPTY_ {
return ErrStreamNameRequired
}
@@ -582,7 +804,7 @@ func (js *js) DeleteMsg(name string, seq uint64) error {
}
dsSubj := js.apiSubj(fmt.Sprintf(apiMsgDeleteT, name))
r, err := js.nc.Request(dsSubj, req, js.wait)
r, err := js.nc.RequestWithContext(o.ctx, dsSubj, req)
if err != nil {
return err
}
@@ -603,9 +825,17 @@ type streamPurgeResponse struct {
}
// PurgeStream purges messages on a Stream.
func (js *js) PurgeStream(name string) error {
func (js *js) PurgeStream(name string, opts ...JSOpt) error {
o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
return err
}
if cancel != nil {
defer cancel()
}
psSubj := js.apiSubj(fmt.Sprintf(apiStreamPurgeT, name))
r, err := js.nc.Request(psSubj, nil, js.wait)
r, err := js.nc.RequestWithContext(o.ctx, psSubj, nil)
if err != nil {
return err
}
@@ -619,9 +849,9 @@ func (js *js) PurgeStream(name string) error {
return nil
}
// StreamLister fetches pages of StreamInfo objects. This object is not safe
// streamLister fetches pages of StreamInfo objects. This object is not safe
// to use for multiple threads.
type StreamLister struct {
type streamLister struct {
js *js
page []*StreamInfo
err error
@@ -646,7 +876,7 @@ type streamNamesRequest struct {
}
// Next fetches the next StreamInfo page.
func (s *StreamLister) Next() bool {
func (s *streamLister) Next() bool {
if s.err != nil {
return false
}
@@ -662,8 +892,15 @@ func (s *StreamLister) Next() bool {
return false
}
var cancel context.CancelFunc
ctx := s.js.opts.ctx
if ctx == nil {
ctx, cancel = context.WithTimeout(context.Background(), s.js.opts.wait)
defer cancel()
}
slSubj := s.js.apiSubj(apiStreamList)
r, err := s.js.nc.Request(slSubj, req, s.js.wait)
r, err := s.js.nc.RequestWithContext(ctx, slSubj, req)
if err != nil {
s.err = err
return false
@@ -685,16 +922,149 @@ func (s *StreamLister) Next() bool {
}
// Page returns the current StreamInfo page.
func (s *StreamLister) Page() []*StreamInfo {
func (s *streamLister) Page() []*StreamInfo {
return s.page
}
// Err returns any errors found while fetching pages.
func (s *StreamLister) Err() error {
func (s *streamLister) Err() error {
return s.err
}
// NewStreamLister is used to return pages of StreamInfo objects.
func (js *js) NewStreamLister() *StreamLister {
return &StreamLister{js: js}
// StreamsInfo can be used to retrieve a list of StreamInfo objects.
func (jsc *js) StreamsInfo(opts ...JSOpt) <-chan *StreamInfo {
o, cancel, err := getJSContextOpts(jsc.opts, opts...)
if err != nil {
return nil
}
ch := make(chan *StreamInfo)
l := &streamLister{js: &js{nc: jsc.nc, opts: o}}
go func() {
if cancel != nil {
defer cancel()
}
defer close(ch)
for l.Next() {
for _, info := range l.Page() {
select {
case ch <- info:
case <-o.ctx.Done():
return
}
}
}
}()
return ch
}
type streamNamesLister struct {
js *js
err error
offset int
page []string
pageInfo *apiPaged
}
// Next fetches the next ConsumerInfo page.
func (l *streamNamesLister) Next() bool {
if l.err != nil {
return false
}
if l.pageInfo != nil && l.offset >= l.pageInfo.Total {
return false
}
var cancel context.CancelFunc
ctx := l.js.opts.ctx
if ctx == nil {
ctx, cancel = context.WithTimeout(context.Background(), l.js.opts.wait)
defer cancel()
}
r, err := l.js.nc.RequestWithContext(ctx, l.js.apiSubj(apiStreams), nil)
if err != nil {
l.err = err
return false
}
var resp streamNamesResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
l.err = err
return false
}
if resp.Error != nil {
l.err = errors.New(resp.Error.Description)
return false
}
l.pageInfo = &resp.apiPaged
l.page = resp.Streams
l.offset += len(l.page)
return true
}
// Page returns the current ConsumerInfo page.
func (l *streamNamesLister) Page() []string {
return l.page
}
// Err returns any errors found while fetching pages.
func (l *streamNamesLister) Err() error {
return l.err
}
// StreamNames is used to retrieve a list of Stream names.
func (jsc *js) StreamNames(opts ...JSOpt) <-chan string {
o, cancel, err := getJSContextOpts(jsc.opts, opts...)
if err != nil {
return nil
}
ch := make(chan string)
l := &streamNamesLister{js: &js{nc: jsc.nc, opts: o}}
go func() {
if cancel != nil {
defer cancel()
}
defer close(ch)
for l.Next() {
for _, info := range l.Page() {
select {
case ch <- info:
case <-o.ctx.Done():
return
}
}
}
}()
return ch
}
func getJSContextOpts(defs *jsOpts, opts ...JSOpt) (*jsOpts, context.CancelFunc, error) {
var o jsOpts
for _, opt := range opts {
if err := opt.configureJSContext(&o); err != nil {
return nil, nil, err
}
}
// Check for option collisions. Right now just timeout and context.
if o.ctx != nil && o.wait != 0 {
return nil, nil, ErrContextAndTimeout
}
if o.wait == 0 && o.ctx == nil {
o.wait = defs.wait
}
var cancel context.CancelFunc
if o.ctx == nil && o.wait > 0 {
o.ctx, cancel = context.WithTimeout(context.Background(), o.wait)
}
if o.pre == "" {
o.pre = defs.pre
}
return &o, cancel, nil
}

View File

@@ -75,6 +75,9 @@ const (
// AUTHENTICATION_EXPIRED_ERR is for when nats server user authorization has expired.
AUTHENTICATION_EXPIRED_ERR = "user authentication expired"
// AUTHENTICATION_REVOKED_ERR is for when user authorization has been revoked.
AUTHENTICATION_REVOKED_ERR = "user authentication revoked"
)
// Errors
@@ -94,6 +97,7 @@ var (
ErrBadTimeout = errors.New("nats: timeout invalid")
ErrAuthorization = errors.New("nats: authorization violation")
ErrAuthExpired = errors.New("nats: authentication expired")
ErrAuthRevoked = errors.New("nats: authentication revoked")
ErrNoServers = errors.New("nats: no servers available for connection")
ErrJsonParse = errors.New("nats: connect message, json parse error")
ErrChanArg = errors.New("nats: argument needs to be a channel type")
@@ -125,7 +129,6 @@ var (
ErrBadHeaderMsg = errors.New("nats: message could not decode headers")
ErrNoResponders = errors.New("nats: no responders available for request")
ErrNoContextOrTimeout = errors.New("nats: no context or timeout given")
ErrDirectModeRequired = errors.New("nats: direct access requires direct pull or push")
ErrPullModeNotAllowed = errors.New("nats: pull based not supported")
ErrJetStreamNotEnabled = errors.New("nats: jetstream not enabled")
ErrJetStreamBadPre = errors.New("nats: jetstream api prefix not valid")
@@ -484,6 +487,9 @@ type Conn struct {
respMux *Subscription // A single response subscription
respMap map[string]chan *Msg // Request map for the response msg channels
respRand *rand.Rand // Used for generating suffix
// JetStream Contexts last account check.
jsLastCheck time.Time
}
// Subscription represents interest in a given subject.
@@ -2757,6 +2763,9 @@ func checkAuthError(e string) error {
if strings.HasPrefix(e, AUTHENTICATION_EXPIRED_ERR) {
return ErrAuthExpired
}
if strings.HasPrefix(e, AUTHENTICATION_REVOKED_ERR) {
return ErrAuthRevoked
}
return nil
}
@@ -2823,6 +2832,7 @@ const (
statusHdr = "Status"
descrHdr = "Description"
noResponders = "503"
noMessages = "404"
statusLen = 3 // e.g. 20x, 40x, 50x
)
@@ -3284,8 +3294,7 @@ func (nc *Conn) SubscribeSync(subj string) (*Subscription, error) {
return nil, ErrInvalidConnection
}
mch := make(chan *Msg, nc.Opts.SubChanLen)
s, e := nc.subscribe(subj, _EMPTY_, nil, mch, true, nil)
return s, e
return nc.subscribe(subj, _EMPTY_, nil, mch, true, nil)
}
// QueueSubscribe creates an asynchronous queue subscriber on the given subject.
@@ -3302,8 +3311,7 @@ func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription
// given message synchronously using Subscription.NextMsg().
func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error) {
mch := make(chan *Msg, nc.Opts.SubChanLen)
s, e := nc.subscribe(subj, queue, nil, mch, true, nil)
return s, e
return nc.subscribe(subj, queue, nil, mch, true, nil)
}
// QueueSubscribeSyncWithChan will express interest in the given subject.
@@ -3447,6 +3455,7 @@ const (
SyncSubscription
ChanSubscription
NilSubscription
PullSubscription
)
// Type returns the type of Subscription.
@@ -3715,7 +3724,6 @@ func (s *Subscription) processNextMsgDelivered(msg *Msg) error {
s.mu.Lock()
nc := s.conn
max := s.max
jsi := s.jsi
// Update some stats.
s.delivered++
@@ -3738,12 +3746,6 @@ func (s *Subscription) processNextMsgDelivered(msg *Msg) error {
}
}
// In case this is a JetStream message and in pull mode
// then check whether it is an JS API error.
if jsi != nil && jsi.pull > 0 && len(msg.Data) == 0 && msg.Header.Get(statusHdr) == noResponders {
return ErrNoResponders
}
return nil
}

2
vendor/modules.txt vendored
View File

@@ -7,7 +7,7 @@ github.com/minio/highwayhash
# github.com/nats-io/jwt/v2 v2.0.1
## explicit
github.com/nats-io/jwt/v2
# github.com/nats-io/nats.go v1.10.1-0.20210228004050-ed743748acac
# github.com/nats-io/nats.go v1.10.1-0.20210330002604-882e98e18c99
## explicit
github.com/nats-io/nats.go
github.com/nats-io/nats.go/encoders/builtin