BufSize option, bump for go1.5.1, bump version

This commit is contained in:
Derek Collison
2015-09-18 07:41:33 -07:00
parent f26e2e4410
commit fe3b8f2aa1
9 changed files with 23 additions and 11 deletions

View File

@@ -1,4 +1,4 @@
FROM golang:1.5
FROM golang:1.5.1
MAINTAINER Derek Collison <derek@apcera.com>
@@ -10,4 +10,3 @@ RUN CGO_ENABLED=0 go install -v -a -tags netgo -installsuffix netgo -ldflags "-s
EXPOSE 4222 8222
ENTRYPOINT ["gnatsd"]
CMD ["--help"]

View File

@@ -53,6 +53,10 @@ func main() {
flag.IntVar(&opts.ProfPort, "profile", 0, "Profiling HTTP port")
flag.StringVar(&opts.RoutesStr, "routes", "", "Routes to actively solicit a connection.")
// Not public per se, will be replaced with dynamic system, but can be used to lower memory footprint when
// lots of connections present.
flag.IntVar(&opts.BufSize, "bs", 0, "Read/Write buffer size per client connection.")
flag.Usage = server.Usage
flag.Parse()

View File

@@ -17,8 +17,6 @@ import (
)
const (
// The size of the bufio reader/writer on top of the socket.
defaultBufSize = 32768
// Scratch buffer size for the processMsg() calls.
msgScratchSize = 512
msgHeadProto = "MSG "
@@ -94,7 +92,7 @@ func init() {
func (c *client) initClient() {
s := c.srv
c.cid = atomic.AddUint64(&s.gcid, 1)
c.bw = bufio.NewWriterSize(c.nc, defaultBufSize)
c.bw = bufio.NewWriterSize(c.nc, s.opts.BufSize)
c.subs = hashmap.New()
// This is a scratch buffer used for processMsg()
@@ -123,8 +121,8 @@ func (c *client) initClient() {
// No clue why, but this stalls and kills performance on Mac (Mavericks).
//
// if ip, ok := c.nc.(*net.TCPConn); ok {
// ip.SetReadBuffer(defaultBufSize)
// ip.SetWriteBuffer(2 * defaultBufSize)
// ip.SetReadBuffer(s.opts.BufSize)
// ip.SetWriteBuffer(2 * s.opts.BufSize)
// }
// Set the Ping timer
@@ -139,13 +137,14 @@ func (c *client) readLoop() {
// We check for that after the loop, but want to avoid a nil dereference
c.mu.Lock()
nc := c.nc
s := c.srv
c.mu.Unlock()
if nc == nil {
return
}
b := make([]byte, defaultBufSize)
b := make([]byte, s.opts.BufSize)
for {
n, err := nc.Read(b)

View File

@@ -46,7 +46,7 @@ var defaultServerOptions = Options{
func rawSetup(serverOptions Options) (*Server, *client, *bufio.Reader, string) {
cli, srv := net.Pipe()
cr := bufio.NewReaderSize(cli, defaultBufSize)
cr := bufio.NewReaderSize(cli, DEFAULT_BUF_SIZE)
s := New(&serverOptions)
if serverOptions.Authorization != "" {
s.SetAuthMethod(&mockAuth{})

View File

@@ -8,7 +8,7 @@ import (
const (
// VERSION is the current version for the server.
VERSION = "0.6.6"
VERSION = "0.6.8"
// DEFAULT_PORT is the deault port for client connections.
DEFAULT_PORT = 4222
@@ -82,4 +82,8 @@ const (
// MAX_PUB_ARGS Maximum possible number of arguments from PUB proto.
MAX_PUB_ARGS = 3
// Default Buffer size for reads and writes per connection. Will be replaced by dynamic
// system in the long run.
DEFAULT_BUF_SIZE = 32768
)

View File

@@ -47,6 +47,7 @@ type Options struct {
RemoteSyslog string `json:"-"`
Routes []*url.URL `json:"-"`
RoutesStr string `json:"-"`
BufSize int `json:"-"`
}
type authorization struct {
@@ -366,4 +367,7 @@ func processOptions(opts *Options) {
if opts.MaxPending == 0 {
opts.MaxPending = MAX_PENDING_SIZE
}
if opts.BufSize == 0 {
opts.BufSize = DEFAULT_BUF_SIZE
}
}

View File

@@ -22,6 +22,7 @@ func TestDefaultOptions(t *testing.T) {
MaxPayload: MAX_PAYLOAD_SIZE,
MaxPending: MAX_PENDING_SIZE,
ClusterAuthTimeout: float64(AUTH_TIMEOUT) / float64(time.Second),
BufSize: DEFAULT_BUF_SIZE,
}
opts := &Options{}

View File

@@ -278,6 +278,7 @@ func (s *Server) AcceptLoop() {
Noticef("Listening for client connections on %s", hp)
l, e := net.Listen("tcp", hp)
if e != nil {
fmt.Printf("could not listen on port for %s, %v\n", hp, e)
Fatalf("Error listening on port: %s, %q", hp, e)
return
}

View File

@@ -78,7 +78,7 @@ func RunServerWithAuth(opts *server.Options, auth server.Auth) *server.Server {
for time.Now().Before(end) {
addr := s.Addr()
if addr == nil {
time.Sleep(10 * time.Millisecond)
time.Sleep(50 * time.Millisecond)
// Retry. We might take a little while to open a connection.
continue
}