1
0
mirror of https://github.com/taigrr/nats.docs synced 2025-01-18 04:03:23 -08:00

fix folder structure from gitbook import

This commit is contained in:
ainsley
2019-10-07 14:20:40 -05:00
parent fb1b7b9a2b
commit 7681f14c27
44 changed files with 0 additions and 0 deletions

View File

@@ -0,0 +1,10 @@
# Receiving Messages
In general, applications can receive messages asynchronously or synchronously. Receiving messages with NATS can be library dependent.
Some languages, like Go or Java, provide synchronous and asynchronous APIs, while others may only support one type of subscription.
In all cases, the process of subscribing involves having the client library tell the NATS system that an application is interested in a particular subject.
Under the covers, the client library will assign a unique id to each subscription. This id is used as a closure when the server sends messages to a specific subscription. Each subscription gets a unique id, so if the same connection is used multiple times for the same subject, the server will send multiple copies of the same message. When an application is done with a subscription it unsubscribes which tells the server to stop sending messages.

View File

@@ -0,0 +1,116 @@
# Asynchronous Subscriptions
Asynchronous subscriptions use callbacks of some form to notify an application when a message arrives. These subscriptions are usually easier to work with, but do represent some form of internal work and resource usage, i.e. threads, by the library. Check your library's documentation for any resource usage associated with asynchronous subscriptions.
The following example subscribes to the subject `updates` and handles the incoming messages:
{% tabs %}
{% tab title="Go" %}
```go
nc, err := nats.Connect("demo.nats.io")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Use a WaitGroup to wait for a message to arrive
wg := sync.WaitGroup{}
wg.Add(1)
// Subscribe
if _, err := nc.Subscribe("updates", func(m *nats.Msg) {
wg.Done()
}); err != nil {
log.Fatal(err)
}
// Wait for a message to come in
wg.Wait()
```
{% endtab %}
{% tab title="Java" %}
```java
Connection nc = Nats.connect("nats://demo.nats.io:4222");
// Use a latch to wait for a message to arrive
CountDownLatch latch = new CountDownLatch(1);
// Create a dispatcher and inline message handler
Dispatcher d = nc.createDispatcher((msg) -> {
String str = new String(msg.getData(), StandardCharsets.UTF_8);
System.out.println(str);
latch.countDown();
});
// Subscribe
d.subscribe("updates");
// Wait for a message to come in
latch.await();
// Close the connection
nc.close();
```
{% endtab %}
{% tab title="JavaScript" %}
```javascript
let nc = NATS.connect({
url: "nats://demo.nats.io:4222"
});
nc.subscribe("updates", (msg) => {
t.log(msg);
});
```
{% endtab %}
{% tab title="Python" %}
```python
nc = NATS()
await nc.connect(servers=["nats://demo.nats.io:4222"])
future = asyncio.Future()
async def cb(msg):
nonlocal future
future.set_result(msg)
await nc.subscribe("updates", cb=cb)
await nc.publish("updates", b'All is Well')
await nc.flush()
# Wait for message to come in
msg = await asyncio.wait_for(future, 1)
```
{% endtab %}
{% tab title="Ruby" %}
```ruby
require 'nats/client'
NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
nc.subscribe("updates") do |msg|
puts msg
nc.close
end
nc.publish("updates", "All is Well")
end
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
nc.subscribe("updates", (err, msg) => {
if(err) {
console.log('error', err);
} else {
t.log(msg.data);
}
});
```
{% endtab %}
{% endtabs %}

View File

@@ -0,0 +1,373 @@
# Draining Messages Before Disconnect
A feature recently added across the NATS client libraries is the ability to drain connections or subscriptions. Closing a connection, or unsubscribing from a subscription, are generally considered immediate requests. When you close or unsubscribe the library will halt messages in any pending queue or cache for subscribers. When you drain a subscription or connection, it will process any inflight and cached/pending messages before closing.
Drain provides clients that use queue subscriptions with a way to bring down applications without losing any messages. A client can bring up a new queue member, drain and shut down the old queue member, all without losing messages sent to the old client. Without drain, there is the possibility of lost messages due to delivery timing.
The libraries can provide drain on a connection or on a subscriber, or both.
For a connection the process is essentially:
1. Drain all subscriptions
2. Stop new messages from being published
3. Flush any remaining published messages
4. Close
The API for drain can generally be used instead of close:
As an example of draining a connection:
{% tabs %}
{% tab title="Go" %}
```go
wg := sync.WaitGroup{}
wg.Add(1)
errCh := make(chan error, 1)
// To simulate a timeout, you would set the DrainTimeout()
// to a value less than the time spent in the message callback,
// so say: nats.DrainTimeout(10*time.Millisecond).
nc, err := nats.Connect("demo.nats.io",
nats.DrainTimeout(10*time.Second),
nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) {
errCh <- err
}),
nats.ClosedHandler(func(_ *nats.Conn) {
wg.Done()
}))
if err != nil {
log.Fatal(err)
}
// Just to not collide using the demo server with other users.
subject := nats.NewInbox()
// Subscribe, but add some delay while processing.
if _, err := nc.Subscribe(subject, func(_ *nats.Msg) {
time.Sleep(200 * time.Millisecond)
}); err != nil {
log.Fatal(err)
}
// Publish a message
if err := nc.Publish(subject, []byte("hello")); err != nil {
log.Fatal(err)
}
// Drain the connection, which will close it when done.
if err := nc.Drain(); err != nil {
log.Fatal(err)
}
// Wait for the connection to be closed.
wg.Wait()
// Check if there was an error
select {
case e := <-errCh:
log.Fatal(e)
default:
}
```
{% endtab %}
{% tab title="Java" %}
```java
Connection nc = Nats.connect("nats://demo.nats.io:4222");
// Use a latch to wait for a message to arrive
CountDownLatch latch = new CountDownLatch(1);
// Create a dispatcher and inline message handler
Dispatcher d = nc.createDispatcher((msg) -> {
String str = new String(msg.getData(), StandardCharsets.UTF_8);
System.out.println(str);
latch.countDown();
});
// Subscribe
d.subscribe("updates");
// Wait for a message to come in
latch.await();
// Drain the connection, which will close it
CompletableFuture<Boolean> drained = nc.drain(Duration.ofSeconds(10));
// Wait for the drain to complete
drained.get();
```
{% endtab %}
{% tab title="JavaScript" %}
```javascript
let nc = NATS.connect({url: "nats://demo.nats.io:4222"});
let inbox = createInbox();
let counter = 0;
nc.subscribe(inbox, () => {
counter++;
});
nc.publish(inbox);
nc.drain((err)=> {
if(err) {
t.log(err);
}
t.log('connection is closed:', nc.closed);
t.log('processed', counter, 'messages');
t.pass();
// the snippet is running as a promise in a test
// and calls resolve to pass the test
resolve();
});
```
{% endtab %}
{% tab title="Python" %}
```python
import asyncio
from nats.aio.client import Client as NATS
async def example(loop):
nc = NATS()
await nc.connect("nats://127.0.0.1:4222", loop=loop)
async def handler(msg):
print("[Received] ", msg)
await nc.publish(msg.reply, b'I can help')
# Can check whether client is in draining state
if nc.is_draining:
print("Connection is draining")
await nc.subscribe("help", "workers", cb=handler)
await nc.flush()
requests = []
for i in range(0, 10):
request = nc.request("help", b'help!', timeout=1)
requests.append(request)
# Wait for all the responses
responses = []
responses = await asyncio.gather(*requests)
# Gracefully close the connection.
await nc.drain()
print("Received {} responses".format(len(responses)))
```
{% endtab %}
{% tab title="Ruby" %}
```ruby
NATS.start(drain_timeout: 1) do |nc|
NATS.subscribe('foo', queue: "workers") do |msg, reply, sub|
nc.publish(reply, "ACK:#{msg}")
end
NATS.subscribe('bar', queue: "workers") do |msg, reply, sub|
nc.publish(reply, "ACK:#{msg}")
end
NATS.subscribe('quux', queue: "workers") do |msg, reply, sub|
nc.publish(reply, "ACK:#{msg}")
end
EM.add_timer(2) do
next if NATS.draining?
# Drain gracefully closes the connection.
NATS.drain do
puts "Done draining. Connection is closed."
end
end
end
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
let sub = await nc.subscribe('updates', (err, msg) => {
t.log('worker got message', msg.data);
}, {queue: "workers"});
// [end drain_sub]
nc.flush();
await nc.drain();
// client must close when the connection drain resolves
nc.close();
```
{% endtab %}
{% endtabs %}
The mechanics of drain for a subscription are simpler:
1. Unsubscribe
2. Process all cached or inflight messages
3. Clean up
The API for drain can generally be used instead of unsubscribe:
{% tabs %}
{% tab title="Go" %}
```go
nc, err := nats.Connect("demo.nats.io")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
done := sync.WaitGroup{}
done.Add(1)
count := 0
errCh := make(chan error, 1)
msgAfterDrain := "not this one"
// Just to not collide using the demo server with other users.
subject := nats.NewInbox()
// This callback will process each message slowly
sub, err := nc.Subscribe(subject, func(m *nats.Msg) {
if string(m.Data) == msgAfterDrain {
errCh <- fmt.Errorf("Should not have received this message")
return
}
time.Sleep(100 * time.Millisecond)
count++
if count == 2 {
done.Done()
}
})
// Send 2 messages
for i := 0; i < 2; i++ {
nc.Publish(subject, []byte("hello"))
}
// Call Drain on the subscription. It unsubscribes but
// wait for all pending messages to be processed.
if err := sub.Drain(); err != nil {
log.Fatal(err)
}
// Send one more message, this message should not be received
nc.Publish(subject, []byte(msgAfterDrain))
// Wait for the subscription to have processed the 2 messages.
done.Wait()
// Now check that the 3rd message was not received
select {
case e := <-errCh:
log.Fatal(e)
case <-time.After(200 * time.Millisecond):
// OK!
}
```
{% endtab %}
{% tab title="Java" %}
```java
Connection nc = Nats.connect("nats://demo.nats.io:4222");
// Use a latch to wait for a message to arrive
CountDownLatch latch = new CountDownLatch(1);
// Create a dispatcher and inline message handler
Dispatcher d = nc.createDispatcher((msg) -> {
String str = new String(msg.getData(), StandardCharsets.UTF_8);
System.out.println(str);
latch.countDown();
});
// Subscribe
d.subscribe("updates");
// Wait for a message to come in
latch.await();
// Messages that have arrived will be processed
CompletableFuture<Boolean> drained = d.drain(Duration.ofSeconds(10));
// Wait for the drain to complete
drained.get();
// Close the connection
nc.close();
```
{% endtab %}
{% tab title="JavaScript" %}
```javascript
let nc = NATS.connect({url: "nats://demo.nats.io:4222"});
let inbox = createInbox();
let counter = 0;
let sid = nc.subscribe(inbox, () => {
counter++;
});
nc.publish(inbox);
nc.drainSubscription(sid, (err)=> {
if(err) {
t.log(err);
}
t.log('processed', counter, 'messages');
});
nc.flush(() => {
nc.close();
t.pass();
resolve();
});
```
{% endtab %}
{% tab title="Python" %}
```python
import asyncio
from nats.aio.client import Client as NATS
async def example(loop):
nc = NATS()
await nc.connect("nats://127.0.0.1:4222", loop=loop)
async def handler(msg):
print("[Received] ", msg)
await nc.publish(msg.reply, b'I can help')
# Can check whether client is in draining state
if nc.is_draining:
print("Connection is draining")
sid = await nc.subscribe("help", "workers", cb=handler)
await nc.flush()
# Gracefully unsubscribe the subscription
await nc.drain(sid)
```
{% endtab %}
{% tab title="Ruby" %}
```ruby
# There is currently no API to drain a single subscription, the whole connection can be drained though via NATS.drain
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
let sub = await nc.subscribe('updates', (err, msg) => {
t.log('worker got message', msg.data);
}, {queue: "workers"});
```
{% endtab %}
{% endtabs %}
Because draining can involve messages flowing to the server, for a flush and asynchronous message processing, the timeout for drain should generally be higher than the timeout for a simple message request/reply or similar.

View File

@@ -0,0 +1,125 @@
# Queue Subscriptions
Subscribing to a queue group is only slightly different than subscribing to a subject alone. The application simply includes a queue name with the subscription. The effect of including the group is fairly major, since the server will now load balance messages between the members of the queue group, but the code differences are minimal.
Keep in mind that the queue groups in NATS are dynamic and do not require any server configuration. You can almost think of a regular subscription as a queue group of 1, but it is probably not worth thinking too much about that.
![](../../.gitbook/assets/queues.svg)
As an example, to subscribe to the queue `workers` with the subject `updates`:
{% tabs %}
{% tab title="Go" %}
```go
nc, err := nats.Connect("demo.nats.io")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Use a WaitGroup to wait for 10 messages to arrive
wg := sync.WaitGroup{}
wg.Add(10)
// Create a queue subscription on "updates" with queue name "workers"
if _, err := nc.QueueSubscribe("updates", "worker", func(m *nats.Msg) {
wg.Done()
}); err != nil {
log.Fatal(err)
}
// Wait for messages to come in
wg.Wait()
```
{% endtab %}
{% tab title="Java" %}
```java
Connection nc = Nats.connect("nats://demo.nats.io:4222");
// Use a latch to wait for 10 messages to arrive
CountDownLatch latch = new CountDownLatch(10);
// Create a dispatcher and inline message handler
Dispatcher d = nc.createDispatcher((msg) -> {
String str = new String(msg.getData(), StandardCharsets.UTF_8);
System.out.println(str);
latch.countDown();
});
// Subscribe
d.subscribe("updates", "workers");
// Wait for a message to come in
latch.await();
// Close the connection
nc.close();
```
{% endtab %}
{% tab title="JavaScript" %}
```javascript
let nc = NATS.connect({
url: "nats://demo.nats.io:4222"});
nc.subscribe('updates', {queue: "workers"}, (msg) => {
t.log('worker got message', msg);
});
```
{% endtab %}
{% tab title="Python" %}
```python
nc = NATS()
await nc.connect(servers=["nats://demo.nats.io:4222"])
future = asyncio.Future()
async def cb(msg):
nonlocal future
future.set_result(msg)
await nc.subscribe("updates", queue="workers", cb=cb)
await nc.publish("updates", b'All is Well')
msg = await asyncio.wait_for(future, 1)
print("Msg", msg)
```
{% endtab %}
{% tab title="Ruby" %}
```ruby
require 'nats/client'
require 'fiber'
NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
Fiber.new do
f = Fiber.current
nc.subscribe("updates", queue: "worker") do |msg, reply|
f.resume Time.now
end
nc.publish("updates", "A")
# Use the response
msg = Fiber.yield
puts "Msg: #{msg}"
end.resume
end
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
await nc.subscribe('updates', (err, msg) => {
t.log('worker got message', msg.data);
}, {queue: "workers"});
```
{% endtab %}
{% endtabs %}
If you run this example with the publish examples that send to `updates`, you will see that one of the instances gets a message while the others you run won't. But the instance that receives the message will change.

View File

@@ -0,0 +1,139 @@
# Replying to a Message
Incoming messages have an optional reply-to field. If that field is set, it will contain a subject to which a reply is expected.
For example, the following code will listen for that request and respond with the time.
{% tabs %}
{% tab title="Go" %}
```go
nc, err := nats.Connect("demo.nats.io")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Subscribe
sub, err := nc.SubscribeSync("time")
if err != nil {
log.Fatal(err)
}
// Read a message
msg, err := sub.NextMsg(10 * time.Second)
if err != nil {
log.Fatal(err)
}
// Get the time
timeAsBytes := []byte(time.Now().String())
// Send the time as the response.
msg.Respond(timeAsBytes)
```
{% endtab %}
{% tab title="Java" %}
```java
Connection nc = Nats.connect("nats://demo.nats.io:4222");
// Subscribe
Subscription sub = nc.subscribe("time");
// Read a message
Message msg = sub.nextMessage(Duration.ZERO);
// Get the time
Calendar cal = Calendar.getInstance();
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
byte[] timeAsBytes = sdf.format(cal.getTime()).getBytes(StandardCharsets.UTF_8);
// Send the time
nc.publish(msg.getReplyTo(), timeAsBytes);
// Flush and close the connection
nc.flush(Duration.ZERO);
nc.close();
```
{% endtab %}
{% tab title="JavaScript" %}
```javascript
let nc = NATS.connect({
url: "nats://demo.nats.io:4222"
});
// set up a subscription to process a request
nc.subscribe('time', (msg, reply) => {
if (msg.reply) {
nc.publish(msg.reply, new Date().toLocaleTimeString());
}
});
```
{% endtab %}
{% tab title="Python" %}
```python
nc = NATS()
await nc.connect(servers=["nats://demo.nats.io:4222"])
future = asyncio.Future()
async def cb(msg):
nonlocal future
future.set_result(msg)
await nc.subscribe("time", cb=cb)
await nc.publish_request("time", new_inbox(), b'What is the time?')
await nc.flush()
# Read the message
msg = await asyncio.wait_for(future, 1)
# Send the time
time_as_bytes = "{}".format(datetime.now()).encode()
await nc.publish(msg.reply, time_as_bytes)
```
{% endtab %}
{% tab title="Ruby" %}
```ruby
require 'nats/client'
require 'fiber'
NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
Fiber.new do
f = Fiber.current
nc.subscribe("time") do |msg, reply|
f.resume Time.now
end
nc.publish("time", 'What is the time?', NATS.create_inbox)
# Use the response
msg = Fiber.yield
puts "Reply: #{msg}"
end.resume
end
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
// set up a subscription to process a request
await nc.subscribe('time', (err, msg) => {
if (msg.reply) {
nc.publish(msg.reply, new Date().toLocaleTimeString());
} else {
t.log('got a request for the time, but no reply subject was set.');
}
});
```
{% endtab %}
{% endtabs %}

View File

@@ -0,0 +1,162 @@
# Structured Data
Client libraries may provide tools to help receive structured data, like JSON. The core traffic to the NATS server will always be opaque byte arrays. The server does not process message payloads in any form. For libraries that don't provide helpers, you can always encode and decode data before sending the associated bytes to the NATS client.
For example, to receive JSON you could do:
{% tabs %}
{% tab title="Go" %}
```go
nc, err := nats.Connect("demo.nats.io")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
if err != nil {
log.Fatal(err)
}
defer ec.Close()
// Define the object
type stock struct {
Symbol string
Price int
}
wg := sync.WaitGroup{}
wg.Add(1)
// Subscribe
if _, err := ec.Subscribe("updates", func(s *stock) {
log.Printf("Stock: %s - Price: %v", s.Symbol, s.Price)
wg.Done()
}); err != nil {
log.Fatal(err)
}
// Wait for a message to come in
wg.Wait()
```
{% endtab %}
{% tab title="Java" %}
```java
class StockForJsonSub {
public String symbol;
public float price;
public String toString() {
return symbol + " is at " + price;
}
}
public class SubscribeJSON {
public static void main(String[] args) {
try {
Connection nc = Nats.connect("nats://demo.nats.io:4222");
// Use a latch to wait for 10 messages to arrive
CountDownLatch latch = new CountDownLatch(10);
// Create a dispatcher and inline message handler
Dispatcher d = nc.createDispatcher((msg) -> {
Gson gson = new Gson();
String json = new String(msg.getData(), StandardCharsets.UTF_8);
StockForJsonSub stk = gson.fromJson(json, StockForJsonSub.class);
// Use the object
System.out.println(stk);
latch.countDown();
});
// Subscribe
d.subscribe("updates");
// Wait for a message to come in
latch.await();
// Close the connection
nc.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
```
{% endtab %}
{% tab title="JavaScript" %}
```javascript
let nc = NATS.connect({
url: "nats://demo.nats.io:4222",
json: true
});
nc.subscribe('updates', (msg) => {
if(msg && msg.ticker === 'TSLA') {
t.log('got message:', msg);
}
});
```
{% endtab %}
{% tab title="Python" %}
```python
import asyncio
import json
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrTimeout
async def run(loop):
nc = NATS()
await nc.connect(servers=["nats://127.0.0.1:4222"], loop=loop)
async def message_handler(msg):
data = json.loads(msg.data.decode())
print(data)
sid = await nc.subscribe("updates", cb=message_handler)
await nc.flush()
await nc.auto_unsubscribe(sid, 2)
await nc.publish("updates", json.dumps({"symbol": "GOOG", "price": 1200 }).encode())
await asyncio.sleep(1, loop=loop)
await nc.close()
```
{% endtab %}
{% tab title="Ruby" %}
```ruby
require 'nats/client'
require 'json'
NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
nc.subscribe("updates") do |msg|
m = JSON.parse(msg)
# {"symbol"=>"GOOG", "price"=>12}
p m
end
end
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
let nc = await connect({
url: "nats://demo.nats.io:4222",
payload: Payload.JSON
});
nc.subscribe('updates', (err, msg) => {
t.log('got message:', msg.data ? msg.data : "no payload");
});
```
{% endtab %}
{% endtabs %}

View File

@@ -0,0 +1,75 @@
# Synchronous Subscriptions
Synchronous subscriptions require the application to wait for messages. This type of subscription is easy to set-up and use, but requires the application to deal with looping if multiple messages are expected. For situations where a single message is expected, synchronous subscriptions are sometimes easier to manage, depending on the language.
For example, to subscribe to the subject `updates` and receive a single message you could do:
{% tabs %}
{% tab title="Go" %}
```go
nc, err := nats.Connect("demo.nats.io")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Subscribe
sub, err := nc.SubscribeSync("updates")
if err != nil {
log.Fatal(err)
}
// Wait for a message
msg, err := sub.NextMsg(10 * time.Second)
if err != nil {
log.Fatal(err)
}
// Use the response
log.Printf("Reply: %s", msg.Data)
```
{% endtab %}
{% tab title="Java" %}
```java
Connection nc = Nats.connect("nats://demo.nats.io:4222");
// Subscribe
Subscription sub = nc.subscribe("updates");
// Read a message
Message msg = sub.nextMessage(Duration.ZERO);
String str = new String(msg.getData(), StandardCharsets.UTF_8);
System.out.println(str);
// Close the connection
nc.close();
```
{% endtab %}
{% tab title="JavaScript" %}
```javascript
// node-nats subscriptions are always async.
```
{% endtab %}
{% tab title="Python" %}
```python
# Asyncio NATS client currently does not have a sync subscribe API
```
{% endtab %}
{% tab title="Ruby" %}
```ruby
# The Ruby NATS client subscriptions are all async.
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
/ Typescript NATS subscriptions are always async.
```
{% endtab %}
{% endtabs %}

View File

@@ -0,0 +1,152 @@
# Unsubscribing After N Messages
NATS provides a special form of unsubscribe that is configured with a message count and takes effect when that many messages are sent to a subscriber. This mechanism is very useful if only a single message is expected.
The message count you provide is the total message count for a subscriber. So if you unsubscribe with a count of 1, the server will stop sending messages to that subscription after it has received one message. If the subscriber has already received one or more messages, the unsubscribe will be immediate. This action based on history can be confusing if you try to auto unsubscribe on a long running subscription, but is logical for a new one.
> Auto unsubscribe is based on the total messages sent to a subscriber, not just the new ones.
Auto unsubscribe can also result in some tricky edge cases if a server cluster is used. The client will tell the server of the unsubscribe count when the application requests it. But if the client disconnects before the count is reached, it may have to tell another server of the remaining count. This dance between previous server notifications and new notifications on reconnect can result in unplanned behavior.
Finally, most of the client libraries also track the max message count after an auto unsubscribe request. Which means that the client will stop allowing messages to flow even if the server has miscounted due to reconnects or some other failure in the client library.
The following example shows unsubscribe after a single message:
{% tabs %}
{% tab title="Go" %}
```go
nc, err := nats.Connect("demo.nats.io")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Sync Subscription
sub, err := nc.SubscribeSync("updates")
if err != nil {
log.Fatal(err)
}
if err := sub.AutoUnsubscribe(1); err != nil {
log.Fatal(err)
}
// Async Subscription
sub, err = nc.Subscribe("updates", func(_ *nats.Msg) {})
if err != nil {
log.Fatal(err)
}
if err := sub.AutoUnsubscribe(1); err != nil {
log.Fatal(err)
}
```
{% endtab %}
{% tab title="Java" %}
```java
Connection nc = Nats.connect("nats://demo.nats.io:4222");
Dispatcher d = nc.createDispatcher((msg) -> {
String str = new String(msg.getData(), StandardCharsets.UTF_8);
System.out.println(str);
});
// Sync Subscription
Subscription sub = nc.subscribe("updates");
sub.unsubscribe(1);
// Async Subscription
d.subscribe("updates");
d.unsubscribe("updates", 1);
// Close the connection
nc.close();
```
{% endtab %}
{% tab title="JavaScript" %}
```javascript
let nc = NATS.connect({
url: "nats://demo.nats.io:4222"
});
// `max` specifies the number of messages that the server will forward.
// The server will auto-cancel.
let opts = {max: 10};
let sub = nc.subscribe(NATS.createInbox(), opts, (msg) => {
t.log(msg);
});
// another way after 10 messages
let sub2 = nc.subscribe(NATS.createInbox(), (err, msg) => {
t.log(msg.data);
});
// if the subscription already received 10 messages, the handler
// won't get any more messages
nc.unsubscribe(sub2, 10);
```
{% endtab %}
{% tab title="Python" %}
```python
nc = NATS()
await nc.connect(servers=["nats://demo.nats.io:4222"])
async def cb(msg):
print(msg)
sid = await nc.subscribe("updates", cb=cb)
await nc.auto_unsubscribe(sid, 1)
await nc.publish("updates", b'All is Well')
# Won't be received...
await nc.publish("updates", b'...')
```
{% endtab %}
{% tab title="Ruby" %}
```ruby
require 'nats/client'
require 'fiber'
NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
Fiber.new do
f = Fiber.current
nc.subscribe("time", max: 1) do |msg, reply|
f.resume Time.now
end
nc.publish("time", 'What is the time?', NATS.create_inbox)
# Use the response
msg = Fiber.yield
puts "Reply: #{msg}"
# Won't be received
nc.publish("time", 'What is the time?', NATS.create_inbox)
end.resume
end
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
// `max` specifies the number of messages that the server will forward.
// The server will auto-cancel.
let opts = {max: 10};
let sub = await nc.subscribe(createInbox(), (err, msg) => {
t.log(msg.data);
}, opts);
// another way after 10 messages
let sub2 = await nc.subscribe(createInbox(), (err, msg) => {
t.log(msg.data);
});
// if the subscription already received 10 messages, the handler
// won't get any more messages
sub2.unsubscribe(10);
```
{% endtab %}
{% endtabs %}

View File

@@ -0,0 +1,145 @@
# Unsubscribing
The client libraries provide a means to unsubscribe a previous subscription request.
This process requires an interaction with the server, so for an asynchronous subscription there may be a small window of time where a message comes through as the unsubscribe is processed by the library. Ignoring that slight edge case, the client library will clean up any outstanding messages and tell the server that the subscription is no longer used.
{% tabs %}
{% tab title="Go" %}
```go
nc, err := nats.Connect("demo.nats.io")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Sync Subscription
sub, err := nc.SubscribeSync("updates")
if err != nil {
log.Fatal(err)
}
if err := sub.Unsubscribe(); err != nil {
log.Fatal(err)
}
// Async Subscription
sub, err = nc.Subscribe("updates", func(_ *nats.Msg) {})
if err != nil {
log.Fatal(err)
}
if err := sub.Unsubscribe(); err != nil {
log.Fatal(err)
}
```
{% endtab %}
{% tab title="Java" %}
```java
Connection nc = Nats.connect("nats://demo.nats.io:4222");
Dispatcher d = nc.createDispatcher((msg) -> {
String str = new String(msg.getData(), StandardCharsets.UTF_8);
System.out.println(str);
});
// Sync Subscription
Subscription sub = nc.subscribe("updates");
sub.unsubscribe();
// Async Subscription
d.subscribe("updates");
d.unsubscribe("updates");
// Close the connection
nc.close();
```
{% endtab %}
{% tab title="JavaScript" %}
```javascript
let nc = NATS.connect({
url: "nats://demo.nats.io:4222"
});
// set up a subscription to process a request
let sub = nc.subscribe(NATS.createInbox(), (msg, reply) => {
if (msg.reply) {
nc.publish(reply, new Date().toLocaleTimeString());
}
});
// without arguments the subscription will cancel when the server receives it
// you can also specify how many messages are expected by the subscription
nc.unsubscribe(sub);
```
{% endtab %}
{% tab title="Python" %}
```python
nc = NATS()
await nc.connect(servers=["nats://demo.nats.io:4222"])
future = asyncio.Future()
async def cb(msg):
nonlocal future
future.set_result(msg)
sid = await nc.subscribe("updates", cb=cb)
await nc.publish("updates", b'All is Well')
# Remove interest in subject
await nc.unsubscribe(sid)
# Won't be received...
await nc.publish("updates", b'...')
```
{% endtab %}
{% tab title="Ruby" %}
```ruby
require 'nats/client'
require 'fiber'
NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
Fiber.new do
f = Fiber.current
sid = nc.subscribe("time") do |msg, reply|
f.resume Time.now
end
nc.publish("time", 'What is the time?', NATS.create_inbox)
# Use the response
msg = Fiber.yield
puts "Reply: #{msg}"
nc.unsubscribe(sid)
# Won't be received
nc.publish("time", 'What is the time?', NATS.create_inbox)
end.resume
end
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
// set up a subscription to process a request
let sub = await nc.subscribe(createInbox(), (err, msg) => {
if (msg.reply) {
nc.publish(msg.reply, new Date().toLocaleTimeString());
} else {
t.log('got a request for the time, but no reply subject was set.');
}
});
// without arguments the subscription will cancel when the server receives it
// you can also specify how many messages are expected by the subscription
sub.unsubscribe();
```
{% endtab %}
{% endtabs %}

View File

@@ -0,0 +1,451 @@
# Wildcard Subscriptions
There is no special code to subscribe with a wildcard subject. Wildcards are a normal part of the subject name.
However, there is a common technique that may come in to play when you use wildcards. This technique is to use the subject provided with the incoming message to determine what to do with the message.
For example, you can subscribe using `*` and then act based on the actual subject.
{% tabs %}
{% tab title="Go" %}
```go
nc, err := nats.Connect("demo.nats.io")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Use a WaitGroup to wait for 2 messages to arrive
wg := sync.WaitGroup{}
wg.Add(2)
// Subscribe
if _, err := nc.Subscribe("time.*.east", func(m *nats.Msg) {
log.Printf("%s: %s", m.Subject, m.Data)
wg.Done()
}); err != nil {
log.Fatal(err)
}
// Wait for the 2 messages to come in
wg.Wait()
```
{% endtab %}
{% tab title="Java" %}
```java
Connection nc = Nats.connect("nats://demo.nats.io:4222");
// Use a latch to wait for 2 messages to arrive
CountDownLatch latch = new CountDownLatch(2);
// Create a dispatcher and inline message handler
Dispatcher d = nc.createDispatcher((msg) -> {
String subject = msg.getSubject();
String str = new String(msg.getData(), StandardCharsets.UTF_8);
System.out.println(subject + ": " + str);
latch.countDown();
});
// Subscribe
d.subscribe("time.*.east");
// Wait for messages to come in
latch.await();
// Close the connection
nc.close();
```
{% endtab %}
{% tab title="JavaScript" %}
```javascript
let nc = NATS.connect({
url: "nats://demo.nats.io:4222"});
nc.subscribe('time.us.*', (msg, reply, subject) => {
// converting timezones correctly in node requires a library
// this doesn't take into account *many* things.
let time = "";
switch (subject) {
case 'time.us.east':
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/New_York"});
break;
case 'time.us.central':
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Chicago"});
break;
case 'time.us.mountain':
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Denver"});
break;
case 'time.us.west':
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Los_Angeles"});
break;
default:
time = "I don't know what you are talking about Willis";
}
t.log(subject, time);
});
```
{% endtab %}
{% tab title="Python" %}
```python
nc = NATS()
await nc.connect(servers=["nats://demo.nats.io:4222"])
# Use queue to wait for 2 messages to arrive
queue = asyncio.Queue()
async def cb(msg):
await queue.put_nowait(msg)
await nc.subscribe("time.*.east", cb=cb)
# Send 2 messages and wait for them to come in
await nc.publish("time.A.east", b'A')
await nc.publish("time.B.east", b'B')
msg_A = await queue.get()
msg_B = await queue.get()
print("Msg A:", msg_A)
print("Msg B:", msg_B)
```
{% endtab %}
{% tab title="Ruby" %}
```ruby
require 'nats/client'
require 'fiber'
NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
Fiber.new do
f = Fiber.current
nc.subscribe("time.*.east") do |msg, reply|
f.resume Time.now
end
nc.publish("time.A.east", "A")
nc.publish("time.B.east", "B")
# Use the response
msg_A = Fiber.yield
puts "Msg A: #{msg_A}"
msg_B = Fiber.yield
puts "Msg B: #{msg_B}"
end.resume
end
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
await nc.subscribe('time.us.*', (err, msg) => {
// converting timezones correctly in node requires a library
// this doesn't take into account *many* things.
let time = "";
switch (msg.subject) {
case 'time.us.east':
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/New_York"});
break;
case 'time.us.central':
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Chicago"});
break;
case 'time.us.mountain':
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Denver"});
break;
case 'time.us.west':
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Los_Angeles"});
break;
default:
time = "I don't know what you are talking about Willis";
}
console.log(msg.subject, time);
});
```
{% endtab %}
{% endtabs %}
or do something similar with `>`:
{% tabs %}
{% tab title="Go" %}
```go
nc, err := nats.Connect("demo.nats.io")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Use a WaitGroup to wait for 4 messages to arrive
wg := sync.WaitGroup{}
wg.Add(4)
// Subscribe
if _, err := nc.Subscribe("time.>", func(m *nats.Msg) {
log.Printf("%s: %s", m.Subject, m.Data)
wg.Done()
}); err != nil {
log.Fatal(err)
}
// Wait for the 4 messages to come in
wg.Wait()
// Close the connection
nc.Close()
```
{% endtab %}
{% tab title="Java" %}
```java
Connection nc = Nats.connect("nats://demo.nats.io:4222");
// Use a latch to wait for 4 messages to arrive
CountDownLatch latch = new CountDownLatch(4);
// Create a dispatcher and inline message handler
Dispatcher d = nc.createDispatcher((msg) -> {
String subject = msg.getSubject();
String str = new String(msg.getData(), StandardCharsets.UTF_8);
System.out.println(subject + ": " + str);
latch.countDown();
});
// Subscribe
d.subscribe("time.>");
// Wait for messages to come in
latch.await();
// Close the connection
nc.close();
```
{% endtab %}
{% tab title="JavaScript" %}
```javascript
let nc = NATS.connect({
url: "nats://demo.nats.io:4222"});
nc.subscribe('time.>', (msg, reply, subject) => {
// converting timezones correctly in node requires a library
// this doesn't take into account *many* things.
let time = "";
switch (subject) {
case 'time.us.east':
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/New_York"});
break;
case 'time.us.central':
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Chicago"});
break;
case 'time.us.mountain':
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Denver"});
break;
case 'time.us.west':
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Los_Angeles"});
break;
default:
time = "I don't know what you are talking about Willis";
}
t.log(subject, time);
});
```
{% endtab %}
{% tab title="Python" %}
```python
nc = NATS()
await nc.connect(servers=["nats://demo.nats.io:4222"])
# Use queue to wait for 4 messages to arrive
queue = asyncio.Queue()
async def cb(msg):
await queue.put(msg)
await nc.subscribe("time.>", cb=cb)
# Send 2 messages and wait for them to come in
await nc.publish("time.A.east", b'A')
await nc.publish("time.B.east", b'B')
await nc.publish("time.C.west", b'C')
await nc.publish("time.D.west", b'D')
for i in range(0, 4):
msg = await queue.get()
print("Msg:", msg)
await nc.close()
```
{% endtab %}
{% tab title="Ruby" %}
```ruby
require 'nats/client'
require 'fiber'
NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
Fiber.new do
f = Fiber.current
nc.subscribe("time.>") do |msg, reply|
f.resume Time.now.to_f
end
nc.publish("time.A.east", "A")
nc.publish("time.B.east", "B")
nc.publish("time.C.west", "C")
nc.publish("time.D.west", "D")
# Use the response
4.times do
msg = Fiber.yield
puts "Msg: #{msg}"
end
end.resume
end
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
await nc.subscribe('time.>', (err, msg) => {
// converting timezones correctly in node requires a library
// this doesn't take into account *many* things.
let time = "";
switch (msg.subject) {
case 'time.us.east':
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/New_York"});
break;
case 'time.us.central':
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Chicago"});
break;
case 'time.us.mountain':
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Denver"});
break;
case 'time.us.west':
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Los_Angeles"});
break;
default:
time = "I don't know what you are talking about Willis";
}
t.log(msg.subject, time);
});
```
{% endtab %}
{% endtabs %}
The following example can be used to test these two subscribers. The `*` subscriber should receive at most 2 messages, while the `>` subscriber receives 4. More importantly the `time.*.east` subscriber won't receive on `time.us.east.atlanta` because that won't match.
{% tabs %}
{% tab title="Go" %}
```go
nc, err := nats.Connect("demo.nats.io")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
zoneID, err := time.LoadLocation("America/New_York")
if err != nil {
log.Fatal(err)
}
now := time.Now()
zoneDateTime := now.In(zoneID)
formatted := zoneDateTime.String()
nc.Publish("time.us.east", []byte(formatted))
nc.Publish("time.us.east.atlanta", []byte(formatted))
zoneID, err = time.LoadLocation("Europe/Warsaw")
if err != nil {
log.Fatal(err)
}
zoneDateTime = now.In(zoneID)
formatted = zoneDateTime.String()
nc.Publish("time.eu.east", []byte(formatted))
nc.Publish("time.eu.east.warsaw", []byte(formatted))
```
{% endtab %}
{% tab title="Java" %}
```java
Connection nc = Nats.connect("nats://demo.nats.io:4222");
ZoneId zoneId = ZoneId.of("America/New_York");
ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(Instant.now(), zoneId);
String formatted = zonedDateTime.format(DateTimeFormatter.ISO_ZONED_DATE_TIME);
nc.publish("time.us.east", formatted.getBytes(StandardCharsets.UTF_8));
nc.publish("time.us.east.atlanta", formatted.getBytes(StandardCharsets.UTF_8));
zoneId = ZoneId.of("Europe/Warsaw");
zonedDateTime = ZonedDateTime.ofInstant(Instant.now(), zoneId);
formatted = zonedDateTime.format(DateTimeFormatter.ISO_ZONED_DATE_TIME);
nc.publish("time.eu.east", formatted.getBytes(StandardCharsets.UTF_8));
nc.publish("time.eu.east.warsaw", formatted.getBytes(StandardCharsets.UTF_8));
nc.flush(Duration.ZERO);
nc.close();
```
{% endtab %}
{% tab title="JavaScript" %}
```javascript
nc.publish('time.us.east');
nc.publish('time.us.central');
nc.publish('time.us.mountain');
nc.publish('time.us.west');
```
{% endtab %}
{% tab title="Python" %}
```python
nc = NATS()
await nc.connect(servers=["nats://demo.nats.io:4222"])
await nc.publish("time.us.east", b'...')
await nc.publish("time.us.east.atlanta", b'...')
await nc.publish("time.eu.east", b'...')
await nc.publish("time.eu.east.warsaw", b'...')
await nc.close()
```
{% endtab %}
{% tab title="Ruby" %}
```ruby
NATS.start do |nc|
nc.publish("time.us.east", '...')
nc.publish("time.us.east.atlanta", '...')
nc.publish("time.eu.east", '...')
nc.publish("time.eu.east.warsaw", '...')
nc.drain
end
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
nc.publish('time.us.east');
nc.publish('time.us.central');
nc.publish('time.us.mountain');
nc.publish('time.us.west');
```
{% endtab %}
{% endtabs %}