1
0
mirror of https://github.com/taigrr/nats.docs synced 2025-01-18 04:03:23 -08:00

Add C Client examples

This commit is contained in:
ainsley
2020-08-26 09:29:26 -05:00
parent 2c9b2f91fc
commit 4382b309d9
34 changed files with 1165 additions and 0 deletions

View File

@@ -114,5 +114,44 @@ nc.subscribe("updates", (err, msg) => {
});
```
{% endtab %}
{% tab title="C" %}
```c
static void
onMsg(natsConnection *conn, natsSubscription *sub, natsMsg *msg, void *closure)
{
printf("Received msg: %s - %.*s\n",
natsMsg_GetSubject(msg),
natsMsg_GetDataLength(msg),
natsMsg_GetData(msg));
// Need to destroy the message!
natsMsg_Destroy(msg);
}
(...)
natsConnection *conn = NULL;
natsSubscription *sub = NULL;
natsStatus s;
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
if (s == NATS_OK)
{
// Creates an asynchronous subscription on subject "foo".
// When a message is sent on subject "foo", the callback
// onMsg() will be invoked by the client library.
// You can pass a closure as the last argument.
s = natsConnection_Subscribe(&sub, conn, "foo", onMsg, NULL);
}
(...)
// Destroy objects that were created
natsSubscription_Destroy(sub);
natsConnection_Destroy(conn);
```
{% endtab %}
{% endtabs %}

View File

@@ -202,6 +202,73 @@ await nc.drain();
nc.close();
```
{% endtab %}
{% tab title="C" %}
```c
static void
onMsg(natsConnection *conn, natsSubscription *sub, natsMsg *msg, void *closure)
{
printf("Received msg: %s - %.*s\n",
natsMsg_GetSubject(msg),
natsMsg_GetDataLength(msg),
natsMsg_GetData(msg));
// Add some delay while processing
nats_Sleep(200);
// Need to destroy the message!
natsMsg_Destroy(msg);
}
static void
closeHandler(natsConnection *conn, void *closure)
{
cond_variable cv = (cond_variable) closure;
notify_cond_variable(cv);
}
(...)
natsConnection *conn = NULL;
natsOptions *opts = NULL;
natsSubscription *sub = NULL;
natsStatus s = NATS_OK;
cond_variable cv = new_cond_variable(); // some fictuous way to notify between threads.
s = natsOptions_Create(&opts);
if (s == NATS_OK)
// Setup a close handler and pass a reference to our condition variable.
s = natsOptions_SetClosedCB(opts, closeHandler, (void*) cv);
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
// Subscribe
if (s == NATS_OK)
s = natsConnection_Subscribe(&sub, conn, "foo", onMsg, NULL);
// Publish a message
if (s == NATS_OK)
s = natsConnection_PublishString(conn, "foo", "hello");
// Drain the connection, which will close it when done.
if (s == NATS_OK)
s = natsConnection_Drain(conn);
// Wait for the connection to be closed
if (s == NATS_OK)
cond_variable_wait(cv);
(...)
// Destroy objects that were created
natsSubscription_Destroy(sub);
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);
```
{% endtab %}
{% endtabs %}
The mechanics of drain for a subscription are simpler:
@@ -366,6 +433,41 @@ let sub = await nc.subscribe('updates', (err, msg) => {
}, {queue: "workers"});
```
{% endtab %}
{% tab title="C" %}
```c
natsConnection *conn = NULL;
natsSubscription *sub = NULL;
natsStatus s = NATS_OK;
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
// Subscribe
if (s == NATS_OK)
s = natsConnection_Subscribe(&sub, conn, "foo", onMsg, NULL);
// Publish 2 messages
if (s == NATS_OK)
{
int i;
for (i=0; (s == NATS_OK) && (i<2); i++)
{
s = natsConnection_PublishString(conn, "foo", "hello");
}
}
// Call Drain on the subscription. It unsubscribes but
// wait for all pending messages to be processed.
if (s == NATS_OK)
s = natsSubscription_Drain(sub);
(...)
// Destroy objects that were created
natsSubscription_Destroy(sub);
natsConnection_Destroy(conn);
```
{% endtab %}
{% endtabs %}
Because draining can involve messages flowing to the server, for a flush and asynchronous message processing, the timeout for drain should generally be higher than the timeout for a simple message request/reply or similar.

View File

@@ -119,6 +119,42 @@ await nc.subscribe('updates', (err, msg) => {
}, {queue: "workers"});
```
{% endtab %}
{% tab title="C" %}
```c
static void
onMsg(natsConnection *conn, natsSubscription *sub, natsMsg *msg, void *closure)
{
printf("Received msg: %s - %.*s\n",
natsMsg_GetSubject(msg),
natsMsg_GetDataLength(msg),
natsMsg_GetData(msg));
// Need to destroy the message!
natsMsg_Destroy(msg);
}
(...)
natsConnection *conn = NULL;
natsSubscription *sub = NULL;
natsStatus s;
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
// Create a queue subscription on "updates" with queue name "workers"
if (s == NATS_OK)
s = natsConnection_QueueSubscribe(&sub, conn, "updates", "workers", onMsg, NULL);
(...)
// Destroy objects that were created
natsSubscription_Destroy(sub);
natsConnection_Destroy(conn);
```
{% endtab %}
{% endtabs %}
If you run this example with the publish examples that send to `updates`, you will see that one of the instances gets a message while the others you run won't. But the instance that receives the message will change.

View File

@@ -134,5 +134,43 @@ await nc.subscribe('time', (err, msg) => {
});
```
{% endtab %}
{% tab title="C" %}
```c
natsConnection *conn = NULL;
natsSubscription *sub = NULL;
natsMsg *msg = NULL;
natsStatus s = NATS_OK;
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
// Subscribe
if (s == NATS_OK)
s = natsConnection_SubscribeSync(&sub, conn, "time");
// Wait for messages
if (s == NATS_OK)
s = natsSubscription_NextMsg(&msg, sub, 10000);
if (s == NATS_OK)
{
char buf[64];
snprintf(buf, sizeof(buf), "%lld", nats_Now());
// Send the time as a response
s = natsConnection_Publish(conn, natsMsg_GetReply(msg), buf, (int) strlen(buf));
// Destroy message that was received
natsMsg_Destroy(msg);
}
(...)
// Destroy objects that were created
natsSubscription_Destroy(sub);
natsConnection_Destroy(conn);
```
{% endtab %}
{% endtabs %}

View File

@@ -168,5 +168,11 @@ nc.subscribe('updates', (err, msg) => {
});
```
{% endtab %}
{% tab title="C" %}
```c
// Structured data is not configurable in C NATS Client.
```
{% endtab %}
{% endtabs %}

View File

@@ -71,5 +71,41 @@ nc.close();
/ Typescript NATS subscriptions are always async.
```
{% endtab %}
{% tab title="C" %}
```c
natsConnection *conn = NULL;
natsSubscription *sub = NULL;
natsMsg *msg = NULL;
natsStatus s = NATS_OK;
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
// Subscribe
if (s == NATS_OK)
s = natsConnection_SubscribeSync(&sub, conn, "updates");
// Wait for messages
if (s == NATS_OK)
s = natsSubscription_NextMsg(&msg, sub, 10000);
if (s == NATS_OK)
{
printf("Received msg: %s - %.*s\n",
natsMsg_GetSubject(msg),
natsMsg_GetDataLength(msg),
natsMsg_GetData(msg));
// Destroy message that was received
natsMsg_Destroy(msg);
}
(...)
// Destroy objects that were created
natsSubscription_Destroy(sub);
natsConnection_Destroy(conn);
```
{% endtab %}
{% endtabs %}

View File

@@ -143,5 +143,45 @@ let sub2 = await nc.subscribe(createInbox(), (err, msg) => {
sub2.unsubscribe(10);
```
{% endtab %}
{% tab title="C" %}
```c
natsConnection *conn = NULL;
natsSubscription *sub = NULL;
natsMsg *msg = NULL;
natsStatus s = NATS_OK;
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
// Subscribe
if (s == NATS_OK)
s = natsConnection_SubscribeSync(&sub, conn, "updates");
// Unsubscribe after 1 message is received
if (s == NATS_OK)
s = natsSubscription_AutoUnsubscribe(sub, 1);
// Wait for messages
if (s == NATS_OK)
s = natsSubscription_NextMsg(&msg, sub, 10000);
if (s == NATS_OK)
{
printf("Received msg: %s - %.*s\n",
natsMsg_GetSubject(msg),
natsMsg_GetDataLength(msg),
natsMsg_GetData(msg));
// Destroy message that was received
natsMsg_Destroy(msg);
}
(...)
// Destroy objects that were created
natsSubscription_Destroy(sub);
natsConnection_Destroy(conn);
```
{% endtab %}
{% endtabs %}

View File

@@ -140,5 +140,29 @@ let sub = await nc.subscribe(createInbox(), (err, msg) => {
sub.unsubscribe();
```
{% endtab %}
{% tab title="C" %}
```c
natsConnection *conn = NULL;
natsSubscription *sub = NULL;
natsStatus s = NATS_OK;
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
// Subscribe
if (s == NATS_OK)
s = natsConnection_SubscribeSync(&sub, conn, "updates");
// Unsubscribe
if (s == NATS_OK)
s = natsSubscription_Unsubscribe(sub);
(...)
// Destroy objects that were created
natsSubscription_Destroy(sub);
natsConnection_Destroy(conn);
```
{% endtab %}
{% endtabs %}

View File

@@ -165,6 +165,40 @@ await nc.subscribe('time.us.*', (err, msg) => {
});
```
{% endtab %}
{% tab title="C" %}
```c
static void
onMsg(natsConnection *conn, natsSubscription *sub, natsMsg *msg, void *closure)
{
printf("Received msg: %s - %.*s\n",
natsMsg_GetSubject(msg),
natsMsg_GetDataLength(msg),
natsMsg_GetData(msg));
// Need to destroy the message!
natsMsg_Destroy(msg);
}
(...)
natsConnection *conn = NULL;
natsSubscription *sub = NULL;
natsStatus s;
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
if (s == NATS_OK)
s = natsConnection_Subscribe(&sub, conn, "time.*.east", onMsg, NULL);
(...)
// Destroy objects that were created
natsSubscription_Destroy(sub);
natsConnection_Destroy(conn);
```
{% endtab %}
{% endtabs %}
or do something similar with `>`:
@@ -335,6 +369,40 @@ await nc.subscribe('time.>', (err, msg) => {
});
```
{% endtab %}
{% tab title="C" %}
```c
static void
onMsg(natsConnection *conn, natsSubscription *sub, natsMsg *msg, void *closure)
{
printf("Received msg: %s - %.*s\n",
natsMsg_GetSubject(msg),
natsMsg_GetDataLength(msg),
natsMsg_GetData(msg));
// Need to destroy the message!
natsMsg_Destroy(msg);
}
(...)
natsConnection *conn = NULL;
natsSubscription *sub = NULL;
natsStatus s;
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
if (s == NATS_OK)
s = natsConnection_Subscribe(&sub, conn, "time.>", onMsg, NULL);
(...)
// Destroy objects that were created
natsSubscription_Destroy(sub);
natsConnection_Destroy(conn);
```
{% endtab %}
{% endtabs %}
The following example can be used to test these two subscribers. The `*` subscriber should receive at most 2 messages, while the `>` subscriber receives 4. More importantly the `time.*.east` subscriber won't receive on `time.us.east.atlanta` because that won't match.