From 230970f41e8b214456f9d9f0b2f016b73218e4cc Mon Sep 17 00:00:00 2001
From: Stephen Asbury
+graph nats {
+ graph [splines=ortho, nodesep=1];
+
+ publisher [shape="record", label="{Application 1 |
+digraph nats_pub_sub {
+ rankdir=LR
+ publisher [shape=box, style="rounded", label="Publisher"];
+ subject [shape=circle, label="Subject"];
+ sub1 [shape=box, style="rounded", label="Subscriber"];
+ sub2 [shape=box, style="rounded", label="Subscriber"];
+ sub3 [shape=box, style="rounded", label="Subscriber"];
+
+ publisher -> subject [label="msg1"];
+ subject -> sub1 [label="msg1"];
+ subject -> sub2 [label="msg1"];
+ subject -> sub3 [label="msg1"];
+}
+
+digraph nats_request_reply {
+ rankdir=LR
+
+ subgraph {
+ publisher [shape=box, style="rounded", label="Publisher"];
+ }
+
+ subgraph {
+ subject [shape=circle, label="Subject"];
+ reply [shape=circle, label="Reply"];
+ {rank = same subject reply}
+ }
+
+ subgraph {
+ sub1 [shape=box, style="rounded", label="Subscriber"];
+ sub2 [shape=box, style="rounded", label="Subscriber"];
+ sub3 [shape=box, style="rounded", label="Subscriber"];
+ }
+
+ publisher -> subject [label="msg1"];
+ publisher -> reply [style="invis", weight=2];
+ reply -> sub3 [style="invis", weight=2];
+ subject -> sub1 [label="msg1", style="dotted"];
+ subject -> sub2 [label="msg1", style="dotted"];
+ subject -> sub3 [label="msg1"];
+ sub3 -> reply;
+ reply -> publisher;
+}
+
+digraph nats_queues {
+ rankdir=LR
+ publisher [shape=box, style="rounded", label="Publisher"];
+ subject [shape=circle, label="Queue"];
+ sub1 [shape=box, style="rounded", label="Subscriber"];
+ sub2 [shape=box, style="rounded", label="Subscriber"];
+ sub3 [shape=box, style="rounded", label="Subscriber"];
+
+ publisher -> subject [label="msgs 1,2,3"];
+ subject -> sub1 [label="msg 2"];
+ subject -> sub2 [label="msg 1"];
+ subject -> sub3 [label="msg 3"];
+}
+
+ nc, err := nats.Connect(nats.DefaultURL)
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+ let nc = NATS.connect();
+nc.on('connect', (c) => {
+ // Do something with the connection
+ doSomething();
+ // When done close it
+ nc.close();
+});
+nc.on('error', (err) => {
+ failed(err);
+});
+
+ require 'nats/client'
+
+NATS.start do |nc|
+ # Do something with the connection
+
+ # Close the connection
+ nc.close
+end
+
+ // will throw an exception if connection fails
+let nc = await connect();
+// Do something with the connection
+
+// When done close it
+nc.close();
+
+
+// alternatively, you can use the Promise pattern
+let nc1: Client;
+connect()
+ .then((c) => {
+ nc1 = c;
+ // Do something with the connection
+ nc1.close();
+ });
+ // add a .catch/.finally
+
+ servers := []string{"nats://localhost:1222",
+ "nats://localhost:1223",
+ "nats://localhost:1224",
+}
+
+nc, err := nats.Connect(strings.Join(servers, ","))
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+ Options options = new Options.Builder().
+ server("nats://localhost:1222").
+ server("nats://localhost:1223").
+ server("nats://localhost:1224").
+ build();
+Connection nc = Nats.connect(options);
+
+// Do something with the connection
+
+nc.close();
+
+ let nc = NATS.connect({
+ servers: [
+ "nats://demo.nats.io:4222",
+ "nats://localhost:4222"
+ ]}
+);
+
+nc.on('connect', (c) => {
+ // Do something with the connection
+ doSomething();
+ // When done close it
+ nc.close();
+});
+nc.on('error', (err) => {
+ failed(err);
+});
+
+ nc = NATS()
+await nc.connect(servers=[
+ "nats://127.0.0.1:1222",
+ "nats://127.0.0.1:1223",
+ "nats://127.0.0.1:1224"
+ ])
+
+# Do something with the connection
+
+await nc.close()
+
+
+ // Set a connection name
+nc, err := nats.Connect("demo.nats.io", nats.Name("my-connection"))
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+ Options options = new Options.Builder().
+ server("nats://demo.nats.io:4222").
+ connectionName("my-connection"). // Set a connection name
+ build();
+Connection nc = Nats.connect(options);
+
+// Do something with the connection
+
+nc.close();
+
+ nc = NATS()
+
+await nc.connect(servers=["nats://demo.nats.io:4222"], name="my-connection")
+
+# Do something with the connection.
+
+
+ nc, err := nats.Connect(nats.DefaultURL, nats.Timeout(10*time.Second))
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+ Options options = new Options.Builder().
+ server("nats://demo.nats.io:4222").
+ connectionTimeout(Duration.ofSeconds(10)). // Set timeout
+ build();
+Connection nc = Nats.connect(options);
+
+// Do something with the connection
+
+nc.close();
+
+ nc = NATS()
+await nc.connect(connect_timeout=2)
+
+# Do something with the connection
+
+await nc.close()
+
+
+ # There is currently no connect timeout as part of the Ruby NATS client API, but you can use a timer to mimic it.
+require 'nats/client'
+
+timer = EM.add_timer(5) do
+ NATS.connect do |nc|
+ # Do something with the connection
+
+ # Close the connection
+ nc.close
+ end
+end
+EM.cancel_timer(timer)
+
+ opts := nats.GetDefaultOptions()
+opts.Url = "demo.nats.io"
+// Turn on Pedantic
+opts.Pedantic = true
+nc, err := opts.Connect()
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+ Options options = new Options.Builder().
+ server("nats://demo.nats.io:4222").
+ pedantic(). // Turn on pedantic
+ build();
+Connection nc = Nats.connect(options);
+
+// Do something with the connection
+
+nc.close();
+
+ nc = NATS()
+
+await nc.connect(servers=["nats://demo.nats.io:4222"], pedantic=True)
+
+# Do something with the connection.
+
+
+ nc, err := nats.Connect("demo.nats.io")
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+getStatusTxt := func(nc *nats.Conn) string {
+ switch nc.Status() {
+ case nats.CONNECTED:
+ return "Connected"
+ case nats.CLOSED:
+ return "Closed"
+ default:
+ return "Other"
+ }
+}
+log.Printf("The connection is %v\n", getStatusTxt(nc))
+
+nc.Close()
+
+log.Printf("The connection is %v\n", getStatusTxt(nc))
+
+
+ Connection nc = Nats.connect("nats://demo.nats.io:4222");
+
+System.out.println("The Connection is: " + nc.getStatus());
+
+nc.close();
+
+System.out.println("The Connection is: " + nc.getStatus());
+
+ let nc = NATS.connect("nats://demo.nats.io:4222");
+
+// on node you *must* register an error listener. If not registered
+// the library emits an 'error' event, the node process will exit.
+nc.on('error', (err) => {
+ t.log('client got an error:', err);
+});
+
+if(nc.closed) {
+ t.log('client is closed');
+} else {
+ t.log('client is not closed');
+}
+
+ nc = NATS()
+
+await nc.connect(
+ servers=["nats://demo.nats.io:4222"],
+ )
+
+# Do something with the connection.
+
+print("The connection is connected?", nc.is_connected)
+
+while True:
+ if nc.is_reconnecting:
+ print("Reconnecting to NATS...")
+ break
+ await asyncio.sleep(1)
+
+await nc.close()
+
+print("The connection is closed?", nc.is_closed)
+
+
+ NATS.start(max_reconnect_attempts: 2) do |nc|
+ puts "Connect is connected?: #{nc.connected?}"
+
+ timer = EM.add_periodic_timer(1) do
+ if nc.closing?
+ puts "Connection closed..."
+ EM.cancel_timer(timer)
+ NATS.stop
+ end
+
+ if nc.reconnecting?
+ puts "Reconnecting to NATS..."
+ next
+ end
+ end
+end
+
+
+ nc, err := nats.Connect("localhost",
+ nats.ClientCert("resources/certs/cert.pem", "resources/certs/key.pem"),
+ nats.RootCAs("resources/certs/ca.pem"))
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+ class SSLUtils {
+ public static String KEYSTORE_PATH = "src/main/resources/keystore.jks";
+ public static String TRUSTSTORE_PATH = "src/main/resources/cacerts";
+ public static String STORE_PASSWORD = "password";
+ public static String KEY_PASSWORD = "password";
+ public static String ALGORITHM = "SunX509";
+
+ public static KeyStore loadKeystore(String path) throws Exception {
+ KeyStore store = KeyStore.getInstance("JKS");
+ BufferedInputStream in = new BufferedInputStream(new FileInputStream(path));
+
+ try {
+ store.load(in, STORE_PASSWORD.toCharArray());
+ } finally {
+ if (in != null) {
+ in.close();
+ }
+ }
+
+ return store;
+ }
+
+ public static KeyManager[] createTestKeyManagers() throws Exception {
+ KeyStore store = loadKeystore(KEYSTORE_PATH);
+ KeyManagerFactory factory = KeyManagerFactory.getInstance(ALGORITHM);
+ factory.init(store, KEY_PASSWORD.toCharArray());
+ return factory.getKeyManagers();
+ }
+
+ public static TrustManager[] createTestTrustManagers() throws Exception {
+ KeyStore store = loadKeystore(TRUSTSTORE_PATH);
+ TrustManagerFactory factory = TrustManagerFactory.getInstance(ALGORITHM);
+ factory.init(store);
+ return factory.getTrustManagers();
+ }
+
+ public static SSLContext createSSLContext() throws Exception {
+ SSLContext ctx = SSLContext.getInstance(Options.DEFAULT_SSL_PROTOCOL);
+ ctx.init(createTestKeyManagers(), createTestTrustManagers(), new SecureRandom());
+ return ctx;
+ }
+}
+
+public class ConnectTLS {
+ public static void main(String[] args) {
+
+ try {
+ SSLContext ctx = SSLUtils.createSSLContext();
+ Options options = new Options.Builder().
+ server("nats://localhost:4222").
+ sslContext(ctx). // Set the SSL context
+ build();
+ Connection nc = Nats.connect(options);
+
+ // Do something with the connection
+
+ nc.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
+
+ let caCert = fs.readFileSync(caCertPath);
+let clientCert = fs.readFileSync(clientCertPath);
+let clientKey = fs.readFileSync(clientKeyPath);
+let nc = NATS.connect({
+ url: url,
+ tls: {
+ ca: [caCert],
+ key: [clientKey],
+ cert: [clientCert]
+ }
+});
+
+ nc = NATS()
+
+ssl_ctx = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
+ssl_ctx.load_verify_locations('ca.pem')
+ssl_ctx.load_cert_chain(certfile='client-cert.pem',
+ keyfile='client-key.pem')
+await nc.connect(io_loop=loop, tls=ssl_ctx)
+
+await nc.connect(servers=["nats://demo.nats.io:4222"], tls=ssl_ctx)
+
+# Do something with the connection.
+
+
+ EM.run do
+
+ options = {
+ :servers => [
+ 'nats://localhost:4222',
+ ],
+ :tls => {
+ :private_key_file => './spec/configs/certs/key.pem',
+ :cert_chain_file => './spec/configs/certs/server.pem'
+ }
+ }
+
+ NATS.connect(options) do |nc|
+ puts "#{Time.now.to_f} - Connected to NATS at #{nc.connected_server}"
+
+ nc.subscribe("hello") do |msg|
+ puts "#{Time.now.to_f} - Received: #{msg}"
+ end
+
+ nc.flush do
+ nc.publish("hello", "world")
+ end
+
+ EM.add_periodic_timer(0.1) do
+ next unless nc.connected?
+ nc.publish("hello", "hello")
+ end
+
+ # Set default callbacks
+ nc.on_error do |e|
+ puts "#{Time.now.to_f } - Error: #{e}"
+ end
+
+ nc.on_disconnect do |reason|
+ puts "#{Time.now.to_f} - Disconnected: #{reason}"
+ end
+
+ nc.on_reconnect do |nc|
+ puts "#{Time.now.to_f} - Reconnected to NATS server at #{nc.connected_server}"
+ end
+
+ nc.on_close do
+ puts "#{Time.now.to_f} - Connection to NATS closed"
+ EM.stop
+ end
+ end
+end
+
+ nc, err := nats.Connect("localhost",
+ nats.Secure(),
+ nats.RootCAs("resources/certs/ca.pem")) // May need this if server is using self-signed certificate
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+ class SSLUtils2 {
+ public static String KEYSTORE_PATH = "src/main/resources/keystore.jks";
+ public static String TRUSTSTORE_PATH = "src/main/resources/cacerts";
+ public static String STORE_PASSWORD = "password";
+ public static String KEY_PASSWORD = "password";
+ public static String ALGORITHM = "SunX509";
+
+ public static KeyStore loadKeystore(String path) throws Exception {
+ KeyStore store = KeyStore.getInstance("JKS");
+ BufferedInputStream in = new BufferedInputStream(new FileInputStream(path));
+
+ try {
+ store.load(in, STORE_PASSWORD.toCharArray());
+ } finally {
+ if (in != null) {
+ in.close();
+ }
+ }
+
+ return store;
+ }
+
+ public static KeyManager[] createTestKeyManagers() throws Exception {
+ KeyStore store = loadKeystore(KEYSTORE_PATH);
+ KeyManagerFactory factory = KeyManagerFactory.getInstance(ALGORITHM);
+ factory.init(store, KEY_PASSWORD.toCharArray());
+ return factory.getKeyManagers();
+ }
+
+ public static TrustManager[] createTestTrustManagers() throws Exception {
+ KeyStore store = loadKeystore(TRUSTSTORE_PATH);
+ TrustManagerFactory factory = TrustManagerFactory.getInstance(ALGORITHM);
+ factory.init(store);
+ return factory.getTrustManagers();
+ }
+
+ public static SSLContext createSSLContext() throws Exception {
+ SSLContext ctx = SSLContext.getInstance(Options.DEFAULT_SSL_PROTOCOL);
+ ctx.init(createTestKeyManagers(), createTestTrustManagers(), new SecureRandom());
+ return ctx;
+ }
+}
+
+public class ConnectTLSURL {
+ public static void main(String[] args) {
+
+ try {
+ SSLContext.setDefault(SSLUtils2.createSSLContext()); // Set the default context
+ Options options = new Options.Builder().
+ server("tls://localhost:4222"). // Use the TLS protocol
+ build();
+ Connection nc = Nats.connect(options);
+
+ // Do something with the connection
+
+ nc.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
+
+ import asyncio
+import ssl
+import certifi
+from nats.aio.client import Client as NATS
+from nats.aio.errors import ErrTimeout
+
+async def run(loop):
+ nc = NATS()
+
+ # If using Python 3.7 in OS X and getting SSL errors, run first:
+ #
+ # /Applications/Python\ 3.7/Install\ Certificates.command
+ #
+ # Setting the tls as the scheme will use same defaults as `ssl.create_default_context()`
+ #
+ await nc.connect("tls://demo.nats.io:4443", loop=loop)
+
+ async def message_handler(msg):
+ subject = msg.subject
+ reply = msg.reply
+ data = msg.data.decode()
+ print("Received a message on '{subject} {reply}': {data}".format(
+ subject=subject, reply=reply, data=data))
+
+ # Simple publisher and async subscriber via coroutine.
+ sid = await nc.subscribe("foo", cb=message_handler)
+ await nc.flush()
+
+ # Stop receiving after 2 messages.
+ await nc.auto_unsubscribe(sid, 2)
+ await nc.publish("foo", b'Hello')
+ await nc.publish("foo", b'World')
+ await nc.publish("foo", b'!!!!!')
+ await asyncio.sleep(1, loop=loop)
+ await nc.close()
+
+ // Set a token
+nc, err := nats.Connect("localhost", nats.Token("mytoken"))
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+ Options options = new Options.Builder().
+ server("nats://localhost:4222").
+ token("mytoken"). // Set a token
+ build();
+Connection nc = Nats.connect(options);
+
+// Do something with the connection
+
+nc.close();
+
+ // Token in URL
+nc, err := nats.Connect("mytoken@localhost")
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+ Connection nc = Nats.connect("nats://mytoken@localhost:4222");//Token in URL
+
+// Do something with the connection
+
+nc.close();
+
+ // If connecting to the default port, the URL can be simplified
+// to just the hostname/IP.
+// That is, the connect below is equivalent to:
+// nats.Connect("nats://demo.nats.io:4222")
+nc, err := nats.Connect("demo.nats.io")
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+ Connection nc = Nats.connect("nats://demo.nats.io:4222");
+
+// Do something with the connection
+
+nc.close();
+
+ let nc = NATS.connect("nats://demo.nats.io:4222");
+nc.on('connect', (c) => {
+ // Do something with the connection
+ doSomething();
+ // When done close it
+ nc.close();
+});
+nc.on('error', (err) => {
+ failed(err);
+});
+
+ nc = NATS()
+await nc.connect(servers=["nats://demo.nats.io:4222"])
+
+# Do something with the connection
+
+await nc.close()
+
+
+ // Set a user and plain text password
+nc, err := nats.Connect("localhost", nats.UserInfo("myname", "password"))
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+ Options options = new Options.Builder().
+ server("nats://localhost:4222").
+ userInfo("myname","password"). // Set a user and plain text password
+ build();
+Connection nc = Nats.connect(options);
+
+// Do something with the connection
+
+nc.close();
+
+ nc = NATS()
+
+await nc.connect(servers=["nats://myname:password@demo.nats.io:4222"])
+
+# Do something with the connection.
+
+
+ require 'nats/client'
+
+NATS.start(servers:["nats://myname:password@127.0.0.1:4222"], name: "my-connection") do |nc|
+ nc.on_error do |e|
+ puts "Error: #{e}"
+ end
+
+ nc.on_reconnect do
+ puts "Got reconnected to #{nc.connected_server}"
+ end
+
+ nc.on_disconnect do |reason|
+ puts "Got disconnected! #{reason}"
+ end
+
+ nc.close
+end
+
+ // Set a user and plain text password
+nc, err := nats.Connect("myname:password@localhost")
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+ Connection nc = Nats.connect("nats://myname:password@localhost:4222");
+
+// Do something with the connection
+
+nc.close();
+
+ nc = NATS()
+
+await nc.connect(servers=["nats://myname:password@demo.nats.io:4222"])
+
+# Do something with the connection.
+
+
+ require 'nats/client'
+
+NATS.start(servers:["nats://myname:password@127.0.0.1:4222"], name: "my-connection") do |nc|
+ nc.on_error do |e|
+ puts "Error: #{e}"
+ end
+
+ nc.on_reconnect do
+ puts "Got reconnected to #{nc.connected_server}"
+ end
+
+ nc.on_disconnect do |reason|
+ puts "Got disconnected! #{reason}"
+ end
+
+ nc.close
+end
+
+ opts := nats.GetDefaultOptions()
+opts.Url = "demo.nats.io"
+// Turn on Verbose
+opts.Verbose = true
+nc, err := opts.Connect()
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+ Options options = new Options.Builder().
+ server("nats://demo.nats.io:4222").
+ verbose(). // Turn on verbose
+ build();
+Connection nc = Nats.connect(options);
+
+// Do something with the connection
+
+nc.close();
+
+ nc = NATS()
+
+await nc.connect(servers=["nats://demo.nats.io:4222"], verbose=True)
+
+# Do something with the connection.
+
+
+ // There is not a single listener for connection events in the NATS Go Client.
+// Instead, you can set individual event handlers using:
+
+DisconnectHandler(cb ConnHandler)
+ReconnectHandler(cb ConnHandler)
+ClosedHandler(cb ConnHandler)
+DiscoveredServersHandler(cb ConnHandler)
+ErrorHandler(cb ErrHandler)
+
+ class MyConnectionListener implements ConnectionListener {
+ public void connectionEvent(Connection natsConnection, Events event) {
+ System.out.println("Connection event - "+event);
+ }
+}
+
+public class SetConnectionListener {
+ public static void main(String[] args) {
+
+ try {
+ Options options = new Options.Builder().
+ server("nats://demo.nats.io:4222").
+ connectionListener(new MyConnectionListener()). // Set the listener
+ build();
+ Connection nc = Nats.connect(options);
+
+ // Do something with the connection
+
+ nc.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
+
+ let nc = NATS.connect("nats://demo.nats.io:4222");
+
+nc.on('error', (err) => {
+ t.log('error', err);
+});
+
+nc.on('connect', () => {
+ t.log('client connected');
+});
+
+nc.on('disconnect', () => {
+ t.log('client disconnected');
+});
+
+nc.on('reconnecting', () => {
+ t.log('client reconnecting');
+});
+
+nc.on('reconnect', () => {
+ t.log('client reconnected');
+});
+
+nc.on('close', () => {
+ t.log('client closed');
+});
+
+nc.on('permission_error', (err) => {
+ t.log('permission_error', err);
+});
+
+ # Asyncio NATS client can be defined a number of event callbacks
+async def disconnected_cb():
+ print("Got disconnected!")
+
+async def reconnected_cb():
+ # See who we are connected to on reconnect.
+ print("Got reconnected to {url}".format(url=nc.connected_url.netloc))
+
+async def error_cb(e):
+ print("There was an error: {}".format(e))
+
+async def closed_cb():
+ print("Connection is closed")
+
+# Setup callbacks to be notified on disconnects and reconnects
+options["disconnected_cb"] = disconnected_cb
+options["reconnected_cb"] = reconnected_cb
+
+# Setup callbacks to be notified when there is an error
+# or connection is closed.
+options["error_cb"] = error_cb
+options["closed_cb"] = closed_cb
+
+await nc.connect(**options)
+
+ # There is not a single listener for connection events in the Ruby NATS Client.
+# Instead, you can set individual event handlers using:
+
+NATS.on_disconnect do
+end
+
+NATS.on_reconnect do
+end
+
+NATS.on_close do
+end
+
+NATS.on_error do
+end
+
+ // connect will happen once - the first connect
+nc.on('connect', (nc) => {
+ // nc is the connection that connected
+ t.log('client connected');
+});
+
+nc.on('disconnect', (url) => {
+ // nc is the connection that reconnected
+ t.log('disconnected from', url);
+});
+
+nc.on('reconnecting', (url) => {
+ t.log('reconnecting to', url);
+});
+
+nc.on('reconnect', (nc, url) => {
+ // nc is the connection that reconnected
+ t.log('reconnected to', url);
+});
+
+ Options options = new Options.Builder().
+ server("nats://demo.nats.io:4222").
+ maxControlLine(2 * 1024). // Set the max control line to 2k
+ build();
+Connection nc = Nats.connect(options);
+
+// Do something with the connection
+
+nc.close();
+
+ wg := sync.WaitGroup{}
+wg.Add(1)
+
+errCh := make(chan error, 1)
+
+// To simulate a timeout, you would set the DrainTimeout()
+// to a value less than the time spent in the message callback,
+// so say: nats.DrainTimeout(10*time.Millisecond).
+
+nc, err := nats.Connect("demo.nats.io",
+ nats.DrainTimeout(10*time.Second),
+ nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) {
+ errCh <- err
+ }),
+ nats.ClosedHandler(func(_ *nats.Conn) {
+ wg.Done()
+ }))
+if err != nil {
+ log.Fatal(err)
+}
+
+// Subscribe, but add some delay while processing.
+if _, err := nc.Subscribe("foo", func(_ *nats.Msg) {
+ time.Sleep(200 * time.Millisecond)
+}); err != nil {
+ log.Fatal(err)
+}
+
+// Publish a message
+if err := nc.Publish("foo", []byte("hello")); err != nil {
+ log.Fatal(err)
+}
+
+// Drain the connection, which will close it when done.
+if err := nc.Drain(); err != nil {
+ log.Fatal(err)
+}
+
+// Wait for the connection to be closed.
+wg.Wait()
+
+// Check if there was an error
+select {
+case e := <-errCh:
+ log.Fatal(e)
+default:
+}
+
+
+ Connection nc = Nats.connect("nats://demo.nats.io:4222");
+
+// Use a latch to wait for a message to arrive
+CountDownLatch latch = new CountDownLatch(1);
+
+// Create a dispatcher and inline message handler
+Dispatcher d = nc.createDispatcher((msg) -> {
+ String str = new String(msg.getData(), StandardCharsets.UTF_8);
+ System.out.println(str);
+ latch.countDown();
+});
+
+// Subscribe
+d.subscribe("updates");
+
+// Wait for a message to come in
+latch.await();
+
+// Drain the connection, which will close it
+CompletableFuture<Boolean> drained = nc.drain(Duration.ofSeconds(10));
+
+// Wait for the drain to complete
+drained.get();
+
+ import asyncio
+from nats.aio.client import Client as NATS
+
+async def example(loop):
+ nc = NATS()
+
+ await nc.connect("nats://127.0.0.1:4222", loop=loop)
+
+ async def handler(msg):
+ print("[Received] ", msg)
+ await nc.publish(msg.reply, b'I can help')
+
+ # Can check whether client is in draining state
+ if nc.is_draining:
+ print("Connection is draining")
+
+ await nc.subscribe("help", "workers", cb=handler)
+ await nc.flush()
+
+ requests = []
+ for i in range(0, 10):
+ request = nc.request("help", b'help!', timeout=1)
+ requests.append(request)
+
+ # Wait for all the responses
+ responses = []
+ responses = await asyncio.gather(*requests)
+
+ # Gracefully close the connection.
+ await nc.drain()
+
+ print("Received {} responses".format(len(responses)))
+
+ NATS.start(drain_timeout: 1) do |nc|
+ NATS.subscribe('foo', queue: "workers") do |msg, reply, sub|
+ nc.publish(reply, "ACK:#{msg}")
+ end
+
+ NATS.subscribe('bar', queue: "workers") do |msg, reply, sub|
+ nc.publish(reply, "ACK:#{msg}")
+ end
+
+ NATS.subscribe('quux', queue: "workers") do |msg, reply, sub|
+ nc.publish(reply, "ACK:#{msg}")
+ end
+
+ EM.add_timer(2) do
+ next if NATS.draining?
+
+ # Drain gracefully closes the connection.
+ NATS.drain do
+ puts "Done draining. Connection is closed."
+ end
+ end
+end
+
+
+ nc, err := nats.Connect("demo.nats.io")
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer nc.Close()
+
+ done := sync.WaitGroup{}
+ done.Add(1)
+
+ count := 0
+ errCh := make(chan error, 1)
+
+ msgAfterDrain := "not this one"
+
+ // This callback will process each message slowly
+ sub, err := nc.Subscribe("updates", func(m *nats.Msg) {
+ if string(m.Data) == msgAfterDrain {
+ errCh <- fmt.Errorf("Should not have received this message")
+ return
+ }
+ time.Sleep(100 * time.Millisecond)
+ count++
+ if count == 2 {
+ done.Done()
+ }
+ })
+
+ // Send 2 messages
+ for i := 0; i < 2; i++ {
+ nc.Publish("updates", []byte("hello"))
+ }
+
+ // Call Drain on the subscription. It unsubscribes but
+ // wait for all pending messages to be processed.
+ if err := sub.Drain(); err != nil {
+ log.Fatal(err)
+ }
+
+ // Send one more message, this message should not be received
+ nc.Publish("updates", []byte(msgAfterDrain))
+
+ // Wait for the subscription to have processed the 2 messages.
+ done.Wait()
+
+ // Now check that the 3rd message was not received
+ select {
+ case e := <-errCh:
+ log.Fatal(e)
+ case <-time.After(200 * time.Millisecond):
+ // OK!
+ }
+
+
+ Connection nc = Nats.connect("nats://demo.nats.io:4222");
+
+// Use a latch to wait for a message to arrive
+CountDownLatch latch = new CountDownLatch(1);
+
+// Create a dispatcher and inline message handler
+Dispatcher d = nc.createDispatcher((msg) -> {
+ String str = new String(msg.getData(), StandardCharsets.UTF_8);
+ System.out.println(str);
+ latch.countDown();
+});
+
+// Subscribe
+d.subscribe("updates");
+
+// Wait for a message to come in
+latch.await();
+
+// Messages that have arrived will be processed
+CompletableFuture<Boolean> drained = d.drain(Duration.ofSeconds(10));
+
+// Wait for the drain to complete
+drained.get();
+
+// Close the connection
+nc.close();
+
+ import asyncio
+from nats.aio.client import Client as NATS
+
+async def example(loop):
+ nc = NATS()
+
+ await nc.connect("nats://127.0.0.1:4222", loop=loop)
+
+ async def handler(msg):
+ print("[Received] ", msg)
+ await nc.publish(msg.reply, b'I can help')
+
+ # Can check whether client is in draining state
+ if nc.is_draining:
+ print("Connection is draining")
+
+ sid = await nc.subscribe("help", "workers", cb=handler)
+ await nc.flush()
+
+ # Gracefully unsubscribe the subscription
+ await nc.drain(sid)
+
+
+ // Set the callback that will be invoked when an asynchronous error occurs.
+nc, err := nats.Connect("demo.nats.io",
+ nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
+ log.Printf("Error: %v", err)
+ }))
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+ class MyErrorListener implements ErrorListener {
+ public void errorOccurred(Connection conn, String error)
+ {
+ System.out.println("The server notificed the client with: "+error);
+ }
+
+ public void exceptionOccurred(Connection conn, Exception exp) {
+ System.out.println("The connection handled an exception: "+exp.getLocalizedMessage());
+ }
+
+ public void slowConsumerDetected(Connection conn, Consumer consumer) {
+ System.out.println("A slow consumer was detected.");
+ }
+}
+
+public class SetErrorListener {
+ public static void main(String[] args) {
+
+ try {
+ Options options = new Options.Builder().
+ server("nats://demo.nats.io:4222").
+ errorListener(new MyErrorListener()). // Set the listener
+ build();
+ Connection nc = Nats.connect(options);
+
+ // Do something with the connection
+
+ nc.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
+
+ let nc = NATS.connect("nats://demo.nats.io:4222");
+
+// on node you *must* register an error listener. If not registered
+// the library emits an 'error' event, the node process will exit.
+nc.on('error', (err) => {
+ t.log('client got an error:', err);
+});
+
+ nc = NATS()
+
+async def error_cb(e):
+ print("Error: ", e)
+
+await nc.connect(
+ servers=["nats://demo.nats.io:4222"],
+ reconnect_time_wait=10,
+ error_cb=error_cb,
+ )
+
+# Do something with the connection.
+
+
+ nc, err := nats.Connect("demo.nats.io")
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+if err := nc.Publish("updates", []byte("All is Well")); err != nil {
+ log.Fatal(err)
+}
+// Sends a PING and wait for a PONG from the server, up to the given timeout.
+// This gives guarantee that the server has processed above message.
+if err := nc.FlushTimeout(time.Second); err != nil {
+ log.Fatal(err)
+}
+
+ Connection nc = Nats.connect("nats://demo.nats.io:4222");
+
+nc.publish("updates", "All is Well".getBytes(StandardCharsets.UTF_8));
+nc.flush(Duration.ofSeconds(1)); // Flush the message queue
+
+nc.close();
+
+ let nc = NATS.connect({url: "nats://demo.nats.io:4222"});
+let start = Date.now();
+nc.flush(() => {
+ t.log('round trip completed in', Date.now() - start, 'ms');
+});
+
+nc.publish('foo');
+// function in flush is optional
+nc.flush();
+
+ nc = NATS()
+
+await nc.connect(servers=["nats://demo.nats.io:4222"])
+
+await nc.publish("updates", b'All is Well')
+
+# Sends a PING and wait for a PONG from the server, up to the given timeout.
+# This gives guarantee that the server has processed above message.
+await nc.flush(timeout=1)
+
+
+ require 'nats/client'
+require 'fiber'
+
+NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
+ nc.subscribe("updates") do |msg|
+ puts msg
+ end
+
+ nc.publish("updates", "All is Well")
+
+ nc.flush do
+ # Sends a PING and wait for a PONG from the server, up to the given timeout.
+ # This gives guarantee that the server has processed above message at this point.
+ end
+end
+
+
+ let nc = await connect({
+ url: "nats://demo.nats.io:4222"
+});
+
+// you can use flush to trigger a function in your
+// application once the round-trip to the server finishes
+let start = Date.now();
+nc.flush(() => {
+ t.log('round trip completed in', Date.now() - start, 'ms');
+});
+
+nc.publish('foo');
+
+// another way, simply wait for the promise to resolve
+await nc.flush();
+
+nc.close();
+
+ nc, err := nats.Connect("demo.nats.io")
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+mp := nc.MaxPayload()
+log.Printf("Maximum payload is %v bytes", mp)
+
+// Do something with the max payload
+
+
+ Connection nc = Nats.connect("nats://demo.nats.io:4222");
+
+long max = nc.getMaxPayload();
+// Do something with the max payload
+
+nc.close();
+
+ let nc = NATS.connect("nats://demo.nats.io:4222");
+
+// on node you *must* register an error listener. If not registered
+// the library emits an 'error' event, the node process will exit.
+nc.on('error', (err) => {
+ t.log('client got an error:', err);
+});
+nc.on('connect', () => {
+ t.log(nc.info.max_payload);
+});
+
+ nc = NATS()
+
+await nc.connect(servers=["nats://demo.nats.io:4222"])
+
+print("Maximum payload is %d bytes" % nc.max_payload)
+
+# Do something with the max payload.
+
+
+ require 'nats/client'
+
+NATS.start(max_outstanding_pings: 5) do |nc|
+ nc.on_reconnect do
+ puts "Got reconnected to #{nc.connected_server}"
+ end
+
+ nc.on_disconnect do |reason|
+ puts "Got disconnected! #{reason}"
+ end
+
+ # Do something with the max_payload
+ puts "Maximum Payload is #{nc.server_info[:max_payload]} bytes"
+end
+
+ // Turn off echo
+nc, err := nats.Connect("demo.nats.io", nats.NoEcho())
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+ Options options = new Options.Builder().
+ server("nats://demo.nats.io:4222").
+ noEcho(). // Turn off echo
+ build();
+Connection nc = Nats.connect(options);
+
+// Do something with the connection
+
+nc.close();
+
+ ncA = NATS()
+ncB = NATS()
+
+await ncA.connect(no_echo=True)
+await ncB.connect()
+
+async def handler(msg):
+ # Messages sent by `ncA' will not be received.
+ print("[Received] ", msg)
+
+await ncA.subscribe("greetings", cb=handler)
+await ncA.flush()
+await ncA.publish("greetings", b'Hello World!')
+await ncB.publish("greetings", b'Hello World!')
+
+# Do something with the connection
+
+await asyncio.sleep(1)
+await ncA.drain()
+await ncB.drain()
+
+
+ // Set Ping Interval to 20 seconds
+nc, err := nats.Connect("demo.nats.io", nats.PingInterval(20*time.Second))
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+ Options options = new Options.Builder().
+ server("nats://demo.nats.io:4222").
+ pingInterval(Duration.ofSeconds(20)). // Set Ping Interval
+ build();
+Connection nc = Nats.connect(options);
+
+// Do something with the connection
+
+nc.close();
+
+ nc = NATS()
+
+await nc.connect(
+ servers=["nats://demo.nats.io:4222"],
+ # Set Ping Interval to 20 seconds
+ ping_interval=20,
+ )
+
+# Do something with the connection.
+
+
+ opts := nats.GetDefaultOptions()
+opts.Url = "demo.nats.io"
+// Set maximum number of PINGs out without getting a PONG back
+// before the connection will be disconnected as a stale connection.
+opts.MaxPingsOut = 5
+
+nc, err := opts.Connect()
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+ Options options = new Options.Builder().
+ server("nats://demo.nats.io:4222").
+ maxPingsOut(5). // Set max pings in flight
+ build();
+Connection nc = Nats.connect(options);
+
+// Do something with the connection
+
+nc.close();
+
+ nc = NATS()
+
+await nc.connect(
+ servers=["nats://demo.nats.io:4222"],
+ # Set maximum number of PINGs out without getting a PONG back
+ # before the connection will be disconnected as a stale connection.
+ max_outstanding_pings=5,
+ ping_interval=1,
+ )
+
+# Do something with the connection.
+
+
+ nc, err := nats.Connect("demo.nats.io")
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+if err := nc.Publish("updates", []byte("All is Well")); err != nil {
+ log.Fatal(err)
+}
+// Make sure the message goes through before we close
+nc.Flush()
+
+ Connection nc = Nats.connect("nats://demo.nats.io:4222");
+
+nc.publish("updates", "All is Well".getBytes(StandardCharsets.UTF_8));
+
+// Make sure the message goes through before we close
+nc.flush(Duration.ZERO);
+nc.close();
+
+ let nc = NATS.connect({url: "nats://demo.nats.io:4222", preserveBuffers: true});
+let buf = Buffer.allocUnsafe(12);
+buf.fill("All is well");
+nc.publish('updates', buf);
+
+ nc = NATS()
+
+await nc.connect(servers=["nats://demo.nats.io:4222"])
+
+await nc.publish("updates", b'All is Well')
+
+
+ nc, err := nats.Connect("demo.nats.io")
+if err != nil {
+ log.Fatal(err)
+}
+ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
+if err != nil {
+ nc.Close()
+ log.Fatal(err)
+}
+defer ec.Close()
+
+// Define the object
+type stock struct {
+ Symbol string
+ Price int
+}
+
+// Publish the message
+if err := ec.Publish("updates", &stock{Symbol: "GOOG", Price: 1200}); err != nil {
+ log.Fatal(err)
+}
+// Make sure the message goes through before we close
+ec.Flush()
+
+ class StockForJsonPub {
+ public String symbol;
+ public float price;
+}
+
+public class PublishJSON {
+ public static void main(String[] args) {
+ try {
+ Connection nc = Nats.connect("nats://demo.nats.io:4222");
+
+ // Create the data object
+ StockForJsonPub stk = new StockForJsonPub();
+ stk.symbol="GOOG";
+ stk.price=1200;
+
+ // use Gson to encode the object to JSON
+ GsonBuilder builder = new GsonBuilder();
+ Gson gson = builder.create();
+ String json = gson.toJson(stk);
+
+ // Publish the message
+ nc.publish("updates", json.getBytes(StandardCharsets.UTF_8));
+
+ // Make sure the message goes through before we close
+ nc.flush(Duration.ZERO);
+ nc.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
+
+ let nc = NATS.connect({url: "nats://demo.nats.io:4222", json: true});
+nc.publish('updates', {ticker: 'GOOG', price: 1200});
+
+ nc = NATS()
+
+await nc.connect(servers=["nats://demo.nats.io:4222"])
+
+await nc.publish("updates", json.dumps({"symbol": "GOOG", "price": 1200 }).encode())
+
+
+ nc, err := nats.Connect("demo.nats.io")
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Create a unique subject name
+uniqueReplyTo := nats.NewInbox()
+
+// Listen for a single response
+sub, err := nc.SubscribeSync(uniqueReplyTo)
+if err != nil {
+ log.Fatal(err)
+}
+
+// Send the request
+if err := nc.PublishRequest("time", uniqueReplyTo, nil); err != nil {
+ log.Fatal(err)
+}
+
+// Read the reply
+msg, err := sub.NextMsg(time.Second)
+if err != nil {
+ log.Fatal(err)
+}
+
+// Use the response
+log.Printf("Reply: %s", msg.Data)
+
+// Close the connection
+nc.Close()
+
+ Connection nc = Nats.connect("nats://demo.nats.io:4222");
+
+// Create a unique subject name
+String uniqueReplyTo = NUID.nextGlobal();
+
+// Listen for a single response
+Subscription sub = nc.subscribe(uniqueReplyTo);
+sub.unsubscribe(1);
+
+// Send the request
+nc.publish("time", uniqueReplyTo, null);
+
+// Read the reply
+Message msg = sub.nextMessage(Duration.ofSeconds(1));
+
+// Use the response
+System.out.println(new String(msg.getData(), StandardCharsets.UTF_8));
+
+// Close the connection
+nc.close();
+
+ // set up a subscription to process the request
+nc.subscribe('time', (msg, reply) => {
+ if(reply) {
+ nc.publish(reply, new Date().toLocaleTimeString());
+ }
+});
+
+// create a subscription subject that the responding send replies to
+let inbox = NATS.createInbox();
+nc.subscribe(inbox, {max: 1}, (msg) => {
+ t.log('the time is', msg);
+ nc.close();
+});
+
+nc.publish('time', "", inbox);
+
+ nc = NATS()
+
+future = asyncio.Future()
+
+async def sub(msg):
+ nonlocal future
+ future.set_result(msg)
+
+await nc.connect(servers=["nats://demo.nats.io:4222"])
+await nc.subscribe("time", cb=sub)
+
+unique_reply_to = new_inbox()
+await nc.publish_request("time", unique_reply_to, b'')
+
+# Use the response
+msg = await asyncio.wait_for(future, 1)
+print("Reply:", msg)
+
+
+ require 'nats/client'
+require 'fiber'
+
+NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
+ Fiber.new do
+ f = Fiber.current
+
+ nc.subscribe("time") do |msg, reply|
+ f.resume msg
+ end
+
+ nc.publish("time", 'example', NATS.create_inbox)
+
+ # Use the response
+ msg = Fiber.yield
+ puts "Reply: #{msg}"
+
+ end.resume
+end
+
+
+ // set up a subscription to process the request
+await nc.subscribe('time', (err, msg) => {
+ if (err) {
+ // this example is running inside of a promise
+ reject();
+ return;
+ }
+ if (msg.reply) {
+ nc.publish(msg.reply, new Date().toLocaleTimeString());
+ }
+});
+
+// create a subscription subject that the responding send replies to
+let inbox = createInbox();
+await nc.subscribe(inbox, (err, msg) => {
+ t.log('the time is', msg.data);
+ // this example is running inside of a promise
+ nc.close();
+ resolve();
+}, {max: 1});
+
+nc.publish('time', "", inbox);
+
+ // Set reconnect interval to 10 seconds
+nc, err := nats.Connect("demo.nats.io", nats.ReconnectWait(10*time.Second))
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+ Options options = new Options.Builder().
+ server("nats://demo.nats.io:4222").
+ reconnectWait(Duration.ofSeconds(10)). // Set Reconnect Wait
+ build();
+Connection nc = Nats.connect(options);
+
+// Do something with the connection
+
+nc.close();
+
+ let nc = NATS.connect({
+ reconnectTimeWait: 10 * 1000, //10s
+ servers: ["nats://demo.nats.io:4222"]
+});
+
+ nc = NATS()
+await nc.connect(
+ servers=["nats://demo.nats.io:4222"],
+ reconnect_time_wait=10,
+ )
+
+# Do something with the connection
+
+await nc.close()
+
+
+ // Set max reconnects attempts
+nc, err := nats.Connect("demo.nats.io", nats.MaxReconnects(10))
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+ Options options = new Options.Builder().
+ server("nats://demo.nats.io:4222").
+ maxReconnects(10). // Set max reconnect attempts
+ build();
+Connection nc = Nats.connect(options);
+
+// Do something with the connection
+
+nc.close();
+
+ let nc = NATS.connect({
+ maxReconnectAttempts: 10,
+ servers: ["nats://demo.nats.io:4222"]
+});
+
+ nc = NATS()
+await nc.connect(
+ servers=["nats://demo.nats.io:4222"],
+ max_reconnect_attempts=10,
+ )
+
+# Do something with the connection
+
+await nc.close()
+
+
+ // Set reconnect buffer size in bytes (5 MB)
+nc, err := nats.Connect("demo.nats.io", nats.ReconnectBufSize(5*1024*1024))
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+ // Connection event handlers are invoked asynchronously
+// and the state of the connection may have changed when
+// the callback is invoked.
+nc, err := nats.Connect("demo.nats.io",
+ nats.DisconnectHandler(func(nc *nats.Conn) {
+ // handle disconnect event
+ }),
+ nats.ReconnectHandler(func(nc *nats.Conn) {
+ // handle reconnect event
+ }))
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+ Options options = new Options.Builder().
+ server("nats://demo.nats.io:4222").
+ connectionListener((conn, type) -> {
+ if (type == Events.RECONNECTED) {
+ // handle reconnected
+ } else if (type == Events.DISCONNECTED) {
+ // handle disconnected, wait for reconnect
+ }
+ }).
+ build();
+Connection nc = Nats.connect(options);
+
+// Do something with the connection
+
+nc.close();
+
+ let nc = NATS.connect({
+ maxReconnectAttempts: 10,
+ servers: ["nats://demo.nats.io:4222"]
+});
+
+nc.on('reconnect', (c) => {
+ console.log('reconnected');
+});
+
+ nc = NATS()
+
+async def disconnected_cb():
+ print("Got disconnected!")
+
+async def reconnected_cb():
+ # See who we are connected to on reconnect.
+ print("Got reconnected to {url}".format(url=nc.connected_url.netloc))
+
+await nc.connect(
+ servers=["nats://demo.nats.io:4222"],
+ reconnect_time_wait=10,
+ reconnected_cb=reconnected_cb,
+ disconnected_cb=disconnected_cb,
+ )
+
+# Do something with the connection.
+
+
+ require 'nats/client'
+
+NATS.start(servers: ["nats://127.0.0.1:1222", "nats://127.0.0.1:1223", "nats://127.0.0.1:1224"]) do |nc|
+ # Do something with the connection
+ nc.on_reconnect do
+ puts "Got reconnected to #{nc.connected_server}"
+ end
+
+ nc.on_disconnect do |reason|
+ puts "Got disconnected! #{reason}"
+ end
+end
+
+ // will throw an exception if connection fails
+let nc = await connect({
+ maxReconnectAttempts: 10,
+ servers: ["nats://demo.nats.io:4222"]
+});
+// first argument is the connection (same as nc in this case)
+// second argument is the url of the server where the client
+// connected
+nc.on('reconnect', (conn, server) => {
+ console.log('reconnected to', server);
+});
+nc.close();
+
+ servers := []string{"nats://localhost:1222",
+ "nats://localhost:1223",
+ "nats://localhost:1224",
+}
+
+nc, err := nats.Connect(strings.Join(servers, ","), nats.DontRandomize())
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+ Options options = new Options.Builder().
+ server("nats://demo.nats.io:4222").
+ noRandomize(). // Disable reconnect shuffle
+ build();
+Connection nc = Nats.connect(options);
+
+// Do something with the connection
+
+nc.close();
+
+ let nc = NATS.connect({
+ noRandomize: false,
+ servers: ["nats://127.0.0.1:4443",
+ "nats://demo.nats.io:4222"
+ ]
+});
+
+ nc = NATS()
+await nc.connect(
+ servers=[
+ "nats://demo.nats.io:1222",
+ "nats://demo.nats.io:1223",
+ "nats://demo.nats.io:1224"
+ ],
+ dont_randomize=True,
+ )
+
+# Do something with the connection
+
+await nc.close()
+
+
+ // Disable reconnect attempts
+nc, err := nats.Connect("demo.nats.io", nats.NoReconnect())
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+ Options options = new Options.Builder().
+ server("nats://demo.nats.io:4222").
+ noReconnect(). // Disable reconnect attempts
+ build();
+Connection nc = Nats.connect(options);
+
+// Do something with the connection
+
+nc.close();
+
+ nc = NATS()
+await nc.connect(
+ servers=[
+ "nats://demo.nats.io:1222",
+ "nats://demo.nats.io:1223",
+ "nats://demo.nats.io:1224"
+ ],
+ allow_reconnect=False,
+ )
+
+# Do something with the connection
+
+await nc.close()
+
+
+ nc, err := nats.Connect("demo.nats.io")
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Send the request
+msg, err := nc.Request("time", nil, time.Second)
+if err != nil {
+ log.Fatal(err)
+}
+
+// Use the response
+log.Printf("Reply: %s", msg.Data)
+
+// Close the connection
+nc.Close()
+
+ Connection nc = Nats.connect("nats://demo.nats.io:4222");
+
+// Send the request
+Message msg = nc.request("time", null, Duration.ofSeconds(1));
+
+// Use the response
+System.out.println(new String(msg.getData(), StandardCharsets.UTF_8));
+
+// Close the connection
+nc.close();
+
+ nc = NATS()
+
+async def sub(msg):
+ await nc.publish(msg.reply, b'response')
+
+await nc.connect(servers=["nats://demo.nats.io:4222"])
+await nc.subscribe("time", cb=sub)
+
+# Send the request
+try:
+ msg = await nc.request("time", b'', timeout=1)
+ # Use the response
+ print("Reply:", msg)
+except asyncio.TimeoutError:
+ print("Timed out waiting for response")
+
+
+ // Be notified if a new server joins the cluster.
+// Print all the known servers and the only the ones that were discovered.
+nc, err := nats.Connect("demo.nats.io",
+ nats.DiscoveredServersHandler(func(nc *nats.Conn) {
+ log.Printf("Known servers: %v\n", nc.Servers())
+ log.Printf("Discovered servers: %v\n", nc.DiscoveredServers())
+ }))
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+ class ServersAddedListener implements ConnectionListener {
+ public void connectionEvent(Connection nc, Events event) {
+ if (event == Events.DISCOVERED_SERVERS) {
+ for (String server : nc.getServers()) {
+ System.out.println("Known server: "+server);
+ }
+ }
+ }
+}
+
+public class ListenForNewServers {
+ public static void main(String[] args) {
+
+ try {
+ Options options = new Options.Builder().
+ server("nats://demo.nats.io:4222").
+ connectionListener(new ServersAddedListener()). // Set the listener
+ build();
+ Connection nc = Nats.connect(options);
+
+ // Do something with the connection
+
+ nc.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
+
+ // Set the callback that will be invoked when an asynchronous error occurs.
+nc, err := nats.Connect("demo.nats.io",
+ nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
+ if err == nats.ErrSlowConsumer {
+ dropped, _ := sub.Dropped()
+ log.Printf("Slow consumer on subject %s dropped %d messages\n",
+ sub.Subject, dropped)
+ }
+ }))
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+ class SlowConsumerReporter implements ErrorListener {
+ public void errorOccurred(Connection conn, String error)
+ {
+ }
+
+ public void exceptionOccurred(Connection conn, Exception exp) {
+ }
+
+ // Detect slow consumers
+ public void slowConsumerDetected(Connection conn, Consumer consumer) {
+ // Get the dropped count
+ System.out.println("A slow consumer dropped messages: "+ consumer.getDroppedCount());
+ }
+}
+
+public class SlowConsumerListener {
+ public static void main(String[] args) {
+
+ try {
+ Options options = new Options.Builder().
+ server("nats://demo.nats.io:4222").
+ errorListener(new SlowConsumerReporter()). // Set the listener
+ build();
+ Connection nc = Nats.connect(options);
+
+ // Do something with the connection
+
+ nc.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
+
+
+ nc = NATS()
+
+ async def error_cb(e):
+ if type(e) is nats.aio.errors.ErrSlowConsumer:
+ print("Slow consumer error, unsubscribing from handling further messages...")
+ await nc.unsubscribe(e.sid)
+
+ await nc.connect(
+ servers=["nats://demo.nats.io:4222"],
+ error_cb=error_cb,
+ )
+
+ msgs = []
+ future = asyncio.Future()
+ async def cb(msg):
+ nonlocal msgs
+ nonlocal future
+ print(msg)
+ msgs.append(msg)
+
+ if len(msgs) == 3:
+ # Head of line blocking on other messages caused
+ # by single message proccesing taking long...
+ await asyncio.sleep(1)
+
+ await nc.subscribe("updates", cb=cb, pending_msgs_limit=5)
+
+ for i in range(0, 10):
+ await nc.publish("updates", "msg #{}".format(i).encode())
+ await asyncio.sleep(0)
+
+ try:
+ await asyncio.wait_for(future, 1)
+ except asyncio.TimeoutError:
+ pass
+
+ for msg in msgs:
+ print("[Received]", msg)
+
+ await nc.close()
+
+
+ nc, err := nats.Connect("demo.nats.io")
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Subscribe
+sub1, err := nc.Subscribe("updates", func(m *nats.Msg) {})
+if err != nil {
+ log.Fatal(err)
+}
+
+// Set limits of 1000 messages or 5MB, whichever comes first
+sub1.SetPendingLimits(1000, 5*1024*1024)
+
+// Subscribe
+sub2, err := nc.Subscribe("updates", func(m *nats.Msg) {})
+if err != nil {
+ log.Fatal(err)
+}
+
+// Set no limits for this subscription
+sub2.SetPendingLimits(-1, -1)
+
+// Close the connection
+nc.Close()
+
+ Connection nc = Nats.connect("nats://demo.nats.io:4222");
+
+Dispatcher d = nc.createDispatcher((msg) -> {
+ // do something
+});
+
+d.subscribe("updates");
+
+d.setPendingLimits(1_000, 5 * 1024 * 1024); // Set limits on a dispatcher
+
+// Subscribe
+Subscription sub = nc.subscribe("updates");
+
+sub.setPendingLimits(1_000, 5 * 1024 * 1024); // Set limits on a subscription
+
+// Do something
+
+// Close the connection
+nc.close();
+
+ nc = NATS()
+
+await nc.connect(servers=["nats://demo.nats.io:4222"])
+
+future = asyncio.Future()
+
+async def cb(msg):
+ nonlocal future
+ future.set_result(msg)
+
+# Set limits of 1000 messages or 5MB
+await nc.subscribe("updates", cb=cb, pending_bytes_limit=5*1024*1024, pending_msgs_limit=1000)
+
+
+ nc, err := nats.Connect("demo.nats.io")
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Use a WaitGroup to wait for 4 messages to arrive
+wg := sync.WaitGroup{}
+wg.Add(4)
+
+// Subscribe
+if _, err := nc.Subscribe("time.>", func(m *nats.Msg) {
+ log.Printf("%s: %s", m.Subject, m.Data)
+ wg.Done()
+}); err != nil {
+ log.Fatal(err)
+}
+
+// Wait for the 4 messages to come in
+wg.Wait()
+
+// Close the connection
+nc.Close()
+
+ Connection nc = Nats.connect("nats://demo.nats.io:4222");
+
+// Use a latch to wait for 4 messages to arrive
+CountDownLatch latch = new CountDownLatch(4);
+
+// Create a dispatcher and inline message handler
+Dispatcher d = nc.createDispatcher((msg) -> {
+ String subject = msg.getSubject();
+ String str = new String(msg.getData(), StandardCharsets.UTF_8);
+ System.out.println(subject + ": " + str);
+ latch.countDown();
+});
+
+// Subscribe
+d.subscribe("time.>");
+
+// Wait for messages to come in
+latch.await();
+
+// Close the connection
+nc.close();
+
+ nc.subscribe('time.>', (msg, reply, subject) => {
+ // converting timezones correctly in node requires a library
+ // this doesn't take into account *many* things.
+ let time = "";
+ switch (subject) {
+ case 'time.us.east':
+ time = new Date().toLocaleTimeString("en-us", {timeZone: "America/New_York"});
+ break;
+ case 'time.us.central':
+ time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Chicago"});
+ break;
+ case 'time.us.mountain':
+ time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Denver"});
+ break;
+ case 'time.us.west':
+ time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Los_Angeles"});
+ break;
+ default:
+ time = "I don't know what you are talking about Willis";
+ }
+ t.log(subject, time);
+});
+
+ nc = NATS()
+
+await nc.connect(servers=["nats://demo.nats.io:4222"])
+
+# Use queue to wait for 4 messages to arrive
+queue = asyncio.Queue()
+async def cb(msg):
+ await queue.put(msg)
+
+await nc.subscribe("time.>", cb=cb)
+
+# Send 2 messages and wait for them to come in
+await nc.publish("time.A.east", b'A')
+await nc.publish("time.B.east", b'B')
+await nc.publish("time.C.west", b'C')
+await nc.publish("time.D.west", b'D')
+
+for i in range(0, 4):
+ msg = await queue.get()
+ print("Msg:", msg)
+
+await nc.close()
+
+
+ require 'nats/client'
+require 'fiber'
+
+NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
+ Fiber.new do
+ f = Fiber.current
+
+ nc.subscribe("time.>") do |msg, reply|
+ f.resume Time.now.to_f
+ end
+
+ nc.publish("time.A.east", "A")
+ nc.publish("time.B.east", "B")
+ nc.publish("time.C.west", "C")
+ nc.publish("time.D.west", "D")
+
+ # Use the response
+ 4.times do
+ msg = Fiber.yield
+ puts "Msg: #{msg}"
+ end
+ end.resume
+end
+
+
+ await nc.subscribe('time.>', (err, msg) => {
+ // converting timezones correctly in node requires a library
+ // this doesn't take into account *many* things.
+ let time = "";
+ switch (msg.subject) {
+ case 'time.us.east':
+ time = new Date().toLocaleTimeString("en-us", {timeZone: "America/New_York"});
+ break;
+ case 'time.us.central':
+ time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Chicago"});
+ break;
+ case 'time.us.mountain':
+ time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Denver"});
+ break;
+ case 'time.us.west':
+ time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Los_Angeles"});
+ break;
+ default:
+ time = "I don't know what you are talking about Willis";
+ }
+ t.log(msg.subject, time);
+});
+
+ nc, err := nats.Connect("demo.nats.io")
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Use a WaitGroup to wait for a message to arrive
+wg := sync.WaitGroup{}
+wg.Add(1)
+
+// Subscribe
+if _, err := nc.Subscribe("updates", func(m *nats.Msg) {
+ wg.Done()
+}); err != nil {
+ log.Fatal(err)
+}
+
+// Wait for a message to come in
+wg.Wait()
+
+// Close the connection
+nc.Close()
+
+ Connection nc = Nats.connect("nats://demo.nats.io:4222");
+
+// Use a latch to wait for a message to arrive
+CountDownLatch latch = new CountDownLatch(1);
+
+// Create a dispatcher and inline message handler
+Dispatcher d = nc.createDispatcher((msg) -> {
+ String str = new String(msg.getData(), StandardCharsets.UTF_8);
+ System.out.println(str);
+ latch.countDown();
+});
+
+// Subscribe
+d.subscribe("updates");
+
+// Wait for a message to come in
+latch.await();
+
+// Close the connection
+nc.close();
+
+ nc = NATS()
+
+await nc.connect(servers=["nats://demo.nats.io:4222"])
+
+future = asyncio.Future()
+
+async def cb(msg):
+ nonlocal future
+ future.set_result(msg)
+
+await nc.subscribe("updates", cb=cb)
+await nc.publish("updates", b'All is Well')
+await nc.flush()
+
+# Wait for message to come in
+msg = await asyncio.wait_for(future, 1)
+
+
+ nc, err := nats.Connect("demo.nats.io")
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
+if err != nil {
+ log.Fatal(err)
+}
+defer ec.Close()
+
+// Define the object
+type stock struct {
+ Symbol string
+ Price int
+}
+
+wg := sync.WaitGroup{}
+wg.Add(1)
+
+// Subscribe
+if _, err := ec.Subscribe("updates", func(s *stock) {
+ log.Printf("Stock: %s - Price: %v", s.Symbol, s.Price)
+ wg.Done()
+}); err != nil {
+ log.Fatal(err)
+}
+
+// Wait for a message to come in
+wg.Wait()
+
+// Close the connection
+ec.Close()
+
+ class StockForJsonSub {
+ public String symbol;
+ public float price;
+
+ public String toString() {
+ return symbol + " is at " + price;
+ }
+}
+
+public class SubscribeJSON {
+ public static void main(String[] args) {
+
+ try {
+ Connection nc = Nats.connect("nats://demo.nats.io:4222");
+
+ // Use a latch to wait for 10 messages to arrive
+ CountDownLatch latch = new CountDownLatch(10);
+
+ // Create a dispatcher and inline message handler
+ Dispatcher d = nc.createDispatcher((msg) -> {
+ Gson gson = new Gson();
+
+ String json = new String(msg.getData(), StandardCharsets.UTF_8);
+ StockForJsonSub stk = gson.fromJson(json, StockForJsonSub.class);
+
+ // Use the object
+ System.out.println(stk);
+
+ latch.countDown();
+ });
+
+ // Subscribe
+ d.subscribe("updates");
+
+ // Wait for a message to come in
+ latch.await();
+
+ // Close the connection
+ nc.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
+
+ let nc = NATS.connect({
+ url: "nats://demo.nats.io:4222",
+ json: true
+});
+
+nc.subscribe('updates', (msg) => {
+ if(msg && msg.ticker === 'TSLA') {
+ t.log('got message:', msg);
+ }
+});
+
+
+ import asyncio
+import json
+from nats.aio.client import Client as NATS
+from nats.aio.errors import ErrTimeout
+
+async def run(loop):
+ nc = NATS()
+
+ await nc.connect(servers=["nats://127.0.0.1:4222"], loop=loop)
+
+ async def message_handler(msg):
+ data = json.loads(msg.data.decode())
+ print(data)
+
+ sid = await nc.subscribe("updates", cb=message_handler)
+ await nc.flush()
+
+ await nc.auto_unsubscribe(sid, 2)
+ await nc.publish("updates", json.dumps({"symbol": "GOOG", "price": 1200 }).encode())
+ await asyncio.sleep(1, loop=loop)
+ await nc.close()
+
+ nc, err := nats.Connect("demo.nats.io")
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Use a WaitGroup to wait for 10 messages to arrive
+wg := sync.WaitGroup{}
+wg.Add(10)
+
+// Create a queue subscription on "updates" with queue name "workers"
+if _, err := nc.QueueSubscribe("updates", "worker", func(m *nats.Msg) {
+ wg.Done()
+}); err != nil {
+ log.Fatal(err)
+}
+
+// Wait for messages to come in
+wg.Wait()
+
+// Close the connection
+nc.Close()
+
+ Connection nc = Nats.connect("nats://demo.nats.io:4222");
+
+// Use a latch to wait for 10 messages to arrive
+CountDownLatch latch = new CountDownLatch(10);
+
+// Create a dispatcher and inline message handler
+Dispatcher d = nc.createDispatcher((msg) -> {
+ String str = new String(msg.getData(), StandardCharsets.UTF_8);
+ System.out.println(str);
+ latch.countDown();
+});
+
+// Subscribe
+d.subscribe("updates", "workers");
+
+// Wait for a message to come in
+latch.await();
+
+// Close the connection
+nc.close();
+
+ nc.subscribe('updates', {queue: "workers"}, (msg) => {
+ t.log('worker got message', msg);
+});
+
+ nc = NATS()
+
+await nc.connect(servers=["nats://demo.nats.io:4222"])
+
+future = asyncio.Future()
+
+async def cb(msg):
+ nonlocal future
+ future.set_result(msg)
+
+await nc.subscribe("updates", queue="workers", cb=cb)
+await nc.publish("updates", b'All is Well')
+
+msg = await asyncio.wait_for(future, 1)
+print("Msg", msg)
+
+
+ require 'nats/client'
+require 'fiber'
+
+NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
+ Fiber.new do
+ f = Fiber.current
+
+ nc.subscribe("updates", queue: "worker") do |msg, reply|
+ f.resume Time.now
+ end
+
+ nc.publish("updates", "A")
+
+ # Use the response
+ msg = Fiber.yield
+ puts "Msg: #{msg}"
+ end.resume
+end
+
+
+ nc, err := nats.Connect("demo.nats.io")
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Use a WaitGroup to wait for 2 messages to arrive
+wg := sync.WaitGroup{}
+wg.Add(2)
+
+// Subscribe
+if _, err := nc.Subscribe("time.*.east", func(m *nats.Msg) {
+ log.Printf("%s: %s", m.Subject, m.Data)
+ wg.Done()
+}); err != nil {
+ log.Fatal(err)
+}
+
+// Wait for the 2 messages to come in
+wg.Wait()
+
+// Close the connection
+nc.Close()
+
+ Connection nc = Nats.connect("nats://demo.nats.io:4222");
+
+// Use a latch to wait for 2 messages to arrive
+CountDownLatch latch = new CountDownLatch(2);
+
+// Create a dispatcher and inline message handler
+Dispatcher d = nc.createDispatcher((msg) -> {
+ String subject = msg.getSubject();
+ String str = new String(msg.getData(), StandardCharsets.UTF_8);
+ System.out.println(subject + ": " + str);
+ latch.countDown();
+});
+
+// Subscribe
+d.subscribe("time.*.east");
+
+// Wait for messages to come in
+latch.await();
+
+// Close the connection
+nc.close();
+
+ nc.subscribe('time.us.*', (msg, reply, subject) => {
+ // converting timezones correctly in node requires a library
+ // this doesn't take into account *many* things.
+ let time = "";
+ switch (subject) {
+ case 'time.us.east':
+ time = new Date().toLocaleTimeString("en-us", {timeZone: "America/New_York"});
+ break;
+ case 'time.us.central':
+ time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Chicago"});
+ break;
+ case 'time.us.mountain':
+ time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Denver"});
+ break;
+ case 'time.us.west':
+ time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Los_Angeles"});
+ break;
+ default:
+ time = "I don't know what you are talking about Willis";
+ }
+ t.log(subject, time);
+});
+
+ nc = NATS()
+
+await nc.connect(servers=["nats://demo.nats.io:4222"])
+
+# Use queue to wait for 2 messages to arrive
+queue = asyncio.Queue()
+async def cb(msg):
+ await queue.put_nowait(msg)
+
+await nc.subscribe("time.*.east", cb=cb)
+
+# Send 2 messages and wait for them to come in
+await nc.publish("time.A.east", b'A')
+await nc.publish("time.B.east", b'B')
+
+msg_A = await queue.get()
+msg_B = await queue.get()
+
+print("Msg A:", msg_A)
+print("Msg B:", msg_B)
+
+
+ require 'nats/client'
+require 'fiber'
+
+NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
+ Fiber.new do
+ f = Fiber.current
+
+ nc.subscribe("time.*.east") do |msg, reply|
+ f.resume Time.now
+ end
+
+ nc.publish("time.A.east", "A")
+ nc.publish("time.B.east", "B")
+
+ # Use the response
+ msg_A = Fiber.yield
+ puts "Msg A: #{msg_A}"
+
+ msg_B = Fiber.yield
+ puts "Msg B: #{msg_B}"
+
+ end.resume
+end
+
+
+ await nc.subscribe('time.us.*', (err, msg) => {
+ // converting timezones correctly in node requires a library
+ // this doesn't take into account *many* things.
+ let time = "";
+ switch (msg.subject) {
+ case 'time.us.east':
+ time = new Date().toLocaleTimeString("en-us", {timeZone: "America/New_York"});
+ break;
+ case 'time.us.central':
+ time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Chicago"});
+ break;
+ case 'time.us.mountain':
+ time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Denver"});
+ break;
+ case 'time.us.west':
+ time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Los_Angeles"});
+ break;
+ default:
+ time = "I don't know what you are talking about Willis";
+ }
+ console.log(msg.subject, time);
+});
+
+ nc, err := nats.Connect("demo.nats.io")
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Subscribe
+sub, err := nc.SubscribeSync("updates")
+if err != nil {
+ log.Fatal(err)
+}
+
+// Wait for a message
+msg, err := sub.NextMsg(10 * time.Second)
+if err != nil {
+ log.Fatal(err)
+}
+
+// Use the response
+log.Printf("Reply: %s", msg.Data)
+
+// Close the connection
+nc.Close()
+
+ Connection nc = Nats.connect("nats://demo.nats.io:4222");
+
+// Subscribe
+Subscription sub = nc.subscribe("updates");
+
+// Read a message
+Message msg = sub.nextMessage(Duration.ZERO);
+
+String str = new String(msg.getData(), StandardCharsets.UTF_8);
+System.out.println(str);
+
+// Close the connection
+nc.close();
+
+ nc, err := nats.Connect("demo.nats.io")
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Subscribe
+sub, err := nc.SubscribeSync("time")
+if err != nil {
+ log.Fatal(err)
+}
+
+// Read a message
+msg, err := sub.NextMsg(10 * time.Second)
+if err != nil {
+ log.Fatal(err)
+}
+
+// Get the time
+timeAsBytes := []byte(time.Now().String())
+
+// Send the time
+nc.Publish(msg.Reply, timeAsBytes)
+
+// Flush and close the connection
+nc.Flush()
+nc.Close()
+
+ Connection nc = Nats.connect("nats://demo.nats.io:4222");
+
+// Subscribe
+Subscription sub = nc.subscribe("time");
+
+// Read a message
+Message msg = sub.nextMessage(Duration.ZERO);
+
+// Get the time
+Calendar cal = Calendar.getInstance();
+SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
+byte[] timeAsBytes = sdf.format(cal.getTime()).getBytes(StandardCharsets.UTF_8);
+
+// Send the time
+nc.publish(msg.getReplyTo(), timeAsBytes);
+
+// Flush and close the connection
+nc.flush(Duration.ZERO);
+nc.close();
+
+ // set up a subscription to process a request
+nc.subscribe('time', (msg, reply) => {
+ if (msg.reply) {
+ nc.publish(msg.reply, new Date().toLocaleTimeString());
+ }
+});
+
+ nc = NATS()
+
+await nc.connect(servers=["nats://demo.nats.io:4222"])
+
+future = asyncio.Future()
+
+async def cb(msg):
+ nonlocal future
+ future.set_result(msg)
+
+await nc.subscribe("time", cb=cb)
+
+await nc.publish_request("time", new_inbox(), b'What is the time?')
+await nc.flush()
+
+# Read the message
+msg = await asyncio.wait_for(future, 1)
+
+# Send the time
+time_as_bytes = "{}".format(datetime.now()).encode()
+await nc.publish(msg.reply, time_as_bytes)
+
+
+ require 'nats/client'
+require 'fiber'
+
+NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
+ Fiber.new do
+ f = Fiber.current
+
+ nc.subscribe("time") do |msg, reply|
+ f.resume Time.now
+ end
+
+ nc.publish("time", 'What is the time?', NATS.create_inbox)
+
+ # Use the response
+ msg = Fiber.yield
+ puts "Reply: #{msg}"
+
+ end.resume
+end
+
+
+ nc, err := nats.Connect("demo.nats.io")
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Sync Subscription
+sub, err := nc.SubscribeSync("updates")
+if err != nil {
+ log.Fatal(err)
+}
+if err := sub.Unsubscribe(); err != nil {
+ log.Fatal(err)
+}
+
+// Async Subscription
+sub, err = nc.Subscribe("updates", func(_ *nats.Msg) {})
+if err != nil {
+ log.Fatal(err)
+}
+if err := sub.Unsubscribe(); err != nil {
+ log.Fatal(err)
+}
+
+// Close the connection
+nc.Close()
+
+ Connection nc = Nats.connect("nats://demo.nats.io:4222");
+Dispatcher d = nc.createDispatcher((msg) -> {
+ String str = new String(msg.getData(), StandardCharsets.UTF_8);
+ System.out.println(str);
+});
+
+// Sync Subscription
+Subscription sub = nc.subscribe("updates");
+sub.unsubscribe();
+
+// Async Subscription
+d.subscribe("updates");
+d.unsubscribe("updates");
+
+// Close the connection
+nc.close();
+
+ // set up a subscription to process a request
+let sub = nc.subscribe(NATS.createInbox(), (msg, reply) => {
+ if (msg.reply) {
+ nc.publish(reply, new Date().toLocaleTimeString());
+ }
+});
+
+// without arguments the subscription will cancel when the server receives it
+// you can also specify how many messages are expected by the subscription
+nc.unsubscribe(sub);
+
+ nc = NATS()
+
+await nc.connect(servers=["nats://demo.nats.io:4222"])
+
+future = asyncio.Future()
+
+async def cb(msg):
+ nonlocal future
+ future.set_result(msg)
+
+sid = await nc.subscribe("updates", cb=cb)
+await nc.publish("updates", b'All is Well')
+
+# Remove interest in subject
+await nc.unsubscribe(sid)
+
+# Won't be received...
+await nc.publish("updates", b'...')
+
+
+ require 'nats/client'
+require 'fiber'
+
+NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
+ Fiber.new do
+ f = Fiber.current
+
+ sid = nc.subscribe("time") do |msg, reply|
+ f.resume Time.now
+ end
+
+ nc.publish("time", 'What is the time?', NATS.create_inbox)
+
+ # Use the response
+ msg = Fiber.yield
+ puts "Reply: #{msg}"
+
+ nc.unsubscribe(sid)
+
+ # Won't be received
+ nc.publish("time", 'What is the time?', NATS.create_inbox)
+
+ end.resume
+end
+
+
+ // set up a subscription to process a request
+let sub = await nc.subscribe(createInbox(), (err, msg) => {
+ if (msg.reply) {
+ nc.publish(msg.reply, new Date().toLocaleTimeString());
+ } else {
+ t.log('got a request for the time, but no reply subject was set.');
+ }
+});
+
+// without arguments the subscription will cancel when the server receives it
+// you can also specify how many messages are expected by the subscription
+sub.unsubscribe();
+
+ nc, err := nats.Connect("demo.nats.io")
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Sync Subscription
+sub, err := nc.SubscribeSync("updates")
+if err != nil {
+ log.Fatal(err)
+}
+if err := sub.AutoUnsubscribe(1); err != nil {
+ log.Fatal(err)
+}
+
+// Async Subscription
+sub, err = nc.Subscribe("updates", func(_ *nats.Msg) {})
+if err != nil {
+ log.Fatal(err)
+}
+if err := sub.AutoUnsubscribe(1); err != nil {
+ log.Fatal(err)
+}
+
+// Close the connection
+nc.Close()
+
+ Connection nc = Nats.connect("nats://demo.nats.io:4222");
+Dispatcher d = nc.createDispatcher((msg) -> {
+ String str = new String(msg.getData(), StandardCharsets.UTF_8);
+ System.out.println(str);
+});
+
+// Sync Subscription
+Subscription sub = nc.subscribe("updates");
+sub.unsubscribe(1);
+
+// Async Subscription
+d.subscribe("updates");
+d.unsubscribe("updates", 1);
+
+// Close the connection
+nc.close();
+
+ // `max` specifies the number of messages that the server will forward.
+// The server will auto-cancel.
+let opts = {max: 10};
+let sub = nc.subscribe(NATS.createInbox(), opts, (msg) => {
+ t.log(msg);
+});
+
+// another way after 10 messages
+let sub2 = nc.subscribe(NATS.createInbox(), (err, msg) => {
+ t.log(msg.data);
+});
+// if the subscription already received 10 messages, the handler
+// won't get any more messages
+nc.unsubscribe(sub2, 10);
+
+ nc = NATS()
+
+await nc.connect(servers=["nats://demo.nats.io:4222"])
+
+async def cb(msg):
+ print(msg)
+
+sid = await nc.subscribe("updates", cb=cb)
+await nc.auto_unsubscribe(sid, 1)
+await nc.publish("updates", b'All is Well')
+
+# Won't be received...
+await nc.publish("updates", b'...')
+
+
+ require 'nats/client'
+require 'fiber'
+
+NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
+ Fiber.new do
+ f = Fiber.current
+
+ nc.subscribe("time", max: 1) do |msg, reply|
+ f.resume Time.now
+ end
+
+ nc.publish("time", 'What is the time?', NATS.create_inbox)
+
+ # Use the response
+ msg = Fiber.yield
+ puts "Reply: #{msg}"
+
+ # Won't be received
+ nc.publish("time", 'What is the time?', NATS.create_inbox)
+
+ end.resume
+end
+
+
+ // `max` specifies the number of messages that the server will forward.
+// The server will auto-cancel.
+let opts = {max: 10};
+let sub = await nc.subscribe(createInbox(), (err, msg) => {
+ t.log(msg.data);
+}, opts);
+
+// another way after 10 messages
+let sub2 = await nc.subscribe(createInbox(), (err, msg) => {
+ t.log(msg.data);
+});
+// if the subscription already received 10 messages, the handler
+// won't get any more messages
+sub2.unsubscribe(10);
+
+ nc, err := nats.Connect("demo.nats.io")
+if err != nil {
+ log.Fatal(err)
+}
+defer nc.Close()
+
+zoneID, err := time.LoadLocation("America/New_York")
+if err != nil {
+ log.Fatal(err)
+}
+now := time.Now()
+zoneDateTime := now.In(zoneID)
+formatted := zoneDateTime.String()
+
+nc.Publish("time.us.east", []byte(formatted))
+nc.Publish("time.us.east.atlanta", []byte(formatted))
+
+zoneID, err = time.LoadLocation("Europe/Warsaw")
+if err != nil {
+ log.Fatal(err)
+}
+zoneDateTime = now.In(zoneID)
+formatted = zoneDateTime.String()
+
+nc.Publish("time.eu.east", []byte(formatted))
+nc.Publish("time.eu.east.warsaw", []byte(formatted))
+
+// Close the connection
+nc.Close()
+
+ Connection nc = Nats.connect("nats://demo.nats.io:4222");
+ZoneId zoneId = ZoneId.of("America/New_York");
+ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(Instant.now(), zoneId);
+String formatted = zonedDateTime.format(DateTimeFormatter.ISO_ZONED_DATE_TIME);
+
+nc.publish("time.us.east", formatted.getBytes(StandardCharsets.UTF_8));
+nc.publish("time.us.east.atlanta", formatted.getBytes(StandardCharsets.UTF_8));
+
+zoneId = ZoneId.of("Europe/Warsaw");
+zonedDateTime = ZonedDateTime.ofInstant(Instant.now(), zoneId);
+formatted = zonedDateTime.format(DateTimeFormatter.ISO_ZONED_DATE_TIME);
+nc.publish("time.eu.east", formatted.getBytes(StandardCharsets.UTF_8));
+nc.publish("time.eu.east.warsaw", formatted.getBytes(StandardCharsets.UTF_8));
+
+nc.flush(Duration.ZERO);
+nc.close();
+
+ nc.publish('time.us.east');
+nc.publish('time.us.central');
+nc.publish('time.us.mountain');
+nc.publish('time.us.west');
+
+ nc = NATS()
+
+await nc.connect(servers=["nats://demo.nats.io:4222"])
+
+await nc.publish("time.us.east", b'...')
+await nc.publish("time.us.east.atlanta", b'...')
+
+await nc.publish("time.eu.east", b'...')
+await nc.publish("time.eu.east.warsaw", b'...')
+
+await nc.close()
+
+