diff --git a/TODO.md b/TODO.md index 7478805a..05f0a0f5 100644 --- a/TODO.md +++ b/TODO.md @@ -1,24 +1,28 @@ # General -- [X] Remove reliance on `ps` -- [X] Syslog support - [ ] SSL/TLS support -- [ ] nats-top equivalent, utils - [ ] Pedantic state -- [X] Daemon mode? Won't fix -- [ ] Connz report routes -- [ ] Info updates contain other implicit route servers -- [X] Docker - [ ] brew, apt-get, rpm, chocately (windows) - [ ] Dynamic socket buffer sizes -- [ ] Switch to 1.4 and use maps vs hashmaps -- [ ] Buffer pools? +- [ ] Switch to 1.4 and use maps vs hashmaps in sublist +- [ ] Sublist better at high concurrency +- [ ] Buffer pools/sync pools? - [ ] Add ability to reload config on signal - [ ] NewSource on Rand do lower lock contention on QueueSubs -- [ ] Add ENV support to dconf +- [ ] Add ENV and variable support to dconf - [ ] Modify cluster support for single message across routes between pub/sub and d-queue -- [ ] Client support for language and version - [ ] Place version in varz? +- [ ] Remove options in varz by default? +- [ ] Memory limits/warnings? - [ ] Gossip Protocol for discovery for clustering +- [ ] Info updates contain other implicit route servers +- [ ] Dropped message statistics +- [X] nats-top equivalent, utils +- [X] Connz report routes +- [X] Docker +- [X] Remove reliance on `ps` +- [X] Syslog support +- [X] Client support for language and version - [X] Fix benchmarks on linux +- [X] Daemon mode? Won't fix diff --git a/server/client.go b/server/client.go index 48d6c8d2..a29e7371 100644 --- a/server/client.go +++ b/server/client.go @@ -123,7 +123,7 @@ func (c *client) initClient() { // // if ip, ok := c.nc.(*net.TCPConn); ok { // ip.SetReadBuffer(defaultBufSize) - // ip.SetWriteBuffer(2*defaultBufSize) + // ip.SetWriteBuffer(2 * defaultBufSize) // } // Set the Ping timer diff --git a/server/parser.go b/server/parser.go index 40a3d7b8..a667b7cc 100644 --- a/server/parser.go +++ b/server/parser.go @@ -87,7 +87,10 @@ func (c *client) parse(buf []byte) error { // proper CONNECT if needed. authSet := c.isAuthTimerSet() - for i, b = range buf { + // Move to loop instead of range syntax to allow jumping of i + for i = 0; i < len(buf); i++ { + b = buf[i] + switch c.state { case OP_START: if b != 'C' && b != 'c' && authSet { @@ -161,6 +164,12 @@ func (c *client) parse(buf []byte) error { return err } c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD + // If we don't have a saved buffer then jump ahead with + // the index. If this overruns what is left we fall out + // and process split buffer. + if c.msgBuf == nil { + i = c.as + c.pa.size - LEN_CR_LF + } default: if c.argBuf != nil { c.argBuf = append(c.argBuf, b) @@ -168,7 +177,23 @@ func (c *client) parse(buf []byte) error { } case MSG_PAYLOAD: if c.msgBuf != nil { - c.msgBuf = append(c.msgBuf, b) + // copy as much as we can to the buffer and skip ahead. + toCopy := c.pa.size - len(c.msgBuf) + avail := len(buf) - i + if avail < toCopy { + toCopy = avail + } + if toCopy > 0 { + start := len(c.msgBuf) + // This is needed for copy to work. + c.msgBuf = c.msgBuf[:start+toCopy] + copy(c.msgBuf[start:], buf[i:i+toCopy]) + // Update our index + i = (i + toCopy) - 1 + } else { + // Fall back to append if needed. + c.msgBuf = append(c.msgBuf, b) + } if len(c.msgBuf) >= c.pa.size { c.state = MSG_END } @@ -587,7 +612,7 @@ func (c *client) parse(buf []byte) error { c.state == MSG_ARG || c.state == MINUS_ERR_ARG || c.state == CONNECT_ARG) && c.argBuf == nil { c.argBuf = c.scratch[:0] - c.argBuf = append(c.argBuf, buf[c.as:(i+1)-c.drop]...) + c.argBuf = append(c.argBuf, buf[c.as:i-c.drop]...) // FIXME, check max len } // Check for split msg @@ -597,8 +622,17 @@ func (c *client) parse(buf []byte) error { if c.argBuf == nil { c.clonePubArg() } - c.msgBuf = c.scratch[len(c.argBuf):len(c.argBuf)] - c.msgBuf = append(c.msgBuf, (buf[c.as:])...) + + // If we will overflow the scratch buffer, just create a + // new buffer to hold the split message. + if c.pa.size > cap(c.scratch)-len(c.argBuf) { + lrem := len(buf[c.as:]) + c.msgBuf = make([]byte, lrem, c.pa.size+LEN_CR_LF) + copy(c.msgBuf, buf[c.as:]) + } else { + c.msgBuf = c.scratch[len(c.argBuf):len(c.argBuf)] + c.msgBuf = append(c.msgBuf, (buf[c.as:])...) + } } return nil diff --git a/test/bench_test.go b/test/bench_test.go index 11c6b46e..92e45eae 100644 --- a/test/bench_test.go +++ b/test/bench_test.go @@ -4,10 +4,10 @@ package test import ( "bufio" - "crypto/rand" - "encoding/hex" + // "encoding/hex" "fmt" - "io" + // "io" + "math/rand" "net" "testing" "time" @@ -25,17 +25,18 @@ func runBenchServer() *server.Server { } const defaultRecBufSize = 32768 -const defaultSendBufSize = 16384 +const defaultSendBufSize = 32768 -func flushConnection(b *testing.B, c net.Conn, buf []byte) { +func flushConnection(b *testing.B, c net.Conn) { + buf := make([]byte, 32) c.Write([]byte("PING\r\n")) - c.SetReadDeadline(time.Now().Add(500 * time.Millisecond)) + c.SetReadDeadline(time.Now().Add(1 * time.Second)) n, err := c.Read(buf) c.SetReadDeadline(time.Time{}) if err != nil { b.Fatalf("Failed read: %v\n", err) } - if n != 6 && buf[0] != 'P' { + if n != 6 && buf[0] != 'P' && buf[1] != 'O' { b.Fatalf("Failed read of PONG: %s\n", buf) } } @@ -48,22 +49,25 @@ func benchPub(b *testing.B, subject, payload string) { bw := bufio.NewWriterSize(c, defaultSendBufSize) sendOp := []byte(fmt.Sprintf("PUB %s %d\r\n%s\r\n", subject, len(payload), payload)) b.SetBytes(int64(len(sendOp))) - buf := make([]byte, 1024) b.StartTimer() for i := 0; i < b.N; i++ { bw.Write(sendOp) } bw.Flush() - flushConnection(b, c, buf) + flushConnection(b, c) b.StopTimer() c.Close() s.Shutdown() } +var ch = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@$#%^&*()") + func sizedString(sz int) string { - u := make([]byte, sz) - io.ReadFull(rand.Reader, u) - return hex.EncodeToString(u) + b := make([]byte, sz) + for i := range b { + b[i] = ch[rand.Intn(len(ch))] + } + return string(b) } func Benchmark___PubNo_Payload(b *testing.B) { @@ -100,12 +104,18 @@ func Benchmark___Pub4K_Payload(b *testing.B) { benchPub(b, "a", s) } +func Benchmark___Pub8K_Payload(b *testing.B) { + b.StopTimer() + s := sizedString(8 * 1024) + benchPub(b, "a", s) +} + func drainConnection(b *testing.B, c net.Conn, ch chan bool, expected int) { buf := make([]byte, defaultRecBufSize) bytes := 0 for { - c.SetReadDeadline(time.Now().Add(500 * time.Millisecond)) + c.SetReadDeadline(time.Now().Add(5 * time.Second)) n, err := c.Read(buf) if err != nil { b.Errorf("Error on read: %v\n", err) @@ -164,6 +174,7 @@ func Benchmark__PubSubTwoConns(b *testing.B) { c2 := createClientConn(b, "localhost", PERF_PORT) doDefaultConnect(b, c2) sendProto(b, c2, "SUB foo 1\r\n") + flushConnection(b, c2) sendOp := []byte(fmt.Sprintf("PUB foo 2\r\nok\r\n")) ch := make(chan bool)