From 95d0152449f1510537708a3e71e15efea12cfdf7 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 18 Jan 2017 19:27:42 -0700 Subject: [PATCH] [ADDED] Make Write deadline configurable We use a hardcoded value of 2 seconds for Write deadline when writing data to client's socket. This PR makes that value configurable. Question is should we push the setting down to the client's object to avoid indirection such as client.srv.opts.WriteDeadline? --- server/client.go | 10 ++++---- server/configs/test.conf | 3 +++ server/opts.go | 6 +++++ server/opts_test.go | 3 +++ server/server_test.go | 53 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 70 insertions(+), 5 deletions(-) diff --git a/server/client.go b/server/client.go index 4d79d9ce..b3f5f1cf 100644 --- a/server/client.go +++ b/server/client.go @@ -306,7 +306,7 @@ func (c *client) readLoop() { wfc := cp.wfc cp.wfc = 0 - cp.nc.SetWriteDeadline(time.Now().Add(DEFAULT_FLUSH_DEADLINE)) + cp.nc.SetWriteDeadline(time.Now().Add(s.opts.WriteDeadline)) err := cp.bw.Flush() cp.nc.SetWriteDeadline(time.Time{}) if err != nil { @@ -520,7 +520,7 @@ func (c *client) sendProto(info []byte, doFlush bool) error { if c.bw != nil && c.nc != nil { deadlineSet := false if doFlush || c.bw.Available() < len(info) { - c.nc.SetWriteDeadline(time.Now().Add(DEFAULT_FLUSH_DEADLINE)) + c.nc.SetWriteDeadline(time.Now().Add(c.srv.opts.WriteDeadline)) deadlineSet = true } _, err = c.bw.Write(info) @@ -941,8 +941,8 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) { deadlineSet := false if client.bw.Available() < (len(mh) + len(msg)) { - client.wfc += 1 - client.nc.SetWriteDeadline(time.Now().Add(DEFAULT_FLUSH_DEADLINE)) + client.wfc++ + client.nc.SetWriteDeadline(time.Now().Add(client.srv.opts.WriteDeadline)) deadlineSet = true } @@ -1260,7 +1260,7 @@ func (c *client) clearConnection() { // With TLS, Close() is sending an alert (that is doing a write). // Need to set a deadline otherwise the server could block there // if the peer is not reading from socket. - c.nc.SetWriteDeadline(time.Now().Add(DEFAULT_FLUSH_DEADLINE)) + c.nc.SetWriteDeadline(time.Now().Add(c.srv.opts.WriteDeadline)) if c.bw != nil { c.bw.Flush() } diff --git a/server/configs/test.conf b/server/configs/test.conf index f5c163ba..e877aeb5 100644 --- a/server/configs/test.conf +++ b/server/configs/test.conf @@ -37,3 +37,6 @@ max_payload: 65536 # ping interval and no pong threshold ping_interval: 60 ping_max: 3 + +# how long server can block on a socket write to a client +write_deadline: 3 diff --git a/server/opts.go b/server/opts.go index 70ca94f2..8b97fa45 100644 --- a/server/opts.go +++ b/server/opts.go @@ -82,6 +82,7 @@ type Options struct { TLSKey string `json:"-"` TLSCaCert string `json:"-"` TLSConfig *tls.Config `json:"-"` + WriteDeadline time.Duration `json:"-"` } // Configuration file authorization section. @@ -234,6 +235,8 @@ func ProcessConfigFile(configFile string) (*Options, error) { return nil, err } opts.TLSTimeout = tc.Timeout + case "write_deadline": + opts.WriteDeadline = time.Duration(v.(int64)) * time.Second } } return opts, nil @@ -841,4 +844,7 @@ func processOptions(opts *Options) { if opts.MaxPayload == 0 { opts.MaxPayload = MAX_PAYLOAD_SIZE } + if opts.WriteDeadline == time.Duration(0) { + opts.WriteDeadline = DEFAULT_FLUSH_DEADLINE + } } diff --git a/server/opts_test.go b/server/opts_test.go index 2554335f..283463ca 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -27,6 +27,7 @@ func TestDefaultOptions(t *testing.T) { AuthTimeout: float64(AUTH_TIMEOUT) / float64(time.Second), TLSTimeout: float64(TLS_TIMEOUT) / float64(time.Second), }, + WriteDeadline: DEFAULT_FLUSH_DEADLINE, } opts := &Options{} @@ -69,6 +70,7 @@ func TestConfigFile(t *testing.T) { MaxConn: 100, PingInterval: 60 * time.Second, MaxPingsOut: 3, + WriteDeadline: 3 * time.Second, } opts, err := ProcessConfigFile("./configs/test.conf") @@ -230,6 +232,7 @@ func TestMergeOverrides(t *testing.T) { NoAdvertise: true, ConnectRetries: 2, }, + WriteDeadline: 3 * time.Second, } fopts, err := ProcessConfigFile("./configs/test.conf") if err != nil { diff --git a/server/server_test.go b/server/server_test.go index 879759cd..66053b57 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -275,3 +275,56 @@ func TestProcessCommandLineArgs(t *testing.T) { t.Errorf("Expected an error handling the command arguments") } } + +func TestWriteDeadline(t *testing.T) { + opts := DefaultOptions + opts.WriteDeadline = 20 * time.Millisecond + s := RunServer(&opts) + defer s.Shutdown() + + c, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", opts.Host, opts.Port), 3*time.Second) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer c.Close() + if _, err := c.Write([]byte("CONNECT {}\r\nPING\r\nSUB foo 1\r\n")); err != nil { + t.Fatalf("Error sending protocols to server: %v", err) + } + // Reduce socket buffer to increase reliability of getting + // write deadline errors. + c.(*net.TCPConn).SetReadBuffer(10) + + url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) + sender, err := nats.Connect(url) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer sender.Close() + + payload := make([]byte, 1000000) + start := time.Now() + for i := 0; i < 10; i++ { + if err := sender.Publish("foo", payload); err != nil { + t.Fatalf("Error on publish: %v", err) + } + } + dur := time.Now().Sub(start) + // user more than the write deadline to account for calls + // overhead, running with -race, etc... + if dur > 100*time.Millisecond { + t.Fatalf("Flush should have returned sooner, took: %v", dur) + } + // Flush sender connection to ensure that all data has been sent. + sender.Flush() + // At this point server should have closed connection c. + + // On certain platforms, it may take more than one call before + // getting the error. + for i := 0; i < 100; i++ { + if _, err := c.Write([]byte("PUB bar 5\r\nhello\r\n")); err != nil { + // ok + return + } + } + t.Fatal("Connection should have been closed") +}