Connecting to NATS

Most client libraries provide several ways to connect to the NATS server, gnatsd. The server itself is identified by a standard URL with the nats protocol. Throughout these examples we will rely on a test server, provided by nats.io, at nats://demo.nats.io:4222, where 4222 is the default port for NATS.

Connecting to a Specific Server

For example, to connect to the demo server with a URL:

// If connecting to the default port, the URL can be simplified
// to just the hostname/IP.
// That is, the connect below is equivalent to:
// nats.Connect("nats://demo.nats.io:4222")
nc, err := nats.Connect("demo.nats.io")
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

// Do something with the connection

Connection nc = Nats.connect("nats://demo.nats.io:4222");

// Do something with the connection

nc.close();
let nc = NATS.connect("nats://demo.nats.io:4222");
nc.on('connect', (c) => {
    // Do something with the connection
    doSomething();
    // When done close it
    nc.close();
});
nc.on('error', (err) => {
    failed(err);
});
nc = NATS()
await nc.connect(servers=["nats://demo.nats.io:4222"])

# Do something with the connection

await nc.close()

require 'nats/client'

NATS.start(servers: ["nats://demo.nats.io:4222"]) do |nc|
   # Do something with the connection

   # Close the connection
   nc.close
end
// will throw an exception if connection fails
    let nc = await connect("nats://demo.nats.io:4222");
    // Do something with the connection

    // Close the connection
    nc.close();

Connecting to the Default Server

Some libraries also provide a special way to connect to a default url, which is general nats://localhost:4222:

nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

// Do something with the connection

Connection nc = Nats.connect();

// Do something with the connection

nc.close();
let nc = NATS.connect();
nc.on('connect', (c) => {
    // Do something with the connection
    doSomething();
    // When done close it
    nc.close();
});
nc.on('error', (err) => {
    failed(err);
});
nc = NATS()
await nc.connect()

# Do something with the connection

await nc.close()

require 'nats/client'

NATS.start do |nc|
   # Do something with the connection

   # Close the connection
   nc.close
end
// will throw an exception if connection fails
let nc = await connect();
// Do something with the connection

// When done close it
nc.close();


// alternatively, you can use the Promise pattern
let nc1: Client;
connect()
    .then((c) => {
        nc1 = c;
        // Do something with the connection
        nc1.close();
    });
    // add a .catch/.finally

Setting a Connect Timeout

Each library has its own, language preferred way, to pass connection options. For example, to set the maximum time to connect to a server to 10 seconds:

nc, err := nats.Connect(nats.DefaultURL, nats.Timeout(10*time.Second))
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

// Do something with the connection

Options options = new Options.Builder().
                            server("nats://demo.nats.io:4222").
                            connectionTimeout(Duration.ofSeconds(10)). // Set timeout
                            build();
Connection nc = Nats.connect(options);

// Do something with the connection

nc.close();
// connection timeout is not supported on node-nats
nc = NATS()
await nc.connect(connect_timeout=2)

# Do something with the connection

await nc.close()

# There is currently no connect timeout as part of the Ruby NATS client API, but you can use a timer to mimic it.
require 'nats/client'

timer = EM.add_timer(5) do
  NATS.connect do |nc|
    # Do something with the connection

    # Close the connection
    nc.close
  end
end
EM.cancel_timer(timer)
let nc = await connect({
    url: "nats://demo.nats.io:4222",
    timeout: 1000
});

The available options are discussed more below, in other pages, and in the documentation for your client library.

Connecting to a Cluster

When connecting to a cluster, there are a few things to think about.

  • Passing a URL for each cluster member (semi-optional)
  • The connection algorithm
  • The reconnect algorithm (discussed later)
  • Server provided URLs

When a client connects to the server, the server may provide a list of URLs for additional known servers. This allows a client to connect to one server and still have other servers available during reconnect. However, the initial connection cannot depend on these additional servers. Rather, the additional connection will try to connect to each of the URLs provided in the connect call and will fail if it is unable to connect to any of them. Note, failure behavior is library dependent, please check the documentation for your client library on information about what happens if the connect fails.

servers := []string{"nats://localhost:1222",
	"nats://localhost:1223",
	"nats://localhost:1224",
}

nc, err := nats.Connect(strings.Join(servers, ","))
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

// Do something with the connection

Options options = new Options.Builder().
                            server("nats://localhost:1222").
                            server("nats://localhost:1223").
                            server("nats://localhost:1224").
                            build();
Connection nc = Nats.connect(options);

// Do something with the connection

nc.close();
let nc = NATS.connect({
    servers: [
        "nats://demo.nats.io:4222",
        "nats://localhost:4222"
    ]}
);

nc.on('connect', (c) => {
    // Do something with the connection
    doSomething();
    // When done close it
    nc.close();
});
nc.on('error', (err) => {
    failed(err);
});
nc = NATS()
await nc.connect(servers=[
   "nats://127.0.0.1:1222",
   "nats://127.0.0.1:1223",
   "nats://127.0.0.1:1224"
   ])

# Do something with the connection

await nc.close()

require 'nats/client'

NATS.start(servers: ["nats://127.0.0.1:1222", "nats://127.0.0.1:1223", "nats://127.0.0.1:1224"]) do |nc|
   # Do something with the connection

   # Close the connection
   nc.close
end
// will throw an exception if connection fails
let nc = await connect({
        servers: [
            "nats://demo.nats.io:4222",
            "nats://localhost:4222"
        ]
});
// Do something with the connection

// When done close it
nc.close();

Reconnecting

Most, if not all, of the client libraries will reconnect to the server if they are disconnected due to a network problem. The reconnect logic can differ by library, so check your client libraries. In general, the client will try to connect to all of the servers it knows about, either through the URLs provided in connect or the URLs provided by its most recent server. The library may have several options to help control reconnect behavior.

Disable Reconnect

For example, you can disable reconnect:

// Disable reconnect attempts
nc, err := nats.Connect("demo.nats.io", nats.NoReconnect())
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

// Do something with the connection

Options options = new Options.Builder().
                            server("nats://demo.nats.io:4222").
                            noReconnect(). // Disable reconnect attempts
                            build();
Connection nc = Nats.connect(options);

// Do something with the connection

nc.close();
let nc = NATS.connect({
    reconnect: false,
    servers: ["nats://demo.nats.io:4222"]
});
nc = NATS()
await nc.connect(
   servers=[
      "nats://demo.nats.io:1222",
      "nats://demo.nats.io:1223",
      "nats://demo.nats.io:1224"
      ],
   allow_reconnect=False,
   )

# Do something with the connection

await nc.close()

require 'nats/client'

NATS.start(servers: ["nats://127.0.0.1:1222", "nats://127.0.0.1:1223", "nats://127.0.0.1:1224"], reconnect: false) do |nc|
   # Do something with the connection

   # Close the connection
   nc.close
end
// will throw an exception if connection fails
let nc = await connect({
    reconnect: false,
    servers: ["nats://demo.nats.io:4222"]
});
nc.close();

Set the Number of Reconnect Attempts

Applications can set the maximum reconnect attempts. Generally, this will limit the actual number of attempts total, but check your library documentation. For example, in Java, if the client knows about 3 servers and the maximum reconnects is set to 2, it will not try all of the servers. On the other hand, if the maximum is set to 6 it will try all of the servers twice before considering the reconnect a failure and closing.

// Set max reconnects attempts
nc, err := nats.Connect("demo.nats.io", nats.MaxReconnects(10))
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

// Do something with the connection

Options options = new Options.Builder().
                            server("nats://demo.nats.io:4222").
                            maxReconnects(10). // Set max reconnect attempts
                            build();
Connection nc = Nats.connect(options);

// Do something with the connection

nc.close();
let nc = NATS.connect({
    maxReconnectAttempts: 10,
    servers: ["nats://demo.nats.io:4222"]
});
nc = NATS()
await nc.connect(
   servers=["nats://demo.nats.io:4222"],
   max_reconnect_attempts=10,
   )

# Do something with the connection

await nc.close()

require 'nats/client'

NATS.start(servers: ["nats://127.0.0.1:1222", "nats://127.0.0.1:1223", "nats://127.0.0.1:1224"], max_reconnect_attempts: 10) do |nc|
   # Do something with the connection

   # Close the connection
   nc.close
end
// will throw an exception if connection fails
let nc = await connect({
    maxReconnectAttempts: 10,
    servers: ["nats://demo.nats.io:4222"]
});
nc.close();

Pausing Between Reconnect Attempts

It doesn’t make much sense to try to connect to the same server over and over. To prevent this sort of thrashing, and wasted reconnect attempts, libraries provide a wait setting. This setting will pause the reconnect logic if the same server is being tried multiple times. In the previous example, if you have 3 servers and 6 attempts, the Java library would loop over the three servers. If none were connectable, it will then try all three again. However, the Java client doesn’t wait between each attempt, only when trying the same server again, so in that example the library may never wait. If on the other hand, you only provide a single server URL and 6 attempts, the library will wait between each attempt.

// Set reconnect interval to 10 seconds
nc, err := nats.Connect("demo.nats.io", nats.ReconnectWait(10*time.Second))
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

// Do something with the connection

Options options = new Options.Builder().
                            server("nats://demo.nats.io:4222").
                            reconnectWait(Duration.ofSeconds(10)).  // Set Reconnect Wait
                            build();
Connection nc = Nats.connect(options);

// Do something with the connection

nc.close();
let nc = NATS.connect({
    reconnectTimeWait: 10 * 1000, //10s
    servers: ["nats://demo.nats.io:4222"]
});
nc = NATS()
await nc.connect(
   servers=["nats://demo.nats.io:4222"],
   reconnect_time_wait=10,
   )

# Do something with the connection

await nc.close()

require 'nats/client'

NATS.start(servers: ["nats://127.0.0.1:1222", "nats://127.0.0.1:1223", "nats://127.0.0.1:1224"], reconnect_time_wait: 10) do |nc|
   # Do something with the connection

   # Close the connection
   nc.close
end
// will throw an exception if connection fails
let nc = await connect({
    reconnectTimeWait: 10*1000, //10s
    servers: ["nats://demo.nats.io:4222"]
});
nc.close();

Avoiding the Thundering Herd

When a server goes down, there is a possible anti-pattern called the Thundering Herd where all of the clients try to reconnect immediately creating a denial of service attack. In order to prevent this, most NATS client libraries randomize the servers they attempt to connect to. This setting has no effect if only a single server is used, but in the case of a cluster, randomization, or shuffling, will ensure that no one server bears the brunt of the client reconnect attempts.

servers := []string{"nats://localhost:1222",
	"nats://localhost:1223",
	"nats://localhost:1224",
}

nc, err := nats.Connect(strings.Join(servers, ","), nats.DontRandomize())
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

// Do something with the connection

Options options = new Options.Builder().
                            server("nats://demo.nats.io:4222").
                            noRandomize(). // Disable reconnect shuffle
                            build();
Connection nc = Nats.connect(options);

// Do something with the connection

nc.close();
let nc = NATS.connect({
    noRandomize: false,
    servers: ["nats://127.0.0.1:4443",
        "nats://demo.nats.io:4222"
    ]
});
nc = NATS()
await nc.connect(
   servers=[
      "nats://demo.nats.io:1222",
      "nats://demo.nats.io:1223",
      "nats://demo.nats.io:1224"
      ],
   dont_randomize=True,
   )

# Do something with the connection

await nc.close()

require 'nats/client'

NATS.start(servers: ["nats://127.0.0.1:1222", "nats://127.0.0.1:1223", "nats://127.0.0.1:1224"], dont_randomize_servers: true) do |nc|
   # Do something with the connection

   # Close the connection
   nc.close
end
// will throw an exception if connection fails
let nc = await connect({
    noRandomize: false,
    servers: ["nats://127.0.0.1:4443",
        "nats://demo.nats.io:4222"
    ]
});
nc.close();

Listening for Reconnect Events

Because reconnect is primarily under the covers many libraries provide an event listener you can use to be notified of reconnect events. This event can be especially important for applications sending a lot of messages.

// Connection event handlers are invoked asynchronously
// and the state of the connection may have changed when
// the callback is invoked.
nc, err := nats.Connect("demo.nats.io",
	nats.DisconnectHandler(func(nc *nats.Conn) {
		// handle disconnect event
	}),
	nats.ReconnectHandler(func(nc *nats.Conn) {
		// handle reconnect event
	}))
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

// Do something with the connection

Options options = new Options.Builder().
                            server("nats://demo.nats.io:4222").
                            connectionListener((conn, type) -> {
                                if (type == Events.RECONNECTED) {
                                    // handle reconnected
                                } else if (type == Events.DISCONNECTED) {
                                    // handle disconnected, wait for reconnect
                                }
                            }).
                            build();
Connection nc = Nats.connect(options);

// Do something with the connection

nc.close();
let nc = NATS.connect({
    maxReconnectAttempts: 10,
    servers: ["nats://demo.nats.io:4222"]
});

nc.on('reconnect', (c) => {
    console.log('reconnected');
});
nc = NATS()

async def disconnected_cb():
   print("Got disconnected!")

async def reconnected_cb():
   # See who we are connected to on reconnect.
   print("Got reconnected to {url}".format(url=nc.connected_url.netloc))

await nc.connect(
   servers=["nats://demo.nats.io:4222"],
   reconnect_time_wait=10,
   reconnected_cb=reconnected_cb,
   disconnected_cb=disconnected_cb,
   )

# Do something with the connection.

require 'nats/client'

NATS.start(servers: ["nats://127.0.0.1:1222", "nats://127.0.0.1:1223", "nats://127.0.0.1:1224"]) do |nc|
   # Do something with the connection
   nc.on_reconnect do
    puts "Got reconnected to #{nc.connected_server}"
  end

  nc.on_disconnect do |reason|
    puts "Got disconnected! #{reason}"
  end
end
// will throw an exception if connection fails
let nc = await connect({
    maxReconnectAttempts: 10,
    servers: ["nats://demo.nats.io:4222"]
});
// first argument is the connection (same as nc in this case)
// second argument is the url of the server where the client
// connected
nc.on('reconnect', (conn, server) => {
    console.log('reconnected to', server);
});
nc.close();

Buffering Messages During Reconnect Attempts

There is another setting that comes in to play during reconnection. This setting controls how much memory the client library will hold in the form of outgoing messages while it is disconnected. During a short reconnect, the client will generally allow applications to publish messages but because the server is offline, will be cached in the client. The library will then send those messages on reconnect. When the maximum reconnect buffer is reached, messages will no longer be publishable by the client.

// Set reconnect buffer size in bytes (5 MB)
nc, err := nats.Connect("demo.nats.io", nats.ReconnectBufSize(5*1024*1024))
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

// Do something with the connection

Options options = new Options.Builder().
                            server("nats://demo.nats.io:4222").
                            reconnectBufferSize(5 * 1024 * 1024).  // Set buffer in bytes
                            build();
Connection nc = Nats.connect(options);

// Do something with the connection

nc.close();
// Reconnect buffer size is not configurable on node-nats
# Asyncio NATS client currentply does not implement a reconnect buffer
# There is currently no reconnect pending buffer as part of the Ruby NATS client.
// Reconnect buffer size is not configurable on ts-nats

As mentioned throughout this document, each client library may behave slightly differently. Please check the documentation for the library you are using.

results matching ""

    No results matching ""