diff --git a/Makefile b/Makefile
index 0207e23..292d1ed 100644
--- a/Makefile
+++ b/Makefile
@@ -9,3 +9,7 @@ serve:
examples:
go run _tools/examplecompiler/main.go -o _examples -r _tools/examplecompiler/example_repos.json -t _tools/examplecompiler/example_template.tmp
+
+deploy: init examples
+ rm -rf docs
+ gitbook build . docs
\ No newline at end of file
diff --git a/docs/LICENSE b/docs/LICENSE
new file mode 100644
index 0000000..261eeb9
--- /dev/null
+++ b/docs/LICENSE
@@ -0,0 +1,201 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/docs/developer/concepts/acks.html b/docs/developer/concepts/acks.html
new file mode 100644
index 0000000..4111d69
--- /dev/null
+++ b/docs/developer/concepts/acks.html
@@ -0,0 +1,2358 @@
+
+
+
+
In a system with at-most-once semantics, there are times when messages are lost. If your application is doing request-reply then it can simply use timeouts to handle network and application failures. When you are using one-way messaging the easiest way to insure message delivery is to turn it into a request-reply with the concept of an acknowledgement message, or ACKS. In NATS an ACK can simply be an empty message, a message with no body.
Because the ACK can be empty it can take up very little network bandwidth, but the idea of the ACK turns a simple fire-and-forget into a fire-and-know world where the sender can be sure that the message was received by the other side, or with scatter-gather, several other sides.
NATS messaging involves the electronic exchange of data among computer applications and provides a layer between the application and the underlying physical network. Application data is encoded as a message and sent by a publisher. The message is received, decoded, and processed by one or more subscribers.
+
By providing a central, easily discovered broker, NATS makes it easy for programs to communicate across different environments, languages, and systems. All clients have to do is connect to the broker, subscribe or publish to a subject and process messages. With this simple design, NATS lets programs share common message-handling code, isolate resources and interdependencies, and scale by easily handling an increase in message volume.
NATS core offers an at most once quality of service. If a subscriber is not listening on the subject (no subject match), or is not active when the message is sent, the message is not received. This is the same level of guarantee that TCP/IP provides. By default, NATS is a fire-and-forget messaging system. If you need higher levels of service, you can either use NATS Streaming, or build the additional reliability into your client(s) yourself.
NATS implements a publish-subscribe message distribution model for one-to-many communication. A publisher sends a message on a subject and any active subscriber listening on that subject receives the message. Subscribers can also register interest in wildcard subjects that work a bit like a regular expression (but only a bit). This one-to-many pattern is sometimes called fan-out.
NATS provides a load balancing feature called queue subscriptions. Using queue subscribers will load balance message delivery across a group of subscribers which can be used to provide application fault tolerance and scale workload processing.
+
To create a queue subscription, subscribers register a queue name. All subscribers with the same queue name form the queue group. As messages on the registered subject are published, one member of the group is chosen randomly to receive the message. Although queue groups have multiple subscribers, each message is consumed by only one.
+
One of the great features of NATS is that queue groups are defined by the subscribers, not on the server. Applications can create new queue groups without any server change.
+
Queue subscribers are ideal for auto scaling as you can add or remove them anytime, without any configuration changes or restarting the server or clients.
NATS supports two flavors of request reply messaging: point-to-point or one-to-many. Point-to-point involves the fastest or first to respond. In a one-to-many exchange, you can set a limit on the number of responses the requestor may receive or use a timeout to limit on the speed of the response. One-to-many request reply is sometimes called scatter gather.
+
In a request-response exchange the publish request operation publishes a message with a reply subject expecting a response on that reply subject. Many libraries allow you to use a function that will automatically wait for a response with a timeout. You can also handle that waiting process yourself.
+
The common pattern used by the libraries is that the request creates a unique inbox and performs a request call with the inbox reply and returns the first reply received. This is optimized in the case of multiple responses by ignoring later responses automatically.
A common problem for one-to-many messages is that a message can get lost or dropped due to a network failure. A simple pattern for resolving this situation is to include a sequence id with the message. Receivers can check the sequence id to see if they miss anything.
In order to really leverage sequence ids there are a few things to keep in mind:
+
+
Each sender will have to use their own sequence
+
If possible, receivers should be able to ask for missing messages by id
+
+
With NATS you can embed sequence ids in the message, or you can include them in the subject. For example, a sender can send messages to updates.1, updates.2, etc... and the subscribers can listen to updates.* and optionally parse the subject to determine the sequence id.
Fundamentally NATS is about publishing and listening for messages. Both of these depend heavily on Subjects which scope messages into streams or topics. At its simplest, a subject is just a string of characters that form a name the publisher and subscriber can used to find each other.
The NATS server reserves a few characters as special, and the specification says that only "alpha-numeric" characters plus the "." should be used in subject names. Subjects are case-sensitive and can not contain whitespace. For safety across clients, ASCII characters should be used, although this is subject to change in the future.
+
Subject Hierarchies
+
The . character is used to create a subject hierarchy. For example, a world clock application might define the following to logically group related subjects:
NATS provides two wildcards that can take the place of one or more elements in a dot-separated subject. Subscribers can use these wildcards to listen to multiple subjects with a single subscription but Publishers will always use a fully specified subject, without the wildcard.
+
Matching A Single Token
+
The first wildcard is * which will match a single token. For example, if an application wanted to listen for eastern time zones, they could subscribe to time.*.east, which would match time.us.east and time.eu.east.
The second wildcard is > which will match one or more tokens, and can only appear at the end of the subject. For example, time.us.> will match time.us.east and time.us.east.atlanta, while time.us.* would only match time.us.east since it can't match more than one token.
Subject to your security configuration, wildcards can be used for monitoring by creating something sometimes called a wire tap. In the simplest case you can create a subscriber for >. This application will receive all messages, again subject to security settings, sent on your NATS cluster.
When connecting to a cluster, there are a few things to think about.
+
+
Passing a URL for each cluster member (semi-optional)
+
The connection algorithm
+
The reconnect algorithm (discussed later)
+
Server provided URLs
+
+
When a client library first tries to connect it will use the list of URLS provided to the connection options or function. These URLS are checked, usually in order, and the first successful connection is used.
+
After a client connects to the server, the server may provide a list of URLs for additional known servers. This allows a client to connect to one server and still have other servers available during reconnect.
+
To insure the initial connection, your code should include a list of reasonable front line servers. Those servers may know about other members of the cluster, and may tell the client about those members. But you don't have to configure the client to pass every valid member of the cluster in the connect method.
+
By providing the ability to pass multiple connect options NATS can handle the possibility of a machine going down or being unavailable to a client. By adding the ability of the server to feed clients a list of known servers as part of the client-server protocol the mesh created by a cluster can grow and change organically while the clients are running.
+
Note, failure behavior is library dependent, please check the documentation for your client library on information about what happens if the connect fails.
servers :=[]string{"nats://localhost:1222",
+ "nats://localhost:1223",
+ "nats://localhost:1224",
+}
+
+nc, err := nats.Connect(strings.Join(servers,","))
+if err !=nil{
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+
+
+
+
Options options =newOptions.Builder().
+ server("nats://localhost:1222").
+ server("nats://localhost:1223").
+ server("nats://localhost:1224").
+ build();
+Connection nc =Nats.connect(options);
+
+// Do something with the connection
+
+nc.close();
+
+
+
+
+
let nc =NATS.connect({
+ servers:[
+ "nats://demo.nats.io:4222",
+ "nats://localhost:4222"
+ ]}
+);
+
+nc.on('connect',(c)=>{
+ // Do something with the connection
+ doSomething();
+ // When done close it
+ nc.close();
+});
+nc.on('error',(err)=>{
+ failed(err);
+});
+
+
+
+
+
nc = NATS()
+await nc.connect(servers=[
+ "nats://127.0.0.1:1222",
+ "nats://127.0.0.1:1223",
+ "nats://127.0.0.1:1224"
+ ])
+
+# Do something with the connection
+
+await nc.close()
+
+
+
+
+
+
require'nats/client'
+
+NATS.start(servers:["nats://127.0.0.1:1222","nats://127.0.0.1:1223","nats://127.0.0.1:1224"])do|nc|
+ # Do something with the connection
+
+ # Close the connection
+ nc.close
+end
+
+
+
+
+
// will throw an exception if connection fails
+let nc =awaitconnect({
+ servers:[
+ "nats://demo.nats.io:4222",
+ "nats://localhost:4222"
+ ]
+});
+// Do something with the connection
+
+// When done close it
+nc.close();
+
Each library has its own, language preferred way, to pass connection options. One of the most common options is a connection timeout. To set the maximum time to connect to a server to 10 seconds:
nc, err := nats.Connect(nats.DefaultURL, nats.Timeout(10*time.Second))
+if err !=nil{
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+
+
+
+
Options options =newOptions.Builder().
+ server("nats://demo.nats.io:4222").
+ connectionTimeout(Duration.ofSeconds(10)).// Set timeout
+ build();
+Connection nc =Nats.connect(options);
+
+// Do something with the connection
+
+nc.close();
+
+
+
+
+
// connection timeout is not supported on node-nats
+
+
+
+
+
nc = NATS()
+await nc.connect(connect_timeout=2)
+
+# Do something with the connection
+
+await nc.close()
+
+
+
+
+
+
# There is currently no connect timeout as part of the Ruby NATS client API, but you can use a timer to mimic it.
+require'nats/client'
+
+timer =EM.add_timer(5)do
+ NATS.connect do|nc|
+ # Do something with the connection
+
+ # Close the connection
+ nc.close
+ end
+end
+EM.cancel_timer(timer)
+
+
+
+
+
let nc =awaitconnect({
+ url:"nats://demo.nats.io:4222",
+ timeout:1000
+});
+
+
nc, err := nats.Connect(nats.DefaultURL)
+if err !=nil{
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+
+
+
+
Connection nc =Nats.connect();
+
+// Do something with the connection
+
+nc.close();
+
+
+
+
+
let nc =NATS.connect();
+nc.on('connect',(c)=>{
+ // Do something with the connection
+ doSomething();
+ // When done close it
+ nc.close();
+});
+nc.on('error',(err)=>{
+ failed(err);
+});
+
+
+
+
+
nc = NATS()
+await nc.connect()
+
+# Do something with the connection
+
+await nc.close()
+
+
+
+
+
+
require'nats/client'
+
+NATS.start do|nc|
+ # Do something with the connection
+
+ # Close the connection
+ nc.close
+end
+
+
+
+
+
// will throw an exception if connection fails
+let nc =awaitconnect();
+// Do something with the connection
+
+// When done close it
+nc.close();
+
+
+// alternatively, you can use the Promise pattern
+let nc1: Client;
+connect()
+ .then((c)=>{
+ nc1 = c;
+ // Do something with the connection
+ nc1.close();
+ });
+ // add a .catch/.finally
+
Most client libraries provide several ways to connect to the NATS server. The server itself is identified by a standard URL with the nats protocol. Throughout these examples we will rely on a test server, provided by nats.io, at nats://demo.nats.io:4222, where 4222 is the default port for NATS.
+
NATS clients also support the tls protocol to indicate that the client wants to use TLS. So in the previous example we can replace nats with tls to get tls://demo.nats.io:4222.
+
The protocol requirement is being removed from many libraries, so that you can use demo.nats.io:4222 as the URL and let the client and server resolve whether or not TLS is required.
+
There are numerous options for a NATS connections ranging from timeouts to reconnect settings.
By default the server will echo messages. This means that if a publisher on a connection sends a message to a subject any subscribers on that same connection may receive the message. Turning off echo is a fairly new feature for the NATS server, but some of the clients already support it.
Keep in mind that each connection will have to turn off echo, and that it is per connection, not per application. Also, turning echo on and off can result in a major change to your applications communications protocol since messages will flow or stop flowing based on this setting and the subscribing code won't have any indication as to why.
The client and server use a simple PING/PONG protocol to check that they are both still connected. The client will ping the server on a regular, configured interval so that the server usually doesn't have to initiate the PING/PONG interaction.
If you have a connection that is going to be open a long time with few messages traveling on it, setting this PING interval can control how quickly the client will be notified of a problem. However on connections with a lot of traffic, the client will often figure out there is a problem between PINGS, and as a result the default PING interval is often on the order of minutes. To set the interval to 20s:
// Set Ping Interval to 20 seconds
+nc, err := nats.Connect("demo.nats.io", nats.PingInterval(20*time.Second))
+if err !=nil{
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+
+
+
+
Options options =newOptions.Builder().
+ server("nats://demo.nats.io:4222").
+ pingInterval(Duration.ofSeconds(20)).// Set Ping Interval
+ build();
+Connection nc =Nats.connect(options);
+
+// Do something with the connection
+
+nc.close();
+
+
+
+
+
let nc =NATS.connect({
+ pingInterval:20*1000,//20s
+ url:"nats://demo.nats.io:4222"
+});
+
+
+
+
+
nc = NATS()
+
+await nc.connect(
+ servers=["nats://demo.nats.io:4222"],
+ # Set Ping Interval to 20 seconds
+ ping_interval=20,
+ )
+
+# Do something with the connection.
+
+
+
+
+
+
require'nats/client'
+
+NATS.start(ping_interval:20)do|nc|
+ nc.on_reconnect do
+ puts "Got reconnected to #{nc.connected_server}"
+ end
+
+ nc.on_disconnect do|reason|
+ puts "Got disconnected! #{reason}"
+ end
+
+ # Do something with the connection
+end
+
+
+
+
+
// will throw an exception if connection fails
+let nc =awaitconnect({
+ pingInterval:20*2000,//20s
+ url:"nats://demo.nats.io:4222"
+});
+nc.close();
+
+
+
+
+
Limit Outgoing Pings
+
The PING/PONG interaction is also used by most of the clients as a way to flush the connection to the server. Clients that cache outgoing messages provide a flush call that will run a PING/PONG. The flush will wait for the PONG to return, telling it that all cached messages have been processed, including the PING. The number of cached PING requests can be limited in most clients to insure that traffic problems are identified early. This configuration for max outgoing pings or similar will usually default to a small number and should only be increased if you are worried about fast flush traffic, perhaps in multiple threads.
+
For example, to set the maximum number of outgoing pings to 5:
opts := nats.GetDefaultOptions()
+opts.Url ="demo.nats.io"
+// Set maximum number of PINGs out without getting a PONG back
+// before the connection will be disconnected as a stale connection.
+opts.MaxPingsOut =5
+
+nc, err := opts.Connect()
+if err !=nil{
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+
+
+
+
Options options =newOptions.Builder().
+ server("nats://demo.nats.io:4222").
+ maxPingsOut(5).// Set max pings in flight
+ build();
+Connection nc =Nats.connect(options);
+
+// Do something with the connection
+
+nc.close();
+
+
+
+
+
let nc =NATS.connect({
+ maxPingOut:5,
+ url:"nats://demo.nats.io:4222"
+});
+
+
+
+
+
nc = NATS()
+
+await nc.connect(
+ servers=["nats://demo.nats.io:4222"],
+ # Set maximum number of PINGs out without getting a PONG back
+ # before the connection will be disconnected as a stale connection.
+ max_outstanding_pings=5,
+ ping_interval=1,
+ )
+
+# Do something with the connection.
+
+
+
+
+
+
require'nats/client'
+
+NATS.start(max_outstanding_pings:5)do|nc|
+ nc.on_reconnect do
+ puts "Got reconnected to #{nc.connected_server}"
+ end
+
+ nc.on_disconnect do|reason|
+ puts "Got disconnected! #{reason}"
+ end
+
+ # Do something with the connection
+end
+
+
+
+
+
// will throw an exception if connection fails
+let nc =awaitconnect({
+ maxPingOut:5,
+ url:"nats://demo.nats.io:4222"
+});
+nc.close();
+
The protocol between the client and the server is fairly simple and relies on a control line and sometimes a body. The control line contains the operations being sent, like PING or PONG, followed by a carriage return and line feed, CRLF or "\r\n". The server has a setting that can limit the maximum size of a control line. For PING and PONG this doesn't come into play, but for messages that contain subject names, the control line length can be important. The server is also configured with a maximum payload size, which limits the size of a message body. The server sends the maximum payload size to the client at connect time but doesn't currently tell the client the maximum control line size.
+
Set the Maximum Control Line Size
+
Some clients will try to limit the control line size internally to prevent an error from the server. These clients may or may not allow you to set the size being used, but if they do, the size should be set to match the server configuration.
+
For example, to set the maximum control line size to 2k:
Options options =newOptions.Builder().
+ server("nats://demo.nats.io:4222").
+ maxControlLine(2*1024).// Set the max control line to 2k
+ build();
+Connection nc =Nats.connect(options);
+
+// Do something with the connection
+
+nc.close();
+
+
+
+
+
// set this option before creating a connection
+NATS.MAX_CONTROL_LINE_SIZE=1024*2;
+let nc =NATS.connect({
+ url:"nats://demo.nats.io:4222"
+});
+
+
+
+
+
+
# Asyncio NATS client does not allow custom control lines
+
+
+
+
+
# There is no need to customize this in the Ruby NATS client.
+
+
+
+
+
// control line size is not configurable on ts-nats
+
+
+
+
+
Get the Maximum Payload Size
+
While the client can't control the maximum payload size, clients may provide a way for applications to get the size after the connection is made. This will allow the application to chunk or limit data as needed to pass through the server.
nc, err := nats.Connect("demo.nats.io")
+if err !=nil{
+ log.Fatal(err)
+}
+defer nc.Close()
+
+mp := nc.MaxPayload()
+log.Printf("Maximum payload is %v bytes", mp)
+
+// Do something with the max payload
+
+
+
+
+
+
Connection nc =Nats.connect("nats://demo.nats.io:4222");
+
+long max = nc.getMaxPayload();
+// Do something with the max payload
+
+nc.close();
+
+
+
+
+
let nc =NATS.connect("nats://demo.nats.io:4222");
+
+// on node you *must* register an error listener. If not registered
+// the library emits an 'error' event, the node process will exit.
+nc.on('error',(err)=>{
+ t.log('client got an error:', err);
+});
+nc.on('connect',()=>{
+ t.log(nc.info.max_payload);
+});
+
+
+
+
+
nc = NATS()
+
+await nc.connect(servers=["nats://demo.nats.io:4222"])
+
+print("Maximum payload is %d bytes"% nc.max_payload)
+
+# Do something with the max payload.
+
+
+
+
+
+
require'nats/client'
+
+NATS.start(max_outstanding_pings:5)do|nc|
+ nc.on_reconnect do
+ puts "Got reconnected to #{nc.connected_server}"
+ end
+
+ nc.on_disconnect do|reason|
+ puts "Got disconnected! #{reason}"
+ end
+
+ # Do something with the max_payload
+ puts "Maximum Payload is #{nc.server_info[:max_payload]} bytes"
+end
+
+
+
+
+
// connect will happen once - the first connect
+nc.on('connect',(nc: Client, url: string, options: ServerInfo)=>{
+ // nc is the connection that connected
+ t.log('client connected to', url);
+ t.log('max_payload', options.max_payload);
+});
+
+
+
+
+
Turn On Pedantic Mode
+
The NATS server provides a pedantic mode that does extra checks on the protocol. By default, this setting is off but you can turn it on:
opts := nats.GetDefaultOptions()
+opts.Url ="demo.nats.io"
+// Turn on Pedantic
+opts.Pedantic =true
+nc, err := opts.Connect()
+if err !=nil{
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+
+
+
+
Options options =newOptions.Builder().
+ server("nats://demo.nats.io:4222").
+ pedantic().// Turn on pedantic
+ build();
+Connection nc =Nats.connect(options);
+
+// Do something with the connection
+
+nc.close();
+
+
+
+
+
let nc =NATS.connect({
+ url:"nats://demo.nats.io:4222",
+ pedantic:true
+});
+
+
+
+
+
+
nc = NATS()
+
+await nc.connect(servers=["nats://demo.nats.io:4222"], pedantic=True)
+
+# Do something with the connection.
+
+
+
+
+
+
require'nats/client'
+
+NATS.start(pedantic:true)do|nc|
+ nc.on_reconnect do
+ puts "Got reconnected to #{nc.connected_server}"
+ end
+
+ nc.on_disconnect do|reason|
+ puts "Got disconnected! #{reason}"
+ end
+
+ nc.close
+end
+
+
+
+
+
// will throw an exception if connection fails
+let nc =awaitconnect({
+ url:"nats://demo.nats.io:4222",
+ pedantic:true
+});
+
+nc.close();
+
+
+
+
+
Turn On/Off Verbose Mode
+
The NATS server also provide a verbose mode. By default, verbose mode is enabled and the server will reply to every message from the client with either a +OK or a -ERR. Most clients turn off verbose mode, which disables all of the +OK traffic. Errors are rarely subject to verbose mode and client libraries handle them as documented. To turn on verbose mode, likely for testing:
The NATS client libraries can take a full URL, nats://demo.nats.io:4222, to specify a specific server host and port to connect to.
+
Libraries are removing the requirement for an explicit protocol and may allow nats://demo.nats.io:4222 or just demo.nats.io:4222. Check with your specific client library's documentation to see what URL formats are supported.
+
For example, to connect to the demo server with a URL you can use:
// If connecting to the default port, the URL can be simplified
+// to just the hostname/IP.
+// That is, the connect below is equivalent to:
+// nats.Connect("nats://demo.nats.io:4222")
+nc, err := nats.Connect("demo.nats.io")
+if err !=nil{
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+
+
+
+
Connection nc =Nats.connect("nats://demo.nats.io:4222");
+
+// Do something with the connection
+
+nc.close();
+
+
+
+
+
let nc =NATS.connect("nats://demo.nats.io:4222");
+nc.on('connect',(c)=>{
+ // Do something with the connection
+ doSomething();
+ // When done close it
+ nc.close();
+});
+nc.on('error',(err)=>{
+ failed(err);
+});
+
+
+
+
+
nc = NATS()
+await nc.connect(servers=["nats://demo.nats.io:4222"])
+
+# Do something with the connection
+
+await nc.close()
+
+
+
+
+
+
require'nats/client'
+
+NATS.start(servers:["nats://demo.nats.io:4222"])do|nc|
+ # Do something with the connection
+
+ # Close the connection
+ nc.close
+end
+
+
+
+
+
// will throw an exception if connection fails
+ let nc =awaitconnect("nats://demo.nats.io:4222");
+ // Do something with the connection
+
+ // Close the connection
+ nc.close();
+
While the connection status is interesting, it is perhaps more interesting to know when the status changes. Most, if not all, of the NATS client libraries provide a way to listen for events related to the connection and its status.
+
The actual API for these listeners is language dependent, but the following examples show a few of the more common use cases. See the API documentation for the client library you are using for more specific instructions.
+
Connection events may include the connection being closed, disconnected or reconnected. Reconnecting involves a disconnect and connect, but depending on the library implementation may also include multiple disconnects as the client tries to find a server, or the server is rebooted.
// There is not a single listener for connection events in the NATS Go Client.
+// Instead, you can set individual event handlers using:
+
+DisconnectHandler(cb ConnHandler)
+ReconnectHandler(cb ConnHandler)
+ClosedHandler(cb ConnHandler)
+DiscoveredServersHandler(cb ConnHandler)
+ErrorHandler(cb ErrHandler)
+
# Asyncio NATS client can be defined a number of event callbacks
+asyncdefdisconnected_cb():
+ print("Got disconnected!")
+
+asyncdefreconnected_cb():
+ # See who we are connected to on reconnect.
+ print("Got reconnected to {url}".format(url=nc.connected_url.netloc))
+
+asyncdeferror_cb(e):
+ print("There was an error: {}".format(e))
+
+asyncdefclosed_cb():
+ print("Connection is closed")
+
+# Setup callbacks to be notified on disconnects and reconnects
+options["disconnected_cb"]= disconnected_cb
+options["reconnected_cb"]= reconnected_cb
+
+# Setup callbacks to be notified when there is an error
+# or connection is closed.
+options["error_cb"]= error_cb
+options["closed_cb"]= closed_cb
+
+await nc.connect(**options)
+
+
+
+
+
# There is not a single listener for connection events in the Ruby NATS Client.
+# Instead, you can set individual event handlers using:
+
+NATS.on_disconnect do
+end
+
+NATS.on_reconnect do
+end
+
+NATS.on_close do
+end
+
+NATS.on_error do
+end
+
+
+
+
+
// connect will happen once - the first connect
+nc.on('connect',(nc)=>{
+ // nc is the connection that connected
+ t.log('client connected');
+});
+
+nc.on('disconnect',(url)=>{
+ // nc is the connection that reconnected
+ t.log('disconnected from', url);
+});
+
+nc.on('reconnecting',(url)=>{
+ t.log('reconnecting to', url);
+});
+
+nc.on('reconnect',(nc, url)=>{
+ // nc is the connection that reconnected
+ t.log('reconnected to', url);
+});
+
+
+
+
+
Listen for New Servers
+
When working with a cluster, servers may be added or changed. Some of the clients allow you to listen for this notification:
// Be notified if a new server joins the cluster.
+// Print all the known servers and the only the ones that were discovered.
+nc, err := nats.Connect("demo.nats.io",
+ nats.DiscoveredServersHandler(func(nc *nats.Conn){
+ log.Printf("Known servers: %v\n", nc.Servers())
+ log.Printf("Discovered servers: %v\n", nc.DiscoveredServers())
+ }))
+if err !=nil{
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
The client library may separate server-to-client errors from events. Many server events are not handled by application code and result in the connection being closed. Listening for the errors can be very useful for debugging problems.
// Set the callback that will be invoked when an asynchronous error occurs.
+nc, err := nats.Connect("demo.nats.io",
+ nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error){
+ log.Printf("Error: %v", err)
+ }))
+if err !=nil{
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
let nc =NATS.connect("nats://demo.nats.io:4222");
+
+// on node you *must* register an error listener. If not registered
+// the library emits an 'error' event, the node process will exit.
+nc.on('error',(err)=>{
+ t.log('client got an error:', err);
+});
+
+
+
+
+
nc = NATS()
+
+asyncdeferror_cb(e):
+ print("Error: ", e)
+
+await nc.connect(
+ servers=["nats://demo.nats.io:4222"],
+ reconnect_time_wait=10,
+ error_cb=error_cb,
+ )
+
+# Do something with the connection.
+
+
// on node you *must* register an error listener. If not registered
+// the library emits an 'error' event, the node process will exit.
+nc.on('error',(err)=>{
+ t.log('client got an out of band error:', err);
+});
+
Managing the interaction with the server is primarily the job of the client library but most of the libraries also provide some insight into what is happening under the covers.
+
For example, the client library may provide a mechanism to get the connection's current status:
Connection nc =Nats.connect("nats://demo.nats.io:4222");
+
+System.out.println("The Connection is: "+ nc.getStatus());
+
+nc.close();
+
+System.out.println("The Connection is: "+ nc.getStatus());
+
+
+
+
+
let nc =NATS.connect("nats://demo.nats.io:4222");
+
+// on node you *must* register an error listener. If not registered
+// the library emits an 'error' event, the node process will exit.
+nc.on('error',(err)=>{
+ t.log('client got an error:', err);
+});
+
+if(nc.closed){
+ t.log('client is closed');
+}else{
+ t.log('client is not closed');
+}
+
+
+
+
+
nc = NATS()
+
+await nc.connect(
+ servers=["nats://demo.nats.io:4222"],
+ )
+
+# Do something with the connection.
+
+print("The connection is connected?", nc.is_connected)
+
+whileTrue:
+ if nc.is_reconnecting:
+ print("Reconnecting to NATS...")
+ break
+ await asyncio.sleep(1)
+
+await nc.close()
+
+print("The connection is closed?", nc.is_closed)
+
+
+
+
+
+
NATS.start(max_reconnect_attempts:2)do|nc|
+ puts "Connect is connected?: #{nc.connected?}"
+
+ timer =EM.add_periodic_timer(1)do
+ if nc.closing?
+ puts "Connection closed..."
+ EM.cancel_timer(timer)
+ NATS.stop
+ end
+
+ if nc.reconnecting?
+ puts "Reconnecting to NATS..."
+ next
+ end
+ end
+end
+
+
+
+
+
+
if(nc.isClosed()){
+ t.log('the client is closed');
+}else{
+ t.log('the client is running');
+}
+
NATS is designed to move messages through the server quickly. As a result, NATS depends on the applications to consider and respond to changing message rates. The server will do a bit of impedance matching, but if a client is too slow the server will eventually cut them off. These cut off connections are called slow consumers.
+
One way some of the libraries deal with bursty message traffic is to cache incoming messages for a subscription. So if an application can handle 10 messages per second and sometimes receives 20 messages per second, the library may hold the extra 10 to give the application time to catch up. To the server, the application will appear to be handling the messages and consider the connection healthy. It is up to the client library to decide what to do when the cache is too big, but most client libraries will drop incoming messages.
+
Receiving and dropping messages from the server keeps the connection to the server healthy, but creates an application requirement. There are several common patterns:
+
+
Use request/reply to throttle the sender and prevent overloading the subscriber
+
Use a queue with multiple subscribers splitting the work
+
Persist messages with something like NATS streaming
+
+
Libraries that cache incoming messages may provide two controls on the incoming queue, or pending messages. These are useful if the problem is bursty publishers and not a continuous performance mismatch. Disabling these limits can be dangerous in production and although setting these limits to 0 may help find problems, it is also a dangerous proposition in production.
+
+
Check your libraries documentation for the default settings, and support for disabling these limits.
+
+
The incoming cache is usually per subscriber, but again, check the specific documentation for your client library.
+
Limiting Incoming/Pending Messages by Count and Bytes
+
The first way that the incoming queue can be limited is by message count. The second way to limit the incoming queue is by total size. For example, to limit the incoming cache to 1,000 messages or 5mb whichever comes first:
nc, err := nats.Connect("demo.nats.io")
+if err !=nil{
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Subscribe
+sub1, err := nc.Subscribe("updates",func(m *nats.Msg){})
+if err !=nil{
+ log.Fatal(err)
+}
+
+// Set limits of 1000 messages or 5MB, whichever comes first
+sub1.SetPendingLimits(1000,5*1024*1024)
+
+// Subscribe
+sub2, err := nc.Subscribe("updates",func(m *nats.Msg){})
+if err !=nil{
+ log.Fatal(err)
+}
+
+// Set no limits for this subscription
+sub2.SetPendingLimits(-1,-1)
+
+// Close the connection
+nc.Close()
+
+
+
+
+
Connection nc =Nats.connect("nats://demo.nats.io:4222");
+
+Dispatcher d = nc.createDispatcher((msg)->{
+ // do something
+});
+
+d.subscribe("updates");
+
+d.setPendingLimits(1_000,5*1024*1024);// Set limits on a dispatcher
+
+// Subscribe
+Subscription sub = nc.subscribe("updates");
+
+sub.setPendingLimits(1_000,5*1024*1024);// Set limits on a subscription
+
+// Do something
+
+// Close the connection
+nc.close();
+
+
+
+
+
// slow pending limits are not configurable on node-nats
+
# The Ruby NATS client currently does not have option to customize slow consumer limits per sub.
+
+
+
+
+
// slow pending limits are not configurable on ts-nats
+
+
+
+
+
Detect a Slow Consumer and Check for Dropped Messages
+
When a slow consumer is detected and messages are about to be dropped, the library may notify the application. This process may be similar to other errors or may involve a custom callback.
+
Some libraries, like Java, will not send this notification on every dropped message because that could be noisy. Rather the notification may be sent once per time the subscriber gets behind. Libraries may also provide a way to get a count of dropped messages so that applications can at least detect a problem is occurring.
Developing with NATS is a combination of distributed application techniques, common NATS features and library specific syntax. As well as using this book for guidance, some of the libraries contain language-familiar formats of their API. For example, the go library has go doc, and the Java library has javadoc.
Not all libraries contain this separate doc, depending on the language community, but be sure to check out the client libraries README for more information.
Asynchronous subscriptions use callbacks of some form to notify an application when a message arrives. These subscriptions are usually easier to work with, but do represent some form of internal work and resource usage, i.e. threads, by the library. Check your library's documentation for any resource usage associated with asynchronous subscriptions.
+
The following example subscribes to the subject updates and handles the incoming messages:
nc, err := nats.Connect("demo.nats.io")
+if err !=nil{
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Use a WaitGroup to wait for a message to arrive
+wg := sync.WaitGroup{}
+wg.Add(1)
+
+// Subscribe
+if_, err := nc.Subscribe("updates",func(m *nats.Msg){
+ wg.Done()
+}); err !=nil{
+ log.Fatal(err)
+}
+
+// Wait for a message to come in
+wg.Wait()
+
+// Close the connection
+nc.Close()
+
+
+
+
+
Connection nc =Nats.connect("nats://demo.nats.io:4222");
+
+// Use a latch to wait for a message to arrive
+CountDownLatch latch =newCountDownLatch(1);
+
+// Create a dispatcher and inline message handler
+Dispatcher d = nc.createDispatcher((msg)->{
+ String str =newString(msg.getData(),StandardCharsets.UTF_8);
+ System.out.println(str);
+ latch.countDown();
+});
+
+// Subscribe
+d.subscribe("updates");
+
+// Wait for a message to come in
+latch.await();
+
+// Close the connection
+nc.close();
+
A new feature being added across the NATS client libraries is the ability to drain connections or subscriptions. Closing a connection, or unsubscribing from a subscription are generally considered immediate requests. When you close or unsubscribe the library will halt messages in any pending queue or cache for subscribers. When you drain a subscription or connection, it will process any cached/pending messages before closing.
+
Drain provides clients that use queue subscriptions with a way to bring down applications without losing any messages. A client can bring up a new queue member, drain and shut down the old queue member, all without losing messages sent to the old client. Without drain, there is the possibility of lost messages due to queue timing.
+
The libraries can provide drain on a connection or on a subscriber, or both.
wg := sync.WaitGroup{}
+wg.Add(1)
+
+errCh :=make(chanerror,1)
+
+// To simulate a timeout, you would set the DrainTimeout()
+// to a value less than the time spent in the message callback,
+// so say: nats.DrainTimeout(10*time.Millisecond).
+
+nc, err := nats.Connect("demo.nats.io",
+ nats.DrainTimeout(10*time.Second),
+ nats.ErrorHandler(func(_*nats.Conn,_*nats.Subscription, err error){
+ errCh <- err
+ }),
+ nats.ClosedHandler(func(_*nats.Conn){
+ wg.Done()
+ }))
+if err !=nil{
+ log.Fatal(err)
+}
+
+// Subscribe, but add some delay while processing.
+if_, err := nc.Subscribe("foo",func(_*nats.Msg){
+ time.Sleep(200* time.Millisecond)
+}); err !=nil{
+ log.Fatal(err)
+}
+
+// Publish a message
+if err := nc.Publish("foo",[]byte("hello")); err !=nil{
+ log.Fatal(err)
+}
+
+// Drain the connection, which will close it when done.
+if err := nc.Drain(); err !=nil{
+ log.Fatal(err)
+}
+
+// Wait for the connection to be closed.
+wg.Wait()
+
+// Check if there was an error
+select{
+case e :=<-errCh:
+ log.Fatal(e)
+default:
+}
+
+
+
+
+
+
Connection nc =Nats.connect("nats://demo.nats.io:4222");
+
+// Use a latch to wait for a message to arrive
+CountDownLatch latch =newCountDownLatch(1);
+
+// Create a dispatcher and inline message handler
+Dispatcher d = nc.createDispatcher((msg)->{
+ String str =newString(msg.getData(),StandardCharsets.UTF_8);
+ System.out.println(str);
+ latch.countDown();
+});
+
+// Subscribe
+d.subscribe("updates");
+
+// Wait for a message to come in
+latch.await();
+
+// Drain the connection, which will close it
+CompletableFuture<Boolean> drained = nc.drain(Duration.ofSeconds(10));
+
+// Wait for the drain to complete
+drained.get();
+
+
+
+
+
import asyncio
+from nats.aio.client import Client as NATS
+
+asyncdefexample(loop):
+ nc = NATS()
+
+ await nc.connect("nats://127.0.0.1:4222", loop=loop)
+
+ asyncdefhandler(msg):
+ print("[Received] ", msg)
+ await nc.publish(msg.reply,b'I can help')
+
+ # Can check whether client is in draining state
+ if nc.is_draining:
+ print("Connection is draining")
+
+ await nc.subscribe("help","workers", cb=handler)
+ await nc.flush()
+
+ requests =[]
+ for i inrange(0,10):
+ request = nc.request("help",b'help!', timeout=1)
+ requests.append(request)
+
+ # Wait for all the responses
+ responses =[]
+ responses =await asyncio.gather(*requests)
+
+ # Gracefully close the connection.
+ await nc.drain()
+
+ print("Received {} responses".format(len(responses)))
+
+
+
+
+
NATS.start(drain_timeout:1)do|nc|
+ NATS.subscribe('foo', queue:"workers")do|msg, reply, sub|
+ nc.publish(reply,"ACK:#{msg}")
+ end
+
+ NATS.subscribe('bar', queue:"workers")do|msg, reply, sub|
+ nc.publish(reply,"ACK:#{msg}")
+ end
+
+ NATS.subscribe('quux', queue:"workers")do|msg, reply, sub|
+ nc.publish(reply,"ACK:#{msg}")
+ end
+
+ EM.add_timer(2)do
+ nextifNATS.draining?
+
+ # Drain gracefully closes the connection.
+ NATS.drain do
+ puts "Done draining. Connection is closed."
+ end
+ end
+end
+
+
+
+
+
let sub =await nc.subscribe('updates',(err, msg)=>{
+ t.log('worker got message', msg.data);
+},{queue:"workers"});
+// [end drain_sub]
+nc.flush();
+
+await nc.drain();
+// client must close when the connection drain resolves
+nc.close();
+
+
+
+
+
The mechanics of drain for a subscription are simpler:
+
+
Unsubscribe at the server
+
Process known messages
+
Clean up
+
+
The API for drain can generally be used instead of unsubscribe:
+ nc, err := nats.Connect("demo.nats.io")
+ if err !=nil{
+ log.Fatal(err)
+ }
+ defer nc.Close()
+
+ done := sync.WaitGroup{}
+ done.Add(1)
+
+ count :=0
+ errCh :=make(chanerror,1)
+
+ msgAfterDrain :="not this one"
+
+ // This callback will process each message slowly
+ sub, err := nc.Subscribe("updates",func(m *nats.Msg){
+ ifstring(m.Data)== msgAfterDrain {
+ errCh <- fmt.Errorf("Should not have received this message")
+ return
+ }
+ time.Sleep(100* time.Millisecond)
+ count++
+ if count ==2{
+ done.Done()
+ }
+ })
+
+ // Send 2 messages
+ for i :=0; i <2; i++{
+ nc.Publish("updates",[]byte("hello"))
+ }
+
+ // Call Drain on the subscription. It unsubscribes but
+ // wait for all pending messages to be processed.
+ if err := sub.Drain(); err !=nil{
+ log.Fatal(err)
+ }
+
+ // Send one more message, this message should not be received
+ nc.Publish("updates",[]byte(msgAfterDrain))
+
+ // Wait for the subscription to have processed the 2 messages.
+ done.Wait()
+
+ // Now check that the 3rd message was not received
+ select{
+ case e :=<-errCh:
+ log.Fatal(e)
+ case<-time.After(200* time.Millisecond):
+ // OK!
+ }
+
+
+
+
+
+
Connection nc =Nats.connect("nats://demo.nats.io:4222");
+
+// Use a latch to wait for a message to arrive
+CountDownLatch latch =newCountDownLatch(1);
+
+// Create a dispatcher and inline message handler
+Dispatcher d = nc.createDispatcher((msg)->{
+ String str =newString(msg.getData(),StandardCharsets.UTF_8);
+ System.out.println(str);
+ latch.countDown();
+});
+
+// Subscribe
+d.subscribe("updates");
+
+// Wait for a message to come in
+latch.await();
+
+// Messages that have arrived will be processed
+CompletableFuture<Boolean> drained = d.drain(Duration.ofSeconds(10));
+
+// Wait for the drain to complete
+drained.get();
+
+// Close the connection
+nc.close();
+
+
+
+
+
// Drain subscription is not supported.
+
+
+
+
+
import asyncio
+from nats.aio.client import Client as NATS
+
+asyncdefexample(loop):
+ nc = NATS()
+
+ await nc.connect("nats://127.0.0.1:4222", loop=loop)
+
+ asyncdefhandler(msg):
+ print("[Received] ", msg)
+ await nc.publish(msg.reply,b'I can help')
+
+ # Can check whether client is in draining state
+ if nc.is_draining:
+ print("Connection is draining")
+
+ sid =await nc.subscribe("help","workers", cb=handler)
+ await nc.flush()
+
+ # Gracefully unsubscribe the subscription
+ await nc.drain(sid)
+
+
+
+
+
+
# There is currently no API to drain a single subscription, the whole connection can be drained though via NATS.drain
+
+
+
+
+
let sub =await nc.subscribe('updates',(err, msg)=>{
+ t.log('worker got message', msg.data);
+},{queue:"workers"});
+
+
+
+
+
Because draining can involve messages flowing to the server, for a flush and asynchronous message processing, the timeout for drain should generally be higher than the timeout for a simple message request/reply or similar.
Receiving messages with NATS can be very library dependent.
+
Some languages, like Go or Java, provide synchronous and asynchronous APIs, while others may only support one type of subscription.
+
In all cases, the process of subscribing involves having the client library tell the NATS server that an application is interested in a particular subject.
+
Under the covers, the client library will assign a unique id to each subscription. This id is used when the server sends messages to a specific subscription. Each subscription gets a unique id, so if the same connection is used multiple times for the same subject, the server will send multiple copies of the same message. When an application is done with a subscription it unsubscribes which tells the server to stop sending messages.
Subscribing to a queue group is only slightly different than subscribing to a subject alone. The application simply includes a queue name with the subscription. The effect of including the group is fairly major, since the server will now load balance messages between the members of the queue group, but the code differences are minimal.
+
Keep in mind that the queue groups in NATS are dynamic and do not require any server configuration. You can almost think of a regular subscription as a queue group of 1, but it is probably not worth thinking too much about that.
nc, err := nats.Connect("demo.nats.io")
+if err !=nil{
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Use a WaitGroup to wait for 10 messages to arrive
+wg := sync.WaitGroup{}
+wg.Add(10)
+
+// Create a queue subscription on "updates" with queue name "workers"
+if_, err := nc.QueueSubscribe("updates","worker",func(m *nats.Msg){
+ wg.Done()
+}); err !=nil{
+ log.Fatal(err)
+}
+
+// Wait for messages to come in
+wg.Wait()
+
+// Close the connection
+nc.Close()
+
+
+
+
+
Connection nc =Nats.connect("nats://demo.nats.io:4222");
+
+// Use a latch to wait for 10 messages to arrive
+CountDownLatch latch =newCountDownLatch(10);
+
+// Create a dispatcher and inline message handler
+Dispatcher d = nc.createDispatcher((msg)->{
+ String str =newString(msg.getData(),StandardCharsets.UTF_8);
+ System.out.println(str);
+ latch.countDown();
+});
+
+// Subscribe
+d.subscribe("updates","workers");
+
+// Wait for a message to come in
+latch.await();
+
+// Close the connection
+nc.close();
+
If you run this example with the publish examples that send to updates, you will see that one of the instances gets a message while the others you run won't. But the instance that receives the message will change.
nc, err := nats.Connect("demo.nats.io")
+if err !=nil{
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Subscribe
+sub, err := nc.SubscribeSync("time")
+if err !=nil{
+ log.Fatal(err)
+}
+
+// Read a message
+msg, err := sub.NextMsg(10* time.Second)
+if err !=nil{
+ log.Fatal(err)
+}
+
+// Get the time
+timeAsBytes :=[]byte(time.Now().String())
+
+// Send the time
+nc.Publish(msg.Reply, timeAsBytes)
+
+// Flush and close the connection
+nc.Flush()
+nc.Close()
+
+
+
+
+
Connection nc =Nats.connect("nats://demo.nats.io:4222");
+
+// Subscribe
+Subscription sub = nc.subscribe("time");
+
+// Read a message
+Message msg = sub.nextMessage(Duration.ZERO);
+
+// Get the time
+Calendar cal =Calendar.getInstance();
+SimpleDateFormat sdf =newSimpleDateFormat("HH:mm:ss");
+byte[] timeAsBytes = sdf.format(cal.getTime()).getBytes(StandardCharsets.UTF_8);
+
+// Send the time
+nc.publish(msg.getReplyTo(), timeAsBytes);
+
+// Flush and close the connection
+nc.flush(Duration.ZERO);
+nc.close();
+
+
+
+
+
// set up a subscription to process a request
+nc.subscribe('time',(msg, reply)=>{
+ if(msg.reply){
+ nc.publish(msg.reply,newDate().toLocaleTimeString());
+ }
+});
+
+
+
+
+
nc = NATS()
+
+await nc.connect(servers=["nats://demo.nats.io:4222"])
+
+future = asyncio.Future()
+
+asyncdefcb(msg):
+ nonlocal future
+ future.set_result(msg)
+
+await nc.subscribe("time", cb=cb)
+
+await nc.publish_request("time", new_inbox(),b'What is the time?')
+await nc.flush()
+
+# Read the message
+msg =await asyncio.wait_for(future,1)
+
+# Send the time
+time_as_bytes ="{}".format(datetime.now()).encode()
+await nc.publish(msg.reply, time_as_bytes)
+
+
+
+
+
+
require'nats/client'
+require'fiber'
+
+NATS.start(servers:["nats://127.0.0.1:4222"])do|nc|
+ Fiber.newdo
+ f =Fiber.current
+
+ nc.subscribe("time")do|msg, reply|
+ f.resume Time.now
+ end
+
+ nc.publish("time",'What is the time?',NATS.create_inbox)
+
+ # Use the response
+ msg =Fiber.yield
+ puts "Reply: #{msg}"
+
+ end.resume
+end
+
+
+
+
+
+
// set up a subscription to process a request
+await nc.subscribe('time',(err, msg)=>{
+ if(msg.reply){
+ nc.publish(msg.reply,newDate().toLocaleTimeString());
+ }else{
+ t.log('got a request for the time, but no reply subject was set.');
+ }
+});
+
Client libraries may provide tools to help receive structured data, like JSON. The core traffic to the NATS server will always be byte arrays. For libraries that don't provide helpers, you can always encode and decode data before sending the associated bytes to the NATS client.
Synchronous subscriptions require the application to poll for messages. This type of subscription is easy to set-up and use, but requires the application to deal with looping if multiple messages are expected. For situations where a single message is expected, synchronous subscriptions are sometimes easier to manage, depending on the language.
+
For example, to subscribe to the subject updates and receive a single message you could do:
Unsubscribing After a Specified Number of Messages
+
NATS provides a special form of unsubscribe that is configured with a message count and takes effect when that many messages are sent to a subscriber. This mechanism is very useful if only a single message is expected.
+
The message count you provide is the total message count for a subscriber. So if you unsubscribe with a count of 1, the server will stop sending messages to that subscription after it has received one message. If the subscriber has already received one or more messages, the unsubscribe will be immediate. This action based on history can be confusing if you try to auto unsubscribe on a long running subscription, but is logical for a new one.
+
+
Auto unsubscribe is based on the total messages sent to a subscriber, not just the new ones.
+
+
Auto unsubscribe can also result in some tricky edge cases if a server cluster is used. The client will tell the server of the unsubscribe count when the application requests it. But if the client disconnects before the count is reached, it may have to tell another server of the remaining count. This dance between previous server notifications and new notifications on reconnect can result in unplanned behavior.
+
Finally, most of the client libraries also track the max message count after an auto unsubscribe request. Which means that the client will stop allowing messages to flow even if the server has miscounted due to reconnects or some other failure in the client library.
+
The following example shows unsubscribe after a single message:
Connection nc =Nats.connect("nats://demo.nats.io:4222");
+Dispatcher d = nc.createDispatcher((msg)->{
+ String str =newString(msg.getData(),StandardCharsets.UTF_8);
+ System.out.println(str);
+});
+
+// Sync Subscription
+Subscription sub = nc.subscribe("updates");
+sub.unsubscribe(1);
+
+// Async Subscription
+d.subscribe("updates");
+d.unsubscribe("updates",1);
+
+// Close the connection
+nc.close();
+
+
+
+
+
// `max` specifies the number of messages that the server will forward.
+// The server will auto-cancel.
+let opts ={max:10};
+let sub = nc.subscribe(NATS.createInbox(), opts,(msg)=>{
+ t.log(msg);
+});
+
+// another way after 10 messages
+let sub2 = nc.subscribe(NATS.createInbox(),(err, msg)=>{
+ t.log(msg.data);
+});
+// if the subscription already received 10 messages, the handler
+// won't get any more messages
+nc.unsubscribe(sub2,10);
+
require'nats/client'
+require'fiber'
+
+NATS.start(servers:["nats://127.0.0.1:4222"])do|nc|
+ Fiber.newdo
+ f =Fiber.current
+
+ nc.subscribe("time", max:1)do|msg, reply|
+ f.resume Time.now
+ end
+
+ nc.publish("time",'What is the time?',NATS.create_inbox)
+
+ # Use the response
+ msg =Fiber.yield
+ puts "Reply: #{msg}"
+
+ # Won't be received
+ nc.publish("time",'What is the time?',NATS.create_inbox)
+
+ end.resume
+end
+
+
+
+
+
+
// `max` specifies the number of messages that the server will forward.
+// The server will auto-cancel.
+let opts ={max:10};
+let sub =await nc.subscribe(createInbox(),(err, msg)=>{
+ t.log(msg.data);
+}, opts);
+
+// another way after 10 messages
+let sub2 =await nc.subscribe(createInbox(),(err, msg)=>{
+ t.log(msg.data);
+});
+// if the subscription already received 10 messages, the handler
+// won't get any more messages
+sub2.unsubscribe(10);
+
The client libraries provide a means to unsubscribe a previous subscription request.
+
This process requires an interaction with the server, so for an asynchronous subscription there may be a small window of time where a message comes through as the unsubscribe is processed by the library. Ignoring that slight edge case, the client library will clean up any outstanding messages and tell the server that the subscription is no longer used.
Connection nc =Nats.connect("nats://demo.nats.io:4222");
+Dispatcher d = nc.createDispatcher((msg)->{
+ String str =newString(msg.getData(),StandardCharsets.UTF_8);
+ System.out.println(str);
+});
+
+// Sync Subscription
+Subscription sub = nc.subscribe("updates");
+sub.unsubscribe();
+
+// Async Subscription
+d.subscribe("updates");
+d.unsubscribe("updates");
+
+// Close the connection
+nc.close();
+
+
+
+
+
// set up a subscription to process a request
+let sub = nc.subscribe(NATS.createInbox(),(msg, reply)=>{
+ if(msg.reply){
+ nc.publish(reply,newDate().toLocaleTimeString());
+ }
+});
+
+// without arguments the subscription will cancel when the server receives it
+// you can also specify how many messages are expected by the subscription
+nc.unsubscribe(sub);
+
require'nats/client'
+require'fiber'
+
+NATS.start(servers:["nats://127.0.0.1:4222"])do|nc|
+ Fiber.newdo
+ f =Fiber.current
+
+ sid = nc.subscribe("time")do|msg, reply|
+ f.resume Time.now
+ end
+
+ nc.publish("time",'What is the time?',NATS.create_inbox)
+
+ # Use the response
+ msg =Fiber.yield
+ puts "Reply: #{msg}"
+
+ nc.unsubscribe(sid)
+
+ # Won't be received
+ nc.publish("time",'What is the time?',NATS.create_inbox)
+
+ end.resume
+end
+
+
+
+
+
+
// set up a subscription to process a request
+let sub =await nc.subscribe(createInbox(),(err, msg)=>{
+ if(msg.reply){
+ nc.publish(msg.reply,newDate().toLocaleTimeString());
+ }else{
+ t.log('got a request for the time, but no reply subject was set.');
+ }
+});
+
+// without arguments the subscription will cancel when the server receives it
+// you can also specify how many messages are expected by the subscription
+sub.unsubscribe();
+
There is no special code to subscribe with a wildcard subject. Wildcards are a normal part of the subject name.
+
However, there is a common technique that may come in to play when you use wildcards. This technique is to use the subject provided with the incoming message to determine what to do with the message.
+
For example, you can subscribe using * and then act based on the actual subject.
nc, err := nats.Connect("demo.nats.io")
+if err !=nil{
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Use a WaitGroup to wait for 2 messages to arrive
+wg := sync.WaitGroup{}
+wg.Add(2)
+
+// Subscribe
+if_, err := nc.Subscribe("time.*.east",func(m *nats.Msg){
+ log.Printf("%s: %s", m.Subject, m.Data)
+ wg.Done()
+}); err !=nil{
+ log.Fatal(err)
+}
+
+// Wait for the 2 messages to come in
+wg.Wait()
+
+// Close the connection
+nc.Close()
+
+
+
+
+
Connection nc =Nats.connect("nats://demo.nats.io:4222");
+
+// Use a latch to wait for 2 messages to arrive
+CountDownLatch latch =newCountDownLatch(2);
+
+// Create a dispatcher and inline message handler
+Dispatcher d = nc.createDispatcher((msg)->{
+ String subject = msg.getSubject();
+ String str =newString(msg.getData(),StandardCharsets.UTF_8);
+ System.out.println(subject +": "+ str);
+ latch.countDown();
+});
+
+// Subscribe
+d.subscribe("time.*.east");
+
+// Wait for messages to come in
+latch.await();
+
+// Close the connection
+nc.close();
+
+
+
+
+
nc.subscribe('time.us.*',(msg, reply, subject)=>{
+ // converting timezones correctly in node requires a library
+ // this doesn't take into account *many* things.
+ let time ="";
+ switch(subject){
+ case'time.us.east':
+ time =newDate().toLocaleTimeString("en-us",{timeZone:"America/New_York"});
+ break;
+ case'time.us.central':
+ time =newDate().toLocaleTimeString("en-us",{timeZone:"America/Chicago"});
+ break;
+ case'time.us.mountain':
+ time =newDate().toLocaleTimeString("en-us",{timeZone:"America/Denver"});
+ break;
+ case'time.us.west':
+ time =newDate().toLocaleTimeString("en-us",{timeZone:"America/Los_Angeles"});
+ break;
+ default:
+ time ="I don't know what you are talking about Willis";
+ }
+ t.log(subject, time);
+});
+
+
+
+
+
nc = NATS()
+
+await nc.connect(servers=["nats://demo.nats.io:4222"])
+
+# Use queue to wait for 2 messages to arrive
+queue = asyncio.Queue()
+asyncdefcb(msg):
+ await queue.put_nowait(msg)
+
+await nc.subscribe("time.*.east", cb=cb)
+
+# Send 2 messages and wait for them to come in
+await nc.publish("time.A.east",b'A')
+await nc.publish("time.B.east",b'B')
+
+msg_A =await queue.get()
+msg_B =await queue.get()
+
+print("Msg A:", msg_A)
+print("Msg B:", msg_B)
+
+
await nc.subscribe('time.us.*',(err, msg)=>{
+ // converting timezones correctly in node requires a library
+ // this doesn't take into account *many* things.
+ let time ="";
+ switch(msg.subject){
+ case'time.us.east':
+ time =newDate().toLocaleTimeString("en-us",{timeZone:"America/New_York"});
+ break;
+ case'time.us.central':
+ time =newDate().toLocaleTimeString("en-us",{timeZone:"America/Chicago"});
+ break;
+ case'time.us.mountain':
+ time =newDate().toLocaleTimeString("en-us",{timeZone:"America/Denver"});
+ break;
+ case'time.us.west':
+ time =newDate().toLocaleTimeString("en-us",{timeZone:"America/Los_Angeles"});
+ break;
+ default:
+ time ="I don't know what you are talking about Willis";
+ }
+ console.log(msg.subject, time);
+});
+
nc, err := nats.Connect("demo.nats.io")
+if err !=nil{
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Use a WaitGroup to wait for 4 messages to arrive
+wg := sync.WaitGroup{}
+wg.Add(4)
+
+// Subscribe
+if_, err := nc.Subscribe("time.>",func(m *nats.Msg){
+ log.Printf("%s: %s", m.Subject, m.Data)
+ wg.Done()
+}); err !=nil{
+ log.Fatal(err)
+}
+
+// Wait for the 4 messages to come in
+wg.Wait()
+
+// Close the connection
+nc.Close()
+
+
+
+
+
Connection nc =Nats.connect("nats://demo.nats.io:4222");
+
+// Use a latch to wait for 4 messages to arrive
+CountDownLatch latch =newCountDownLatch(4);
+
+// Create a dispatcher and inline message handler
+Dispatcher d = nc.createDispatcher((msg)->{
+ String subject = msg.getSubject();
+ String str =newString(msg.getData(),StandardCharsets.UTF_8);
+ System.out.println(subject +": "+ str);
+ latch.countDown();
+});
+
+// Subscribe
+d.subscribe("time.>");
+
+// Wait for messages to come in
+latch.await();
+
+// Close the connection
+nc.close();
+
+
+
+
+
nc.subscribe('time.>',(msg, reply, subject)=>{
+ // converting timezones correctly in node requires a library
+ // this doesn't take into account *many* things.
+ let time ="";
+ switch(subject){
+ case'time.us.east':
+ time =newDate().toLocaleTimeString("en-us",{timeZone:"America/New_York"});
+ break;
+ case'time.us.central':
+ time =newDate().toLocaleTimeString("en-us",{timeZone:"America/Chicago"});
+ break;
+ case'time.us.mountain':
+ time =newDate().toLocaleTimeString("en-us",{timeZone:"America/Denver"});
+ break;
+ case'time.us.west':
+ time =newDate().toLocaleTimeString("en-us",{timeZone:"America/Los_Angeles"});
+ break;
+ default:
+ time ="I don't know what you are talking about Willis";
+ }
+ t.log(subject, time);
+});
+
+
+
+
+
nc = NATS()
+
+await nc.connect(servers=["nats://demo.nats.io:4222"])
+
+# Use queue to wait for 4 messages to arrive
+queue = asyncio.Queue()
+asyncdefcb(msg):
+ await queue.put(msg)
+
+await nc.subscribe("time.>", cb=cb)
+
+# Send 2 messages and wait for them to come in
+await nc.publish("time.A.east",b'A')
+await nc.publish("time.B.east",b'B')
+await nc.publish("time.C.west",b'C')
+await nc.publish("time.D.west",b'D')
+
+for i inrange(0,4):
+ msg =await queue.get()
+ print("Msg:", msg)
+
+await nc.close()
+
+
+
+
+
+
require'nats/client'
+require'fiber'
+
+NATS.start(servers:["nats://127.0.0.1:4222"])do|nc|
+ Fiber.newdo
+ f =Fiber.current
+
+ nc.subscribe("time.>")do|msg, reply|
+ f.resume Time.now.to_f
+ end
+
+ nc.publish("time.A.east","A")
+ nc.publish("time.B.east","B")
+ nc.publish("time.C.west","C")
+ nc.publish("time.D.west","D")
+
+ # Use the response
+ 4.times do
+ msg =Fiber.yield
+ puts "Msg: #{msg}"
+ end
+ end.resume
+end
+
+
+
+
+
+
await nc.subscribe('time.>',(err, msg)=>{
+ // converting timezones correctly in node requires a library
+ // this doesn't take into account *many* things.
+ let time ="";
+ switch(msg.subject){
+ case'time.us.east':
+ time =newDate().toLocaleTimeString("en-us",{timeZone:"America/New_York"});
+ break;
+ case'time.us.central':
+ time =newDate().toLocaleTimeString("en-us",{timeZone:"America/Chicago"});
+ break;
+ case'time.us.mountain':
+ time =newDate().toLocaleTimeString("en-us",{timeZone:"America/Denver"});
+ break;
+ case'time.us.west':
+ time =newDate().toLocaleTimeString("en-us",{timeZone:"America/Los_Angeles"});
+ break;
+ default:
+ time ="I don't know what you are talking about Willis";
+ }
+ t.log(msg.subject, time);
+});
+
+
+
+
+
The following example can be used to test these two subscribers. The * subscriber should receive at most 2 messages, while the > subscriber receives 4. More importantly the time.*.east subscriber won't receive on time.us.east.atlanta because that won't match.
The NATS client libraries, try as much as possible to be fire and forget. One of the features that may be included in the library you are using is the ability to buffer outgoing messages when the connection is down.
+
During a short reconnect, these client can allow applications to publish messages that, because the server is offline, will be cached in the client. The library will then send those messages on reconnect. When the maximum reconnect buffer is reached, messages will no longer be publishable by the client.
+
Be aware, while the message appears to be sent to the application it is possible that it is never sent because the connection is never remade. Your applications should use patterns like acknowledgements to insure delivery.
+
For clients that support this feature, you are able to configure the size of this buffer with bytes, messages or both.
// Set reconnect buffer size in bytes (5 MB)
+nc, err := nats.Connect("demo.nats.io", nats.ReconnectBufSize(5*1024*1024))
+if err !=nil{
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+
+
+
+
Options options =newOptions.Builder().
+ server("nats://demo.nats.io:4222").
+ reconnectBufferSize(5*1024*1024).// Set buffer in bytes
+ build();
+Connection nc =Nats.connect(options);
+
+// Do something with the connection
+
+nc.close();
+
+
+
+
+
// Reconnect buffer size is not configurable on node-nats
+
+
+
+
+
# Asyncio NATS client currentply does not implement a reconnect buffer
+
+
+
+
+
# There is currently no reconnect pending buffer as part of the Ruby NATS client.
+
+
+
+
+
// Reconnect buffer size is not configurable on ts-nats
+
+
+
+
+
+
As mentioned throughout this document, each client library may behave slightly differently. Please check the documentation for the library you are using.
require'nats/client'
+
+NATS.start(servers:["nats://127.0.0.1:1222","nats://127.0.0.1:1223","nats://127.0.0.1:1224"], reconnect:false)do|nc|
+ # Do something with the connection
+
+ # Close the connection
+ nc.close
+end
+
+
+
+
+
// will throw an exception if connection fails
+let nc =awaitconnect({
+ reconnect:false,
+ servers:["nats://demo.nats.io:4222"]
+});
+nc.close();
+
Because reconnect is primarily under the covers many libraries provide an event listener you can use to be notified of reconnect events. This event can be especially important for applications sending a lot of messages.
// Connection event handlers are invoked asynchronously
+// and the state of the connection may have changed when
+// the callback is invoked.
+nc, err := nats.Connect("demo.nats.io",
+ nats.DisconnectHandler(func(nc *nats.Conn){
+ // handle disconnect event
+ }),
+ nats.ReconnectHandler(func(nc *nats.Conn){
+ // handle reconnect event
+ }))
+if err !=nil{
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
nc = NATS()
+
+asyncdefdisconnected_cb():
+ print("Got disconnected!")
+
+asyncdefreconnected_cb():
+ # See who we are connected to on reconnect.
+ print("Got reconnected to {url}".format(url=nc.connected_url.netloc))
+
+await nc.connect(
+ servers=["nats://demo.nats.io:4222"],
+ reconnect_time_wait=10,
+ reconnected_cb=reconnected_cb,
+ disconnected_cb=disconnected_cb,
+ )
+
+# Do something with the connection.
+
+
+
+
+
+
require'nats/client'
+
+NATS.start(servers:["nats://127.0.0.1:1222","nats://127.0.0.1:1223","nats://127.0.0.1:1224"])do|nc|
+ # Do something with the connection
+ nc.on_reconnect do
+ puts "Got reconnected to #{nc.connected_server}"
+ end
+
+ nc.on_disconnect do|reason|
+ puts "Got disconnected! #{reason}"
+ end
+end
+
+
+
+
+
// will throw an exception if connection fails
+let nc =awaitconnect({
+ maxReconnectAttempts:10,
+ servers:["nats://demo.nats.io:4222"]
+});
+// first argument is the connection (same as nc in this case)
+// second argument is the url of the server where the client
+// connected
+nc.on('reconnect',(conn, server)=>{
+ console.log('reconnected to', server);
+});
+nc.close();
+
Most, if not all, of the client libraries will reconnect to the server if they are disconnected due to a network problem. The reconnect logic can differ by library, so check your client library's documentation.
+
In general, the client will try to connect to all of the servers it knows about, either through the URLs provided in connect or the URLs provided by its most recent server. The library may have several options to help control reconnect behavior.
+
The list of servers used during reconnect is library dependent, but generally is constructed from the list of servers passed to the connect function/options and the list of servers provided by the most recent connected server.
+
One, sometimes important, detail is that the server URLS provided to clients by servers will use addresses, while the URLS provided to the connect function will usually be host names. As a result, it is possible, on reconnect, for the same server to be tried multiple times without the client knowing about the match.
Applications can set the maximum reconnect attempts. Generally, this will limit the actual number of attempts total, but check your library documentation. For example, in Java, if the client knows about 3 servers and the maximum reconnects is set to 2, it will not try all of the servers. On the other hand, if the maximum is set to 6 it will try all of the servers twice before considering the reconnect a failure and closing.
// Set max reconnects attempts
+nc, err := nats.Connect("demo.nats.io", nats.MaxReconnects(10))
+if err !=nil{
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+
+
+
+
Options options =newOptions.Builder().
+ server("nats://demo.nats.io:4222").
+ maxReconnects(10).// Set max reconnect attempts
+ build();
+Connection nc =Nats.connect(options);
+
+// Do something with the connection
+
+nc.close();
+
+
+
+
+
let nc =NATS.connect({
+ maxReconnectAttempts:10,
+ servers:["nats://demo.nats.io:4222"]
+});
+
+
+
+
+
nc = NATS()
+await nc.connect(
+ servers=["nats://demo.nats.io:4222"],
+ max_reconnect_attempts=10,
+ )
+
+# Do something with the connection
+
+await nc.close()
+
+
+
+
+
+
require'nats/client'
+
+NATS.start(servers:["nats://127.0.0.1:1222","nats://127.0.0.1:1223","nats://127.0.0.1:1224"], max_reconnect_attempts:10)do|nc|
+ # Do something with the connection
+
+ # Close the connection
+ nc.close
+end
+
+
+
+
+
// will throw an exception if connection fails
+let nc =awaitconnect({
+ maxReconnectAttempts:10,
+ servers:["nats://demo.nats.io:4222"]
+});
+nc.close();
+
When a server goes down, there is a possible anti-pattern called the Thundering Herd where all of the clients try to reconnect immediately creating a denial of service attack. In order to prevent this, most NATS client libraries randomize the servers they attempt to connect to. This setting has no effect if only a single server is used, but in the case of a cluster, randomization, or shuffling, will ensure that no one server bears the brunt of the client reconnect attempts.
+
However, if you want to disable the randomization process, so that servers are always checked in the same order, you can do that in most libraries with a connection options:
require'nats/client'
+
+NATS.start(servers:["nats://127.0.0.1:1222","nats://127.0.0.1:1223","nats://127.0.0.1:1224"], dont_randomize_servers:true)do|nc|
+ # Do something with the connection
+
+ # Close the connection
+ nc.close
+end
+
+
+
+
+
// will throw an exception if connection fails
+let nc =awaitconnect({
+ noRandomize:false,
+ servers:["nats://127.0.0.1:4443",
+ "nats://demo.nats.io:4222"
+ ]
+});
+nc.close();
+
It doesn’t make much sense to try to connect to the same server over and over. To prevent this sort of thrashing, and wasted reconnect attempts, libraries provide a wait setting. This setting will pause the reconnect logic if the same server is being tried multiple times in a row. In the previous example, if you have 3 servers and 6 attempts, the Java library would loop over the three servers. If none were connectable, it will then try all three again. However, the Java client doesn’t wait between each attempt, only when trying the same server again, so in that example the library may never wait. If on the other hand, you only provide a single server URL and 6 attempts, the library will wait between each attempt.
// Set reconnect interval to 10 seconds
+nc, err := nats.Connect("demo.nats.io", nats.ReconnectWait(10*time.Second))
+if err !=nil{
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+
+
+
+
Options options =newOptions.Builder().
+ server("nats://demo.nats.io:4222").
+ reconnectWait(Duration.ofSeconds(10)).// Set Reconnect Wait
+ build();
+Connection nc =Nats.connect(options);
+
+// Do something with the connection
+
+nc.close();
+
+
+
+
+
let nc =NATS.connect({
+ reconnectTimeWait:10*1000,//10s
+ servers:["nats://demo.nats.io:4222"]
+});
+
+
+
+
+
nc = NATS()
+await nc.connect(
+ servers=["nats://demo.nats.io:4222"],
+ reconnect_time_wait=10,
+ )
+
+# Do something with the connection
+
+await nc.close()
+
+
+
+
+
+
require'nats/client'
+
+NATS.start(servers:["nats://127.0.0.1:1222","nats://127.0.0.1:1223","nats://127.0.0.1:1224"], reconnect_time_wait:10)do|nc|
+ # Do something with the connection
+
+ # Close the connection
+ nc.close
+end
+
+
+
+
+
// will throw an exception if connection fails
+let nc =awaitconnect({
+ reconnectTimeWait:10*1000,//10s
+ servers:["nats://demo.nats.io:4222"]
+});
+nc.close();
+
The 2.0 version of NATS server introduced the idea of JWT-based authentication. Clients interact with this new scheme using a user JWT and the private key from an NKey pair. To help make connecting with a JWT easier, the client libraries support the concept of a credentials file. This file contains both the private key and the JWT and can be generated with the nsc tool. The contents will look like the following and should be protected because it contains a private key. This creds file is unused and only for example purposes.
+
-----BEGIN NATS USER JWT-----
+eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5In0.eyJqdGkiOiJUVlNNTEtTWkJBN01VWDNYQUxNUVQzTjRISUw1UkZGQU9YNUtaUFhEU0oyWlAzNkVMNVJBIiwiaWF0IjoxNTU4MDQ1NTYyLCJpc3MiOiJBQlZTQk0zVTQ1REdZRVVFQ0tYUVM3QkVOSFdHN0tGUVVEUlRFSEFKQVNPUlBWV0JaNEhPSUtDSCIsIm5hbWUiOiJvbWVnYSIsInN1YiI6IlVEWEIyVk1MWFBBU0FKN1pEVEtZTlE3UU9DRldTR0I0Rk9NWVFRMjVIUVdTQUY3WlFKRUJTUVNXIiwidHlwZSI6InVzZXIiLCJuYXRzIjp7InB1YiI6e30sInN1YiI6e319fQ.6TQ2ilCDb6m2ZDiJuj_D_OePGXFyN3Ap2DEm3ipcU5AhrWrNvneJryWrpgi_yuVWKo1UoD5s8bxlmwypWVGFAA
+------END NATS USER JWT------
+
+************************* IMPORTANT *************************
+NKEY Seed printed below can be used to sign and prove identity.
+NKEYs are sensitive and should be treated as secrets.
+
+-----BEGIN USER NKEY SEED-----
+SUAOY5JZ2WJKVR4UO2KJ2P3SW6FZFNWEOIMAXF4WZEUNVQXXUOKGM55CYE
+------END USER NKEY SEED------
+
+*************************************************************
+
+
Given a creds file, a client can authenticate as a specific user belonging to a specific account:
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
nc, err := nats.Connect("localhost", nats.UserCredentials("path_to_creds_file"))
+if err !=nil{
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+
+
+
+
Options options =newOptions.Builder().
+ server("nats://localhost:4222").
+ authHandler(Nats.credentials("path_to_creds_file")).
+ build();
+Connection nc =Nats.connect(options);
+
+// Do something with the connection
+
+nc.close();
+
NATS provides several forms of security for your messages. First, you can turn on authorization which limits access to the NATS server. Second, access to specific subjects can be controlled. Third, you can use TLS to encrypt traffic between clients and the server. Finally, TLS can be used to verify client identities using certificates. By combining all of these methods you can protect access to data and data in motion.
+
The client doesn't have control over access controls, but clients do provide the configurations required to authenticate with the server and to turn on TLS.
The 2.0 version of NATS server introduces a new challenge response authentication option. This challenge response is based on a wrapper we call NKeys which uses ED25519 signing. The server can use these keys in several ways for authentication. The simplest is for the server to be configured with a list of known public keys and for the clients to respond to the challenge by signing it with its private key. This challenge-response insures security by insuring that the client has the private key, but also protects the private key from the server which never has to actually see it.
+
Handling challenge response may require more than just a setting in the connection options, depending on the client library.
While authentication limits which clients can connect, TLS can be used to check the server’s identity and the client’s identity and will encrypt the traffic between the two. The most secure version of TLS with NATS is to use verified client certificates. In this mode, the client can check that it trusts the certificate sent by gnatsd but the server will also check that it trusts the certificate sent by the client. From an applications perspective connecting to a server that does not verify client certificates may appear identical. Under the covers, disabling TLS verification removes the server side check on the client’s certificate. When started in TLS mode, gnatsd will require all clients to connect with TLS. Moreover, if configured to connect with TLS, client libraries will fail to connect to a server without TLS.
Some clients may support the tls protocol as well as a manual setting to turn on TLS. However, in that case there is likely some form of default or environmental settings to allow the TLS libraries to find certificate and trust stores.
nc, err := nats.Connect("localhost",
+ nats.Secure(),
+ nats.RootCAs("resources/certs/ca.pem"))// May need this if server is using self-signed certificate
+if err !=nil{
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
let nc =NATS.connect({
+ url:"tls://demo.nats.io:4443",
+ tls:true
+});
+
+
+
+
+
import asyncio
+import ssl
+import certifi
+from nats.aio.client import Client as NATS
+from nats.aio.errors import ErrTimeout
+
+asyncdefrun(loop):
+ nc = NATS()
+
+ # If using Python 3.7 in OS X and getting SSL errors, run first:
+ #
+ # /Applications/Python\ 3.7/Install\ Certificates.command
+ #
+ # Setting the tls as the scheme will use same defaults as `ssl.create_default_context()`
+ #
+ await nc.connect("tls://demo.nats.io:4443", loop=loop)
+
+ asyncdefmessage_handler(msg):
+ subject = msg.subject
+ reply = msg.reply
+ data = msg.data.decode()
+ print("Received a message on '{subject} {reply}': {data}".format(
+ subject=subject, reply=reply, data=data))
+
+ # Simple publisher and async subscriber via coroutine.
+ sid =await nc.subscribe("foo", cb=message_handler)
+ await nc.flush()
+
+ # Stop receiving after 2 messages.
+ await nc.auto_unsubscribe(sid,2)
+ await nc.publish("foo",b'Hello')
+ await nc.publish("foo",b'World')
+ await nc.publish("foo",b'!!!!!')
+ await asyncio.sleep(1, loop=loop)
+ await nc.close()
+
+
+
+
+
EM.run do
+
+ # In order to use TLS with the Ruby NATS client, use the :tls option
+ # when customizing the connection with an empty block.
+ options ={
+ :servers=>[
+ 'nats://demo.nats.io:4443',
+ ],
+ :tls=>{}
+ }
+
+ NATS.connect(options)do|nc|
+ puts :connected
+ end
+end
+
+
+
+
+
// will throw an exception if connection fails
+let nc =awaitconnect({
+ url:"tls://demo.nats.io:4443"
+});
+
+nc.close();
+
Tokens are basically random strings, much like a password, and can provide a simple authentication mechanism in some situations. However, tokens are only as safe as they are secret so other authentication schemes can provide more security in large installations.
+
For this example, start the server using:
+
> gnatsd --auth mytoken
+
+
The code uses localhost:4222 so that you can start the server on your machine to try them out.
// Set a token
+nc, err := nats.Connect("localhost", nats.Token("mytoken"))
+if err !=nil{
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+
+
+
+
Options options =newOptions.Builder().
+ server("nats://localhost:4222").
+ token("mytoken").// Set a token
+ build();
+Connection nc =Nats.connect(options);
+
+// Do something with the connection
+
+nc.close();
+
+
+
+
+
let nc =NATS.connect({url:`nats://127.0.0.1:${port}`, token:"mytoken!"});
+
+
+
+
+
nc = NATS()
+
+await nc.connect(servers=["nats://mytoken@demo.nats.io:4222"])
+
+# Do something with the connection.
+
+
+
+
+
+
NATS.start(token:"deadbeef")do|nc|
+ puts "Connected using token"
+end
+
+
+
+
+
let nc =awaitconnect({url: server.nats, token:"mytoken"});
+
+
+
+
+
Connecting with a Token in the URL
+
Some client libraries will allow you to pass the token as part of the server URL using the form:
+
+
nats://token@server:port
+
+
Again, once you construct this URL you can connect as if this was a normal URL.
// Set a user and plain text password
+nc, err := nats.Connect("localhost", nats.UserInfo("myname","password"))
+if err !=nil{
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+
+
+
+
Options options =newOptions.Builder().
+ server("nats://localhost:4222").
+ userInfo("myname","password").// Set a user and plain text password
+ build();
+Connection nc =Nats.connect(options);
+
+// Do something with the connection
+
+nc.close();
+
+
+
+
+
let nc =NATS.connect({url: server.nats, user:"myname", pass:"password"});
+
+
+
+
+
nc = NATS()
+
+await nc.connect(servers=["nats://myname:password@demo.nats.io:4222"])
+
+# Do something with the connection.
+
+
+
+
+
+
require'nats/client'
+
+NATS.start(servers:["nats://myname:password@127.0.0.1:4222"], name:"my-connection")do|nc|
+ nc.on_error do|e|
+ puts "Error: #{e}"
+ end
+
+ nc.on_reconnect do
+ puts "Got reconnected to #{nc.connected_server}"
+ end
+
+ nc.on_disconnect do|reason|
+ puts "Got disconnected! #{reason}"
+ end
+
+ nc.close
+end
+
+
+
+
+
let nc =awaitconnect({url: server.nats, user:"myname", pass:"password"});
+
+
+
+
+
Connecting with a User/Password in the URL
+
Most clients make it easy to pass the user name and password by accepting them in the URL for the server. This standard format is:
+
+
nats://user:password@server:port
+
+
Using this format, you can connect to a server using authentication as easily as you connected with a URL:
// Set a user and plain text password
+nc, err := nats.Connect("myname:password@localhost")
+if err !=nil{
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Do something with the connection
+
+
+
+
+
+
Connection nc =Nats.connect("nats://myname:password@localhost:4222");
+
+// Do something with the connection
+
+nc.close();
+
+
+
+
+
let url =`nats://myname:password@127.0.0.1:${port}`;
+let nc =NATS.connect(url);
+
+
+
+
+
nc = NATS()
+
+await nc.connect(servers=["nats://myname:password@demo.nats.io:4222"])
+
+# Do something with the connection.
+
+
+
+
+
+
require'nats/client'
+
+NATS.start(servers:["nats://myname:password@127.0.0.1:4222"], name:"my-connection")do|nc|
+ nc.on_error do|e|
+ puts "Error: #{e}"
+ end
+
+ nc.on_reconnect do
+ puts "Got reconnected to #{nc.connected_server}"
+ end
+
+ nc.on_disconnect do|reason|
+ puts "Got disconnected! #{reason}"
+ end
+
+ nc.close
+end
+
+
+
+
+
let url =`nats://myname:password@127.0.0.1:${port}`;
+let nc =awaitconnect({url: url});
+
For performance reasons, most if not all, of the client libraries will cache outgoing data so that bigger chunks can be written to the network at one time. This may be as simple as a byte buffer that stores up a few messages before being pushed to the network.
+
These buffers do not hold messages forever, generally they are designed to hold messages in high throughput scenarios, while still providing good latency in low throughput situations.
+
It is the libraries job to make sure messages flow in a high performance manner. But there may be times when an application needs to know that a message has "hit the wire." In this case, applications can use a flush call to tell the library to move data through the system.
nc, err := nats.Connect("demo.nats.io")
+if err !=nil{
+ log.Fatal(err)
+}
+defer nc.Close()
+
+if err := nc.Publish("updates",[]byte("All is Well")); err !=nil{
+ log.Fatal(err)
+}
+// Sends a PING and wait for a PONG from the server, up to the given timeout.
+// This gives guarantee that the server has processed above message.
+if err := nc.FlushTimeout(time.Second); err !=nil{
+ log.Fatal(err)
+}
+
+
+
+
+
Connection nc =Nats.connect("nats://demo.nats.io:4222");
+
+nc.publish("updates","All is Well".getBytes(StandardCharsets.UTF_8));
+nc.flush(Duration.ofSeconds(1));// Flush the message queue
+
+nc.close();
+
+
+
+
+
let nc =NATS.connect({url:"nats://demo.nats.io:4222"});
+let start = Date.now();
+nc.flush(()=>{
+ t.log('round trip completed in', Date.now()- start,'ms');
+});
+
+nc.publish('foo');
+// function in flush is optional
+nc.flush();
+
+
+
+
+
nc = NATS()
+
+await nc.connect(servers=["nats://demo.nats.io:4222"])
+
+await nc.publish("updates",b'All is Well')
+
+# Sends a PING and wait for a PONG from the server, up to the given timeout.
+# This gives guarantee that the server has processed above message.
+await nc.flush(timeout=1)
+
+
+
+
+
+
require'nats/client'
+require'fiber'
+
+NATS.start(servers:["nats://127.0.0.1:4222"])do|nc|
+ nc.subscribe("updates")do|msg|
+ puts msg
+ end
+
+ nc.publish("updates","All is Well")
+
+ nc.flush do
+ # Sends a PING and wait for a PONG from the server, up to the given timeout.
+ # This gives guarantee that the server has processed above message at this point.
+ end
+end
+
+
+
+
+
+
let nc =awaitconnect({
+ url:"nats://demo.nats.io:4222"
+});
+
+// you can use flush to trigger a function in your
+// application once the round-trip to the server finishes
+let start = Date.now();
+nc.flush(()=>{
+ t.log('round trip completed in', Date.now()- start,'ms');
+});
+
+nc.publish('foo');
+
+// another way, simply wait for the promise to resolve
+await nc.flush();
+
+nc.close();
+
+
+
+
+
Flush and Ping/Pong
+
Many of the client libraries use the PING/PONG interaction built into the NATS protocol to insure that flush pushed all of the cached messages to the server. When an application calls flush most libraries will put a PING on the outgoing queue of messages, and wait for the server to send PONG before saying that the flush was successful.
NATS sends and receives messages using a protocol that includes a target subject, an optional reply subject and an array of bytes. Some libraries may provide helpers to convert other data formats to and from bytes, but the NATS server will treat all messages as opaque byte arrays.
+
All of the NATS clients are designed to make sending a message simple. For example, to send the string “All is Well” to the “updates” subject as a UTF-8 string of bytes you would do:
nc, err := nats.Connect("demo.nats.io")
+if err !=nil{
+ log.Fatal(err)
+}
+defer nc.Close()
+
+if err := nc.Publish("updates",[]byte("All is Well")); err !=nil{
+ log.Fatal(err)
+}
+// Make sure the message goes through before we close
+nc.Flush()
+
+
+
+
+
Connection nc =Nats.connect("nats://demo.nats.io:4222");
+
+nc.publish("updates","All is Well".getBytes(StandardCharsets.UTF_8));
+
+// Make sure the message goes through before we close
+nc.flush(Duration.ZERO);
+nc.close();
+
+
+
+
+
let nc =NATS.connect({url:"nats://demo.nats.io:4222", preserveBuffers:true});
+let buf = Buffer.allocUnsafe(12);
+buf.fill("All is well");
+nc.publish('updates', buf);
+
The optional reply-to field when publishing a message can be used on the receiving side to respond. The reply-to subject is often called an inbox, and some libraries may provide a method for generating unique inbox subjects. For example to send a request to the subject time, with no content for the messages, you might:
nc, err := nats.Connect("demo.nats.io")
+if err !=nil{
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Create a unique subject name
+uniqueReplyTo := nats.NewInbox()
+
+// Listen for a single response
+sub, err := nc.SubscribeSync(uniqueReplyTo)
+if err !=nil{
+ log.Fatal(err)
+}
+
+// Send the request
+if err := nc.PublishRequest("time", uniqueReplyTo,nil); err !=nil{
+ log.Fatal(err)
+}
+
+// Read the reply
+msg, err := sub.NextMsg(time.Second)
+if err !=nil{
+ log.Fatal(err)
+}
+
+// Use the response
+log.Printf("Reply: %s", msg.Data)
+
+// Close the connection
+nc.Close()
+
+
+
+
+
Connection nc =Nats.connect("nats://demo.nats.io:4222");
+
+// Create a unique subject name
+String uniqueReplyTo = NUID.nextGlobal();
+
+// Listen for a single response
+Subscription sub = nc.subscribe(uniqueReplyTo);
+sub.unsubscribe(1);
+
+// Send the request
+nc.publish("time", uniqueReplyTo,null);
+
+// Read the reply
+Message msg = sub.nextMessage(Duration.ofSeconds(1));
+
+// Use the response
+System.out.println(newString(msg.getData(),StandardCharsets.UTF_8));
+
+// Close the connection
+nc.close();
+
+
+
+
+
// set up a subscription to process the request
+nc.subscribe('time',(msg, reply)=>{
+ if(reply){
+ nc.publish(reply,newDate().toLocaleTimeString());
+ }
+});
+
+// create a subscription subject that the responding send replies to
+let inbox =NATS.createInbox();
+nc.subscribe(inbox,{max:1},(msg)=>{
+ t.log('the time is', msg);
+ nc.close();
+});
+
+nc.publish('time',"", inbox);
+
require'nats/client'
+require'fiber'
+
+NATS.start(servers:["nats://127.0.0.1:4222"])do|nc|
+ Fiber.newdo
+ f =Fiber.current
+
+ nc.subscribe("time")do|msg, reply|
+ f.resume msg
+ end
+
+ nc.publish("time",'example',NATS.create_inbox)
+
+ # Use the response
+ msg =Fiber.yield
+ puts "Reply: #{msg}"
+
+ end.resume
+end
+
+
+
+
+
+
// set up a subscription to process the request
+await nc.subscribe('time',(err, msg)=>{
+ if(err){
+ // this example is running inside of a promise
+ reject();
+ return;
+ }
+ if(msg.reply){
+ nc.publish(msg.reply,newDate().toLocaleTimeString());
+ }
+});
+
+// create a subscription subject that the responding send replies to
+let inbox =createInbox();
+await nc.subscribe(inbox,(err, msg)=>{
+ t.log('the time is', msg.data);
+ // this example is running inside of a promise
+ nc.close();
+ resolve();
+},{max:1});
+
+nc.publish('time',"", inbox);
+
The pattern of sending a message and receiving a response is encapsulated in most client libraries into a request method. Under the covers this method will publish a message with a unique reply-to subject and wait for the response before returning.
+
In the older versions of some libraries a completely new reply-to subject is created each time. In newer versions, a subject hierarchy is used so that a single subscriber in the client library listens for a wildcard, and requests are sent with a unique child subject of a single subject.
+
The primary difference between the request method and publishing with a reply-to is that the library is only going to accept one response, and in most libraries the request will be treated as a synchronous action. The library may even provide a way to set the timeout.
+
For example, updating the previous publish example we may request time with a one second timeout:
nc, err := nats.Connect("demo.nats.io")
+if err !=nil{
+ log.Fatal(err)
+}
+defer nc.Close()
+
+// Send the request
+msg, err := nc.Request("time",nil, time.Second)
+if err !=nil{
+ log.Fatal(err)
+}
+
+// Use the response
+log.Printf("Reply: %s", msg.Data)
+
+// Close the connection
+nc.Close()
+
+
+
+
+
Connection nc =Nats.connect("nats://demo.nats.io:4222");
+
+// Send the request
+Message msg = nc.request("time",null,Duration.ofSeconds(1));
+
+// Use the response
+System.out.println(newString(msg.getData(),StandardCharsets.UTF_8));
+
+// Close the connection
+nc.close();
+
+
+
+
+
nc.requestOne('time',(msg)=>{
+ t.log('the time is', msg);
+ nc.close();
+});
+
+
+
+
+
nc = NATS()
+
+asyncdefsub(msg):
+ await nc.publish(msg.reply,b'response')
+
+await nc.connect(servers=["nats://demo.nats.io:4222"])
+await nc.subscribe("time", cb=sub)
+
+# Send the request
+try:
+ msg =await nc.request("time",b'', timeout=1)
+ # Use the response
+ print("Reply:", msg)
+except asyncio.TimeoutError:
+ print("Timed out waiting for response")
+
+
let msg =await nc.request('time',1000);
+t.log('the time is', msg.data);
+nc.close();
+
+
+
+
+
You can think of request-reply in the library as a subscribe, get one message, unsubscribe pattern. In Go this might look something like:
+
sub, err := nc.SubscribeSync(replyTo)
+if err !=nil{
+ log.Fatal(err)
+}
+nc.Flush()
+
+// Send the request
+nc.PublishRequest(subject, replyTo,[]byte(input))
+
+// Wait for a single response
+for{
+ msg, err := sub.NextMsg(1* time.Second)
+ if err !=nil{
+ log.Fatal(err)
+ }
+
+ response =string(msg.Data)
+ break
+}
+sub.Unsubscribe()
+
+
Scatter-Gather
+
You can expand the request-reply pattern into something often called scatter-gather. To receive multiple messages, with a timeout, you could do something like the following, where the loop getting messages is using time as the limitation, not the receipt of a single message:
+
sub, err := nc.SubscribeSync(replyTo)
+if err !=nil{
+ log.Fatal(err)
+}
+nc.Flush()
+
+// Send the request
+nc.PublishRequest(subject, replyTo,[]byte(input))
+
+// Wait for a single response
+max :=100* time.Millisecond
+start := time.Now()
+for time.Now().Sub(start)< max {
+ msg, err := sub.NextMsg(1* time.Second)
+ if err !=nil{
+ break
+ }
+
+ responses =append(responses,string(msg.Data))
+}
+sub.Unsubscribe()
+
+
Or, you can loop on a counter and a timeout to try to get at least N responses:
Some client libraries provide helpers to send structured data while others depend on the application to perform any encoding and decoding and just take byte arrays for sending. The following example shows how to send JSON but this could easily be altered to send a protocol buffer, YAML or some other format. JSON is a text format so we also have to encode the string in most languages to bytes. We are using UTF-8, the JSON standard encoding.
+
Take a simple stock ticker that sends the symbol and price of each stock:
Subscribers can use auto-ack or manual-ack. Auto-ack is the default for most clients and is sent by the library when the message callback returns. Manual ack provides more control. The subscription options provide flags to:
+
+
Set manual acks to true
+
Set the ack wait used by the server for messages to this subscription
+
+
The ack wait is the time the server will wait before resending a message.
Subscribers can set max in flight to rate limit incoming messages. The server will send at most “max in flight” messages before receiving an acknowledgement. Setting max in flight to 1 insures every message is processed in order.
NATS Streaming is a service on top of NATS. To connect to the service you first connect to NATS and then use the client library to communicate with the server over your NATS connection. Most of the libraries provide a convenience mechanism for connecting in a single step. These convenience methods will take some NATS options, like the server, and perform the NATS connection first, then then run the protocol to connect to the streaming server.
+
Connecting to a streaming server requires a cluster id, defined by the server configuration, and a client ID defined by the client.
Sometimes you may want to provide NATS settings that aren't available in the streaming libraries connect method. Or, you may want to reuse a NATS connection instead of creating a new one. In this case the libraries generally provide a way to connect to streaming with an existing NATS connection:
Regular subscriptions remember their position while the client is connected. If the client disconnects the position is lost. Durable subscriptions remember their position even if the client is disconnected. Durable subscriptions identify themselves with a name. Connect and disconnect won’t affect the durable subscriptions position in the channel. Unsubscribe will clear the durable subscription.
Where NATS provides at most once quality of service, streaming adds at least once. Streaming is implemented as a request-reply service on top of NATS. Streaming messages are encoded as protocol buffers, the streaming clients use NATS to talk to the streaming server. The streaming server organizes messages in channels and stores them in files and databases. ACKs are used to insure delivery in both directions.
+
+
Sometimes the maintainers will refer to NATS as "nats core" and streaming as "stan" or "streaming."
+
+
Messages to the streaming service are opaque byte arrays, just as they are with NATS. However, the streaming server protocol uses protocol buffers to wrap these byte arrays. So if you listen to the NATS traffic the messages will appear as protocol buffers, while the actual data sent and received will simply be byte arrays.
+
NATS streaming uses the concept of a channel to represent an ordered collection of messages. Clients send to and receive from channels instead of subjects. The subjects used by the streaming libraries and server are managed internally. Channels do not currently support wildcard. Channels aren’t raw subjects. Streaming isn’t raw NATS. The streaming libraries hide some of the differences.
+
Think of channels as a ring buffer. Messages are added until the configured limit is reached. Old messages are removed to make room for new ones. Old messages can expire, based on configuration. Subscriptions don’t affect channel content. Subscriptions are like a cursor on the ring buffer.
+
+
Positions in the channel are specified in multiple ways:
+
+
Sequence number - counting from 1
+
Time
+
Time delta (converted to time on client)
+
+
New subscriptions can also specify last received to indicate they only want new messages. Sequence numbers are persistent, when message #1 goes away the oldest message is message #2. Trying to go to a position before the oldest message will be moved to the oldest message.
+
+
Subscription Types
+
NATS Streaming supports several types of subscriptions:
+
+
Regular
+
Durable
+
Queue
+
Durable/Queue
+
+
Regular subscriptions pick the location of their channel position on creation and it is stored while the subscriber is active. Durable subscriptions store their position in the streaming server. Queue subscriptions share a channel position. Durable/Queue subscriptions share a channel position stored in the server. All subscriptions can be configured with a starting position, but only new durable subscriptions and new regular subscriptions respect the request.
+
All subscriptions define their position on creation. Regular subscriptions lose their position if the application crashes, the app disconnects or they unsubscribe. Durable subscriptions maintain their position through disconnect, subscriber close, but not through unsubscribe. The position on reconnect comes from the server not the options in both cases. Queue subscriptions share a position. Regular queue subscriptions lose their position on the last disconnect/unsubscribe. Durable queue subscriptions maintain their position through disconnect, but not through the last unsubscribe. Positions provided in options are ignored after the position is set.
+
Acknowledgements
+
In order to implement at least once delivery NATS streaming uses ACK messages for publishers and subscribers. Each message sent from the streaming server to the client must be acknowledged or it will be re-delivered. Developers must switch their mind set. The same message can arrive more than once. Messages should be idempotent. The client libraries can help with ACKs. Subscriptions can use manual or automatic ACKs. Manual ACKs are safer, since the program controls when they happen. An ACK wait setting is used to define the timeout before an ACK is considered missing.
+
+
Ack wait = 10s means that the server won’t redeliver for at least 10s
+
+
Using ACKs for each message sent can be a performance hit - round trip per message. NATS streaming allows subscriptions to set a max in flight value. Max in flight determines how many unacknowledged messages can be sent to the client. Ack Wait is used to decide when the ACK for a message has failed and it needs to be redelivered. New and redelivered messages are sent upon availability, in order.
+
Messages are sent in order, when they are available:
+
+
Max inflight = 2
+
Send msg 1 and msg 2
+
ACK 2
+
Message 3 arrives at the server
+
Send message 3 (since it is available)
+
When Ack wait expires, msg 1 is available
+
Send msg 1 (1 and 3 are in flight)
+
+
The streaming server sends available messages in order, but 1 isn’t available until its Ack wait expires. If max in flight = 1 then only 1 message is on the wire at a time, it will be re-sent until it is acknowledged. Re-delivered messages will not come out of order in this situation.
+
Setting max in flight to a number greater than 1 requires some thought and foresight to deal with redelivery scenarios.
+
Max in flight is a per-subscription setting. In the case of queue subscribers, each client can set the value. Normally, each client will use the same value but this is not a requirement.
+
NATS streaming uses acknowledgements on the sending side as well as the subscribing side. The streaming server acknowledges messages it receives and has persisted. A maximum in flight setting is used for publishers. No more than max in flight can be on their way to the server at one time. The library may provide various mechanisms to handle publisher ACKs. The application must manage redelivery to the server.
The streaming client library can provide a method for publishing synchronously. .These publish methods block until the ACK is returned by the server. An error or exception is used to indicate a timeout or other error.
+
err := sc.Publish("foo",[]byte("Hello World"))
+
+
Streaming libraries can also provide a way to publish asynchronously. An ACK callback of some kind is required. The library will publish the message and notify the callback on ACK or timeout. The global id associated with the message being sent is returned from publish so that the application can identify it on callback.
Queue subscriptions are created like other subscriptions with the addition of a queue name. All subscriptions, across clients, share the queue based on this unique name. Other subscriptions can receive messages independently of the queue groups. Unsubscribe removes a client from a group, the last unsubscribe kills the group. Max in flight is per subscription.
Clients subscribe to channels by name. Wildcards are not supported. Receiving messages is similar to core NATS. Messages in streaming use protocol buffers and will have a bit more structure than NATS opaque messages. Client messages are still presented and accepted as raw/opaque binary data. The use of protocol buffers is transparent.
+
Subscriptions come in several forms:
+
+
Regular
+
Durable
+
Queue
+
Queue/Durable
+
+
Subscriptions set their starting position on creation using position or time. For example, in Go you can start at:
The Go NATS client features a CustomDialer option which allows you to customize
+the connection logic against the NATS server without having to modify the internals
+of the client. For example, let's say that you want to make the client use the context
+package to use DialContext and be able to cancel connecting to NATS altogether with a deadline,
+you could then do define a Dialer implementation as follows:
With the dialer implementation above, the NATS client will retry a number of times to connect
+to the NATS server until the context is no longer valid:
+
funcmain(){
+ // Parent context cancels connecting/reconnecting altogether.
+ ctx, cancel := context.WithCancel(context.Background())
+ defercancel()
+
+ var err error
+ var nc *nats.Conn
+ cd :=&customDialer{
+ ctx: ctx,
+ connectTimeout:10* time.Second,
+ connectTimeWait:1* time.Second,
+ }
+ opts :=[]nats.Option{
+ nats.SetCustomDialer(cd),
+ nats.ReconnectWait(2* time.Second),
+ nats.ReconnectHandler(func(c *nats.Conn){
+ log.Println("Reconnected to", c.ConnectedUrl())
+ }),
+ nats.DisconnectHandler(func(c *nats.Conn){
+ log.Println("Disconnected from NATS")
+ }),
+ nats.ClosedHandler(func(c *nats.Conn){
+ log.Println("NATS connection is closed.")
+ }),
+ nats.NoReconnect(),
+ }
+ gofunc(){
+ nc, err = nats.Connect("127.0.0.1:4222", opts...)
+ }()
+
+WaitForEstablishedConnection:
+ for{
+ if err !=nil{
+ log.Fatal(err)
+ }
+
+ // Wait for context to be canceled either by timeout
+ // or because of establishing a connection...
+ select{
+ case<-ctx.Done():
+ break WaitForEstablishedConnection
+ default:
+ }
+
+ if nc ==nil||!nc.IsConnected(){
+ log.Println("Connection not ready")
+ time.Sleep(200* time.Millisecond)
+ continue
+ }
+ break WaitForEstablishedConnection
+ }
+ if ctx.Err()!=nil{
+ log.Fatal(ctx.Err())
+ }
+
+ for{
+ if nc.IsClosed(){
+ break
+ }
+ if err := nc.Publish("hello",[]byte("world")); err !=nil{
+ log.Println(err)
+ time.Sleep(1* time.Second)
+ continue
+ }
+ log.Println("Published message")
+ time.Sleep(1* time.Second)
+ }
+
+ // Disconnect and flush pending messages
+ if err := nc.Drain(); err !=nil{
+ log.Println(err)
+ }
+ log.Println("Disconnected")
+}
+
NATS is a publish subscribe messaging system. Subscribers listening on a subject name receive messages on that subject. If the subscriber is not actively listening on the subject, the message is not received. Subscribers can use the wildcard subjects * to match a single token to match the tail of a subject.
Where <subject> is the subject name and <message> is the text to publish.
+
For example:
+
% go run nats-pub.go msg.test hello
+
+
or
+
% go run nats-pub.go msg.test "NATS MESSAGE"
+
+
8. Verify message publication and receipt
+
You should see that the publisher sends the message: Published [msg.test] : 'NATS MESSAGE'
+
And that the subscriber receives the message: [#1] Received on [msg.test]: 'NATS MESSAGE'
+
Note that if the receiver does not get the message, check that you are using the same subject name for the publisher and the subscriber.
+
9. Publish another message
+
% go run nats-pub.go msg.test "NATS MESSAGE 2"
+
+
You should see that the subscriber receive message 2. Note that the message count is incremented each time your subscribing client receives a message on that subject:
+
10. Start another shell or command prompt session
+
You will use this session to run a second NATS subscriber.
NATS supports a form of load balancing using queue groups. Subscribers register a queue group name. A single subscriber in the group is randomly selected to receive the message.
+
Prerequisites
+
Go and the NATS server should be installed.
+
1. Start the NATS server
+
nats-server
+
+
2. Clone the repositories for each client examples
+
go get github.com/nats-io/go-nats
+git clone https://github.com/nats-io/node-nats.git
+git clone https://github.com/nats-io/ruby-nats.git
+
+
3. Run the Go client subscriber with queue group name
+
cd$GOPATH/src/github.com/nats-io/go-nats/examples
+go run nats-qsub.go foo my-queue
+
+
4. Install and run the Node client subscriber with queue group name
5. Install and run the Ruby client subscriber with queue group name
+
gem install nats
+nats-queue foo my-queue &
+
+
*6. Run another Go client subscriber without* the queue group.
+
cd$GOPATH/src/github.com/nats-io/go-nats/examples
+go run nats-sub.go foo
+
+
7. Publish a NATS message using the Go client
+
cd$GOPATH/src/github.com/nats-io/go-nats/examples
+go run nats-pub.go foo "Hello NATS!"
+
+
8. Verify message publication and receipt
+
You should see that the publisher sends the message: Published [foo] : 'Hello NATS!'
+
You should see that only one of the my-queue group subscribers receives the message. In addition, the Go client subscriber not in the my-queue group should also receive the message.
+
9. Publish another message
+
go run nats-pub.go foo "Hello NATS Again!"
+
+
You should see that a different queue group subscriber receives the message this time, chosen at random among the 3 queue group members.
You should see the message Receiver is listening, and that the NATS receiver client is listening on the "help.please" subject. The reply client acts as a receiver, listening for message requests. In NATS, the receiver is a subscriber.
+
5. In the other terminal, run the request client
+
% go run nats-req.go foo "request payload"
+
+
The NATS requestor client makes a request by sending the message "some message" on the “help.please” subject.
+
The NATS receiver client receives the message, formulates the reply ("OK, I CAN HELP!!!), and sends it to the inbox of the requester.
One difference is that instead of routes you specify gateways. As expected self-gateway connections are ignored, so you can share gateway configurations with minimal fuzz.
+
Starting a server:
+
> nats-server -c A.conf
+[85803] 2019/05/07 10:50:55.902474 [INF] Starting nats-server version 2.0.0-RC11
+[85803] 2019/05/07 10:50:55.902547 [INF] Git commit [not set]
+[85803] 2019/05/07 10:50:55.903669 [INF] Gateway name is A
+[85803] 2019/05/07 10:50:55.903684 [INF] Listening for gateways connections on localhost:7222
+[85803] 2019/05/07 10:50:55.903696 [INF] Address for gateway "A" is localhost:7222
+[85803] 2019/05/07 10:50:55.903909 [INF] Listening for client connections on 0.0.0.0:4222
+[85803] 2019/05/07 10:50:55.903914 [INF] Server id is NBHUDBF3TVJSWCDPG2HSKI4I2SBSPDTNYEXEMOFAZUZYXVA2IYRUGPZU
+[85803] 2019/05/07 10:50:55.903917 [INF] Server is ready
+[85803] 2019/05/07 10:50:56.830669 [INF] 127.0.0.1:50892 - gid:2 - Processing inbound gateway connection
+[85803] 2019/05/07 10:50:56.830673 [INF] 127.0.0.1:50891 - gid:1 - Processing inbound gateway connection
+[85803] 2019/05/07 10:50:56.831079 [INF] 127.0.0.1:50892 - gid:2 - Inbound gateway connection from "C" (NBHWDFO3KHANNI6UCEUL27VNWL7NWD2MC4BI4L2C7VVLFBSMZ3CRD7HE) registered
+[85803] 2019/05/07 10:50:56.831211 [INF] 127.0.0.1:50891 - gid:1 - Inbound gateway connection from "B" (ND2UJB3GFUHXOQ2UUMZQGOCL4QVR2LRJODPZH7MIPGLWCQRARJBU27C3) registered
+[85803] 2019/05/07 10:50:56.906103 [INF] Connecting to explicit gateway "B" (localhost:7333) at 127.0.0.1:7333
+[85803] 2019/05/07 10:50:56.906104 [INF] Connecting to explicit gateway "C" (localhost:7444) at 127.0.0.1:7444
+[85803] 2019/05/07 10:50:56.906404 [INF] 127.0.0.1:7333 - gid:3 - Creating outbound gateway connection to "B"
+[85803] 2019/05/07 10:50:56.906444 [INF] 127.0.0.1:7444 - gid:4 - Creating outbound gateway connection to "C"
+[85803] 2019/05/07 10:50:56.906647 [INF] 127.0.0.1:7444 - gid:4 - Outbound gateway connection to "C" (NBHWDFO3KHANNI6UCEUL27VNWL7NWD2MC4BI4L2C7VVLFBSMZ3CRD7HE) registered
+[85803] 2019/05/07 10:50:56.906772 [INF] 127.0.0.1:7333 - gid:3 - Outbound gateway connection to "B" (ND2UJB3GFUHXOQ2UUMZQGOCL4QVR2LRJODPZH7MIPGLWCQRARJBU27C3) registered
+
+
Once all the gateways are up, these clusters of one will forward messages as expected:
+
> nats-pub -s localhost:4444 foo bar
+Published [foo] : 'bar'
+
+# On a different session...
+> nats-sub -s localhost:4333 ">"
+Listening on [>]
+[#1] Received on [foo]: 'bar'
+
+
Gateway Configuration Block
+
+
+
+
Property
+
Description
+
+
+
+
+
advertise
+
Hostport <host>:<port> to advertise to other gateways.
+
+
+
authorization
+
Authorization block (same as other nats-server authorization configuration).
+
+
+
connect_retries
+
Number of times the server will try to connect to a discovered gateway.
+
+
+
gateways
+
List of Gateway entries - see below.
+
+
+
host
+
Interface where the gateway will listen for incomming gateway connections.
+
+
+
listen
+
Combines host and port as <host>:<port>
+
+
+
name
+
Name for this cluster, all gateways belonging to the same cluster, should specify the same name.
+
+
+
port
+
Port where the gateway will listen for incomming gateway connections.
+
+
+
reject_unknown
+
If true, gateway will reject connections from gateways that are not configured in gateways.
Gateways enable connecting one or more clusters together; they allow the formation of super clusters from smaller clusters. Cluster and Gateway protocols listen in different ports. Clustering is used for adjacent servers; gateways are for joining clusters together. Typically all cluster nodes will also be gateway nodes, but this is not a requirement.
+
Gateway configuration is similar to clustering:
+
+
gateways have a dedicated port where they listen for gateway requests
+
gateways gossip gateway members and remote discovered gateways
+
+
Unlike clusters, gateways:
+
+
don't form a full mesh
+
are bound by uni-directional connections
+
+
Gateways exist to:
+
+
reduce the number of connections required between servers
+
optimize the interest graph propagation
+
+
Gateway Connections
+
A nats-server in a gateway role will specify a port where it will accept gateways connections. If the configuration specifies other externalgateways, the gateway will create one outbound gateway connection for each gateway in its configuration. It will also gossip other gateways it knows or discovered.
+
If the local cluster has three gateway nodes, this means there will be three outbound connections to each external gateway.
+
+
+
In the example above cluster A has configured gateway connections for B (solid lines). B has discovered gateway connections to A (dotted lines). Note that the number of outgoing connections always matches the number of gateways with the same name.
+
+
+
+
In this second example, again configured connections are shown with solid lines and discovered gateway connections are shown using dotted lines. Gateways A and C were both discovered via gossiping; B discovered A and A discovered C.
+
+
A key point in the description above is that each node in the cluster will make a connection to a single node in the remote cluster — a difference from the clustering protocol, where every node is directly connected to all other nodes.
+
For those mathematically inclined, cluster connections are N(N-1)/2 where N is the number of nodes in the cluster. On gateway configurations, outbound connections are the summation of Ni(M-1) where Ni is the number of nodes in a gateway i, and M is the total number of gateways. Inbound connections are the summation of U-Ni where U is the sum of all gateway nodes in all gateways, and N is the number of nodes in a gateway i. It works out that both inbound and outbound connection counts are the same.
+
The number of connections required to join clusters using clustering vs. gateways is apparent very quickly. For 3 clusters, with N nodes:
+
+
+
+
Nodes per Cluster
+
Full Mesh Conns
+
Gateway Conns
+
+
+
+
+
1
+
3
+
6
+
+
+
2
+
15
+
12
+
+
+
3
+
36
+
18
+
+
+
4
+
66
+
24
+
+
+
5
+
105
+
30
+
+
+
30
+
4005
+
180
+
+
+
+
Interest Propagation
+
Gateways propagate interest using three different mechanisms:
+
+
Optimistic Mode
+
Interest-only Mode
+
Queue Subscriptions
+
+
Optimistic Mode
+
When a publisher in A publishes "foo", the A gateway will check if cluster B has registered no interest in "foo". If not, it forwards "foo" to B. If upon receiving "foo", B has no subscribers on "foo", B will send a gateway protocol message to A expressing that it has no interest on "foo", preventing future messages on "foo" from being forwarded.
+
Should a subscriber on B create a subscription to "foo", B knowing that it had previously rejected interest on foo, will send a gateway protocol message to cancel its previous no interest on "foo" in A.
+
Interest-only Mode
+
When a gateway on A sends many messages on various subjects for which B has no interest. B sends a gateway protocol message for A to stop sending optimistically, and instead send if there's known interest in the subject. As subscriptions come and go on B, B will update its subject interest with A.
+
Queue Subscriptions
+
When a queue subscriber creates a new subscription, the gateway propagates the subscription interest to other gateways. The subscription interest is only propagated once per Account and subject. When the last queue subscriber is gone, the cluster interest is removed.
+
Queue subscriptions work on Interest-only Mode to honor NATS' queue semantics across the Super Cluster. For each queue group, a message is only delivered to a single queue subscriber. Only when a local queue group member is not found, is a message forwarded to a different interested cluster; gateways will always try to serve local queue subscribers first and only failover when a local queue subscriber is not found.