Much improved large message performance

This commit is contained in:
Derek Collison
2015-06-25 13:51:53 -07:00
parent bef1b7f77b
commit 90d1d6d572
4 changed files with 79 additions and 30 deletions

26
TODO.md
View File

@@ -1,24 +1,28 @@
# General
- [X] Remove reliance on `ps`
- [X] Syslog support
- [ ] SSL/TLS support
- [ ] nats-top equivalent, utils
- [ ] Pedantic state
- [X] Daemon mode? Won't fix
- [ ] Connz report routes
- [ ] Info updates contain other implicit route servers
- [X] Docker
- [ ] brew, apt-get, rpm, chocately (windows)
- [ ] Dynamic socket buffer sizes
- [ ] Switch to 1.4 and use maps vs hashmaps
- [ ] Buffer pools?
- [ ] Switch to 1.4 and use maps vs hashmaps in sublist
- [ ] Sublist better at high concurrency
- [ ] Buffer pools/sync pools?
- [ ] Add ability to reload config on signal
- [ ] NewSource on Rand do lower lock contention on QueueSubs
- [ ] Add ENV support to dconf
- [ ] Add ENV and variable support to dconf
- [ ] Modify cluster support for single message across routes between pub/sub and d-queue
- [ ] Client support for language and version
- [ ] Place version in varz?
- [ ] Remove options in varz by default?
- [ ] Memory limits/warnings?
- [ ] Gossip Protocol for discovery for clustering
- [ ] Info updates contain other implicit route servers
- [ ] Dropped message statistics
- [X] nats-top equivalent, utils
- [X] Connz report routes
- [X] Docker
- [X] Remove reliance on `ps`
- [X] Syslog support
- [X] Client support for language and version
- [X] Fix benchmarks on linux
- [X] Daemon mode? Won't fix

View File

@@ -123,7 +123,7 @@ func (c *client) initClient() {
//
// if ip, ok := c.nc.(*net.TCPConn); ok {
// ip.SetReadBuffer(defaultBufSize)
// ip.SetWriteBuffer(2*defaultBufSize)
// ip.SetWriteBuffer(2 * defaultBufSize)
// }
// Set the Ping timer

View File

@@ -87,7 +87,10 @@ func (c *client) parse(buf []byte) error {
// proper CONNECT if needed.
authSet := c.isAuthTimerSet()
for i, b = range buf {
// Move to loop instead of range syntax to allow jumping of i
for i = 0; i < len(buf); i++ {
b = buf[i]
switch c.state {
case OP_START:
if b != 'C' && b != 'c' && authSet {
@@ -161,6 +164,12 @@ func (c *client) parse(buf []byte) error {
return err
}
c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
// If we don't have a saved buffer then jump ahead with
// the index. If this overruns what is left we fall out
// and process split buffer.
if c.msgBuf == nil {
i = c.as + c.pa.size - LEN_CR_LF
}
default:
if c.argBuf != nil {
c.argBuf = append(c.argBuf, b)
@@ -168,7 +177,23 @@ func (c *client) parse(buf []byte) error {
}
case MSG_PAYLOAD:
if c.msgBuf != nil {
c.msgBuf = append(c.msgBuf, b)
// copy as much as we can to the buffer and skip ahead.
toCopy := c.pa.size - len(c.msgBuf)
avail := len(buf) - i
if avail < toCopy {
toCopy = avail
}
if toCopy > 0 {
start := len(c.msgBuf)
// This is needed for copy to work.
c.msgBuf = c.msgBuf[:start+toCopy]
copy(c.msgBuf[start:], buf[i:i+toCopy])
// Update our index
i = (i + toCopy) - 1
} else {
// Fall back to append if needed.
c.msgBuf = append(c.msgBuf, b)
}
if len(c.msgBuf) >= c.pa.size {
c.state = MSG_END
}
@@ -587,7 +612,7 @@ func (c *client) parse(buf []byte) error {
c.state == MSG_ARG || c.state == MINUS_ERR_ARG ||
c.state == CONNECT_ARG) && c.argBuf == nil {
c.argBuf = c.scratch[:0]
c.argBuf = append(c.argBuf, buf[c.as:(i+1)-c.drop]...)
c.argBuf = append(c.argBuf, buf[c.as:i-c.drop]...)
// FIXME, check max len
}
// Check for split msg
@@ -597,8 +622,17 @@ func (c *client) parse(buf []byte) error {
if c.argBuf == nil {
c.clonePubArg()
}
c.msgBuf = c.scratch[len(c.argBuf):len(c.argBuf)]
c.msgBuf = append(c.msgBuf, (buf[c.as:])...)
// If we will overflow the scratch buffer, just create a
// new buffer to hold the split message.
if c.pa.size > cap(c.scratch)-len(c.argBuf) {
lrem := len(buf[c.as:])
c.msgBuf = make([]byte, lrem, c.pa.size+LEN_CR_LF)
copy(c.msgBuf, buf[c.as:])
} else {
c.msgBuf = c.scratch[len(c.argBuf):len(c.argBuf)]
c.msgBuf = append(c.msgBuf, (buf[c.as:])...)
}
}
return nil

View File

@@ -4,10 +4,10 @@ package test
import (
"bufio"
"crypto/rand"
"encoding/hex"
// "encoding/hex"
"fmt"
"io"
// "io"
"math/rand"
"net"
"testing"
"time"
@@ -25,17 +25,18 @@ func runBenchServer() *server.Server {
}
const defaultRecBufSize = 32768
const defaultSendBufSize = 16384
const defaultSendBufSize = 32768
func flushConnection(b *testing.B, c net.Conn, buf []byte) {
func flushConnection(b *testing.B, c net.Conn) {
buf := make([]byte, 32)
c.Write([]byte("PING\r\n"))
c.SetReadDeadline(time.Now().Add(500 * time.Millisecond))
c.SetReadDeadline(time.Now().Add(1 * time.Second))
n, err := c.Read(buf)
c.SetReadDeadline(time.Time{})
if err != nil {
b.Fatalf("Failed read: %v\n", err)
}
if n != 6 && buf[0] != 'P' {
if n != 6 && buf[0] != 'P' && buf[1] != 'O' {
b.Fatalf("Failed read of PONG: %s\n", buf)
}
}
@@ -48,22 +49,25 @@ func benchPub(b *testing.B, subject, payload string) {
bw := bufio.NewWriterSize(c, defaultSendBufSize)
sendOp := []byte(fmt.Sprintf("PUB %s %d\r\n%s\r\n", subject, len(payload), payload))
b.SetBytes(int64(len(sendOp)))
buf := make([]byte, 1024)
b.StartTimer()
for i := 0; i < b.N; i++ {
bw.Write(sendOp)
}
bw.Flush()
flushConnection(b, c, buf)
flushConnection(b, c)
b.StopTimer()
c.Close()
s.Shutdown()
}
var ch = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@$#%^&*()")
func sizedString(sz int) string {
u := make([]byte, sz)
io.ReadFull(rand.Reader, u)
return hex.EncodeToString(u)
b := make([]byte, sz)
for i := range b {
b[i] = ch[rand.Intn(len(ch))]
}
return string(b)
}
func Benchmark___PubNo_Payload(b *testing.B) {
@@ -100,12 +104,18 @@ func Benchmark___Pub4K_Payload(b *testing.B) {
benchPub(b, "a", s)
}
func Benchmark___Pub8K_Payload(b *testing.B) {
b.StopTimer()
s := sizedString(8 * 1024)
benchPub(b, "a", s)
}
func drainConnection(b *testing.B, c net.Conn, ch chan bool, expected int) {
buf := make([]byte, defaultRecBufSize)
bytes := 0
for {
c.SetReadDeadline(time.Now().Add(500 * time.Millisecond))
c.SetReadDeadline(time.Now().Add(5 * time.Second))
n, err := c.Read(buf)
if err != nil {
b.Errorf("Error on read: %v\n", err)
@@ -164,6 +174,7 @@ func Benchmark__PubSubTwoConns(b *testing.B) {
c2 := createClientConn(b, "localhost", PERF_PORT)
doDefaultConnect(b, c2)
sendProto(b, c2, "SUB foo 1\r\n")
flushConnection(b, c2)
sendOp := []byte(fmt.Sprintf("PUB foo 2\r\nok\r\n"))
ch := make(chan bool)