1
0
mirror of https://github.com/taigrr/nats.docs synced 2025-01-18 04:03:23 -08:00

Added minimal doc for nats streaming based on training slides

This commit is contained in:
Stephen Asbury 2019-05-17 11:23:44 -07:00
parent f013ebb8d2
commit 37b46131da
10 changed files with 192 additions and 1 deletions

View File

@ -45,7 +45,7 @@
## Developing With NATS ## Developing With NATS
* [Intro](developer/README.md) * [Introduction](developer/README.md)
* [Concepts](developer/concepts/intro.md) * [Concepts](developer/concepts/intro.md)
* [Subject-Based Messaging](developer/concepts/subjects.md) * [Subject-Based Messaging](developer/concepts/subjects.md)
@ -106,6 +106,16 @@
* [Explore NATS Queueing](developer/tutorials/queues.md) * [Explore NATS Queueing](developer/tutorials/queues.md)
* [Advanced Connect and Custom Dialer in Go](developer/tutorials/custom_dialer.md) * [Advanced Connect and Custom Dialer in Go](developer/tutorials/custom_dialer.md)
## Developing With NATS Streaming
* [Introduction](developer/streaming/README.md)
* [Connecting to NATS Streaming](developer/streaming/connecting.md)
* [Publishing to a Channel](developer/streaming/publishing.md)
* [Receiving Messages from a Channel](developer/streaming/receiving.md)
* [Durable Subscriptions](developer/streaming/durables.md)
* [Queue Subscriptions](developer/streaming/queues.md)
* [Acknowledgements](developer/streaming/acks.md)
## NATS Protocol ## NATS Protocol
* [Protocol Demo](nats_protocol/nats-protocol-demo.md) * [Protocol Demo](nats_protocol/nats-protocol-demo.md)
* [Client Protocol](nats_protocol/nats-protocol.md) * [Client Protocol](nats_protocol/nats-protocol.md)

Binary file not shown.

After

Width:  |  Height:  |  Size: 25 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 23 KiB

View File

@ -0,0 +1,62 @@
# NATS Streaming
Where NATS provides at most once quality of service, streaming adds at least once. Streaming is implemented as a request-reply service on top of NATS. Streaming messages are encoded as protocol buffers, the streaming clients use NATS to talk to the streaming server. The streaming server organizes messages in channels and stores them in files and databases. ACKs are used to insure delivery in both directions.
> Sometimes the maintainers will refer to NATS as "nats core" and streaming as "stan" or "streaming."
Messages to the streaming service are opaque byte arrays, just as they are with NATS. However, the streaming server protocol uses protocol buffers to wrap these byte arrays. So if you listen to the NATS traffic the messages will appear as protocol buffers, while the actual data sent and received will simply be byte arrays.
NATS streaming uses the concept of a channel to represent an ordered collection of messages. Clients send to and receive from channels instead of subjects. The subjects used by the streaming libraries and server are managed internally. Channels do not currently support wildcard. Channels arent raw subjects. Streaming isnt raw NATS. The streaming libraries hide some of the differences.
Think of channels as a ring buffer. Messages are added until the configured limit is reached. Old messages are removed to make room for new ones. Old messages can expire, based on configuration. Subscriptions dont affect channel content. Subscriptions are like a cursor on the ring buffer.
![ring buffer](../resources/ring_buffer.png)
Positions in the channel are specified in multiple ways:
* Sequence number - counting from 1
* Time
* Time delta (converted to time on client)
New subscriptions can also specify last received to indicate they only want new messages. Sequence numbers are persistent, when message #1 goes away the oldest message is message #2. Trying to go to a position before the oldest message will be moved to the oldest message.
![starting positions](../resources/start_positions.png)
## Subscription Types
NATS Streaming supports several types of subscriptions:
* Regular
* Durable
* Queue
* Durable/Queue
Regular subscriptions pick the location of their channel position on creation and it is stored while the subscriber is active. Durable subscriptions store their position in the streaming server. Queue subscriptions share a channel position. Durable/Queue subscriptions share a channel position stored in the server. All subscriptions can be configured with a starting position, but only new durable subscriptions and new regular subscriptions respect the request.
All subscriptions define their position on creation. Regular subscriptions lose their position if the application crashes, the app disconnects or they unsubscribe. Durable subscriptions maintain their position through disconnect, subscriber close, but not through unsubscribe. The position on reconnect comes from the server not the options in both cases. Queue subscriptions share a position. Regular queue subscriptions lose their position on the last disconnect/unsubscribe. Durable queue subscriptions maintain their position through disconnect, but not through the last unsubscribe. Positions provided in options are ignored after the position is set.
## Acknowledgements
In order to implement at least once delivery NATS streaming uses ACK messages for publishers and subscribers. Each message sent from the streaming server to the client must be acknowledged or it will be re-delivered. Developers must switch their mind set. The same message can arrive more than once. Messages should be idempotent. The client libraries can help with ACKs. Subscriptions can use manual or automatic ACKs. Manual ACKs are safer, since the program controls when they happen. An ACK wait setting is used to define the timeout before an ACK is considered missing.
> Ack wait = 10s means that the server wont redeliver for at least 10s
Using ACKs for each message sent can be a performance hit - round trip per message. NATS streaming allows subscriptions to set a max in flight value. Max in flight determines how many unacknowledged messages can be sent to the client. Ack Wait is used to decide when the ACK for a message has failed and it needs to be redelivered. New and redelivered messages are sent upon availability, in order.
Messages are sent in order, when they are available:
* Max inflight = 2
* Send msg 1 and msg 2
* ACK 2
* Message 3 arrives at the server
* Send message 3 (since it is available)
* When Ack wait expires, msg 1 is available
* Send msg 1 (1 and 3 are in flight)
The streaming server sends available messages in order, but 1 isnt available until its Ack wait expires. If max in flight = 1 then only 1 message is on the wire at a time, it will be re-sent until it is acknowledged. Re-delivered messages will not come out of order in this situation.
Setting max in flight to a number greater than 1 requires some thought and foresight to deal with redelivery scenarios.
Max in flight is a per-subscription setting. In the case of queue subscribers, each client can set the value. Normally, each client will use the same value but this is not a requirement.
NATS streaming uses acknowledgements on the sending side as well as the subscribing side. The streaming server acknowledges messages it receives and has persisted. A maximum in flight setting is used for publishers. No more than max in flight can be on their way to the server at one time. The library may provide various mechanisms to handle publisher ACKs. **The application must manage redelivery to the server**.

View File

@ -0,0 +1,25 @@
# Acknowledgements
Subscribers can use auto-ack or manual-ack. Auto-ack is the default for most clients and is sent by the library when the message callback returns. Manual ack provides more control. The subscription options provide flags to:
* Set manual acks to true
* Set the ack wait used by the server for messages to this subscription
The ack wait is the time the server will wait before resending a message.
```go
sub, err := sc.Subscribe("foo",
func(m *stan.Msg) {
m.Ack()
}, stan.SetManualAckMode(), stan.AckWait(aw))
```
## Max In Flight
Subscribers can set max in flight to rate limit incoming messages. The server will send at most “max in flight” messages before receiving an acknowledgement. Setting max in flight to 1 insures every message is processed in order.
```go
sc.Subscribe("foo", func(m *stan.Msg) {...},
stan.SetManualAckMode(),
stan.MaxInflight(25))
```

View File

@ -0,0 +1,15 @@
# Connecting to NATS Streaming
NATS Streaming is a service on top of NATS. To connect to the service you first connect to NATS and then use the client library to communicate with the server over your NATS connection. Most of the libraries provide a convenience mechanism for connecting in a single step. These convenience methods will take some NATS options, like the server, and perform the NATS connection first, then then run the protocol to connect to the streaming server.
Connecting to a streaming server requires a cluster id, defined by the server configuration, and a client ID defined by the client.
```go
sc, err := stan.Connect(clusterID, clientID, stan.NatsURL(“nats://localhost:4222”))
```
Sometimes you may want to provide NATS settings that aren't available in the streaming libraries connect method. Or, you may want to reuse a NATS connection instead of creating a new one. In this case the libraries generally provide a way to connect to streaming with an existing NATS connection:
```go
sc, err := stan.Connect(clusterID, clientID, stan.NatsConn(nc))
```

View File

@ -0,0 +1,7 @@
# Durable Subscriptions
Regular subscriptions remember their position while the client is connected. If the client disconnects the position is lost. Durable subscriptions remember their position even if the client is disconnected. Durable subscriptions identify themselves with a name. Connect and disconnect wont affect the durable subscriptions position in the channel. Unsubscribe will clear the durable subscription.
```go
sc.Subscribe("foo", func(m *stan.Msg) {...}, stan.DurableName("my-durable"))
```

View File

@ -0,0 +1,15 @@
# Publishing to a Channel
The streaming client library can provide a method for publishing synchronously. .These publish methods block until the ACK is returned by the server. An error or exception is used to indicate a timeout or other error.
```go
err := sc.Publish("foo", []byte("Hello World"))
```
Streaming libraries can also provide a way to publish asynchronously. An ACK callback of some kind is required. The library will publish the message and notify the callback on ACK or timeout. The global id associated with the message being sent is returned from publish so that the application can identify it on callback.
```go
ackHandler := func(ackedNuid string, err error){ ... }
nuid, err := sc.PublishAsync("foo", []byte("Hello World"), ackHandler)
```

View File

@ -0,0 +1,11 @@
# Queue Subscriptions
Queue subscriptions are created like other subscriptions with the addition of a queue name. All subscriptions, across clients, share the queue based on this unique name. Other subscriptions can receive messages independently of the queue groups. Unsubscribe removes a client from a group, the last unsubscribe kills the group. Max in flight is per subscription.
```go
qsub1, _ := sc.QueueSubscribe(channelID,
queueName, func(m *stan.Msg) {...})
qsub2, _ := sc.QueueSubscribe(channelID,
queueName, func(m *stan.Msg) {...})
```

View File

@ -0,0 +1,46 @@
# Receiving Messages from a Channel
Clients subscribe to channels by name. Wildcards are not supported. Receiving messages is similar to core NATS. Messages in streaming use protocol buffers and will have a bit more structure than NATS opaque messages. Client messages are still presented and accepted as raw/opaque binary data. The use of protocol buffers is transparent.
Subscriptions come in several forms:
* Regular
* Durable
* Queue
* Queue/Durable
Subscriptions set their starting position on creation using position or time. For example, in Go you can start at:
* The last message received
```go
sub, err := sc.Subscribe("foo",
func(m *stan.Msg) {...},
stan.StartWithLastReceived())
```
* The beginning of the channel
```go
sub, err := sc.Subscribe("foo",
func(m *stan.Msg) {...},
stan.DeliverAllAvailable())
```
* A specific message, indexing starts at 1
```go
sub, err := sc.Subscribe("foo",
func(m *stan.Msg) {...},
stan.StartAtSequence(22))
```
* A specific time the message arrived in the channel
```go
var startTime time.Time
...
sub, err := sc.Subscribe("foo",
func(m *stan.Msg) {...},
stan.StartAtTime(startTime))
```