1
0
mirror of https://github.com/taigrr/nats.docs synced 2025-01-18 04:03:23 -08:00
2020-08-26 09:29:26 -05:00

158 lines
3.6 KiB
Markdown

# Asynchronous Subscriptions
Asynchronous subscriptions use callbacks of some form to notify an application when a message arrives. These subscriptions are usually easier to work with, but do represent some form of internal work and resource usage, i.e. threads, by the library. Check your library's documentation for any resource usage associated with asynchronous subscriptions.
_**Note: For a given subscription, messages are dispatched serially, one message at a time. If your application does not care about processing ordering and would prefer the messages to be dispatched concurrently, it is the application's responsibility to move them to some internal queue to be picked up by threads/go routines.**_
The following example subscribes to the subject `updates` and handles the incoming messages:
{% tabs %}
{% tab title="Go" %}
```go
nc, err := nats.Connect("demo.nats.io")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Use a WaitGroup to wait for a message to arrive
wg := sync.WaitGroup{}
wg.Add(1)
// Subscribe
if _, err := nc.Subscribe("updates", func(m *nats.Msg) {
wg.Done()
}); err != nil {
log.Fatal(err)
}
// Wait for a message to come in
wg.Wait()
```
{% endtab %}
{% tab title="Java" %}
```java
Connection nc = Nats.connect("nats://demo.nats.io:4222");
// Use a latch to wait for a message to arrive
CountDownLatch latch = new CountDownLatch(1);
// Create a dispatcher and inline message handler
Dispatcher d = nc.createDispatcher((msg) -> {
String str = new String(msg.getData(), StandardCharsets.UTF_8);
System.out.println(str);
latch.countDown();
});
// Subscribe
d.subscribe("updates");
// Wait for a message to come in
latch.await();
// Close the connection
nc.close();
```
{% endtab %}
{% tab title="JavaScript" %}
```javascript
let nc = NATS.connect({
url: "nats://demo.nats.io:4222"
});
nc.subscribe("updates", (msg) => {
t.log(msg);
});
```
{% endtab %}
{% tab title="Python" %}
```python
nc = NATS()
await nc.connect(servers=["nats://demo.nats.io:4222"])
future = asyncio.Future()
async def cb(msg):
nonlocal future
future.set_result(msg)
await nc.subscribe("updates", cb=cb)
await nc.publish("updates", b'All is Well')
await nc.flush()
# Wait for message to come in
msg = await asyncio.wait_for(future, 1)
```
{% endtab %}
{% tab title="Ruby" %}
```ruby
require 'nats/client'
NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
nc.subscribe("updates") do |msg|
puts msg
nc.close
end
nc.publish("updates", "All is Well")
end
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
nc.subscribe("updates", (err, msg) => {
if(err) {
console.log('error', err);
} else {
t.log(msg.data);
}
});
```
{% endtab %}
{% tab title="C" %}
```c
static void
onMsg(natsConnection *conn, natsSubscription *sub, natsMsg *msg, void *closure)
{
printf("Received msg: %s - %.*s\n",
natsMsg_GetSubject(msg),
natsMsg_GetDataLength(msg),
natsMsg_GetData(msg));
// Need to destroy the message!
natsMsg_Destroy(msg);
}
(...)
natsConnection *conn = NULL;
natsSubscription *sub = NULL;
natsStatus s;
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
if (s == NATS_OK)
{
// Creates an asynchronous subscription on subject "foo".
// When a message is sent on subject "foo", the callback
// onMsg() will be invoked by the client library.
// You can pass a closure as the last argument.
s = natsConnection_Subscribe(&sub, conn, "foo", onMsg, NULL);
}
(...)
// Destroy objects that were created
natsSubscription_Destroy(sub);
natsConnection_Destroy(conn);
```
{% endtab %}
{% endtabs %}