Merge pull request #1431 from nats-io/snap-perf

Snapshot performance tweaks
This commit is contained in:
Derek Collison
2020-05-29 08:29:47 -07:00
committed by GitHub
4 changed files with 116 additions and 37 deletions

View File

@@ -1948,7 +1948,10 @@ const errFile = "errors.txt"
func (fs *fileStore) streamSnapshot(w io.WriteCloser, blks []*msgBlock, includeConsumers bool) {
defer w.Close()
gzw := gzip.NewWriter(w)
bw := bufio.NewWriter(w)
defer bw.Flush()
gzw, _ := gzip.NewWriterLevel(bw, gzip.BestSpeed)
defer gzw.Close()
tw := tar.NewWriter(gzw)
@@ -2117,6 +2120,7 @@ func (fs *fileStore) Snapshot(deadline time.Duration, includeConsumers bool) (*S
fs.mu.Unlock()
pr, pw := net.Pipe()
// Set a write deadline here to protect ourselves.
if deadline > 0 {
pw.SetWriteDeadline(time.Now().Add(deadline))

View File

@@ -1233,18 +1233,15 @@ func TestFileStoreSnapshot(t *testing.T) {
})
// Make sure if we do not read properly then it will close the writer and report an error.
sr, err = fs.Snapshot(10*time.Millisecond, false)
sr, err = fs.Snapshot(25*time.Millisecond, false)
if err != nil {
t.Fatalf("Error creating snapshot")
}
var buf [32]byte
if n, err := sr.Reader.Read(buf[:]); err != nil || n == 0 {
t.Fatalf("Expected to read beginning, got %v and %d", err, n)
}
// Cause snapshot to timeout.
time.Sleep(20 * time.Millisecond)
// Read again should fail
time.Sleep(30 * time.Millisecond)
// Read should fail
var buf [32]byte
if _, err := sr.Reader.Read(buf[:]); err != io.EOF {
t.Fatalf("Expected read to produce an error, got none")
}

View File

@@ -23,6 +23,7 @@ import (
"sort"
"strconv"
"strings"
"sync/atomic"
"time"
"github.com/nats-io/nuid"
@@ -1198,6 +1199,7 @@ func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, subject,
// Default chunk size for now.
const defaultSnapshotChunkSize = 128 * 1024
const defaultSnapshotWindowSize = 16 * 1024 * 1024 // 16MB
// streamSnapshot will stream out our snapshot to the reply subject.
func (s *Server) streamSnapshot(c *client, mset *Stream, sr *SnapshotResult, req *JSApiStreamSnapshotRequest) {
@@ -1229,50 +1231,52 @@ func (s *Server) streamSnapshot(c *client, mset *Stream, sr *SnapshotResult, req
acks := make(chan struct{}, 1)
acks <- struct{}{}
// Track bytes outstanding.
var out int32
// We will place sequence number and size of chunk sent in the reply.
ackSubj := fmt.Sprintf(jsSnapshotAckT, mset.Name(), nuid.Next())
ackSub, _ := mset.subscribeInternalUnlocked(ackSubj, func(_ *subscription, _ *client, _, _ string, _ []byte) {
acks <- struct{}{}
ackSub, _ := mset.subscribeInternalUnlocked(ackSubj+".>", func(_ *subscription, _ *client, subject, _ string, _ []byte) {
// This is very crude and simple, but ok for now.
// This only matters when sending multiple chunks.
if atomic.LoadInt32(&out) > defaultSnapshotWindowSize {
acks <- struct{}{}
}
cs, _ := strconv.Atoi(tokenAt(subject, 6))
atomic.AddInt32(&out, int32(-cs))
})
defer mset.unsubscribeUnlocked(ackSub)
// TODO(dlc) - Add in NATS-Chunked-Sequence header
// Since this is a pipe we will gather up reads and buffer internally before sending.
// bufio will not help here.
var frag [512]byte
fsize := len(frag)
if chunkSize < fsize {
fsize = chunkSize
}
chunk := make([]byte, 0, chunkSize)
for {
n, err := r.Read(frag[:fsize])
// Treat all errors the same for now, just break out.
// TODO(dlc) - when we use headers do error if not io.EOF
for index := 1; ; index++ {
chunk := make([]byte, chunkSize)
n, err := r.Read(chunk)
chunk = chunk[:n]
if err != nil {
if n > 0 {
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, chunk, nil, 0}
}
break
}
// Check if we should send.
if len(chunk)+n > chunkSize {
// Wait on acks for flow control.
// Wait up to 10ms for now if none received.
// Wait on acks for flow control if past our window size.
// Wait up to 1ms for now if no acks received.
if atomic.LoadInt32(&out) > defaultSnapshotWindowSize {
select {
case <-acks:
case <-inch: // Lost interest
goto done
case <-time.After(10 * time.Millisecond):
case <-time.After(time.Millisecond):
}
// TODO(dlc) - Might want these moved off sendq if we have contention.
mset.sendq <- &jsPubMsg{reply, _EMPTY_, ackSubj, chunk, nil, 0}
// Can't reuse
chunk = make([]byte, 0, chunkSize)
}
chunk = append(chunk, frag[:n]...)
// TODO(dlc) - Might want these moved off sendq if we have contention.
ackReply := fmt.Sprintf("%s.%d.%d", ackSubj, len(chunk), index)
mset.sendq <- &jsPubMsg{reply, _EMPTY_, ackReply, chunk, nil, 0}
atomic.AddInt32(&out, int32(len(chunk)))
}
done:
// Send last chunk and nil as EOF
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, chunk, nil, 0}
// Send last EOF
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, nil, 0}
}

View File

@@ -2898,7 +2898,7 @@ func TestJetStreamSnapshotsAPI(t *testing.T) {
// Set delivery subject, do not subscribe yet. Want this to be an ok pattern.
sreq.DeliverSubject = nats.NewInbox()
// Just for test, usually left alone.
sreq.ChunkSize = 512
sreq.ChunkSize = 1024
req, _ = json.Marshal(sreq)
rmsg, err = nc.Request(fmt.Sprintf(server.JSApiStreamSnapshotT, mname), req, time.Second)
if err != nil {
@@ -2936,7 +2936,6 @@ func TestJetStreamSnapshotsAPI(t *testing.T) {
}
// Now make sure this snapshot is legit.
var rresp server.JSApiStreamRestoreResponse
// Make sure we get an error since stream still exists.
@@ -2986,6 +2985,81 @@ func TestJetStreamSnapshotsAPI(t *testing.T) {
}
}
func TestJetStreamSnapshotsAPIPerf(t *testing.T) {
// Comment out to run, holding place for now.
t.SkipNow()
s := RunBasicJetStreamServer()
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
cfg := server.StreamConfig{
Name: "snap-perf",
Storage: server.FileStorage,
}
acc := s.GlobalAccount()
if _, err := acc.AddStream(&cfg); err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
nc := clientConnectToServer(t, s)
defer nc.Close()
msg := make([]byte, 128*1024)
// If you don't give gzip some data will spend too much time compressing everything to zero.
rand.Read(msg)
for i := 0; i < 10000; i++ {
nc.Publish("snap-perf", msg)
}
nc.Flush()
sreq := &server.JSApiStreamSnapshotRequest{DeliverSubject: nats.NewInbox()}
req, _ := json.Marshal(sreq)
rmsg, err := nc.Request(fmt.Sprintf(server.JSApiStreamSnapshotT, "snap-perf"), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error on snapshot request: %v", err)
}
var resp server.JSApiStreamSnapshotResponse
json.Unmarshal(rmsg.Data, &resp)
if resp.Error != nil {
t.Fatalf("Did not get correct error response: %+v", resp.Error)
}
done := make(chan bool)
total := 0
sub, _ := nc.Subscribe(sreq.DeliverSubject, func(m *nats.Msg) {
// EOF
if len(m.Data) == 0 {
m.Sub.Unsubscribe()
done <- true
return
}
// We don't do anything with the snapshot, just take
// note of the size.
total += len(m.Data)
// Flow ack
m.Respond(nil)
})
defer sub.Unsubscribe()
start := time.Now()
// Wait to receive the snapshot.
select {
case <-done:
case <-time.After(30 * time.Second):
t.Fatalf("Did not receive our snapshot in time")
}
td := time.Since(start)
fmt.Printf("Received %d bytes in %v\n", total, td)
fmt.Printf("Rate %.0f MB/s\n", float64(total)/td.Seconds()/(1024*1024))
}
func TestJetStreamActiveDelivery(t *testing.T) {
cases := []struct {
name string