diff --git a/go.mod b/go.mod index 9e895d1a..e81923e5 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/klauspost/compress v1.11.12 github.com/minio/highwayhash v1.0.1 github.com/nats-io/jwt/v2 v2.0.1 - github.com/nats-io/nats.go v1.10.1-0.20210228004050-ed743748acac + github.com/nats-io/nats.go v1.10.1-0.20210330002604-882e98e18c99 github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b diff --git a/go.sum b/go.sum index 01c9c865..fafc7029 100644 --- a/go.sum +++ b/go.sum @@ -31,14 +31,16 @@ github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad/go.mod h1 github.com/nats-io/nats-server/v2 v2.1.8-0.20201129161730-ebe63db3e3ed/go.mod h1:XD0zHR/jTXdZvWaQfS5mQgsXj6x12kMjKLyAk/cOGgY= github.com/nats-io/nats-server/v2 v2.1.8-0.20210205154825-f7ab27f7dad4/go.mod h1:kauGd7hB5517KeSqspW2U1Mz/jhPbTrE8eOXzUPk1m0= github.com/nats-io/nats-server/v2 v2.1.8-0.20210227190344-51550e242af8/go.mod h1:/QQ/dpqFavkNhVnjvMILSQ3cj5hlmhB66adlgNbjuoA= +github.com/nats-io/nats-server/v2 v2.2.1-0.20210327180151-03aee09847d0/go.mod h1:eKlAaGmSQHZMFQA6x56AaP5/Bl9N3mWF4awyT2TTpzc= github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE= github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I= github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4= github.com/nats-io/nats.go v1.10.1-0.20201021145452-94be476ad6e0/go.mod h1:VU2zERjp8xmF+Lw2NH4u2t5qWZxwc7jB3+7HVMWQXPI= github.com/nats-io/nats.go v1.10.1-0.20210127212649-5b4924938a9a/go.mod h1:Sa3kLIonafChP5IF0b55i9uvGR10I3hPETFbi4+9kOI= github.com/nats-io/nats.go v1.10.1-0.20210211000709-75ded9c77585/go.mod h1:uBWnCKg9luW1g7hgzPxUjHFRI40EuTSX7RCzgnc74Jk= -github.com/nats-io/nats.go v1.10.1-0.20210228004050-ed743748acac h1:/cF7DEtxQBcwRDhpFZ3J0XU4TFpJa9KQF/xDirRNNI0= github.com/nats-io/nats.go v1.10.1-0.20210228004050-ed743748acac/go.mod h1:hxFvLNbNmT6UppX5B5Tr/r3g+XSwGjJzFn6mxPNJEHc= +github.com/nats-io/nats.go v1.10.1-0.20210330002604-882e98e18c99 h1:KHz7ujBiN9Zg9lqK5IvxW6ZwhW1v/PIRHCCVHkn0XZ0= +github.com/nats-io/nats.go v1.10.1-0.20210330002604-882e98e18c99/go.mod h1:OieyGzlIObT5YMgJfjuZS4tXG7fUUdRH+hDqioUKbDw= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 856f56df..8cfc69fe 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -15,6 +15,7 @@ package server import ( "bytes" + "context" "encoding/json" "fmt" "io/ioutil" @@ -584,19 +585,15 @@ func TestJetStreamClusterConsumerState(t *testing.T) { } } - sub, err := js.SubscribeSync("foo", nats.Durable("dlc"), nats.Pull(1)) + sub, err := js.PullSubscribe("foo", "dlc") if err != nil { t.Fatalf("Unexpected error: %v", err) } - checkSubsPending(t, sub, 1) - // Pull 5 messages and ack. for i := 0; i < 5; i++ { - m, err := sub.NextMsg(time.Second) - if err != nil { - t.Fatalf("Unexpected error getting msg %d: %v", i+1, err) - } + msgs := fetchMsgs(t, sub, 1, 5*time.Second) + m := msgs[0] m.Ack() } @@ -629,10 +626,8 @@ func TestJetStreamClusterConsumerState(t *testing.T) { // Now make sure we can receive new messages. // Pull last 5. for i := 0; i < 5; i++ { - m, err := sub.NextMsg(time.Second) - if err != nil { - t.Fatalf("Unexpected error getting msg %d: %v", i+1, err) - } + msgs := fetchMsgs(t, sub, 1, 5*time.Second) + m := msgs[0] m.Ack() } nci, _ = sub.ConsumerInfo() @@ -672,12 +667,12 @@ func TestJetStreamClusterFullConsumerState(t *testing.T) { } } - sub, err := js.SubscribeSync("foo", nats.Durable("dlc"), nats.Pull(1)) + sub, err := js.PullSubscribe("foo", "dlc") if err != nil { t.Fatalf("Unexpected error: %v", err) } - checkSubsPending(t, sub, 1) + fetchMsgs(t, sub, 1, 5*time.Second) // Now purge the stream. if err := js.PurgeStream("TEST"); err != nil { @@ -907,29 +902,21 @@ func TestJetStreamClusterRestoreSingleConsumer(t *testing.T) { nc, js = jsClientConnect(t, s) defer nc.Close() - resp, err := nc.Request(JSApiStreams, nil, time.Second) - if err != nil { - t.Fatalf("Unexpected error: %v", err) + var names []string + for name := range js.StreamNames() { + names = append(names, name) } - var streams JSApiStreamNamesResponse - if err = json.Unmarshal(resp.Data, &streams); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if len(streams.Streams) != 1 { - t.Fatalf("Expected only 1 stream but got %d", len(streams.Streams)) + if len(names) != 1 { + t.Fatalf("Expected only 1 stream but got %d", len(names)) } // Now do detailed version. - resp, err = nc.Request(JSApiStreamList, nil, 5*time.Second) - if err != nil { - t.Fatalf("Unexpected error: %v", err) + var infos []*nats.StreamInfo + for info := range js.StreamsInfo() { + infos = append(infos, info) } - var listResponse JSApiStreamListResponse - if err = json.Unmarshal(resp.Data, &listResponse); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if len(listResponse.Streams) != 1 { - t.Fatalf("Expected 1 stream but got %d", len(listResponse.Streams)) + if len(infos) != 1 { + t.Fatalf("Expected 1 stream but got %d", len(infos)) } si, err := js.StreamInfo("foo") if err != nil { @@ -940,18 +927,13 @@ func TestJetStreamClusterRestoreSingleConsumer(t *testing.T) { } // Now check for consumer. - resp, err = nc.Request(fmt.Sprintf(JSApiConsumersT, "foo"), nil, time.Second) - if err != nil { - t.Fatalf("Unexpected error: %v", err) + names = names[:0] + for name := range js.ConsumerNames("foo") { + names = append(names, name) } - var clResponse JSApiConsumerNamesResponse - if err = json.Unmarshal(resp.Data, &clResponse); err != nil { - t.Fatalf("Unexpected error: %v", err) + if len(names) != 1 { + t.Fatalf("Expected 1 consumer but got %d", len(names)) } - if len(clResponse.Consumers) != 1 { - t.Fatalf("Expected 1 consumer but got %d", len(clResponse.Consumers)) - } - } func TestJetStreamClusterStreamPublishWithActiveConsumers(t *testing.T) { @@ -1074,26 +1056,21 @@ func TestJetStreamClusterStreamOverlapSubjects(t *testing.T) { } // Now grab list of streams and make sure the second is not there. - resp, err := nc.Request(JSApiStreams, nil, time.Second) - if err != nil { - t.Fatalf("Unexpected error: %v", err) + var names []string + for name := range js.StreamNames() { + names = append(names, name) } - var streams JSApiStreamNamesResponse - if err = json.Unmarshal(resp.Data, &streams); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if len(streams.Streams) != 1 { - t.Fatalf("Expected only 1 stream but got %d", len(streams.Streams)) + if len(names) != 1 { + t.Fatalf("Expected only 1 stream but got %d", len(names)) } // Now do a detailed version. - resp, err = nc.Request(JSApiStreamList, nil, 5*time.Second) - if err != nil { - t.Fatalf("Unexpected error: %v", err) + var infos []*nats.StreamInfo + for info := range js.StreamsInfo() { + infos = append(infos, info) } - var listResponse JSApiStreamListResponse - if err = json.Unmarshal(resp.Data, &listResponse); err != nil { - t.Fatalf("Unexpected error: %v", err) + if len(infos) != 1 { + t.Fatalf("Expected only 1 stream but got %d", len(infos)) } } @@ -1132,15 +1109,19 @@ func TestJetStreamClusterStreamInfoList(t *testing.T) { sendBatch("baz", 33) // Now get the stream list info. - sl := js.NewStreamLister() - if !sl.Next() { - t.Fatalf("Unexpected error: %v", sl.Err()) - } - p := sl.Page() - if len(p) != 3 { - t.Fatalf("StreamInfo expected 3 results, got %d", len(p)) - } - for _, si := range p { + var infos []*nats.StreamInfo + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + infos = infos[:0] + for info := range js.StreamsInfo() { + infos = append(infos, info) + } + if len(infos) != 3 { + return fmt.Errorf("StreamInfo expected 3 results, got %d", len(infos)) + } + return nil + }) + + for _, si := range infos { switch si.Config.Name { case "foo": if si.State.Msgs != 10 { @@ -1180,11 +1161,10 @@ func TestJetStreamClusterConsumerInfoList(t *testing.T) { createConsumer := func(name string) *nats.Subscription { t.Helper() - sub, err := js.SubscribeSync("TEST", nats.Durable(name), nats.Pull(2)) + sub, err := js.PullSubscribe("TEST", name) if err != nil { t.Fatalf("Unexpected error: %v", err) } - checkSubsPending(t, sub, 2) return sub } @@ -1202,25 +1182,21 @@ func TestJetStreamClusterConsumerInfoList(t *testing.T) { {subBar, 2, 0}, {subBaz, 8, 6}, } { - for i := 0; i < ss.fetch; i++ { - if m, err := ss.sub.NextMsg(time.Second); err != nil { - t.Fatalf("Unexpected error getting message %d: %v", i, err) - } else if i < ss.ack { - m.Ack() - } + msgs := fetchMsgs(t, ss.sub, ss.fetch, 5*time.Second) + for i := 0; i < ss.ack; i++ { + msgs[i].Ack() } } // Now get the consumer list info. - cl := js.NewConsumerLister("TEST") - if !cl.Next() { - t.Fatalf("Unexpected error: %v", cl.Err()) + var infos []*nats.ConsumerInfo + for info := range js.ConsumersInfo("TEST") { + infos = append(infos, info) } - p := cl.Page() - if len(p) != 3 { - t.Fatalf("ConsumerInfo expected 3 results, got %d", len(p)) + if len(infos) != 3 { + t.Fatalf("ConsumerInfo expected 3 results, got %d", len(infos)) } - for _, ci := range p { + for _, ci := range infos { switch ci.Name { case "foo": if ci.Delivered.Consumer != 4 { @@ -1825,12 +1801,12 @@ func TestJetStreamClusterExtendedStreamInfo(t *testing.T) { }) // Now do consumer. - sub, err := js.SubscribeSync("foo", nats.Durable("dlc"), nats.Pull(10)) + sub, err := js.PullSubscribe("foo", "dlc") if err != nil { t.Fatalf("Unexpected error: %v", err) } defer sub.Unsubscribe() - checkSubsPending(t, sub, 10) + fetchMsgs(t, sub, 10, 5*time.Second) leader = c.consumerLeader("$G", "TEST", "dlc").Name() ci, err := sub.ConsumerInfo() @@ -1844,11 +1820,14 @@ func TestJetStreamClusterExtendedStreamInfo(t *testing.T) { if len(ci.Cluster.Replicas) != 2 { t.Fatalf("Expected %d replicas, got %d", 2, len(ci.Cluster.Replicas)) } - for _, peer := range ci.Cluster.Replicas { - if !peer.Current { - t.Fatalf("Expected replica to be current: %+v", peer) + checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { + for _, peer := range si.Cluster.Replicas { + if !peer.Current { + return fmt.Errorf("Expected replica to be current: %+v", peer) + } } - } + return nil + }) } func TestJetStreamClusterExtendedStreamInfoSingleReplica(t *testing.T) { @@ -1895,13 +1874,12 @@ func TestJetStreamClusterExtendedStreamInfoSingleReplica(t *testing.T) { } // Make sure we can grab consumer lists from any - cl := js.NewConsumerLister("TEST") - if !cl.Next() { - t.Fatalf("Unexpected error: %v", cl.Err()) + var infos []*nats.ConsumerInfo + for info := range js.ConsumersInfo("TEST") { + infos = append(infos, info) } - p := cl.Page() - if len(p) != 0 { - t.Fatalf("ConsumerInfo expected no paged results, got %d", len(p)) + if len(infos) != 0 { + t.Fatalf("ConsumerInfo expected no paged results, got %d", len(infos)) } // Now add in a consumer. @@ -1910,28 +1888,22 @@ func TestJetStreamClusterExtendedStreamInfoSingleReplica(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - cl = js.NewConsumerLister("TEST") - if !cl.Next() { - t.Fatalf("Unexpected error: %v", cl.Err()) + infos = infos[:0] + for info := range js.ConsumersInfo("TEST") { + infos = append(infos, info) } - p = cl.Page() - if len(p) != 1 { - t.Fatalf("ConsumerInfo expected 1 result, got %d", len(p)) + if len(infos) != 1 { + t.Fatalf("ConsumerInfo expected 1 result, got %d", len(infos)) } // Now do direct names list as well. - resp, err := nc.Request(fmt.Sprintf(JSApiConsumersT, "TEST"), nil, time.Second) - if err != nil { - t.Fatalf("Unexpected error: %v", err) + var names []string + for name := range js.ConsumerNames("TEST") { + names = append(names, name) } - var clResponse JSApiConsumerNamesResponse - if err = json.Unmarshal(resp.Data, &clResponse); err != nil { - t.Fatalf("Unexpected error: %v", err) + if len(names) != 1 { + t.Fatalf("Expected only 1 consumer but got %d", len(names)) } - if len(clResponse.Consumers) != 1 { - t.Fatalf("Expected only 1 consumer but got %d", len(clResponse.Consumers)) - } - } func TestJetStreamClusterInterestRetention(t *testing.T) { @@ -2112,15 +2084,14 @@ func TestJetStreamClusterEphemeralConsumerCleanup(t *testing.T) { } getConsumers := func() []string { - resp, err := nc.Request(fmt.Sprintf(JSApiConsumersT, "foo"), nil, time.Second) - if err != nil { - t.Fatalf("Unexpected error: %v", err) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + var names []string + for name := range js.ConsumerNames("foo", nats.Context(ctx)) { + names = append(names, name) } - var clResponse JSApiConsumerNamesResponse - if err = json.Unmarshal(resp.Data, &clResponse); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - return clResponse.Consumers + return names } checkConsumer := func(expected int) { @@ -2215,7 +2186,7 @@ func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - toSend, batchSize := 200, 50 + toSend := 200 for i := 0; i < toSend; i++ { if _, err = js.Publish("foo", []byte("OK")); err != nil { @@ -2235,25 +2206,16 @@ func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - jsub, err := js.SubscribeSync("foo", nats.Durable("dlc"), nats.Pull(batchSize)) + jsub, err := js.PullSubscribe("foo", "dlc") if err != nil { t.Fatalf("Unexpected error: %v", err) } - checkSubsPending(t, jsub, batchSize) // Ack first 50. - for i := 1; i <= 50; i++ { - m, err := jsub.NextMsg(time.Second) - if err != nil { - t.Fatalf("Unexpected error getting msg %d: %v", i, err) - } + for _, m := range fetchMsgs(t, jsub, 50, 5*time.Second) { m.Ack() } // Now ack every third message for next 50. - for i := 51; i <= 100; i++ { - m, err := jsub.NextMsg(time.Second) - if err != nil { - t.Fatalf("Unexpected error getting msg %d: %v", i, err) - } + for i, m := range fetchMsgs(t, jsub, 50, 5*time.Second) { if i%3 == 0 { m.Ack() } @@ -2411,26 +2373,22 @@ func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) { // Make sure consumer works. // It should pick up with the next delivery spot, so check for that as first message. // We should have all the messages for first delivery delivered. - start := 101 - end := toSend - for i := start; i <= end; i++ { - m, err := jsub.NextMsg(2 * time.Second) - if err != nil { - t.Fatalf("Unexpected error getting msg [%d]: %v", i, err) - } + wantSeq := 101 + for _, m := range fetchMsgs(t, jsub, 100, 5*time.Second) { meta, err := m.MetaData() if err != nil { t.Fatalf("Unexpected error: %v", err) } - if meta.Stream != uint64(i) { - t.Fatalf("Expected stream sequence of %d, but got %d", i, meta.Stream) + if meta.Stream != uint64(wantSeq) { + t.Fatalf("Expected stream sequence of %d, but got %d", wantSeq, meta.Stream) } m.Ack() + wantSeq++ } // Check that redelivered come in now.. redelivered := 50/3 + 1 - checkSubsPending(t, jsub, redelivered) + fetchMsgs(t, jsub, redelivered, 5*time.Second) // Now make sure the other server was properly caughtup. // Need to call this by hand for now. @@ -2660,23 +2618,14 @@ func TestJetStreamClusterAccountInfoAndLimits(t *testing.T) { sendBatch("bar", 75) sendBatch("baz", 10) - accountStats := func() *JetStreamAccountStats { + accountStats := func() *nats.AccountInfo { t.Helper() - resp, err := nc.Request(JSApiAccountInfo, nil, time.Second) + + info, err := js.AccountInfo() if err != nil { t.Fatalf("Unexpected error: %v", err) } - var info JSApiAccountInfoResponse - if err := json.Unmarshal(resp.Data, &info); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if info.Error != nil { - t.Fatalf("Unexpected error: %+v", info.Error) - } - if info.JetStreamAccountStats == nil { - t.Fatalf("AccountStats missing") - } - return info.JetStreamAccountStats + return info } // If subject is not 3 letters or payload not 2 this needs to change. @@ -2947,16 +2896,14 @@ func TestJetStreamClusterExtendedAccountInfo(t *testing.T) { sendBatch("TEST-3", 100) // Go client will lag so use direct for now. - getAccountInfo := func() *JetStreamAccountStats { - resp, err := nc.Request(JSApiAccountInfo, nil, time.Second) + getAccountInfo := func() *nats.AccountInfo { + t.Helper() + + info, err := js.AccountInfo() if err != nil { t.Fatalf("Unexpected error: %v", err) } - var info JSApiAccountInfoResponse - if err := json.Unmarshal(resp.Data, &info); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - return info.JetStreamAccountStats + return info } // Wait to accumulate. @@ -3198,7 +3145,7 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) { }) notAvailableErr := func(err error) bool { - return err != nil && strings.Contains(err.Error(), "unavailable") + return err != nil && (strings.Contains(err.Error(), "unavailable") || err == context.DeadlineExceeded) } checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { @@ -3250,11 +3197,11 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) { t.Fatalf("Expected an 'unavailable' error, got %v", err) } // Listers - if sl := js.NewStreamLister(); sl.Next() || !notAvailableErr(sl.Err()) { - t.Fatalf("Expected an 'unavailable' error, got %v", sl.Err()) + for info := range js.StreamsInfo() { + t.Fatalf("Unexpected stream info, got %v", info) } - if cl := js.NewConsumerLister("NO-Q"); cl.Next() || !notAvailableErr(cl.Err()) { - t.Fatalf("Expected an 'unavailable' error, got %v", cl.Err()) + for info := range js.ConsumersInfo("NO-Q") { + t.Fatalf("Unexpected consumer info, got %v", info) } } @@ -4037,28 +3984,17 @@ func TestJetStreamClusterSuperClusterBasics(t *testing.T) { } // Now check we can place a stream. - // Need to do this by hand for now until Go client catches up. pcn := "C3" - cfg := StreamConfig{ + scResp, err := js.AddStream(&nats.StreamConfig{ Name: "TEST2", - Storage: FileStorage, - Placement: &Placement{Cluster: pcn}, - } - req, err := json.Marshal(cfg) + Placement: &nats.Placement{Cluster: pcn}, + }) if err != nil { t.Fatalf("Unexpected error: %v", err) } - resp, _ := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second) - var scResp JSApiStreamCreateResponse - if err := json.Unmarshal(resp.Data, &scResp); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if scResp.StreamInfo == nil || scResp.Error != nil { - t.Fatalf("Did not receive correct response: %+v", scResp.Error) - } - if scResp.StreamInfo.Cluster.Name != pcn { - t.Fatalf("Expected the stream to be placed in %q, got %q", pcn, scResp.StreamInfo.Cluster.Name) + if scResp.Cluster.Name != pcn { + t.Fatalf("Expected the stream to be placed in %q, got %q", pcn, scResp.Cluster.Name) } } @@ -4079,7 +4015,7 @@ func TestJetStreamClusterSuperClusterCrossClusterConsumerInterest(t *testing.T) } // Pull based first. - sub, err := js.SubscribeSync("foo", nats.Durable("dlc"), nats.Pull(1)) + sub, err := js.PullSubscribe("foo", "dlc") if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -4089,7 +4025,7 @@ func TestJetStreamClusterSuperClusterCrossClusterConsumerInterest(t *testing.T) t.Fatalf("Unexpected publish error: %v", err) } - checkSubsPending(t, sub, 1) + fetchMsgs(t, sub, 1, 5*time.Second) // Now check push based delivery. sub, err = js.SubscribeSync("foo", nats.Durable("rip")) @@ -4629,40 +4565,21 @@ func TestJetStreamCrossAccountMirrorsAndSources(t *testing.T) { } } - nc2, _ := jsClientConnect(t, s) + nc2, js2 := jsClientConnect(t, s) defer nc2.Close() // Have to do this direct until we get Go client support. // Need to match jsClusterMirrorSourceImportsTempl imports. - cfg := StreamConfig{ - Name: "MY_MIRROR_TEST", - Storage: FileStorage, - Mirror: &StreamSource{ + _, err := js2.AddStream(&nats.StreamConfig{ + Name: "MY_MIRROR_TEST", + Mirror: &nats.StreamSource{ Name: "TEST", - External: &ExternalStream{ - ApiPrefix: "RI.JS.API", + External: &nats.ExternalStream{ + APIPrefix: "RI.JS.API", DeliverPrefix: "RI.DELIVER.SYNC.MIRRORS", }, }, - } - - req, err := json.Marshal(cfg) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - resp, err := nc2.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - var scResp JSApiStreamCreateResponse - if err := json.Unmarshal(resp.Data, &scResp); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if scResp.StreamInfo == nil || scResp.Error != nil { - t.Fatalf("Did not receive correct response: %+v", scResp.Error) - } - - js2, err := nc2.JetStream(nats.MaxWait(50 * time.Millisecond)) + }) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -4670,7 +4587,7 @@ func TestJetStreamCrossAccountMirrorsAndSources(t *testing.T) { checkFor(t, 20*time.Second, 500*time.Millisecond, func() error { si, err := js2.StreamInfo("MY_MIRROR_TEST") if err != nil { - t.Fatalf("Could not retrieve stream info") + t.Fatalf("Could not retrieve stream info: %s", err) } if si.State.Msgs != uint64(toSend) { return fmt.Errorf("Expected %d msgs, got state: %+v", toSend, si.State) @@ -4679,35 +4596,21 @@ func TestJetStreamCrossAccountMirrorsAndSources(t *testing.T) { }) // Now do sources as well. - cfg = StreamConfig{ - Name: "MY_SOURCE_TEST", - Storage: FileStorage, - Sources: []*StreamSource{ - &StreamSource{ + _, err = js2.AddStream(&nats.StreamConfig{ + Name: "MY_SOURCE_TEST", + Sources: []*nats.StreamSource{ + &nats.StreamSource{ Name: "TEST", - External: &ExternalStream{ - ApiPrefix: "RI.JS.API", + External: &nats.ExternalStream{ + APIPrefix: "RI.JS.API", DeliverPrefix: "RI.DELIVER.SYNC.SOURCES", }, }, }, - } - - req, err = json.Marshal(cfg) + }) if err != nil { t.Fatalf("Unexpected error: %v", err) } - resp, err = nc2.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - scResp.Error = nil - if err := json.Unmarshal(resp.Data, &scResp); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if scResp.StreamInfo == nil || scResp.Error != nil { - t.Fatalf("Did not receive correct response: %+v", scResp.Error) - } checkFor(t, 20*time.Second, 100*time.Millisecond, func() error { si, err := js2.StreamInfo("MY_SOURCE_TEST") @@ -4835,16 +4738,12 @@ func TestJetStreamClusterJSAPIImport(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } // Names list.. - resp, err := nc.Request(JSApiStreams, nil, time.Second) - if err != nil { - t.Fatalf("Unexpected error: %v", err) + var names []string + for name := range js.StreamNames() { + names = append(names, name) } - var streams JSApiStreamNamesResponse - if err = json.Unmarshal(resp.Data, &streams); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if len(streams.Streams) != 1 { - t.Fatalf("Expected only 1 stream but got %d", len(streams.Streams)) + if len(names) != 1 { + t.Fatalf("Expected only 1 stream but got %d", len(names)) } // Now send to stream. @@ -4852,16 +4751,13 @@ func TestJetStreamClusterJSAPIImport(t *testing.T) { t.Fatalf("Unexpected publish error: %v", err) } - sub, err = js.SubscribeSync("TEST", nats.Durable("tr"), nats.Pull(1)) + sub, err = js.PullSubscribe("TEST", "tr") if err != nil { t.Fatalf("Unexpected error: %v", err) } - checkSubsPending(t, sub, 1) + msgs := fetchMsgs(t, sub, 1, 5*time.Second) - m, err := sub.NextMsg(0) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } + m := msgs[0] if m.Subject != "TEST" { t.Fatalf("Expected subject of %q, got %q", "TEST", m.Subject) } @@ -5757,6 +5653,18 @@ func checkSubsPending(t *testing.T, sub *nats.Subscription, numExpected int) { }) } +func fetchMsgs(t *testing.T, sub *nats.Subscription, numExpected int, wait time.Duration) []*nats.Msg { + t.Helper() + msgs, err := sub.Fetch(numExpected, nats.MaxWait(wait)) + if err != nil { + t.Fatal(err) + } + if len(msgs) != numExpected { + t.Fatalf("Unexpected msg count, got %d, want %d", len(msgs), numExpected) + } + return msgs +} + func (c *cluster) restartServer(rs *Server) *Server { c.t.Helper() index := -1 diff --git a/server/jetstream_test.go b/server/jetstream_test.go index a64cc3c9..d3d40cb6 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -10849,54 +10849,40 @@ func TestJetStreamMirrorBasics(t *testing.T) { nc, js := jsClientConnect(t, s) defer nc.Close() - createStream := func(cfg *StreamConfig) *JSApiStreamCreateResponse { - t.Helper() - req, err := json.Marshal(cfg) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - rm, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - var resp JSApiStreamCreateResponse - if err := json.Unmarshal(rm.Data, &resp); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - return &resp + createStream := func(cfg *nats.StreamConfig) (*nats.StreamInfo, error) { + return js.AddStream(cfg) } - createStreamOk := func(cfg *StreamConfig) { + createStreamOk := func(cfg *nats.StreamConfig) { t.Helper() - if scr := createStream(cfg); scr.Error != nil { - t.Fatalf("Expected error, got %+v", scr.Error) + if _, err := createStream(cfg); err != nil { + t.Fatalf("Expected error, got %+v", err) } } // Test we get right config errors etc. - cfg := &StreamConfig{ + cfg := &nats.StreamConfig{ Name: "M1", - Storage: FileStorage, Subjects: []string{"foo", "bar", "baz"}, - Mirror: &StreamSource{Name: "S1"}, + Mirror: &nats.StreamSource{Name: "S1"}, } - - scr := createStream(cfg) - if scr.Error == nil || !strings.Contains(scr.Error.Description, "stream mirrors can not") { - t.Fatalf("Expected error, got %+v", scr.Error) + _, err := createStream(cfg) + if err == nil || !strings.Contains(err.Error(), "stream mirrors can not") { + t.Fatalf("Expected error, got %+v", err) } // Clear subjects. cfg.Subjects = nil // Source - scfg := &StreamConfig{ + scfg := &nats.StreamConfig{ Name: "S1", - Storage: FileStorage, Subjects: []string{"foo", "bar", "baz"}, } + // Create source stream createStreamOk(scfg) + // Now create our mirror stream. createStreamOk(cfg) @@ -10915,7 +10901,7 @@ func TestJetStreamMirrorBasics(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + checkFor(t, 5*time.Second, 250*time.Millisecond, func() error { si, err := js2.StreamInfo("M1") if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -10937,16 +10923,16 @@ func TestJetStreamMirrorBasics(t *testing.T) { } } - cfg = &StreamConfig{ + cfg = &nats.StreamConfig{ Name: "M2", - Storage: FileStorage, - Mirror: &StreamSource{Name: "S1"}, + Storage: nats.FileStorage, + Mirror: &nats.StreamSource{Name: "S1"}, } // Now create our second mirror stream. createStreamOk(cfg) - checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + checkFor(t, 5*time.Second, 250*time.Millisecond, func() error { si, err := js2.StreamInfo("M2") if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -10967,15 +10953,14 @@ func TestJetStreamMirrorBasics(t *testing.T) { } } - cfg = &StreamConfig{ - Name: "M3", - Storage: FileStorage, - Mirror: &StreamSource{Name: "S1", OptStartSeq: 150}, + cfg = &nats.StreamConfig{ + Name: "M3", + Mirror: &nats.StreamSource{Name: "S1", OptStartSeq: 150}, } createStreamOk(cfg) - checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + checkFor(t, 5*time.Second, 250*time.Millisecond, func() error { si, err := js2.StreamInfo("M3") if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -10991,14 +10976,13 @@ func TestJetStreamMirrorBasics(t *testing.T) { // Make sure setting time works ok. start := time.Now().UTC().Add(-2 * time.Hour) - cfg = &StreamConfig{ - Name: "M4", - Storage: FileStorage, - Mirror: &StreamSource{Name: "S1", OptStartTime: &start}, + cfg = &nats.StreamConfig{ + Name: "M4", + Mirror: &nats.StreamSource{Name: "S1", OptStartTime: &start}, } createStreamOk(cfg) - checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + checkFor(t, 5*time.Second, 250*time.Millisecond, func() error { si, err := js2.StreamInfo("M4") if err != nil { t.Fatalf("Unexpected error: %v", err) diff --git a/server/norace_test.go b/server/norace_test.go index 47b73778..615ece3f 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -1457,22 +1457,10 @@ func TestNoRaceJetStreamClusterSuperClusterMirrors(t *testing.T) { } // Needed while Go client does not have mirror support. - createStream := func(cfg *StreamConfig) { + createStream := func(cfg *nats.StreamConfig) { t.Helper() - req, err := json.Marshal(cfg) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - rm, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - var resp JSApiStreamCreateResponse - if err := json.Unmarshal(rm.Data, &resp); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if resp.Error != nil { - t.Fatalf("Unexpected error: %+v", resp.Error) + if _, err := js.AddStream(cfg); err != nil { + t.Fatalf("Unexpected error: %+v", err) } } @@ -1483,11 +1471,10 @@ func TestNoRaceJetStreamClusterSuperClusterMirrors(t *testing.T) { } } - createStream(&StreamConfig{ + createStream(&nats.StreamConfig{ Name: "M1", - Storage: FileStorage, - Mirror: &StreamSource{Name: "S1"}, - Placement: &Placement{Cluster: "C1"}, + Mirror: &nats.StreamSource{Name: "S1"}, + Placement: &nats.Placement{Cluster: "C1"}, }) // Faster timeout since we loop below checking for condition. @@ -1517,12 +1504,11 @@ func TestNoRaceJetStreamClusterSuperClusterMirrors(t *testing.T) { } } - createStream(&StreamConfig{ + createStream(&nats.StreamConfig{ Name: "M2", - Storage: FileStorage, - Mirror: &StreamSource{Name: "S1"}, + Mirror: &nats.StreamSource{Name: "S1"}, Replicas: 3, - Placement: &Placement{Cluster: "C3"}, + Placement: &nats.Placement{Cluster: "C3"}, }) checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { @@ -1609,32 +1595,19 @@ func TestNoRaceJetStreamClusterSuperClusterSources(t *testing.T) { sendBatch("baz", 25) // Needed while Go client does not have mirror support for creating mirror or source streams. - createStream := func(cfg *StreamConfig) { + createStream := func(cfg *nats.StreamConfig) { t.Helper() - req, err := json.Marshal(cfg) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - rm, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - var resp JSApiStreamCreateResponse - if err := json.Unmarshal(rm.Data, &resp); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if resp.Error != nil { - t.Fatalf("Unexpected error: %+v", resp.Error) + if _, err := js.AddStream(cfg); err != nil { + t.Fatalf("Unexpected error: %+v", err) } } - cfg := &StreamConfig{ - Name: "MS", - Storage: FileStorage, - Sources: []*StreamSource{ - &StreamSource{Name: "foo"}, - &StreamSource{Name: "bar"}, - &StreamSource{Name: "baz"}, + cfg := &nats.StreamConfig{ + Name: "MS", + Sources: []*nats.StreamSource{ + {Name: "foo"}, + {Name: "bar"}, + {Name: "baz"}, }, } @@ -1673,16 +1646,15 @@ func TestNoRaceJetStreamClusterSuperClusterSources(t *testing.T) { sendBatch("bar", 15) sendBatch("baz", 25) - cfg = &StreamConfig{ - Name: "MS2", - Storage: FileStorage, - Sources: []*StreamSource{ - &StreamSource{Name: "foo"}, - &StreamSource{Name: "bar"}, - &StreamSource{Name: "baz"}, + cfg = &nats.StreamConfig{ + Name: "MS2", + Sources: []*nats.StreamSource{ + {Name: "foo"}, + {Name: "bar"}, + {Name: "baz"}, }, Replicas: 3, - Placement: &Placement{Cluster: "C3"}, + Placement: &nats.Placement{Cluster: "C3"}, } createStream(cfg) diff --git a/vendor/github.com/nats-io/nats.go/.travis.yml b/vendor/github.com/nats-io/nats.go/.travis.yml index f16a2c1b..bcdbf44f 100644 --- a/vendor/github.com/nats-io/nats.go/.travis.yml +++ b/vendor/github.com/nats-io/nats.go/.travis.yml @@ -1,7 +1,7 @@ language: go go: +- 1.16.x - 1.15.x -- 1.14.x go_import_path: github.com/nats-io/nats.go install: - go get -t ./... @@ -17,4 +17,4 @@ before_script: script: - go test -i -race ./... - go test -v -run=TestNoRace -p=1 ./... -- if [[ "$TRAVIS_GO_VERSION" =~ 1.15 ]]; then ./scripts/cov.sh TRAVIS; else go test -race -v -p=1 ./... --failfast; fi +- if [[ "$TRAVIS_GO_VERSION" =~ 1.16 ]]; then ./scripts/cov.sh TRAVIS; else go test -race -v -p=1 ./... --failfast; fi diff --git a/vendor/github.com/nats-io/nats.go/go.mod b/vendor/github.com/nats-io/nats.go/go.mod index b60123e7..91b5659d 100644 --- a/vendor/github.com/nats-io/nats.go/go.mod +++ b/vendor/github.com/nats-io/nats.go/go.mod @@ -4,8 +4,8 @@ go 1.15 require ( github.com/golang/protobuf v1.4.2 - github.com/nats-io/nats-server/v2 v2.1.8-0.20210227190344-51550e242af8 - github.com/nats-io/nkeys v0.2.0 + github.com/nats-io/nats-server/v2 v2.2.1-0.20210327180151-03aee09847d0 + github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 google.golang.org/protobuf v1.23.0 ) diff --git a/vendor/github.com/nats-io/nats.go/go.sum b/vendor/github.com/nats-io/nats.go/go.sum index a2f355de..109fb481 100644 --- a/vendor/github.com/nats-io/nats.go/go.sum +++ b/vendor/github.com/nats-io/nats.go/go.sum @@ -9,55 +9,66 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/klauspost/compress v1.11.7 h1:0hzRabrMN4tSTvMfnL3SCv1ZGeAP23ynzodBgaHeMeg= github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= -github.com/minio/highwayhash v1.0.0 h1:iMSDhgUILCr0TNm8LWlSjF8N0ZIj2qbO8WHp6Q/J2BA= +github.com/klauspost/compress v1.11.12 h1:famVnQVu7QwryBN4jNseQdUKES71ZAOnB6UQQJPZvqk= +github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/minio/highwayhash v1.0.0/go.mod h1:xQboMTeM9nY9v/LlAOxFctujiv5+Aq2hR5dxBpaMbdc= +github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0= +github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M= -github.com/nats-io/jwt v1.1.0 h1:+vOlgtM0ZsF46GbmUoadq0/2rChNS45gtxHEa3H1gqM= github.com/nats-io/jwt v1.1.0/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M= +github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= +github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= github.com/nats-io/jwt/v2 v2.0.0-20200916203241-1f8ce17dff02/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ= github.com/nats-io/jwt/v2 v2.0.0-20201015190852-e11ce317263c/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ= -github.com/nats-io/jwt/v2 v2.0.0-20210125223648-1c24d462becc h1:pu+s4XC+bYnI0iD2vDtOl83zjCYUau/q6c83pEvsGZc= github.com/nats-io/jwt/v2 v2.0.0-20210125223648-1c24d462becc/go.mod h1:PuO5FToRL31ecdFqVjc794vK0Bj0CwzveQEDvkb7MoQ= -github.com/nats-io/jwt/v2 v2.0.0-20210208203759-ff814ca5f813 h1:km4lLzT86NyJRhO++VqfP/vn5cbfm+E05i2bGdqDbrY= github.com/nats-io/jwt/v2 v2.0.0-20210208203759-ff814ca5f813/go.mod h1:PuO5FToRL31ecdFqVjc794vK0Bj0CwzveQEDvkb7MoQ= +github.com/nats-io/jwt/v2 v2.0.1 h1:SycklijeduR742i/1Y3nRhURYM7imDzZZ3+tuAQqhQA= +github.com/nats-io/jwt/v2 v2.0.1/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY= github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093/go.mod h1:rQnBf2Rv4P9adtAs/Ti6LfFmVtFG6HLhl/H7cVshcJU= github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71/go.mod h1:Nan/1L5Sa1JRW+Thm4HNYcIDcVRFc5zK9OpSZeI2kk4= github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad/go.mod h1:TkHpUIDETmTI7mrHN40D1pzxfzHZuGmtMbtb83TGVQw= github.com/nats-io/nats-server/v2 v2.1.8-0.20201129161730-ebe63db3e3ed/go.mod h1:XD0zHR/jTXdZvWaQfS5mQgsXj6x12kMjKLyAk/cOGgY= -github.com/nats-io/nats-server/v2 v2.1.8-0.20210205154825-f7ab27f7dad4 h1:GStuc0W1rK45FSlpt3+7UTLzmRys2/6WSDuJFyzZ6Xg= github.com/nats-io/nats-server/v2 v2.1.8-0.20210205154825-f7ab27f7dad4/go.mod h1:kauGd7hB5517KeSqspW2U1Mz/jhPbTrE8eOXzUPk1m0= -github.com/nats-io/nats-server/v2 v2.1.8-0.20210227190344-51550e242af8 h1:jPZZofsCevE2oJl3YexVw3drWOFdo8H4AWMb/1WcVoc= github.com/nats-io/nats-server/v2 v2.1.8-0.20210227190344-51550e242af8/go.mod h1:/QQ/dpqFavkNhVnjvMILSQ3cj5hlmhB66adlgNbjuoA= +github.com/nats-io/nats-server/v2 v2.2.1-0.20210327180151-03aee09847d0 h1:ybeT5VFA73CVQb4rCL+48+up91xWheriSBbJ3M2Pzps= +github.com/nats-io/nats-server/v2 v2.2.1-0.20210327180151-03aee09847d0/go.mod h1:eKlAaGmSQHZMFQA6x56AaP5/Bl9N3mWF4awyT2TTpzc= github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE= github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I= github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4= github.com/nats-io/nats.go v1.10.1-0.20201021145452-94be476ad6e0/go.mod h1:VU2zERjp8xmF+Lw2NH4u2t5qWZxwc7jB3+7HVMWQXPI= github.com/nats-io/nats.go v1.10.1-0.20210127212649-5b4924938a9a/go.mod h1:Sa3kLIonafChP5IF0b55i9uvGR10I3hPETFbi4+9kOI= github.com/nats-io/nats.go v1.10.1-0.20210211000709-75ded9c77585/go.mod h1:uBWnCKg9luW1g7hgzPxUjHFRI40EuTSX7RCzgnc74Jk= +github.com/nats-io/nats.go v1.10.1-0.20210228004050-ed743748acac/go.mod h1:hxFvLNbNmT6UppX5B5Tr/r3g+XSwGjJzFn6mxPNJEHc= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= -github.com/nats-io/nkeys v0.2.0 h1:WXKF7diOaPU9cJdLD7nuzwasQy9vT1tBqzXZZf3AMJM= github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= +github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= +github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 h1:pLI5jrR7OSLijeIDcmRxNmw2api+jEfxLoykJVice/E= golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as= +golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 h1:nxC68pudNYkKU6jWhgrqdreuFiOQWj1Fs7T3VrH4Pjw= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI= golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= diff --git a/vendor/github.com/nats-io/nats.go/js.go b/vendor/github.com/nats-io/nats.go/js.go index 429959d9..58545765 100644 --- a/vendor/github.com/nats-io/nats.go/js.go +++ b/vendor/github.com/nats-io/nats.go/js.go @@ -1,4 +1,4 @@ -// Copyright 2020 The NATS Authors +// Copyright 2020-2021 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -16,14 +16,18 @@ package nats import ( "bytes" "context" + "crypto/sha256" "encoding/json" "errors" "fmt" "net/http" "strconv" "strings" + "sync" "sync/atomic" "time" + + "github.com/nats-io/nuid" ) // Request API subjects for JetStream. @@ -52,6 +56,9 @@ const ( // apiConsumerListT is used to return all detailed consumer information apiConsumerListT = "CONSUMER.LIST.%s" + // apiConsumerNamesT is used to return a list with all consumer names for the stream. + apiConsumerNamesT = "CONSUMER.NAMES.%s" + // apiStreams can lookup a stream by subject. apiStreams = "STREAM.NAMES" @@ -80,14 +87,28 @@ const ( apiMsgDeleteT = "STREAM.MSG.DELETE.%s" ) -// JetStream is the public interface for JetStream. +// JetStream allows persistent messaging through JetStream. type JetStream interface { // Publish publishes a message to JetStream. Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error) - // Publish publishes a Msg to JetStream. + // PublishMsg publishes a Msg to JetStream. PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) + // PublishAsync publishes a message to JetStream and returns a PubAckFuture. + // The data should not be changed until the PubAckFuture has been processed. + PublishAsync(subj string, data []byte, opts ...PubOpt) (*PubAckFuture, error) + + // PublishMsgAsync publishes a Msg to JetStream and returms a PubAckFuture. + // The message should not be changed until the PubAckFuture has been processed. + PublishMsgAsync(m *Msg, opts ...PubOpt) (*PubAckFuture, error) + + // PublishAsyncPending returns the number of async publishes outstanding for this context. + PublishAsyncPending() int + + // PublishAsyncComplete returns a channel that will be closed when all outstanding messages are ack'd. + PublishAsyncComplete() <-chan struct{} + // Subscribe creates an async Subscription for JetStream. Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) @@ -102,9 +123,12 @@ type JetStream interface { // QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously. QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error) + + // PullSubscribe creates a Subscription that can fetch messages. + PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) } -// JetStreamContext is the public interface for JetStream. +// JetStreamContext allows JetStream messaging and stream management. type JetStreamContext interface { JetStream JetStreamManager @@ -112,56 +136,88 @@ type JetStreamContext interface { // js is an internal struct from a JetStreamContext. type js struct { - nc *Conn + nc *Conn + opts *jsOpts + + // For async publish context. + mu sync.RWMutex + rpre string + rsub *Subscription + pafs map[string]*PubAckFuture + stc chan struct{} + dch chan struct{} +} + +type jsOpts struct { + ctx context.Context // For importing JetStream from other accounts. pre string // Amount of time to wait for API requests. wait time.Duration - // Signals only direct access and no API access. - direct bool + // For async publish error handling. + aecb MsgErrHandler + // Maximum in flight. + maxap int } -const defaultRequestWait = 5 * time.Second +const ( + defaultRequestWait = 5 * time.Second + defaultAccountCheck = 20 * time.Second +) -// JetStream returns a JetStream context for pub/sub interactions. +// JetStream returns a JetStreamContext for messaging and stream management. func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) { - js := &js{nc: nc, pre: defaultAPIPrefix, wait: defaultRequestWait} + js := &js{ + nc: nc, + opts: &jsOpts{ + pre: defaultAPIPrefix, + wait: defaultRequestWait, + }, + } for _, opt := range opts { - if err := opt.configureJSContext(js); err != nil { + if err := opt.configureJSContext(js.opts); err != nil { return nil, err } } - if js.direct { - return js, nil + // If we have check recently we can avoid another account lookup here. + // We want these to be lighweight and created at will. + nc.mu.Lock() + now := time.Now() + checkAccount := now.Sub(nc.jsLastCheck) > defaultAccountCheck + if checkAccount { + nc.jsLastCheck = now } + nc.mu.Unlock() - if _, err := js.AccountInfo(); err != nil { - if err == ErrNoResponders { - err = ErrJetStreamNotEnabled + if checkAccount { + if _, err := js.AccountInfo(); err != nil { + if err == ErrNoResponders { + err = ErrJetStreamNotEnabled + } + return nil, err } - return nil, err } return js, nil } -// JSOpt configures a JetStream context. +// JSOpt configures a JetStreamContext. type JSOpt interface { - configureJSContext(opts *js) error + configureJSContext(opts *jsOpts) error } -// jsOptFn configures an option for the JetStream context. -type jsOptFn func(opts *js) error +// jsOptFn configures an option for the JetStreamContext. +type jsOptFn func(opts *jsOpts) error -func (opt jsOptFn) configureJSContext(opts *js) error { +func (opt jsOptFn) configureJSContext(opts *jsOpts) error { return opt(opts) } // APIPrefix changes the default prefix used for the JetStream API. func APIPrefix(pre string) JSOpt { - return jsOptFn(func(js *js) error { + return jsOptFn(func(js *jsOpts) error { js.pre = pre if !strings.HasSuffix(js.pre, ".") { js.pre = js.pre + "." @@ -170,20 +226,12 @@ func APIPrefix(pre string) JSOpt { }) } -// DirectOnly makes a JetStream context avoid using the JetStream API altogether. -func DirectOnly() JSOpt { - return jsOptFn(func(js *js) error { - js.direct = true - return nil - }) -} - func (js *js) apiSubj(subj string) string { - if js.pre == _EMPTY_ { + if js.opts.pre == _EMPTY_ { return subj } var b strings.Builder - b.WriteString(js.pre) + b.WriteString(js.opts.pre) b.WriteString(subj) return b.String() } @@ -209,7 +257,7 @@ type pubOpts struct { seq uint64 // Expected last sequence } -// pubAckResponse is the ack response from the JetStream API when of publishing a message. +// pubAckResponse is the ack response from the JetStream API when publishing a message. type pubAckResponse struct { apiResponse *PubAck @@ -248,7 +296,7 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) { return nil, ErrContextAndTimeout } if o.ttl == 0 && o.ctx == nil { - o.ttl = js.wait + o.ttl = js.opts.wait } if o.id != _EMPTY_ { @@ -284,7 +332,7 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) { return nil, ErrInvalidJSAck } if pa.Error != nil { - return nil, errors.New(pa.Error.Description) + return nil, fmt.Errorf("nats: %s", pa.Error.Description) } if pa.PubAck == nil || pa.PubAck.Stream == _EMPTY_ { return nil, ErrInvalidJSAck @@ -297,6 +345,306 @@ func (js *js) Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error) return js.PublishMsg(&Msg{Subject: subj, Data: data}, opts...) } +// PubAckFuture is a future for a PubAck. +type PubAckFuture struct { + js *js + msg *Msg + pa *PubAck + st time.Time + err error + errCh chan error + doneCh chan *PubAck +} + +func (paf *PubAckFuture) Ok() <-chan *PubAck { + paf.js.mu.Lock() + defer paf.js.mu.Unlock() + + if paf.doneCh == nil { + paf.doneCh = make(chan *PubAck, 1) + if paf.pa != nil { + paf.doneCh <- paf.pa + } + } + + return paf.doneCh +} + +func (paf *PubAckFuture) Err() <-chan error { + paf.js.mu.Lock() + defer paf.js.mu.Unlock() + + if paf.errCh == nil { + paf.errCh = make(chan error, 1) + if paf.err != nil { + paf.errCh <- paf.err + } + } + + return paf.errCh +} + +func (paf *PubAckFuture) Msg() *Msg { + paf.js.mu.RLock() + defer paf.js.mu.RUnlock() + return paf.msg +} + +// For quick token lookup etc. +const aReplyPreLen = 14 +const aReplyTokensize = 6 + +func (js *js) newAsyncReply() string { + js.mu.Lock() + if js.rsub == nil { + // Create our wildcard reply subject. + sha := sha256.New() + sha.Write([]byte(nuid.Next())) + b := sha.Sum(nil) + for i := 0; i < aReplyTokensize; i++ { + b[i] = rdigits[int(b[i]%base)] + } + js.rpre = fmt.Sprintf("%s%s.", InboxPrefix, b[:aReplyTokensize]) + sub, err := js.nc.Subscribe(fmt.Sprintf("%s*", js.rpre), js.handleAsyncReply) + if err != nil { + js.mu.Unlock() + return _EMPTY_ + } + js.rsub = sub + } + var sb strings.Builder + sb.WriteString(js.rpre) + rn := js.nc.respRand.Int63() + var b [aReplyTokensize]byte + for i, l := 0, rn; i < len(b); i++ { + b[i] = rdigits[l%base] + l /= base + } + sb.Write(b[:]) + js.mu.Unlock() + return sb.String() +} + +// registerPAF will register for a PubAckFuture. +func (js *js) registerPAF(id string, paf *PubAckFuture) (int, int) { + js.mu.Lock() + if js.pafs == nil { + js.pafs = make(map[string]*PubAckFuture) + } + paf.js = js + js.pafs[id] = paf + np := len(js.pafs) + maxap := js.opts.maxap + js.mu.Unlock() + return np, maxap +} + +// Lock should be held. +func (js *js) getPAF(id string) *PubAckFuture { + if js.pafs == nil { + return nil + } + return js.pafs[id] +} + +// clearPAF will remove a PubAckFuture that was registered. +func (js *js) clearPAF(id string) { + js.mu.Lock() + delete(js.pafs, id) + js.mu.Unlock() +} + +// PublishAsyncPending returns how many PubAckFutures are pending. +func (js *js) PublishAsyncPending() int { + js.mu.RLock() + defer js.mu.RUnlock() + return len(js.pafs) +} + +func (js *js) asyncStall() <-chan struct{} { + js.mu.Lock() + if js.stc == nil { + js.stc = make(chan struct{}) + } + stc := js.stc + js.mu.Unlock() + return stc +} + +// Handle an async reply from PublishAsync. +func (js *js) handleAsyncReply(m *Msg) { + if len(m.Subject) <= aReplyPreLen { + return + } + id := m.Subject[aReplyPreLen:] + + js.mu.Lock() + paf := js.getPAF(id) + if paf == nil { + js.mu.Unlock() + return + } + // Remove + delete(js.pafs, id) + + // Check on anyone stalled and waiting. + if js.stc != nil && len(js.pafs) < js.opts.maxap { + close(js.stc) + js.stc = nil + } + // Check on anyone one waiting on done status. + if js.dch != nil && len(js.pafs) == 0 { + dch := js.dch + js.dch = nil + // Defer here so error is processed and can be checked. + defer close(dch) + } + + doErr := func(err error) { + paf.err = err + if paf.errCh != nil { + paf.errCh <- paf.err + } + cb := js.opts.aecb + js.mu.Unlock() + if cb != nil { + cb(paf.js, paf.msg, err) + } + } + + // Process no responders etc. + if len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders { + doErr(ErrNoResponders) + return + } + + var pa pubAckResponse + if err := json.Unmarshal(m.Data, &pa); err != nil { + doErr(ErrInvalidJSAck) + return + } + if pa.Error != nil { + doErr(fmt.Errorf("nats: %s", pa.Error.Description)) + return + } + if pa.PubAck == nil || pa.PubAck.Stream == _EMPTY_ { + doErr(ErrInvalidJSAck) + return + } + + // So here we have received a proper puback. + paf.pa = pa.PubAck + if paf.doneCh != nil { + paf.doneCh <- paf.pa + } + js.mu.Unlock() +} + +// MsgErrHandler is used to process asynchronous errors from +// JetStream PublishAsync and PublishAsynMsg. It will return the original +// message sent to the server for possible retransmitting and the error encountered. +type MsgErrHandler func(JetStream, *Msg, error) + +// PublishAsyncErrHandler sets the error handler for async publishes in JetStream. +func PublishAsyncErrHandler(cb MsgErrHandler) JSOpt { + return jsOptFn(func(js *jsOpts) error { + js.aecb = cb + return nil + }) +} + +// PublishAsyncMaxPending sets the maximum outstanding async publishes that can be inflight at one time. +func PublishAsyncMaxPending(max int) JSOpt { + return jsOptFn(func(js *jsOpts) error { + if max < 1 { + return errors.New("nats: max ack pending should be >= 1") + } + js.maxap = max + return nil + }) +} + +// PublishAsync publishes a message to JetStream and returns a PubAckFuture +func (js *js) PublishAsync(subj string, data []byte, opts ...PubOpt) (*PubAckFuture, error) { + return js.PublishMsgAsync(&Msg{Subject: subj, Data: data}, opts...) +} + +func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (*PubAckFuture, error) { + var o pubOpts + if len(opts) > 0 { + if m.Header == nil { + m.Header = http.Header{} + } + for _, opt := range opts { + if err := opt.configurePublish(&o); err != nil { + return nil, err + } + } + } + + // Timeouts and contexts do not make sense for these. + if o.ttl != 0 || o.ctx != nil { + return nil, ErrContextAndTimeout + } + + // FIXME(dlc) - Make common. + if o.id != _EMPTY_ { + m.Header.Set(MsgIdHdr, o.id) + } + if o.lid != _EMPTY_ { + m.Header.Set(ExpectedLastMsgIdHdr, o.lid) + } + if o.str != _EMPTY_ { + m.Header.Set(ExpectedStreamHdr, o.str) + } + if o.seq > 0 { + m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(o.seq, 10)) + } + + // Reply + if m.Reply != _EMPTY_ { + return nil, errors.New("nats: reply subject should be empty") + } + m.Reply = js.newAsyncReply() + if m.Reply == _EMPTY_ { + return nil, errors.New("nats: error creating async reply handler") + } + id := m.Reply[aReplyPreLen:] + paf := &PubAckFuture{msg: m, st: time.Now()} + numPending, maxPending := js.registerPAF(id, paf) + + if maxPending > 0 && numPending >= maxPending { + select { + case <-js.asyncStall(): + case <-time.After(200 * time.Millisecond): + js.clearPAF(id) + return nil, errors.New("nats: stalled with too many outstanding async published messages") + } + } + + if err := js.nc.PublishMsg(m); err != nil { + js.clearPAF(id) + return nil, err + } + + return paf, nil +} + +// PublishAsyncComplete returns a channel that will be closed when all outstanding messages have been ack'd. +func (js *js) PublishAsyncComplete() <-chan struct{} { + js.mu.Lock() + defer js.mu.Unlock() + if js.dch == nil { + js.dch = make(chan struct{}) + } + dch := js.dch + if len(js.pafs) == 0 { + close(js.dch) + js.dch = nil + } + return dch +} + // MsgId sets the message ID used for de-duplication. func MsgId(id string) PubOpt { return pubOptFn(func(opts *pubOpts) error { @@ -332,13 +680,26 @@ func ExpectLastMsgId(id string) PubOpt { // MaxWait sets the maximum amount of time we will wait for a response. type MaxWait time.Duration -func (ttl MaxWait) configurePublish(opts *pubOpts) error { +func (ttl MaxWait) configureJSContext(js *jsOpts) error { + js.wait = time.Duration(ttl) + return nil +} + +func (ttl MaxWait) configurePull(opts *pullOpts) error { opts.ttl = time.Duration(ttl) return nil } -func (ttl MaxWait) configureJSContext(js *js) error { - js.wait = time.Duration(ttl) +// AckWait sets the maximum amount of time we will wait for an ack. +type AckWait time.Duration + +func (ttl AckWait) configurePublish(opts *pubOpts) error { + opts.ttl = time.Duration(ttl) + return nil +} + +func (ttl AckWait) configureSubscribe(opts *subOpts) error { + opts.cfg.AckWait = time.Duration(ttl) return nil } @@ -347,11 +708,21 @@ type ContextOpt struct { context.Context } +func (ctx ContextOpt) configureJSContext(opts *jsOpts) error { + opts.ctx = ctx + return nil +} + func (ctx ContextOpt) configurePublish(opts *pubOpts) error { opts.ctx = ctx return nil } +func (ctx ContextOpt) configurePull(opts *pullOpts) error { + opts.ctx = ctx + return nil +} + // Context returns an option that can be used to configure a context. func Context(ctx context.Context) ContextOpt { return ContextOpt{ctx} @@ -400,9 +771,9 @@ type SequencePair struct { // nextRequest is for getting next messages for pull based consumers from JetStream. type nextRequest struct { - Expires *time.Time `json:"expires,omitempty"` - Batch int `json:"batch,omitempty"` - NoWait bool `json:"no_wait,omitempty"` + Expires time.Duration `json:"expires,omitempty"` + Batch int `json:"batch,omitempty"` + NoWait bool `json:"no_wait,omitempty"` } // jsSub includes JetStream subscription info. @@ -411,7 +782,7 @@ type jsSub struct { consumer string stream string deliver string - pull int + pull bool durable bool attached bool } @@ -422,13 +793,7 @@ func (jsi *jsSub) unsubscribe(drainMode bool) error { // consumers when using drain mode. return nil } - - // Skip if in direct mode as well. js := jsi.js - if js.direct { - return nil - } - return js.DeleteConsumer(jsi.stream, jsi.consumer) } @@ -446,6 +811,9 @@ func (opt subOptFn) configureSubscribe(opts *subOpts) error { // Subscribe will create a subscription to the appropriate stream and consumer. func (js *js) Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) { + if cb == nil { + return nil, ErrBadSubscription + } return js.subscribe(subj, _EMPTY_, cb, nil, opts) } @@ -457,6 +825,9 @@ func (js *js) SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error) // QueueSubscribe will create a subscription to the appropriate stream and consumer with queue semantics. func (js *js) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) { + if cb == nil { + return nil, ErrBadSubscription + } return js.subscribe(subj, queue, cb, nil, opts) } @@ -471,6 +842,11 @@ func (js *js) ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscri return js.subscribe(subj, _EMPTY_, nil, ch, opts) } +// PullSubscribe creates a pull subscriber. +func (js *js) PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) { + return js.subscribe(subj, _EMPTY_, nil, nil, append(opts, Durable(durable))) +} + func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []SubOpt) (*Subscription, error) { cfg := ConsumerConfig{AckPolicy: ackPolicyNotSet} o := subOpts{cfg: &cfg} @@ -482,9 +858,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts [] } } - isPullMode := o.pull > 0 - if cb != nil && isPullMode { - return nil, ErrPullModeNotAllowed + isPullMode := ch == nil && cb == nil + badPullAck := o.cfg.AckPolicy == AckNonePolicy || o.cfg.AckPolicy == AckAllPolicy + if isPullMode && badPullAck { + return nil, fmt.Errorf("nats: invalid ack mode for pull consumers: %s", o.cfg.AckPolicy) } var ( @@ -495,62 +872,53 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts [] attached bool stream = o.stream consumer = o.consumer - requiresAPI = (stream == _EMPTY_ && consumer == _EMPTY_) && o.cfg.DeliverSubject == _EMPTY_ ) - if js.direct && requiresAPI { - return nil, ErrDirectModeRequired - } - - if js.direct { - if o.cfg.DeliverSubject != _EMPTY_ { - deliver = o.cfg.DeliverSubject - } else { - deliver = NewInbox() - } - } else { - // Find the stream mapped to the subject. + // Find the stream mapped to the subject if not bound to a stream already. + if o.stream == _EMPTY_ { stream, err = js.lookupStreamBySubject(subj) if err != nil { return nil, err } + } else { + stream = o.stream + } - // With an explicit durable name, then can lookup - // the consumer to which it should be attaching to. - var info *ConsumerInfo - consumer = o.cfg.Durable - if consumer != _EMPTY_ { - // Only create in case there is no consumer already. - info, err = js.ConsumerInfo(stream, consumer) - if err != nil && err.Error() != `consumer not found` { - return nil, err - } + // With an explicit durable name, then can lookup + // the consumer to which it should be attaching to. + var info *ConsumerInfo + consumer = o.cfg.Durable + if consumer != _EMPTY_ { + // Only create in case there is no consumer already. + info, err = js.ConsumerInfo(stream, consumer) + if err != nil && err.Error() != "nats: consumer not found" { + return nil, err + } + } + + if info != nil { + // Attach using the found consumer config. + ccfg = &info.Config + attached = true + + // Make sure this new subject matches or is a subset. + if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject { + return nil, ErrSubjectMismatch } - if info != nil { - // Attach using the found consumer config. - ccfg = &info.Config - attached = true - - // Make sure this new subject matches or is a subset. - if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject { - return nil, ErrSubjectMismatch - } - - if ccfg.DeliverSubject != _EMPTY_ { - deliver = ccfg.DeliverSubject - } else { - deliver = NewInbox() - } + if ccfg.DeliverSubject != _EMPTY_ { + deliver = ccfg.DeliverSubject } else { - shouldCreate = true deliver = NewInbox() - if !isPullMode { - cfg.DeliverSubject = deliver - } - // Do filtering always, server will clear as needed. - cfg.FilterSubject = subj } + } else { + shouldCreate = true + deliver = NewInbox() + if !isPullMode { + cfg.DeliverSubject = deliver + } + // Do filtering always, server will clear as needed. + cfg.FilterSubject = subj } var sub *Subscription @@ -560,9 +928,14 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts [] ocb := cb cb = func(m *Msg) { ocb(m); m.Ack() } } - sub, err = js.nc.subscribe(deliver, queue, cb, ch, cb == nil, &jsSub{js: js}) - if err != nil { - return nil, err + + if isPullMode { + sub = &Subscription{Subject: subj, conn: js.nc, typ: PullSubscription, jsi: &jsSub{js: js, pull: true}} + } else { + sub, err = js.nc.subscribe(deliver, queue, cb, ch, cb == nil, &jsSub{js: js}) + if err != nil { + return nil, err + } } // If we are creating or updating let's process that request. @@ -597,7 +970,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts [] ccSubj = fmt.Sprintf(apiConsumerCreateT, stream) } - resp, err := js.nc.Request(js.apiSubj(ccSubj), j, js.wait) + resp, err := js.nc.Request(js.apiSubj(ccSubj), j, js.opts.wait) if err != nil { if err == ErrNoResponders { err = ErrJetStreamNotEnabled @@ -614,7 +987,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts [] } if info.Error != nil { sub.Unsubscribe() - return nil, errors.New(info.Error.Description) + return nil, fmt.Errorf("nats: %s", info.Error.Description) } // Hold onto these for later. @@ -625,18 +998,13 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts [] } else { sub.jsi.stream = stream sub.jsi.consumer = consumer - if js.direct { - sub.jsi.deliver = o.cfg.DeliverSubject - } else { - sub.jsi.deliver = ccfg.DeliverSubject - } + sub.jsi.deliver = ccfg.DeliverSubject } sub.jsi.attached = attached // If we are pull based go ahead and fire off the first request to populate. if isPullMode { sub.jsi.pull = o.pull - sub.Poll() } return sub, nil @@ -659,7 +1027,7 @@ func (js *js) lookupStreamBySubject(subj string) (string, error) { if err != nil { return _EMPTY_, err } - resp, err := js.nc.Request(js.apiSubj(apiStreams), j, js.wait) + resp, err := js.nc.Request(js.apiSubj(apiStreams), j, js.opts.wait) if err != nil { if err == ErrNoResponders { err = ErrJetStreamNotEnabled @@ -678,17 +1046,28 @@ func (js *js) lookupStreamBySubject(subj string) (string, error) { type subOpts struct { // For attaching. stream, consumer string - // For pull based consumers, batch size for pull - pull int + // For pull based consumers. + pull bool // For manual ack mack bool // For creating or updating. cfg *ConsumerConfig } +// ManualAck disables auto ack functionality for async subscriptions. +func ManualAck() SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.mack = true + return nil + }) +} + // Durable defines the consumer name for JetStream durable subscribers. func Durable(name string) SubOpt { return subOptFn(func(opts *subOpts) error { + if opts.cfg.Durable != "" { + return fmt.Errorf("nats: option Durable set more than once") + } if strings.Contains(name, ".") { return ErrInvalidDurableName } @@ -698,38 +1077,6 @@ func Durable(name string) SubOpt { }) } -// Pull defines the batch size of messages that will be received -// when using pull based JetStream consumers. -func Pull(batchSize int) SubOpt { - return subOptFn(func(opts *subOpts) error { - if batchSize == 0 { - return errors.New("nats: batch size of 0 not valid") - } - opts.pull = batchSize - return nil - }) -} - -func PullDirect(stream, consumer string, batchSize int) SubOpt { - return subOptFn(func(opts *subOpts) error { - if batchSize == 0 { - return errors.New("nats: batch size of 0 not valid") - } - opts.stream = stream - opts.consumer = consumer - opts.pull = batchSize - return nil - }) -} - -// ManualAck disables auto ack functionality for async subscriptions. -func ManualAck() SubOpt { - return subOptFn(func(opts *subOpts) error { - opts.mack = true - return nil - }) -} - // DeliverAll will configure a Consumer to receive all the // messages from a Stream. func DeliverAll() SubOpt { @@ -777,6 +1124,7 @@ func StartTime(startTime time.Time) SubOpt { }) } +// AckNone requires no acks for delivered messages. func AckNone() SubOpt { return subOptFn(func(opts *subOpts) error { opts.cfg.AckPolicy = AckNonePolicy @@ -784,6 +1132,8 @@ func AckNone() SubOpt { }) } +// AckAll when acking a sequence number, this implicitly acks all sequences +// below this one as well. func AckAll() SubOpt { return subOptFn(func(opts *subOpts) error { opts.cfg.AckPolicy = AckAllPolicy @@ -791,6 +1141,7 @@ func AckAll() SubOpt { }) } +// AckExplicit requires ack or nack for all messages. func AckExplicit() SubOpt { return subOptFn(func(opts *subOpts) error { opts.cfg.AckPolicy = AckExplicitPolicy @@ -798,6 +1149,47 @@ func AckExplicit() SubOpt { }) } +// MaxDeliver sets the number of redeliveries for a message. +func MaxDeliver(n int) SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.MaxDeliver = n + return nil + }) +} + +// MaxAckPending sets the number of outstanding acks that are allowed before +// message delivery is halted. +func MaxAckPending(n int) SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.MaxAckPending = n + return nil + }) +} + +// ReplayOriginal replays the messages at the original speed. +func ReplayOriginal() SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.ReplayPolicy = ReplayOriginalPolicy + return nil + }) +} + +// RateLimit is the Bits per sec rate limit applied to a push consumer. +func RateLimit(n uint64) SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.RateLimit = n + return nil + }) +} + +// BindStream binds a consumer to a stream explicitly based on a name. +func BindStream(name string) SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.stream = name + return nil + }) +} + func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { sub.mu.Lock() // TODO(dlc) - Better way to mark especially if we attach. @@ -808,37 +1200,313 @@ func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { // Consumer info lookup should fail if in direct mode. js := sub.jsi.js - if js.direct { - sub.mu.Unlock() - return nil, ErrDirectModeRequired - } - stream, consumer := sub.jsi.stream, sub.jsi.consumer sub.mu.Unlock() return js.getConsumerInfo(stream, consumer) } -func (sub *Subscription) Poll() error { - sub.mu.Lock() - if sub.jsi == nil || sub.jsi.deliver != _EMPTY_ || sub.jsi.pull == 0 { - sub.mu.Unlock() - return ErrTypeSubscription +type pullOpts struct { + ttl time.Duration + ctx context.Context +} + +type PullOpt interface { + configurePull(opts *pullOpts) error +} + +// PullMaxWaiting defines the max inflight pull requests to be delivered more messages. +func PullMaxWaiting(n int) SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.MaxWaiting = n + return nil + }) +} + +var errNoMessages = errors.New("nats: no messages") + +// Fetch pulls a batch of messages from a stream for a pull consumer. +func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { + if sub == nil { + return nil, ErrBadSubscription } - batch := sub.jsi.pull - nc, reply := sub.conn, sub.Subject + + var o pullOpts + for _, opt := range opts { + if err := opt.configurePull(&o); err != nil { + return nil, err + } + } + if o.ctx != nil && o.ttl != 0 { + return nil, ErrContextAndTimeout + } + + sub.mu.Lock() + if sub.jsi == nil || sub.typ != PullSubscription { + sub.mu.Unlock() + return nil, ErrTypeSubscription + } + + nc, _ := sub.conn, sub.Subject stream, consumer := sub.jsi.stream, sub.jsi.consumer js := sub.jsi.js + + ttl := o.ttl + if ttl == 0 { + ttl = js.opts.wait + } sub.mu.Unlock() - req, _ := json.Marshal(&nextRequest{Batch: batch}) - reqNext := js.apiSubj(fmt.Sprintf(apiRequestNextT, stream, consumer)) - return nc.PublishRequest(reqNext, reply, req) + // Use the given context or setup a default one for the span + // of the pull batch request. + var ( + ctx = o.ctx + err error + cancel context.CancelFunc + ) + if o.ctx == nil { + ctx, cancel = context.WithTimeout(context.Background(), ttl) + defer cancel() + } + + // Check if context not done already before making the request. + select { + case <-ctx.Done(): + if ctx.Err() == context.Canceled { + err = ctx.Err() + } else { + err = ErrTimeout + } + default: + } + if err != nil { + return nil, err + } + + // Check for empty payload message and process synchronously + // any status messages. + checkMsg := func(msg *Msg) error { + if len(msg.Data) == 0 { + switch msg.Header.Get(statusHdr) { + case noResponders: + return ErrNoResponders + case noMessages: + return errNoMessages + case "400", "408", "409": + return fmt.Errorf("nats: %s", msg.Header.Get(descrHdr)) + } + } + return nil + } + + checkCtxErr := func(err error) error { + if o.ctx == nil && err == context.DeadlineExceeded { + return ErrTimeout + } + return err + } + + var ( + gotNoMessages bool + nr = &nextRequest{Batch: batch, NoWait: true} + req, _ = json.Marshal(nr) + reqNext = js.apiSubj(fmt.Sprintf(apiRequestNextT, stream, consumer)) + expires = ttl - 10*time.Millisecond + msgs = make([]*Msg, 0) + ) + + // In case of only one message, then can already handle with built-in request functions. + if batch == 1 { + resp, err := nc.oldRequestWithContext(ctx, reqNext, nil, req) + if err != nil { + return nil, checkCtxErr(err) + } + + // In case of a no messages instant error, then fallback + // into longer version of pull batch request. + err = checkMsg(resp) + if err != nil { + if err == errNoMessages { + // Use old request style for the retry of the pull request + // in order to use auto UNSUB 1 to prevent the server + // from delivering a message when there is no more interest. + nr.NoWait = false + nr.Expires = expires + req, _ = json.Marshal(nr) + resp, err = nc.oldRequestWithContext(ctx, reqNext, nil, req) + if err != nil { + return nil, checkCtxErr(err) + } + + // This next message, could also be an error + // (e.g. 408 due to request timeout). + err = checkMsg(resp) + if err != nil { + return nil, err + } + return []*Msg{resp}, nil + } else { + // Hard error + return nil, checkCtxErr(err) + } + } + return []*Msg{resp}, nil + } + + // Setup a request where we will wait for the first response + // in case of errors, then dispatch the rest of the replies + // to the channel. + inbox := NewInbox() + + mch := make(chan *Msg, batch) + s, err := nc.subscribe(inbox, _EMPTY_, nil, mch, true, nil) + if err != nil { + return nil, err + } + + // Remove interest in the subscription at the end so that the + // this inbox does not get delivered the results intended + // for another request. + defer s.Unsubscribe() + + // Make a publish request to get results of the pull. + err = nc.publish(reqNext, inbox, nil, req) + if err != nil { + s.Unsubscribe() + return nil, err + } + + // Try to get the first message or error with NoWait. + var ( + firstMsg *Msg + ok bool + ) + select { + case firstMsg, ok = <-mch: + if !ok { + err = s.getNextMsgErr() + } else { + err = s.processNextMsgDelivered(firstMsg) + if err == nil { + err = checkMsg(firstMsg) + } + } + case <-ctx.Done(): + err = checkCtxErr(ctx.Err()) + } + + // If the first error is 'no more messages', then switch into + // longer form version of the request that waits for messages. + if err == errNoMessages { + gotNoMessages = true + } else if err != nil { + // We should be getting the response from the server + // in case we got a poll error, so stop and cleanup. + s.Unsubscribe() + return nil, err + } + + if gotNoMessages { + // We started with a 404 response right away, so fallback into + // second request that waits longer for messages to delivered. + nr.NoWait = false + nr.Expires = expires + req, _ = json.Marshal(nr) + + // Since first message was an error we UNSUB (batch+1) + // since we are counting it as the first message. + err = s.AutoUnsubscribe(batch + 1) + if err != nil { + return nil, err + } + + // Make another request and wait for the messages... + err = nc.publish(reqNext, inbox, nil, req) + if err != nil { + s.Unsubscribe() + return nil, err + } + + // Try to get the first result again or return the error. + select { + case firstMsg, ok = <-mch: + if !ok { + err = s.getNextMsgErr() + } else { + err = s.processNextMsgDelivered(firstMsg) + if err == nil { + err = checkMsg(firstMsg) + } + } + case <-ctx.Done(): + err = checkCtxErr(ctx.Err()) + } + if err != nil { + s.Unsubscribe() + return nil, err + } + // Check again if the delivered next message is a status error. + err = checkMsg(firstMsg) + if err != nil { + s.Unsubscribe() + return nil, err + } + } else { + // We are receiving messages at this point. Send UNSUB to let + // the server clear interest once enough replies are delivered. + err = s.AutoUnsubscribe(batch) + if err != nil { + return nil, err + } + } + + msgs = append(msgs, firstMsg) + for { + var ( + msg *Msg + ok bool + ) + select { + case msg, ok = <-mch: + if !ok { + err = s.getNextMsgErr() + } else { + err = s.processNextMsgDelivered(msg) + if err == nil { + err = checkMsg(msg) + } + } + case <-ctx.Done(): + return msgs, checkCtxErr(err) + } + if err != nil { + // Discard the error which may have been a timeout + // or 408 request timeout status from the server, + // and just the return delivered messages. + break + } + if msg != nil { + msgs = append(msgs, msg) + } + + if len(msgs) == batch { + // Done! + break + } + } + + return msgs, nil } func (js *js) getConsumerInfo(stream, consumer string) (*ConsumerInfo, error) { + ctx, cancel := context.WithTimeout(context.Background(), js.opts.wait) + defer cancel() + return js.getConsumerInfoContext(ctx, stream, consumer) +} + +func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer string) (*ConsumerInfo, error) { ccInfoSubj := fmt.Sprintf(apiConsumerInfoT, stream, consumer) - resp, err := js.nc.Request(js.apiSubj(ccInfoSubj), nil, js.wait) + resp, err := js.nc.RequestWithContext(ctx, js.apiSubj(ccInfoSubj), nil) if err != nil { if err == ErrNoResponders { err = ErrJetStreamNotEnabled @@ -851,7 +1519,7 @@ func (js *js) getConsumerInfo(stream, consumer string) (*ConsumerInfo, error) { return nil, err } if info.Error != nil { - return nil, errors.New(info.Error.Description) + return nil, fmt.Errorf("nats: %s", info.Error.Description) } return info.ConsumerInfo, nil } @@ -872,7 +1540,7 @@ func (m *Msg) checkReply() (*js, bool, error) { return nil, false, nil } js := sub.jsi.js - isPullMode := sub.jsi.pull > 0 + isPullMode := sub.jsi.pull sub.mu.Unlock() return js, isPullMode, nil @@ -888,7 +1556,7 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...PubOpt) error { return err } } - js, isPullMode, err := m.checkReply() + js, _, err := m.checkReply() if err != nil { return err } @@ -907,23 +1575,10 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...PubOpt) error { if o.ttl > 0 { wait = o.ttl } else if js != nil { - wait = js.wait + wait = js.opts.wait } - if isPullMode { - if bytes.Equal(ackType, AckAck) { - err = nc.PublishRequest(m.Reply, m.Sub.Subject, AckNext) - } else if bytes.Equal(ackType, AckNak) || bytes.Equal(ackType, AckTerm) { - err = nc.PublishRequest(m.Reply, m.Sub.Subject, []byte("+NXT {\"batch\":1}")) - } - if sync && err == nil { - if ctx != nil { - _, err = nc.RequestWithContext(ctx, m.Reply, nil) - } else { - _, err = nc.Request(m.Reply, nil, wait) - } - } - } else if sync { + if sync { if ctx != nil { _, err = nc.RequestWithContext(ctx, m.Reply, ackType) } else { @@ -935,51 +1590,56 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...PubOpt) error { // Mark that the message has been acked unless it is AckProgress // which can be sent many times. - if err == nil && !bytes.Equal(ackType, AckProgress) { + if err == nil && !bytes.Equal(ackType, ackProgress) { atomic.StoreUint32(&m.ackd, 1) } return err } -// Acks for messages - -// Ack a message, this will do the right thing with pull based consumers. +// Ack acknowledges a message. This tells the server that the message was +// successfully processed and it can move on to the next message. func (m *Msg) Ack() error { - return m.ackReply(AckAck, false) + return m.ackReply(ackAck, false) } -// Ack a message and wait for a response from the server. +// Ack is the synchronous version of Ack. This indicates successful message +// processing. func (m *Msg) AckSync(opts ...PubOpt) error { - return m.ackReply(AckAck, true, opts...) + return m.ackReply(ackAck, true, opts...) } -// Nak this message, indicating we can not process. +// Nak negatively acknowledges a message. This tells the server to redeliver +// the message. You can configure the number of redeliveries by passing +// nats.MaxDeliver when you Subscribe. The default is infinite redeliveries. func (m *Msg) Nak() error { - return m.ackReply(AckNak, false) + return m.ackReply(ackNak, false) } -// Term this message from ever being delivered regardless of MaxDeliverCount. +// Term tells the server to not redeliver this message, regardless of the value +// of nats.MaxDeliver. func (m *Msg) Term() error { - return m.ackReply(AckTerm, false) + return m.ackReply(ackTerm, false) } -// InProgress indicates that this message is being worked on -// and reset the redelivery timer in the server. +// InProgress tells the server that this message is being worked on. It resets +// the redelivery timer on the server. func (m *Msg) InProgress() error { - return m.ackReply(AckProgress, false) + return m.ackReply(ackProgress, false) } // MsgMetadata is the JetStream metadata associated with received messages. type MsgMetaData struct { - Consumer uint64 - Stream uint64 - Delivered uint64 - Pending uint64 - Timestamp time.Time + Consumer uint64 + Stream uint64 + Delivered uint64 + Pending uint64 + Timestamp time.Time + StreamName string } -// MetaData retrieves the metadata from a JetStream message. +// MetaData retrieves the metadata from a JetStream message. This method will +// return an error for non-JetStream Msgs. func (m *Msg) MetaData() (*MsgMetaData, error) { if _, _, err := m.checkReply(); err != nil { return nil, err @@ -1003,11 +1663,12 @@ func (m *Msg) MetaData() (*MsgMetaData, error) { } meta := &MsgMetaData{ - Delivered: uint64(parseNum(tokens[4])), - Stream: uint64(parseNum(tokens[5])), - Consumer: uint64(parseNum(tokens[6])), - Timestamp: time.Unix(0, parseNum(tokens[7])), - Pending: uint64(parseNum(tokens[8])), + Delivered: uint64(parseNum(tokens[4])), + Stream: uint64(parseNum(tokens[5])), + Consumer: uint64(parseNum(tokens[6])), + Timestamp: time.Unix(0, parseNum(tokens[7])), + Pending: uint64(parseNum(tokens[8])), + StreamName: tokens[2], } return meta, nil @@ -1041,10 +1702,11 @@ const ( // AckNonePolicy requires no acks for delivered messages. AckNonePolicy AckPolicy = iota - // AckAllPolicy when acking a sequence number, this implicitly acks all sequences below this one as well. + // AckAllPolicy when acking a sequence number, this implicitly acks all + // sequences below this one as well. AckAllPolicy - // AckExplicit requires ack or nack for all messages. + // AckExplicitPolicy requires ack or nack for all messages. AckExplicitPolicy // For setting @@ -1064,7 +1726,7 @@ func (p *AckPolicy) UnmarshalJSON(data []byte) error { case jsonString("explicit"): *p = AckExplicitPolicy default: - return fmt.Errorf("can not unmarshal %q", data) + return fmt.Errorf("nats: can not unmarshal %q", data) } return nil @@ -1079,7 +1741,7 @@ func (p AckPolicy) MarshalJSON() ([]byte, error) { case AckExplicitPolicy: return json.Marshal("explicit") default: - return nil, fmt.Errorf("unknown acknowlegement policy %v", p) + return nil, fmt.Errorf("nats: unknown acknowlegement policy %v", p) } } @@ -1098,21 +1760,25 @@ func (p AckPolicy) String() string { } } +// ReplayPolicy determines how the consumer should replay messages it already has queued in the stream. type ReplayPolicy int const ( - ReplayInstant ReplayPolicy = iota - ReplayOriginal + // ReplayInstantPolicy will replay messages as fast as possible. + ReplayInstantPolicy ReplayPolicy = iota + + // ReplayOriginalPolicy will maintain the same timing as the messages were received. + ReplayOriginalPolicy ) func (p *ReplayPolicy) UnmarshalJSON(data []byte) error { switch string(data) { case jsonString("instant"): - *p = ReplayInstant + *p = ReplayInstantPolicy case jsonString("original"): - *p = ReplayOriginal + *p = ReplayOriginalPolicy default: - return fmt.Errorf("can not unmarshal %q", data) + return fmt.Errorf("nats: can not unmarshal %q", data) } return nil @@ -1120,41 +1786,44 @@ func (p *ReplayPolicy) UnmarshalJSON(data []byte) error { func (p ReplayPolicy) MarshalJSON() ([]byte, error) { switch p { - case ReplayOriginal: + case ReplayOriginalPolicy: return json.Marshal("original") - case ReplayInstant: + case ReplayInstantPolicy: return json.Marshal("instant") default: - return nil, fmt.Errorf("unknown replay policy %v", p) + return nil, fmt.Errorf("nats: unknown replay policy %v", p) } } var ( - AckAck = []byte("+ACK") - AckNak = []byte("-NAK") - AckProgress = []byte("+WPI") - AckNext = []byte("+NXT") - AckTerm = []byte("+TERM") + ackAck = []byte("+ACK") + ackNak = []byte("-NAK") + ackProgress = []byte("+WPI") + ackTerm = []byte("+TERM") ) // DeliverPolicy determines how the consumer should select the first message to deliver. type DeliverPolicy int const ( - // DeliverAllPolicy will be the default so can be omitted from the request. + // DeliverAllPolicy starts delivering messages from the very beginning of a + // stream. This is the default. DeliverAllPolicy DeliverPolicy = iota - // DeliverLastPolicy will start the consumer with the last sequence received. + // DeliverLastPolicy will start the consumer with the last sequence + // received. DeliverLastPolicy - // DeliverNewPolicy will only deliver new messages that are sent - // after the consumer is created. + // DeliverNewPolicy will only deliver new messages that are sent after the + // consumer is created. DeliverNewPolicy - // DeliverByStartSequencePolicy will look for a defined starting sequence to start. + // DeliverByStartTimePolicy will deliver messages starting from a given + // sequence. DeliverByStartSequencePolicy - // StartTime will select the first messsage with a timestamp >= to StartTime. + // DeliverByStartTimePolicy will deliver messages starting from a given + // time. DeliverByStartTimePolicy ) @@ -1188,7 +1857,7 @@ func (p DeliverPolicy) MarshalJSON() ([]byte, error) { case DeliverByStartTimePolicy: return json.Marshal("by_start_time") default: - return nil, fmt.Errorf("unknown deliver policy %v", p) + return nil, fmt.Errorf("nats: unknown deliver policy %v", p) } } @@ -1205,14 +1874,15 @@ const ( WorkQueuePolicy ) -// DiscardPolicy determines how we proceed when limits of messages or bytes are hit. The default, DiscardOld will -// remove older messages. DiscardNew will fail to store the new message. +// DiscardPolicy determines how to proceed when limits of messages or bytes are +// reached. type DiscardPolicy int const ( - // DiscardOld will remove older messages to return to the limits. - DiscardOld = iota - //DiscardNew will error on a StoreMsg call + // DiscardOld will remove older messages to return to the limits. This is + // the default. + DiscardOld DiscardPolicy = iota + //DiscardNew will fail to store new messages. DiscardNew ) @@ -1244,7 +1914,7 @@ func (rp RetentionPolicy) MarshalJSON() ([]byte, error) { case WorkQueuePolicy: return json.Marshal(workQueuePolicyString) default: - return nil, fmt.Errorf("can not marshal %v", rp) + return nil, fmt.Errorf("nats: can not marshal %v", rp) } } @@ -1257,7 +1927,7 @@ func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error { case jsonString(workQueuePolicyString): *rp = WorkQueuePolicy default: - return fmt.Errorf("can not unmarshal %q", data) + return fmt.Errorf("nats: can not unmarshal %q", data) } return nil } @@ -1280,7 +1950,7 @@ func (dp DiscardPolicy) MarshalJSON() ([]byte, error) { case DiscardNew: return json.Marshal("new") default: - return nil, fmt.Errorf("can not marshal %v", dp) + return nil, fmt.Errorf("nats: can not marshal %v", dp) } } @@ -1291,7 +1961,7 @@ func (dp *DiscardPolicy) UnmarshalJSON(data []byte) error { case jsonString("new"): *dp = DiscardNew default: - return fmt.Errorf("can not unmarshal %q", data) + return fmt.Errorf("nats: can not unmarshal %q", data) } return nil } @@ -1329,7 +1999,7 @@ func (st StorageType) MarshalJSON() ([]byte, error) { case FileStorage: return json.Marshal(fileStorageString) default: - return nil, fmt.Errorf("can not marshal %v", st) + return nil, fmt.Errorf("nats: can not marshal %v", st) } } @@ -1340,7 +2010,7 @@ func (st *StorageType) UnmarshalJSON(data []byte) error { case jsonString(fileStorageString): *st = FileStorage default: - return fmt.Errorf("can not unmarshal %q", data) + return fmt.Errorf("nats: can not unmarshal %q", data) } return nil } diff --git a/vendor/github.com/nats-io/nats.go/jsm.go b/vendor/github.com/nats-io/nats.go/jsm.go index fb7851e4..6b1900fa 100644 --- a/vendor/github.com/nats-io/nats.go/jsm.go +++ b/vendor/github.com/nats-io/nats.go/jsm.go @@ -14,6 +14,7 @@ package nats import ( + "context" "encoding/json" "errors" "fmt" @@ -22,46 +23,52 @@ import ( "time" ) -// JetStreamManager is the public interface for managing JetStream streams & consumers. +// JetStreamManager manages JetStream Streams and Consumers. type JetStreamManager interface { // AddStream creates a stream. - AddStream(cfg *StreamConfig) (*StreamInfo, error) + AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) // UpdateStream updates a stream. - UpdateStream(cfg *StreamConfig) (*StreamInfo, error) + UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) // DeleteStream deletes a stream. - DeleteStream(name string) error + DeleteStream(name string, opts ...JSOpt) error // StreamInfo retrieves information from a stream. - StreamInfo(stream string) (*StreamInfo, error) + StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) - // Purge stream messages. - PurgeStream(name string) error + // PurgeStream purges a stream messages. + PurgeStream(name string, opts ...JSOpt) error - // NewStreamLister is used to return pages of StreamInfo objects. - NewStreamLister() *StreamLister + // StreamsInfo can be used to retrieve a list of StreamInfo objects. + StreamsInfo(opts ...JSOpt) <-chan *StreamInfo + + // StreamNames is used to retrieve a list of Stream names. + StreamNames(opts ...JSOpt) <-chan string // GetMsg retrieves a raw stream message stored in JetStream by sequence number. - GetMsg(name string, seq uint64) (*RawStreamMsg, error) + GetMsg(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, error) // DeleteMsg erases a message from a stream. - DeleteMsg(name string, seq uint64) error + DeleteMsg(name string, seq uint64, opts ...JSOpt) error // AddConsumer adds a consumer to a stream. - AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error) + AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) // DeleteConsumer deletes a consumer. - DeleteConsumer(stream, consumer string) error + DeleteConsumer(stream, consumer string, opts ...JSOpt) error - // ConsumerInfo retrieves consumer information. - ConsumerInfo(stream, name string) (*ConsumerInfo, error) + // ConsumerInfo retrieves information of a consumer from a stream. + ConsumerInfo(stream, name string, opts ...JSOpt) (*ConsumerInfo, error) - // NewConsumerLister is used to return pages of ConsumerInfo objects. - NewConsumerLister(stream string) *ConsumerLister + // ConsumersInfo is used to retrieve a list of ConsumerInfo objects. + ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo + + // ConsumerNames is used to retrieve a list of Consumer names. + ConsumerNames(stream string, opts ...JSOpt) <-chan string // AccountInfo retrieves info about the JetStream usage from an account. - AccountInfo() (*AccountInfo, error) + AccountInfo(opts ...JSOpt) (*AccountInfo, error) } // StreamConfig will determine the properties for a stream. @@ -95,10 +102,18 @@ type Placement struct { // StreamSource dictates how streams can source from other streams. type StreamSource struct { - Name string `json:"name"` - OptStartSeq uint64 `json:"opt_start_seq,omitempty"` - OptStartTime *time.Time `json:"opt_start_time,omitempty"` - FilterSubject string `json:"filter_subject,omitempty"` + Name string `json:"name"` + OptStartSeq uint64 `json:"opt_start_seq,omitempty"` + OptStartTime *time.Time `json:"opt_start_time,omitempty"` + FilterSubject string `json:"filter_subject,omitempty"` + External *ExternalStream `json:"external,omitempty"` +} + +// ExternalStream allows you to qualify access to a stream source in another +// account. +type ExternalStream struct { + APIPrefix string `json:"api"` + DeliverPrefix string `json:"deliver"` } // apiError is included in all API responses if there was an error. @@ -156,8 +171,16 @@ type accountInfoResponse struct { } // AccountInfo retrieves info about the JetStream usage from the current account. -func (js *js) AccountInfo() (*AccountInfo, error) { - resp, err := js.nc.Request(js.apiSubj(apiAccountInfo), nil, js.wait) +func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) { + o, cancel, err := getJSContextOpts(js.opts, opts...) + if err != nil { + return nil, err + } + if cancel != nil { + defer cancel() + } + + resp, err := js.nc.RequestWithContext(o.ctx, js.apiSubj(apiAccountInfo), nil) if err != nil { return nil, err } @@ -189,7 +212,15 @@ type consumerResponse struct { } // AddConsumer will add a JetStream consumer. -func (js *js) AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error) { +func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) { + o, cancel, err := getJSContextOpts(js.opts, opts...) + if err != nil { + return nil, err + } + if cancel != nil { + defer cancel() + } + if stream == _EMPTY_ { return nil, ErrStreamNameRequired } @@ -208,7 +239,7 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, er ccSubj = fmt.Sprintf(apiConsumerCreateT, stream) } - resp, err := js.nc.Request(js.apiSubj(ccSubj), req, js.wait) + resp, err := js.nc.RequestWithContext(o.ctx, js.apiSubj(ccSubj), req) if err != nil { if err == ErrNoResponders { err = ErrJetStreamNotEnabled @@ -233,13 +264,21 @@ type consumerDeleteResponse struct { } // DeleteConsumer deletes a Consumer. -func (js *js) DeleteConsumer(stream, consumer string) error { +func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error { + o, cancel, err := getJSContextOpts(js.opts, opts...) + if err != nil { + return err + } + if cancel != nil { + defer cancel() + } + if stream == _EMPTY_ { return ErrStreamNameRequired } dcSubj := js.apiSubj(fmt.Sprintf(apiConsumerDeleteT, stream, consumer)) - r, err := js.nc.Request(dcSubj, nil, js.wait) + r, err := js.nc.RequestWithContext(o.ctx, dcSubj, nil) if err != nil { return err } @@ -254,13 +293,20 @@ func (js *js) DeleteConsumer(stream, consumer string) error { } // ConsumerInfo returns information about a Consumer. -func (js *js) ConsumerInfo(stream, consumer string) (*ConsumerInfo, error) { - return js.getConsumerInfo(stream, consumer) +func (js *js) ConsumerInfo(stream, consumer string, opts ...JSOpt) (*ConsumerInfo, error) { + o, cancel, err := getJSContextOpts(js.opts, opts...) + if err != nil { + return nil, err + } + if cancel != nil { + defer cancel() + } + return js.getConsumerInfoContext(o.ctx, stream, consumer) } -// ConsumerLister fetches pages of ConsumerInfo objects. This object is not +// consumerLister fetches pages of ConsumerInfo objects. This object is not // safe to use for multiple threads. -type ConsumerLister struct { +type consumerLister struct { stream string js *js @@ -283,7 +329,7 @@ type consumerListResponse struct { } // Next fetches the next ConsumerInfo page. -func (c *ConsumerLister) Next() bool { +func (c *consumerLister) Next() bool { if c.err != nil { return false } @@ -302,8 +348,16 @@ func (c *ConsumerLister) Next() bool { c.err = err return false } + + var cancel context.CancelFunc + ctx := c.js.opts.ctx + if ctx == nil { + ctx, cancel = context.WithTimeout(context.Background(), c.js.opts.wait) + defer cancel() + } + clSubj := c.js.apiSubj(fmt.Sprintf(apiConsumerListT, c.stream)) - r, err := c.js.nc.Request(clSubj, req, c.js.wait) + r, err := c.js.nc.RequestWithContext(ctx, clSubj, req) if err != nil { c.err = err return false @@ -325,18 +379,138 @@ func (c *ConsumerLister) Next() bool { } // Page returns the current ConsumerInfo page. -func (c *ConsumerLister) Page() []*ConsumerInfo { +func (c *consumerLister) Page() []*ConsumerInfo { return c.page } // Err returns any errors found while fetching pages. -func (c *ConsumerLister) Err() error { +func (c *consumerLister) Err() error { return c.err } -// NewConsumerLister is used to return pages of ConsumerInfo objects. -func (js *js) NewConsumerLister(stream string) *ConsumerLister { - return &ConsumerLister{stream: stream, js: js} +// ConsumersInfo is used to retrieve a list of ConsumerInfo objects. +func (jsc *js) ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo { + o, cancel, err := getJSContextOpts(jsc.opts, opts...) + if err != nil { + return nil + } + + ch := make(chan *ConsumerInfo) + l := &consumerLister{js: &js{nc: jsc.nc, opts: o}, stream: stream} + go func() { + if cancel != nil { + defer cancel() + } + defer close(ch) + for l.Next() { + for _, info := range l.Page() { + select { + case ch <- info: + case <-o.ctx.Done(): + return + } + } + } + }() + + return ch +} + +type consumerNamesLister struct { + stream string + js *js + + err error + offset int + page []string + pageInfo *apiPaged +} + +// consumerNamesListResponse is the response for a Consumers Names List request. +type consumerNamesListResponse struct { + apiResponse + apiPaged + Consumers []string `json:"consumers"` +} + +// Next fetches the next ConsumerInfo page. +func (c *consumerNamesLister) Next() bool { + if c.err != nil { + return false + } + if c.stream == _EMPTY_ { + c.err = ErrStreamNameRequired + return false + } + if c.pageInfo != nil && c.offset >= c.pageInfo.Total { + return false + } + + var cancel context.CancelFunc + ctx := c.js.opts.ctx + if ctx == nil { + ctx, cancel = context.WithTimeout(context.Background(), c.js.opts.wait) + defer cancel() + } + + clSubj := c.js.apiSubj(fmt.Sprintf(apiConsumerNamesT, c.stream)) + r, err := c.js.nc.RequestWithContext(ctx, clSubj, nil) + if err != nil { + c.err = err + return false + } + var resp consumerNamesListResponse + if err := json.Unmarshal(r.Data, &resp); err != nil { + c.err = err + return false + } + if resp.Error != nil { + c.err = errors.New(resp.Error.Description) + return false + } + + c.pageInfo = &resp.apiPaged + c.page = resp.Consumers + c.offset += len(c.page) + return true +} + +// Page returns the current ConsumerInfo page. +func (c *consumerNamesLister) Page() []string { + return c.page +} + +// Err returns any errors found while fetching pages. +func (c *consumerNamesLister) Err() error { + return c.err +} + +// ConsumerNames is used to retrieve a list of Consumer names. +func (jsc *js) ConsumerNames(stream string, opts ...JSOpt) <-chan string { + o, cancel, err := getJSContextOpts(jsc.opts, opts...) + if err != nil { + return nil + } + + ch := make(chan string) + l := &consumerNamesLister{stream: stream, js: &js{nc: jsc.nc, opts: o}} + go func() { + if cancel != nil { + defer cancel() + } + defer close(ch) + for l.Next() { + for _, info := range l.Page() { + select { + case ch <- info: + case <-o.ctx.Done(): + return + } + } + } + }() + + return ch } // streamCreateResponse stream creation. @@ -345,7 +519,15 @@ type streamCreateResponse struct { *StreamInfo } -func (js *js) AddStream(cfg *StreamConfig) (*StreamInfo, error) { +func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) { + o, cancel, err := getJSContextOpts(js.opts, opts...) + if err != nil { + return nil, err + } + if cancel != nil { + defer cancel() + } + if cfg == nil || cfg.Name == _EMPTY_ { return nil, ErrStreamNameRequired } @@ -356,7 +538,7 @@ func (js *js) AddStream(cfg *StreamConfig) (*StreamInfo, error) { } csSubj := js.apiSubj(fmt.Sprintf(apiStreamCreateT, cfg.Name)) - r, err := js.nc.Request(csSubj, req, js.wait) + r, err := js.nc.RequestWithContext(o.ctx, csSubj, req) if err != nil { return nil, err } @@ -372,9 +554,17 @@ func (js *js) AddStream(cfg *StreamConfig) (*StreamInfo, error) { type streamInfoResponse = streamCreateResponse -func (js *js) StreamInfo(stream string) (*StreamInfo, error) { +func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) { + o, cancel, err := getJSContextOpts(js.opts, opts...) + if err != nil { + return nil, err + } + if cancel != nil { + defer cancel() + } + csSubj := js.apiSubj(fmt.Sprintf(apiStreamInfoT, stream)) - r, err := js.nc.Request(csSubj, nil, js.wait) + r, err := js.nc.RequestWithContext(o.ctx, csSubj, nil) if err != nil { return nil, err } @@ -435,7 +625,15 @@ type PeerInfo struct { } // UpdateStream updates a Stream. -func (js *js) UpdateStream(cfg *StreamConfig) (*StreamInfo, error) { +func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) { + o, cancel, err := getJSContextOpts(js.opts, opts...) + if err != nil { + return nil, err + } + if cancel != nil { + defer cancel() + } + if cfg == nil || cfg.Name == _EMPTY_ { return nil, ErrStreamNameRequired } @@ -446,7 +644,7 @@ func (js *js) UpdateStream(cfg *StreamConfig) (*StreamInfo, error) { } usSubj := js.apiSubj(fmt.Sprintf(apiStreamUpdateT, cfg.Name)) - r, err := js.nc.Request(usSubj, req, js.wait) + r, err := js.nc.RequestWithContext(o.ctx, usSubj, req) if err != nil { return nil, err } @@ -467,13 +665,21 @@ type streamDeleteResponse struct { } // DeleteStream deletes a Stream. -func (js *js) DeleteStream(name string) error { +func (js *js) DeleteStream(name string, opts ...JSOpt) error { + o, cancel, err := getJSContextOpts(js.opts, opts...) + if err != nil { + return err + } + if cancel != nil { + defer cancel() + } + if name == _EMPTY_ { return ErrStreamNameRequired } dsSubj := js.apiSubj(fmt.Sprintf(apiStreamDeleteT, name)) - r, err := js.nc.Request(dsSubj, nil, js.wait) + r, err := js.nc.RequestWithContext(o.ctx, dsSubj, nil) if err != nil { return err } @@ -517,7 +723,15 @@ type apiMsgGetResponse struct { } // GetMsg retrieves a raw stream message stored in JetStream by sequence number. -func (js *js) GetMsg(name string, seq uint64) (*RawStreamMsg, error) { +func (js *js) GetMsg(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, error) { + o, cancel, err := getJSContextOpts(js.opts, opts...) + if err != nil { + return nil, err + } + if cancel != nil { + defer cancel() + } + if name == _EMPTY_ { return nil, ErrStreamNameRequired } @@ -528,7 +742,7 @@ func (js *js) GetMsg(name string, seq uint64) (*RawStreamMsg, error) { } dsSubj := js.apiSubj(fmt.Sprintf(apiMsgGetT, name)) - r, err := js.nc.Request(dsSubj, req, js.wait) + r, err := js.nc.RequestWithContext(o.ctx, dsSubj, req) if err != nil { return nil, err } @@ -571,7 +785,15 @@ type msgDeleteResponse struct { } // DeleteMsg deletes a message from a stream. -func (js *js) DeleteMsg(name string, seq uint64) error { +func (js *js) DeleteMsg(name string, seq uint64, opts ...JSOpt) error { + o, cancel, err := getJSContextOpts(js.opts, opts...) + if err != nil { + return err + } + if cancel != nil { + defer cancel() + } + if name == _EMPTY_ { return ErrStreamNameRequired } @@ -582,7 +804,7 @@ func (js *js) DeleteMsg(name string, seq uint64) error { } dsSubj := js.apiSubj(fmt.Sprintf(apiMsgDeleteT, name)) - r, err := js.nc.Request(dsSubj, req, js.wait) + r, err := js.nc.RequestWithContext(o.ctx, dsSubj, req) if err != nil { return err } @@ -603,9 +825,17 @@ type streamPurgeResponse struct { } // PurgeStream purges messages on a Stream. -func (js *js) PurgeStream(name string) error { +func (js *js) PurgeStream(name string, opts ...JSOpt) error { + o, cancel, err := getJSContextOpts(js.opts, opts...) + if err != nil { + return err + } + if cancel != nil { + defer cancel() + } + psSubj := js.apiSubj(fmt.Sprintf(apiStreamPurgeT, name)) - r, err := js.nc.Request(psSubj, nil, js.wait) + r, err := js.nc.RequestWithContext(o.ctx, psSubj, nil) if err != nil { return err } @@ -619,9 +849,9 @@ func (js *js) PurgeStream(name string) error { return nil } -// StreamLister fetches pages of StreamInfo objects. This object is not safe +// streamLister fetches pages of StreamInfo objects. This object is not safe // to use for multiple threads. -type StreamLister struct { +type streamLister struct { js *js page []*StreamInfo err error @@ -646,7 +876,7 @@ type streamNamesRequest struct { } // Next fetches the next StreamInfo page. -func (s *StreamLister) Next() bool { +func (s *streamLister) Next() bool { if s.err != nil { return false } @@ -662,8 +892,15 @@ func (s *StreamLister) Next() bool { return false } + var cancel context.CancelFunc + ctx := s.js.opts.ctx + if ctx == nil { + ctx, cancel = context.WithTimeout(context.Background(), s.js.opts.wait) + defer cancel() + } + slSubj := s.js.apiSubj(apiStreamList) - r, err := s.js.nc.Request(slSubj, req, s.js.wait) + r, err := s.js.nc.RequestWithContext(ctx, slSubj, req) if err != nil { s.err = err return false @@ -685,16 +922,149 @@ func (s *StreamLister) Next() bool { } // Page returns the current StreamInfo page. -func (s *StreamLister) Page() []*StreamInfo { +func (s *streamLister) Page() []*StreamInfo { return s.page } // Err returns any errors found while fetching pages. -func (s *StreamLister) Err() error { +func (s *streamLister) Err() error { return s.err } -// NewStreamLister is used to return pages of StreamInfo objects. -func (js *js) NewStreamLister() *StreamLister { - return &StreamLister{js: js} +// StreamsInfo can be used to retrieve a list of StreamInfo objects. +func (jsc *js) StreamsInfo(opts ...JSOpt) <-chan *StreamInfo { + o, cancel, err := getJSContextOpts(jsc.opts, opts...) + if err != nil { + return nil + } + + ch := make(chan *StreamInfo) + l := &streamLister{js: &js{nc: jsc.nc, opts: o}} + go func() { + if cancel != nil { + defer cancel() + } + defer close(ch) + for l.Next() { + for _, info := range l.Page() { + select { + case ch <- info: + case <-o.ctx.Done(): + return + } + } + } + }() + + return ch +} + +type streamNamesLister struct { + js *js + + err error + offset int + page []string + pageInfo *apiPaged +} + +// Next fetches the next ConsumerInfo page. +func (l *streamNamesLister) Next() bool { + if l.err != nil { + return false + } + if l.pageInfo != nil && l.offset >= l.pageInfo.Total { + return false + } + + var cancel context.CancelFunc + ctx := l.js.opts.ctx + if ctx == nil { + ctx, cancel = context.WithTimeout(context.Background(), l.js.opts.wait) + defer cancel() + } + + r, err := l.js.nc.RequestWithContext(ctx, l.js.apiSubj(apiStreams), nil) + if err != nil { + l.err = err + return false + } + var resp streamNamesResponse + if err := json.Unmarshal(r.Data, &resp); err != nil { + l.err = err + return false + } + if resp.Error != nil { + l.err = errors.New(resp.Error.Description) + return false + } + + l.pageInfo = &resp.apiPaged + l.page = resp.Streams + l.offset += len(l.page) + return true +} + +// Page returns the current ConsumerInfo page. +func (l *streamNamesLister) Page() []string { + return l.page +} + +// Err returns any errors found while fetching pages. +func (l *streamNamesLister) Err() error { + return l.err +} + +// StreamNames is used to retrieve a list of Stream names. +func (jsc *js) StreamNames(opts ...JSOpt) <-chan string { + o, cancel, err := getJSContextOpts(jsc.opts, opts...) + if err != nil { + return nil + } + + ch := make(chan string) + l := &streamNamesLister{js: &js{nc: jsc.nc, opts: o}} + go func() { + if cancel != nil { + defer cancel() + } + defer close(ch) + for l.Next() { + for _, info := range l.Page() { + select { + case ch <- info: + case <-o.ctx.Done(): + return + } + } + } + }() + + return ch +} + +func getJSContextOpts(defs *jsOpts, opts ...JSOpt) (*jsOpts, context.CancelFunc, error) { + var o jsOpts + for _, opt := range opts { + if err := opt.configureJSContext(&o); err != nil { + return nil, nil, err + } + } + + // Check for option collisions. Right now just timeout and context. + if o.ctx != nil && o.wait != 0 { + return nil, nil, ErrContextAndTimeout + } + if o.wait == 0 && o.ctx == nil { + o.wait = defs.wait + } + var cancel context.CancelFunc + if o.ctx == nil && o.wait > 0 { + o.ctx, cancel = context.WithTimeout(context.Background(), o.wait) + } + if o.pre == "" { + o.pre = defs.pre + } + + return &o, cancel, nil } diff --git a/vendor/github.com/nats-io/nats.go/nats.go b/vendor/github.com/nats-io/nats.go/nats.go index b19021a4..57029abd 100644 --- a/vendor/github.com/nats-io/nats.go/nats.go +++ b/vendor/github.com/nats-io/nats.go/nats.go @@ -75,6 +75,9 @@ const ( // AUTHENTICATION_EXPIRED_ERR is for when nats server user authorization has expired. AUTHENTICATION_EXPIRED_ERR = "user authentication expired" + + // AUTHENTICATION_REVOKED_ERR is for when user authorization has been revoked. + AUTHENTICATION_REVOKED_ERR = "user authentication revoked" ) // Errors @@ -94,6 +97,7 @@ var ( ErrBadTimeout = errors.New("nats: timeout invalid") ErrAuthorization = errors.New("nats: authorization violation") ErrAuthExpired = errors.New("nats: authentication expired") + ErrAuthRevoked = errors.New("nats: authentication revoked") ErrNoServers = errors.New("nats: no servers available for connection") ErrJsonParse = errors.New("nats: connect message, json parse error") ErrChanArg = errors.New("nats: argument needs to be a channel type") @@ -125,7 +129,6 @@ var ( ErrBadHeaderMsg = errors.New("nats: message could not decode headers") ErrNoResponders = errors.New("nats: no responders available for request") ErrNoContextOrTimeout = errors.New("nats: no context or timeout given") - ErrDirectModeRequired = errors.New("nats: direct access requires direct pull or push") ErrPullModeNotAllowed = errors.New("nats: pull based not supported") ErrJetStreamNotEnabled = errors.New("nats: jetstream not enabled") ErrJetStreamBadPre = errors.New("nats: jetstream api prefix not valid") @@ -484,6 +487,9 @@ type Conn struct { respMux *Subscription // A single response subscription respMap map[string]chan *Msg // Request map for the response msg channels respRand *rand.Rand // Used for generating suffix + + // JetStream Contexts last account check. + jsLastCheck time.Time } // Subscription represents interest in a given subject. @@ -2757,6 +2763,9 @@ func checkAuthError(e string) error { if strings.HasPrefix(e, AUTHENTICATION_EXPIRED_ERR) { return ErrAuthExpired } + if strings.HasPrefix(e, AUTHENTICATION_REVOKED_ERR) { + return ErrAuthRevoked + } return nil } @@ -2823,6 +2832,7 @@ const ( statusHdr = "Status" descrHdr = "Description" noResponders = "503" + noMessages = "404" statusLen = 3 // e.g. 20x, 40x, 50x ) @@ -3284,8 +3294,7 @@ func (nc *Conn) SubscribeSync(subj string) (*Subscription, error) { return nil, ErrInvalidConnection } mch := make(chan *Msg, nc.Opts.SubChanLen) - s, e := nc.subscribe(subj, _EMPTY_, nil, mch, true, nil) - return s, e + return nc.subscribe(subj, _EMPTY_, nil, mch, true, nil) } // QueueSubscribe creates an asynchronous queue subscriber on the given subject. @@ -3302,8 +3311,7 @@ func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription // given message synchronously using Subscription.NextMsg(). func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error) { mch := make(chan *Msg, nc.Opts.SubChanLen) - s, e := nc.subscribe(subj, queue, nil, mch, true, nil) - return s, e + return nc.subscribe(subj, queue, nil, mch, true, nil) } // QueueSubscribeSyncWithChan will express interest in the given subject. @@ -3447,6 +3455,7 @@ const ( SyncSubscription ChanSubscription NilSubscription + PullSubscription ) // Type returns the type of Subscription. @@ -3715,7 +3724,6 @@ func (s *Subscription) processNextMsgDelivered(msg *Msg) error { s.mu.Lock() nc := s.conn max := s.max - jsi := s.jsi // Update some stats. s.delivered++ @@ -3738,12 +3746,6 @@ func (s *Subscription) processNextMsgDelivered(msg *Msg) error { } } - // In case this is a JetStream message and in pull mode - // then check whether it is an JS API error. - if jsi != nil && jsi.pull > 0 && len(msg.Data) == 0 && msg.Header.Get(statusHdr) == noResponders { - return ErrNoResponders - } - return nil } diff --git a/vendor/modules.txt b/vendor/modules.txt index df72be88..e51131d4 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -7,7 +7,7 @@ github.com/minio/highwayhash # github.com/nats-io/jwt/v2 v2.0.1 ## explicit github.com/nats-io/jwt/v2 -# github.com/nats-io/nats.go v1.10.1-0.20210228004050-ed743748acac +# github.com/nats-io/nats.go v1.10.1-0.20210330002604-882e98e18c99 ## explicit github.com/nats-io/nats.go github.com/nats-io/nats.go/encoders/builtin