Add in snapshot and restore JSApi

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-05-27 19:54:59 -07:00
parent 8727315eb9
commit fa59cff105
9 changed files with 489 additions and 34 deletions

View File

@@ -1289,6 +1289,27 @@ func (a *Account) internalClient() *client {
return a.ic
}
// Internal account scoped subscriptions.
func (a *Account) subscribeInternal(c *client, subject string, cb msgHandler) (*subscription, error) {
a.mu.Lock()
sid := strconv.FormatUint(a.isid+1, 10)
a.isid++
a.mu.Unlock()
// This will happen in parsing when the account has not been properly setup.
if c == nil {
return nil, fmt.Errorf("no internal account client")
}
sub, err := c.processSub([]byte(subject+" "+sid), true)
if err != nil {
return nil, err
}
sub.icb = cb
return sub, nil
}
// This will add an account subscription that matches the "from" from a service import entry.
func (a *Account) addServiceImportSub(si *serviceImport) error {
a.mu.Lock()

View File

@@ -2740,13 +2740,8 @@ func (c *client) deliverMsg(sub *subscription, subject, mh, msg []byte, gwrply b
client.outBytes += msgSize
// Check for internal subscriptions.
if client.kind == SYSTEM || client.kind == JETSTREAM || client.kind == ACCOUNT {
s := client.srv
if sub.icb != nil || client.kind == SYSTEM || client.kind == JETSTREAM || client.kind == ACCOUNT {
client.mu.Unlock()
if sub.icb == nil {
s.Debugf("Received internal callback with no registered handler")
return false
}
// Internal account clients are for service imports and need the
// complete raw msg with '\r\n'.
if client.kind == ACCOUNT {

View File

@@ -1945,7 +1945,7 @@ func (fs *fileStore) Stop() error {
const errFile = "errors.txt"
// Stream our snapshot through gzip and tar.
func (fs *fileStore) streamSnapshot(w io.WriteCloser, includeConsumers bool) {
func (fs *fileStore) streamSnapshot(w io.WriteCloser, blks []*msgBlock, includeConsumers bool) {
defer w.Close()
gzw := gzip.NewWriter(w)
@@ -2011,7 +2011,6 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, includeConsumers bool) {
// Now do messages themselves.
fs.mu.Lock()
lmb := fs.lmb
blks := fs.blks
fs.mu.Unlock()
// Can't use join path here, zip only recognizes relative paths with forward slashes.
@@ -2094,7 +2093,7 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, includeConsumers bool) {
}
// Create a snapshot of this stream and its consumer's state along with messages.
func (fs *fileStore) Snapshot(deadline time.Duration, includeConsumers bool) (io.ReadCloser, error) {
func (fs *fileStore) Snapshot(deadline time.Duration, includeConsumers bool) (*SnapshotResult, error) {
fs.mu.Lock()
if fs.closed {
fs.mu.Unlock()
@@ -2107,15 +2106,19 @@ func (fs *fileStore) Snapshot(deadline time.Duration, includeConsumers bool) (io
}
// Mark us as snapshotting
fs.sips += 1
blks := fs.blks
blkSize := int(fs.fcfg.BlockSize)
fs.mu.Unlock()
pr, pw := net.Pipe()
// Set a write deadline here to protect ourselves.
pw.SetWriteDeadline(time.Now().Add(deadline))
if deadline > 0 {
pw.SetWriteDeadline(time.Now().Add(deadline))
}
// Stream in separate Go routine.
go fs.streamSnapshot(pw, includeConsumers)
go fs.streamSnapshot(pw, blks, includeConsumers)
return pr, nil
return &SnapshotResult{pr, blkSize, len(blks)}, nil
}
////////////////////////////////////////////////////////////////////////////////

View File

@@ -1120,7 +1120,7 @@ func TestFileStoreSnapshot(t *testing.T) {
if err != nil {
t.Fatalf("Error creating snapshot")
}
snapshot, err := ioutil.ReadAll(r)
snapshot, err := ioutil.ReadAll(r.Reader)
if err != nil {
t.Fatalf("Error reading snapshot")
}
@@ -1212,7 +1212,7 @@ func TestFileStoreSnapshot(t *testing.T) {
// Now check to make sure that we get the correct error when trying to delete or erase
// a message when a snapshot is in progress and that closing the reader releases that condition.
r, err := fs.Snapshot(5*time.Second, true)
sr, err := fs.Snapshot(5*time.Second, true)
if err != nil {
t.Fatalf("Error creating snapshot")
}
@@ -1224,7 +1224,7 @@ func TestFileStoreSnapshot(t *testing.T) {
}
// Now make sure we can do these when we close the reader and release the snapshot condition.
r.Close()
sr.Reader.Close()
checkFor(t, time.Second, 10*time.Millisecond, func() error {
if _, err := fs.RemoveMsg(122); err != nil {
return fmt.Errorf("Got an error on remove after snapshot: %v", err)
@@ -1233,19 +1233,19 @@ func TestFileStoreSnapshot(t *testing.T) {
})
// Make sure if we do not read properly then it will close the writer and report an error.
r, err = fs.Snapshot(10*time.Millisecond, false)
sr, err = fs.Snapshot(10*time.Millisecond, false)
if err != nil {
t.Fatalf("Error creating snapshot")
}
var buf [32]byte
if n, err := r.Read(buf[:]); err != nil || n == 0 {
if n, err := sr.Reader.Read(buf[:]); err != nil || n == 0 {
t.Fatalf("Expected to read beginning, got %v and %d", err, n)
}
// Cause snapshot to timeout.
time.Sleep(20 * time.Millisecond)
// Read again should fail
if _, err := r.Read(buf[:]); err != io.EOF {
if _, err := sr.Reader.Read(buf[:]); err != io.EOF {
t.Fatalf("Expected read to produce an error, got none")
}
}

View File

@@ -17,7 +17,9 @@ import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"os"
"sort"
"strconv"
"strings"
@@ -32,7 +34,7 @@ const (
// Will return JSON response.
JSApiAccountInfo = "$JS.API.INFO"
// JSApiCreateTemplate is the endpoint to create new stream templates.
// JSApiTemplateCreate is the endpoint to create new stream templates.
// Will return JSON response.
JSApiTemplateCreate = "$JS.API.STREAM.TEMPLATE.CREATE.*"
JSApiTemplateCreateT = "$JS.API.STREAM.TEMPLATE.CREATE.%s"
@@ -46,17 +48,17 @@ const (
JSApiTemplateInfo = "$JS.API.STREAM.TEMPLATE.INFO.*"
JSApiTemplateInfoT = "$JS.API.STREAM.TEMPLATE.INFO.%s"
// JSApiDeleteTemplate is the endpoint to delete stream templates.
// JSApiTemplateDelete is the endpoint to delete stream templates.
// Will return JSON response.
JSApiTemplateDelete = "$JS.API.STREAM.TEMPLATE.DELETE.*"
JSApiTemplateDeleteT = "$JS.API.STREAM.TEMPLATE.DELETE.%s"
// JSApiCreateStream is the endpoint to create new streams.
// JSApiStreamCreate is the endpoint to create new streams.
// Will return JSON response.
JSApiStreamCreate = "$JS.API.STREAM.CREATE.*"
JSApiStreamCreateT = "$JS.API.STREAM.CREATE.%s"
// JSApiUpdateStream is the endpoint to update existing streams.
// JSApiStreamUpdate is the endpoint to update existing streams.
// Will return JSON response.
JSApiStreamUpdate = "$JS.API.STREAM.UPDATE.*"
JSApiStreamUpdateT = "$JS.API.STREAM.UPDATE.%s"
@@ -72,7 +74,7 @@ const (
JSApiStreamInfo = "$JS.API.STREAM.INFO.*"
JSApiStreamInfoT = "$JS.API.STREAM.INFO.%s"
// JSApiDeleteStream is the endpoint to delete streams.
// JSApiStreamDelete is the endpoint to delete streams.
// Will return JSON response.
JSApiStreamDelete = "$JS.API.STREAM.DELETE.*"
JSApiStreamDeleteT = "$JS.API.STREAM.DELETE.%s"
@@ -82,6 +84,18 @@ const (
JSApiStreamPurge = "$JS.API.STREAM.PURGE.*"
JSApiStreamPurgeT = "$JS.API.STREAM.PURGE.%s"
// JSApiStreamSnapshot is the endpoint to snapshot streams.
// Will return a stream of chunks with a nil chunk as EOF to
// the deliver subject. Caller should respond to each chunk
// with a nil body response for ack flow.
JSApiStreamSnapshot = "$JS.API.STREAM.SNAPSHOT.*"
JSApiStreamSnapshotT = "$JS.API.STREAM.SNAPSHOT.%s"
// JSApiStreamRestore is the endpoint to restore a stream from a snapshot.
// Caller should resond to each chunk with a nil body response.
JSApiStreamRestore = "$JS.API.STREAM.RESTORE.*"
JSApiStreamRestoreT = "$JS.API.STREAM.RESTORE.%s"
// JSApiDeleteMsg is the endpoint to delete messages from a stream.
// Will return JSON response.
JSApiMsgDelete = "$JS.API.STREAM.MSG.DELETE.*"
@@ -287,6 +301,35 @@ type JSApiMsgDeleteResponse struct {
const JSApiMsgDeleteResponseType = "io.nats.jetstream.api.v1.stream_msg_delete_response"
type JSApiStreamSnapshotRequest struct {
// Subject to deliver the chunks to for the snapshot.
DeliverSubject string `json:"deliver_subject"`
// Do not include consumers in the snapshot.
NoConsumers bool `json:"no_consumers,omitempty"`
// Optional chunk size preference. Otherwise server selects.
ChunkSize int `json:"chunk_size,omitempty"`
}
// JSApiStreamSnapshotResponse is the direct response to the snapshot request.
type JSApiStreamSnapshotResponse struct {
ApiResponse
// Estimate of number of blocks for the messages.
NumBlks int `json:"num_blks"`
// Block size limit as specified by the stream.
BlkSize int `json:"blk_size"`
}
const JSApiStreamSnapshotResponseType = "io.nats.jetstream.api.v1.stream_snapshot_response"
// JSApiStreamRestoreResponse is the direct response to the restore request.
type JSApiStreamRestoreResponse struct {
ApiResponse
// Subject to deliver the chunks to for the snapshot restore.
DeliverSubject string `json:"deliver_subject"`
}
const JSApiStreamRestoreResponseType = "io.nats.jetstream.api.v1.stream_restore_response"
// JSApiMsgGetRequest get a message request.
type JSApiMsgGetRequest struct {
Seq uint64 `json:"seq"`
@@ -406,6 +449,8 @@ var allJsExports = []string{
JSApiStreamInfo,
JSApiStreamDelete,
JSApiStreamPurge,
JSApiStreamSnapshot,
JSApiStreamRestore,
JSApiMsgDelete,
JSApiMsgGet,
JSApiConsumerCreate,
@@ -433,6 +478,8 @@ func (s *Server) setJetStreamExportSubs() error {
{JSApiStreamInfo, s.jsStreamInfoRequest},
{JSApiStreamDelete, s.jsStreamDeleteRequest},
{JSApiStreamPurge, s.jsStreamPurgeRequest},
{JSApiStreamSnapshot, s.jsStreamSnapshotRequest},
{JSApiStreamRestore, s.jsStreamRestoreRequest},
{JSApiMsgDelete, s.jsMsgDeleteRequest},
{JSApiMsgGet, s.jsMsgGetRequest},
{JSApiConsumerCreate, s.jsConsumerCreateRequest},
@@ -1033,6 +1080,193 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, subject, rep
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(resp))
}
// Request to restore a stream.
func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, subject, reply string, msg []byte) {
if c.acc == nil {
return
}
acc := c.acc
var resp = JSApiStreamRestoreResponse{ApiResponse: ApiResponse{Type: JSApiStreamRestoreResponseType}}
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if !isEmptyRequest(msg) {
resp.Error = jsNotEmptyRequestErr
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
stream := streamNameFromSubject(subject)
if _, err := acc.LookupStream(stream); err == nil {
resp.Error = &ApiError{Code: 400, Description: fmt.Sprintf("stream [%q] already exists", stream)}
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
tfile, err := ioutil.TempFile("", "jetstream-restore-")
if err != nil {
resp.Error = &ApiError{Code: 500, Description: "jetstream unable to open temp storage for restore"}
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Create our internal subscription to accept the snapshot.
restoreSubj := fmt.Sprintf("_restore_.%s", nuid.Next())
// FIXME(dlc) - Can't recover well here if something goes wrong. Could use channels and at least time
// things out. Note that this is tied to the requesting client, so if it is a tool this goes away when
// the client does. Only thing leaking here is the sub on strange failure.
acc.subscribeInternal(c, restoreSubj, func(sub *subscription, c *client, _, _ string, msg []byte) {
if len(msg) == 0 {
tfile.Seek(0, 0)
// TODO(dlc) - no way right now to communicate back.
acc.RestoreStream(stream, tfile)
tfile.Close()
os.Remove(tfile.Name())
c.processUnsub(sub.sid)
return
}
// Append chunk to temp file.
tfile.Write(msg)
})
resp.DeliverSubject = restoreSubj
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(resp))
}
// Process a snapshot request.
func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, subject, reply string, msg []byte) {
if c.acc == nil {
return
}
acc := c.acc
var resp = JSApiStreamSnapshotResponse{ApiResponse: ApiResponse{Type: JSApiStreamSnapshotResponseType}}
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if isEmptyRequest(msg) {
resp.Error = jsBadRequestErr
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
stream := streamNameFromSubject(subject)
mset, err := acc.LookupStream(stream)
if err != nil {
resp.Error = jsNotFoundError(err)
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
var req JSApiStreamSnapshotRequest
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = jsInvalidJSONErr
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if !IsValidSubject(req.DeliverSubject) {
resp.Error = jsBadRequestErr
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
sr, err := mset.Snapshot(0, !req.NoConsumers)
if err != nil {
resp.Error = jsError(err)
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
resp.NumBlks = sr.NumBlks
resp.BlkSize = sr.BlkSize
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(resp))
// Now do the real streaming in a separate go routine.
go s.streamSnapshot(c, mset, sr, &req)
}
const defaultSnapshotChunkSize = 64 * 1024
// streamSnapshot will stream out our snapshot to the reply subject.
func (s *Server) streamSnapshot(c *client, mset *Stream, sr *SnapshotResult, req *JSApiStreamSnapshotRequest) {
chunkSize := req.ChunkSize
if chunkSize == 0 {
chunkSize = defaultSnapshotChunkSize
}
// Setup for the chunk stream.
acc := c.acc
reply := req.DeliverSubject
r := sr.Reader
defer r.Close()
// Check interest for the snapshot deliver subject.
inch := make(chan bool, 1)
acc.sl.RegisterNotification(req.DeliverSubject, inch)
defer acc.sl.ClearNotification(req.DeliverSubject, inch)
hasInterest := <-inch
if !hasInterest {
// Allow 2 seconds or so for interest to show up.
select {
case <-inch:
case <-time.After(2 * time.Second):
}
}
// Create our ack flow handler.
// This is very simple for now.
acks := make(chan struct{}, 1)
acks <- struct{}{}
ackSubj := fmt.Sprintf("_snapshot_.%s", nuid.Next())
ackSub, _ := mset.subscribeInternalUnlocked(ackSubj, func(_ *subscription, _ *client, _, _ string, _ []byte) {
acks <- struct{}{}
})
defer mset.unsubscribeUnlocked(ackSub)
// TODO(dlc) - Add in NATS-Chunked-Sequence header
// Since this is a pipe we will gather up reads and buffer internally before sending.
// bufio will not help here.
var frag [512]byte
fsize := len(frag)
if chunkSize < fsize {
fsize = chunkSize
}
chunk := make([]byte, 0, chunkSize)
for {
n, err := r.Read(frag[:fsize])
// Treat all errors the same for now, just break out.
// TODO(dlc) - when we use headers do error if not io.EOF
if err != nil {
break
}
// Check if we should send.
if len(chunk)+n > chunkSize {
// Wait on acks for flow control.
// Wait up to 10ms for now if none received.
select {
case <-acks:
case <-inch: // Lost interest
goto done
case <-time.After(10 * time.Millisecond):
}
mset.sendq <- &jsPubMsg{reply, _EMPTY_, ackSubj, chunk, nil, 0}
// Can't reuse
chunk = make([]byte, 0, chunkSize)
}
chunk = append(chunk, frag[:n]...)
}
done:
// Send last chunk and nil as EOF
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, chunk, nil, 0}
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, nil, 0}
}
// Request to create a durable consumer.
func (s *Server) jsDurableCreateRequest(sub *subscription, c *client, subject, reply string, msg []byte) {
s.jsConsumerCreate(sub, c, subject, reply, msg, true)

View File

@@ -15,7 +15,6 @@ package server
import (
"fmt"
"io"
"math/rand"
"sort"
"sync"
@@ -382,7 +381,7 @@ func (ms *memStore) ConsumerStore(_ string, _ *ConsumerConfig) (ConsumerStore, e
return &consumerMemStore{ms}, nil
}
func (ms *memStore) Snapshot(_ time.Duration, _ bool) (io.ReadCloser, error) {
func (ms *memStore) Snapshot(_ time.Duration, _ bool) (*SnapshotResult, error) {
return nil, fmt.Errorf("no impl")
}

View File

@@ -63,7 +63,7 @@ type StreamStore interface {
Delete() error
Stop() error
ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerStore, error)
Snapshot(deadline time.Duration, includeConsumers bool) (io.ReadCloser, error)
Snapshot(deadline time.Duration, includeConsumers bool) (*SnapshotResult, error)
}
// RetentionPolicy determines how messages in a set are retained.
@@ -101,6 +101,13 @@ type StreamState struct {
Consumers int `json:"consumer_count"`
}
// SnapshotResult contains information about the snapshot.
type SnapshotResult struct {
Reader io.ReadCloser
BlkSize int
NumBlks int
}
// ConsumerStore stores state on consumers for streams.
type ConsumerStore interface {
State() (*ConsumerState, error)

View File

@@ -521,6 +521,13 @@ func (mset *Stream) subscribeInternal(subject string, cb msgHandler) (*subscript
return sub, nil
}
// Helper for unlocked stream.
func (mset *Stream) subscribeInternalUnlocked(subject string, cb msgHandler) (*subscription, error) {
mset.mu.Lock()
defer mset.mu.Unlock()
return mset.subscribeInternal(subject, cb)
}
// This will unsubscribe us from the exact subject given.
// We do not currently track the subs so do not have the sid.
// This should be called only on an update.
@@ -559,6 +566,12 @@ func (mset *Stream) unsubscribe(sub *subscription) {
mset.client.unsubscribe(mset.client.acc, sub, true, true)
}
func (mset *Stream) unsubscribeUnlocked(sub *subscription) {
mset.mu.Lock()
mset.unsubscribe(sub)
mset.mu.Unlock()
}
func (mset *Stream) setupStore(fsCfg *FileStoreConfig) error {
mset.mu.Lock()
defer mset.mu.Unlock()
@@ -922,8 +935,8 @@ func (mset *Stream) ackMsg(obs *Consumer, seq uint64) {
}
}
// Snapshot creates a snapshot for the stream.
func (mset *Stream) Snapshot(deadline time.Duration, includeConsumers bool) (io.ReadCloser, error) {
// Snapshot creates a snapshot for the stream and possibly consumers.
func (mset *Stream) Snapshot(deadline time.Duration, includeConsumers bool) (*SnapshotResult, error) {
mset.mu.Lock()
if mset.client == nil || mset.store == nil {
mset.mu.Unlock()
@@ -947,7 +960,7 @@ func (mset *Stream) Snapshot(deadline time.Duration, includeConsumers bool) (io.
const snapsDir = "__snapshots__"
// RestoreStream will restore a stream from a snapshot.
func (a *Account) RestoreStream(r io.Reader) (*Stream, error) {
func (a *Account) RestoreStream(stream string, r io.Reader) (*Stream, error) {
_, jsa, err := a.checkForJetStream()
if err != nil {
return nil, err
@@ -1009,6 +1022,11 @@ func (a *Account) RestoreStream(r io.Reader) (*Stream, error) {
if err := json.Unmarshal(b, &cfg); err != nil {
return nil, err
}
// See if names match
if cfg.Name != stream {
return nil, fmt.Errorf("stream name [%q] does not match snapshot stream [%q]", stream, cfg.Name)
}
// See if this stream already exists.
if _, err := a.LookupStream(cfg.Name); err == nil {
return nil, fmt.Errorf("stream [%q] already exists", cfg.Name)

View File

@@ -2628,17 +2628,18 @@ func TestJetStreamSnapshots(t *testing.T) {
// Snapshot state of the stream and consumers.
info := info{mset.Config(), mset.State(), obs}
zr, err := mset.Snapshot(5*time.Second, true)
sr, err := mset.Snapshot(5*time.Second, true)
if err != nil {
t.Fatalf("Error getting snapshot: %v", err)
}
zr := sr.Reader
snapshot, err := ioutil.ReadAll(zr)
if err != nil {
t.Fatalf("Error reading snapshot")
}
// Try to restore from snapshot with current stream present, should error.
r := bytes.NewReader(snapshot)
if _, err := acc.RestoreStream(r); err == nil {
if _, err := acc.RestoreStream(mname, r); err == nil {
t.Fatalf("Expected an error trying to restore existing stream")
} else if !strings.Contains(err.Error(), "already exists") {
t.Fatalf("Incorrect error received: %v", err)
@@ -2648,7 +2649,13 @@ func TestJetStreamSnapshots(t *testing.T) {
mset.Delete()
r.Reset(snapshot)
mset, err = acc.RestoreStream(r)
// Now send in wrong name
if _, err := acc.RestoreStream("foo", r); err == nil {
t.Fatalf("Expected an error trying to restore stream with wrong name")
}
r.Reset(snapshot)
mset, err = acc.RestoreStream(mname, r)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -2685,7 +2692,7 @@ func TestJetStreamSnapshots(t *testing.T) {
}
acc = s2.GlobalAccount()
r.Reset(snapshot)
mset, err = acc.RestoreStream(r)
mset, err = acc.RestoreStream(mname, r)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -2699,11 +2706,182 @@ func TestJetStreamSnapshots(t *testing.T) {
defer nc2.Close()
// Make sure we can read messages.
if _, err := nc2.Request(o.RequestNextMsgSubject(), nil, time.Second); err != nil {
if _, err := nc2.Request(o.RequestNextMsgSubject(), nil, 5*time.Second); err != nil {
t.Fatalf("Unexpected error getting next message: %v", err)
}
}
func TestJetStreamSnapshotsAPI(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
mname := "MY-STREAM"
subjects := []string{"foo", "bar", "baz"}
cfg := server.StreamConfig{
Name: mname,
Storage: server.FileStorage,
Subjects: subjects,
MaxMsgs: 1000,
}
acc := s.GlobalAccount()
mset, err := acc.AddStreamWithStore(&cfg, &server.FileStoreConfig{BlockSize: 128})
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
nc := clientConnectToServer(t, s)
defer nc.Close()
toSend := rand.Intn(100) + 1
for i := 1; i <= toSend; i++ {
msg := fmt.Sprintf("Hello World %d", i)
subj := subjects[rand.Intn(len(subjects))]
sendStreamMsg(t, nc, subj, msg)
}
o, err := mset.AddConsumer(workerModeConfig("WQ"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Now grab some messages.
toReceive := rand.Intn(toSend) + 1
for r := 0; r < toReceive; r++ {
resp, err := nc.Request(o.RequestNextMsgSubject(), nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if resp != nil {
resp.Respond(nil)
}
}
// Make sure we get proper error for non-existent request, streams,etc,
rmsg, err := nc.Request(fmt.Sprintf(server.JSApiStreamSnapshotT, "foo"), nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error on snapshot request: %v", err)
}
var resp server.JSApiStreamSnapshotResponse
json.Unmarshal(rmsg.Data, &resp)
if resp.Error == nil || resp.Error.Code != 400 || resp.Error.Description != "bad request" {
t.Fatalf("Did not get correct error response: %+v", resp.Error)
}
sreq := &server.JSApiStreamSnapshotRequest{}
req, _ := json.Marshal(sreq)
rmsg, err = nc.Request(fmt.Sprintf(server.JSApiStreamSnapshotT, "foo"), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error on snapshot request: %v", err)
}
json.Unmarshal(rmsg.Data, &resp)
if resp.Error == nil || resp.Error.Code != 404 || resp.Error.Description != "stream not found" {
t.Fatalf("Did not get correct error response: %+v", resp.Error)
}
req, _ = json.Marshal(sreq)
rmsg, err = nc.Request(fmt.Sprintf(server.JSApiStreamSnapshotT, mname), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error on snapshot request: %v", err)
}
json.Unmarshal(rmsg.Data, &resp)
if resp.Error == nil || resp.Error.Code != 400 || resp.Error.Description != "bad request" {
t.Fatalf("Did not get correct error response: %+v", resp.Error)
}
// Set delivery subject, do not subscribe yet. Want this to be an ok pattern.
sreq.DeliverSubject = nats.NewInbox()
// Just for test, usually left alone.
sreq.ChunkSize = 512
req, _ = json.Marshal(sreq)
rmsg, err = nc.Request(fmt.Sprintf(server.JSApiStreamSnapshotT, mname), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error on snapshot request: %v", err)
}
json.Unmarshal(rmsg.Data, &resp)
if resp.Error == nil || resp.Error.Code != 400 || resp.Error.Description != "bad request" {
t.Fatalf("Did not get correct error response: %+v", resp.Error)
}
// Setup to process snapshot chunks.
var snapshot []byte
done := make(chan bool)
sub, _ := nc.Subscribe(sreq.DeliverSubject, func(m *nats.Msg) {
// EOF
if len(m.Data) == 0 {
m.Sub.Unsubscribe()
done <- true
return
}
// Could be writing to a file here too.
snapshot = append(snapshot, m.Data...)
// Flow ack
m.Respond(nil)
})
defer sub.Unsubscribe()
// Wait to receive the snapshot.
select {
case <-done:
case <-time.After(time.Second):
t.Fatalf("Did not receive our snapshot in time")
}
// Now make sure this snapshot is legit.
var rresp server.JSApiStreamRestoreResponse
// Make sure we get an error since stream still exists.
rmsg, err = nc.Request(fmt.Sprintf(server.JSApiStreamRestoreT, mname), nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error on snapshot request: %v", err)
}
json.Unmarshal(rmsg.Data, &rresp)
if rresp.Error == nil || rresp.Error.Code != 400 || !strings.Contains(rresp.Error.Description, "already exists") {
t.Fatalf("Did not get correct error response: %+v", rresp.Error)
}
// Grab state for comparison.
state := mset.State()
// Delete this stream.
mset.Delete()
rmsg, err = nc.Request(fmt.Sprintf(server.JSApiStreamRestoreT, mname), nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error on snapshot request: %v", err)
}
// Make sure to clear.
rresp.Error = nil
json.Unmarshal(rmsg.Data, &rresp)
if rresp.Error != nil {
t.Fatalf("Got an unexpected error response: %+v", rresp.Error)
}
r := bytes.NewReader(snapshot)
// Can be anysize message.
var chunk [512]byte
for {
n, err := r.Read(chunk[:])
if err != nil {
break
}
nc.Publish(rresp.DeliverSubject, chunk[:n])
}
nc.Publish(rresp.DeliverSubject, nil)
nc.Flush()
mset, err = acc.LookupStream(mname)
if err != nil {
t.Fatalf("Expected to find a stream for %q", mname)
}
if mset.State() != state {
t.Fatalf("Did not match states, %+v vs %+v", mset.State(), state)
}
}
func TestJetStreamActiveDelivery(t *testing.T) {
cases := []struct {
name string