From 0a206b4c6403b3f31d078b87598d265c8bdba18e Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 29 May 2020 07:40:10 -0700 Subject: [PATCH] Snapshot performance tweaks Signed-off-by: Derek Collison --- server/filestore.go | 6 +++- server/filestore_test.go | 11 +++--- server/jetstream_api.go | 58 ++++++++++++++++-------------- test/jetstream_test.go | 78 ++++++++++++++++++++++++++++++++++++++-- 4 files changed, 116 insertions(+), 37 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 830b482a..64742bfb 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1948,7 +1948,10 @@ const errFile = "errors.txt" func (fs *fileStore) streamSnapshot(w io.WriteCloser, blks []*msgBlock, includeConsumers bool) { defer w.Close() - gzw := gzip.NewWriter(w) + bw := bufio.NewWriter(w) + defer bw.Flush() + + gzw, _ := gzip.NewWriterLevel(bw, gzip.BestSpeed) defer gzw.Close() tw := tar.NewWriter(gzw) @@ -2117,6 +2120,7 @@ func (fs *fileStore) Snapshot(deadline time.Duration, includeConsumers bool) (*S fs.mu.Unlock() pr, pw := net.Pipe() + // Set a write deadline here to protect ourselves. if deadline > 0 { pw.SetWriteDeadline(time.Now().Add(deadline)) diff --git a/server/filestore_test.go b/server/filestore_test.go index 6f61a720..03002f2d 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -1233,18 +1233,15 @@ func TestFileStoreSnapshot(t *testing.T) { }) // Make sure if we do not read properly then it will close the writer and report an error. - sr, err = fs.Snapshot(10*time.Millisecond, false) + sr, err = fs.Snapshot(25*time.Millisecond, false) if err != nil { t.Fatalf("Error creating snapshot") } - var buf [32]byte - if n, err := sr.Reader.Read(buf[:]); err != nil || n == 0 { - t.Fatalf("Expected to read beginning, got %v and %d", err, n) - } // Cause snapshot to timeout. - time.Sleep(20 * time.Millisecond) - // Read again should fail + time.Sleep(30 * time.Millisecond) + // Read should fail + var buf [32]byte if _, err := sr.Reader.Read(buf[:]); err != io.EOF { t.Fatalf("Expected read to produce an error, got none") } diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 618fe70d..0404283b 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -23,6 +23,7 @@ import ( "sort" "strconv" "strings" + "sync/atomic" "time" "github.com/nats-io/nuid" @@ -1198,6 +1199,7 @@ func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, subject, // Default chunk size for now. const defaultSnapshotChunkSize = 128 * 1024 +const defaultSnapshotWindowSize = 16 * 1024 * 1024 // 16MB // streamSnapshot will stream out our snapshot to the reply subject. func (s *Server) streamSnapshot(c *client, mset *Stream, sr *SnapshotResult, req *JSApiStreamSnapshotRequest) { @@ -1229,50 +1231,52 @@ func (s *Server) streamSnapshot(c *client, mset *Stream, sr *SnapshotResult, req acks := make(chan struct{}, 1) acks <- struct{}{} + // Track bytes outstanding. + var out int32 + + // We will place sequence number and size of chunk sent in the reply. ackSubj := fmt.Sprintf(jsSnapshotAckT, mset.Name(), nuid.Next()) - ackSub, _ := mset.subscribeInternalUnlocked(ackSubj, func(_ *subscription, _ *client, _, _ string, _ []byte) { - acks <- struct{}{} + ackSub, _ := mset.subscribeInternalUnlocked(ackSubj+".>", func(_ *subscription, _ *client, subject, _ string, _ []byte) { + // This is very crude and simple, but ok for now. + // This only matters when sending multiple chunks. + if atomic.LoadInt32(&out) > defaultSnapshotWindowSize { + acks <- struct{}{} + } + cs, _ := strconv.Atoi(tokenAt(subject, 6)) + atomic.AddInt32(&out, int32(-cs)) }) defer mset.unsubscribeUnlocked(ackSub) // TODO(dlc) - Add in NATS-Chunked-Sequence header - // Since this is a pipe we will gather up reads and buffer internally before sending. - // bufio will not help here. - var frag [512]byte - fsize := len(frag) - if chunkSize < fsize { - fsize = chunkSize - } - chunk := make([]byte, 0, chunkSize) - - for { - n, err := r.Read(frag[:fsize]) - // Treat all errors the same for now, just break out. - // TODO(dlc) - when we use headers do error if not io.EOF + for index := 1; ; index++ { + chunk := make([]byte, chunkSize) + n, err := r.Read(chunk) + chunk = chunk[:n] if err != nil { + if n > 0 { + mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, chunk, nil, 0} + } break } - // Check if we should send. - if len(chunk)+n > chunkSize { - // Wait on acks for flow control. - // Wait up to 10ms for now if none received. + + // Wait on acks for flow control if past our window size. + // Wait up to 1ms for now if no acks received. + if atomic.LoadInt32(&out) > defaultSnapshotWindowSize { select { case <-acks: case <-inch: // Lost interest goto done - case <-time.After(10 * time.Millisecond): + case <-time.After(time.Millisecond): } - // TODO(dlc) - Might want these moved off sendq if we have contention. - mset.sendq <- &jsPubMsg{reply, _EMPTY_, ackSubj, chunk, nil, 0} - // Can't reuse - chunk = make([]byte, 0, chunkSize) } - chunk = append(chunk, frag[:n]...) + // TODO(dlc) - Might want these moved off sendq if we have contention. + ackReply := fmt.Sprintf("%s.%d.%d", ackSubj, len(chunk), index) + mset.sendq <- &jsPubMsg{reply, _EMPTY_, ackReply, chunk, nil, 0} + atomic.AddInt32(&out, int32(len(chunk))) } done: - // Send last chunk and nil as EOF - mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, chunk, nil, 0} + // Send last EOF mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, nil, 0} } diff --git a/test/jetstream_test.go b/test/jetstream_test.go index 2170d474..f16ecbe5 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -2898,7 +2898,7 @@ func TestJetStreamSnapshotsAPI(t *testing.T) { // Set delivery subject, do not subscribe yet. Want this to be an ok pattern. sreq.DeliverSubject = nats.NewInbox() // Just for test, usually left alone. - sreq.ChunkSize = 512 + sreq.ChunkSize = 1024 req, _ = json.Marshal(sreq) rmsg, err = nc.Request(fmt.Sprintf(server.JSApiStreamSnapshotT, mname), req, time.Second) if err != nil { @@ -2936,7 +2936,6 @@ func TestJetStreamSnapshotsAPI(t *testing.T) { } // Now make sure this snapshot is legit. - var rresp server.JSApiStreamRestoreResponse // Make sure we get an error since stream still exists. @@ -2986,6 +2985,81 @@ func TestJetStreamSnapshotsAPI(t *testing.T) { } } +func TestJetStreamSnapshotsAPIPerf(t *testing.T) { + // Comment out to run, holding place for now. + t.SkipNow() + + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + cfg := server.StreamConfig{ + Name: "snap-perf", + Storage: server.FileStorage, + } + + acc := s.GlobalAccount() + if _, err := acc.AddStream(&cfg); err != nil { + t.Fatalf("Unexpected error adding stream: %v", err) + } + + nc := clientConnectToServer(t, s) + defer nc.Close() + + msg := make([]byte, 128*1024) + // If you don't give gzip some data will spend too much time compressing everything to zero. + rand.Read(msg) + + for i := 0; i < 10000; i++ { + nc.Publish("snap-perf", msg) + } + nc.Flush() + + sreq := &server.JSApiStreamSnapshotRequest{DeliverSubject: nats.NewInbox()} + req, _ := json.Marshal(sreq) + rmsg, err := nc.Request(fmt.Sprintf(server.JSApiStreamSnapshotT, "snap-perf"), req, time.Second) + if err != nil { + t.Fatalf("Unexpected error on snapshot request: %v", err) + } + + var resp server.JSApiStreamSnapshotResponse + json.Unmarshal(rmsg.Data, &resp) + if resp.Error != nil { + t.Fatalf("Did not get correct error response: %+v", resp.Error) + } + + done := make(chan bool) + total := 0 + sub, _ := nc.Subscribe(sreq.DeliverSubject, func(m *nats.Msg) { + // EOF + if len(m.Data) == 0 { + m.Sub.Unsubscribe() + done <- true + return + } + // We don't do anything with the snapshot, just take + // note of the size. + total += len(m.Data) + // Flow ack + m.Respond(nil) + }) + defer sub.Unsubscribe() + + start := time.Now() + // Wait to receive the snapshot. + select { + case <-done: + case <-time.After(30 * time.Second): + t.Fatalf("Did not receive our snapshot in time") + } + td := time.Since(start) + fmt.Printf("Received %d bytes in %v\n", total, td) + fmt.Printf("Rate %.0f MB/s\n", float64(total)/td.Seconds()/(1024*1024)) +} + func TestJetStreamActiveDelivery(t *testing.T) { cases := []struct { name string