diff --git a/server/client.go b/server/client.go index 4d79d9ce..b3f5f1cf 100644 --- a/server/client.go +++ b/server/client.go @@ -306,7 +306,7 @@ func (c *client) readLoop() { wfc := cp.wfc cp.wfc = 0 - cp.nc.SetWriteDeadline(time.Now().Add(DEFAULT_FLUSH_DEADLINE)) + cp.nc.SetWriteDeadline(time.Now().Add(s.opts.WriteDeadline)) err := cp.bw.Flush() cp.nc.SetWriteDeadline(time.Time{}) if err != nil { @@ -520,7 +520,7 @@ func (c *client) sendProto(info []byte, doFlush bool) error { if c.bw != nil && c.nc != nil { deadlineSet := false if doFlush || c.bw.Available() < len(info) { - c.nc.SetWriteDeadline(time.Now().Add(DEFAULT_FLUSH_DEADLINE)) + c.nc.SetWriteDeadline(time.Now().Add(c.srv.opts.WriteDeadline)) deadlineSet = true } _, err = c.bw.Write(info) @@ -941,8 +941,8 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) { deadlineSet := false if client.bw.Available() < (len(mh) + len(msg)) { - client.wfc += 1 - client.nc.SetWriteDeadline(time.Now().Add(DEFAULT_FLUSH_DEADLINE)) + client.wfc++ + client.nc.SetWriteDeadline(time.Now().Add(client.srv.opts.WriteDeadline)) deadlineSet = true } @@ -1260,7 +1260,7 @@ func (c *client) clearConnection() { // With TLS, Close() is sending an alert (that is doing a write). // Need to set a deadline otherwise the server could block there // if the peer is not reading from socket. - c.nc.SetWriteDeadline(time.Now().Add(DEFAULT_FLUSH_DEADLINE)) + c.nc.SetWriteDeadline(time.Now().Add(c.srv.opts.WriteDeadline)) if c.bw != nil { c.bw.Flush() } diff --git a/server/configs/test.conf b/server/configs/test.conf index f5c163ba..e877aeb5 100644 --- a/server/configs/test.conf +++ b/server/configs/test.conf @@ -37,3 +37,6 @@ max_payload: 65536 # ping interval and no pong threshold ping_interval: 60 ping_max: 3 + +# how long server can block on a socket write to a client +write_deadline: 3 diff --git a/server/opts.go b/server/opts.go index 70ca94f2..8b97fa45 100644 --- a/server/opts.go +++ b/server/opts.go @@ -82,6 +82,7 @@ type Options struct { TLSKey string `json:"-"` TLSCaCert string `json:"-"` TLSConfig *tls.Config `json:"-"` + WriteDeadline time.Duration `json:"-"` } // Configuration file authorization section. @@ -234,6 +235,8 @@ func ProcessConfigFile(configFile string) (*Options, error) { return nil, err } opts.TLSTimeout = tc.Timeout + case "write_deadline": + opts.WriteDeadline = time.Duration(v.(int64)) * time.Second } } return opts, nil @@ -841,4 +844,7 @@ func processOptions(opts *Options) { if opts.MaxPayload == 0 { opts.MaxPayload = MAX_PAYLOAD_SIZE } + if opts.WriteDeadline == time.Duration(0) { + opts.WriteDeadline = DEFAULT_FLUSH_DEADLINE + } } diff --git a/server/opts_test.go b/server/opts_test.go index 2554335f..283463ca 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -27,6 +27,7 @@ func TestDefaultOptions(t *testing.T) { AuthTimeout: float64(AUTH_TIMEOUT) / float64(time.Second), TLSTimeout: float64(TLS_TIMEOUT) / float64(time.Second), }, + WriteDeadline: DEFAULT_FLUSH_DEADLINE, } opts := &Options{} @@ -69,6 +70,7 @@ func TestConfigFile(t *testing.T) { MaxConn: 100, PingInterval: 60 * time.Second, MaxPingsOut: 3, + WriteDeadline: 3 * time.Second, } opts, err := ProcessConfigFile("./configs/test.conf") @@ -230,6 +232,7 @@ func TestMergeOverrides(t *testing.T) { NoAdvertise: true, ConnectRetries: 2, }, + WriteDeadline: 3 * time.Second, } fopts, err := ProcessConfigFile("./configs/test.conf") if err != nil { diff --git a/server/server_test.go b/server/server_test.go index 879759cd..66053b57 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -275,3 +275,56 @@ func TestProcessCommandLineArgs(t *testing.T) { t.Errorf("Expected an error handling the command arguments") } } + +func TestWriteDeadline(t *testing.T) { + opts := DefaultOptions + opts.WriteDeadline = 20 * time.Millisecond + s := RunServer(&opts) + defer s.Shutdown() + + c, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", opts.Host, opts.Port), 3*time.Second) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer c.Close() + if _, err := c.Write([]byte("CONNECT {}\r\nPING\r\nSUB foo 1\r\n")); err != nil { + t.Fatalf("Error sending protocols to server: %v", err) + } + // Reduce socket buffer to increase reliability of getting + // write deadline errors. + c.(*net.TCPConn).SetReadBuffer(10) + + url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) + sender, err := nats.Connect(url) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer sender.Close() + + payload := make([]byte, 1000000) + start := time.Now() + for i := 0; i < 10; i++ { + if err := sender.Publish("foo", payload); err != nil { + t.Fatalf("Error on publish: %v", err) + } + } + dur := time.Now().Sub(start) + // user more than the write deadline to account for calls + // overhead, running with -race, etc... + if dur > 100*time.Millisecond { + t.Fatalf("Flush should have returned sooner, took: %v", dur) + } + // Flush sender connection to ensure that all data has been sent. + sender.Flush() + // At this point server should have closed connection c. + + // On certain platforms, it may take more than one call before + // getting the error. + for i := 0; i < 100; i++ { + if _, err := c.Write([]byte("PUB bar 5\r\nhello\r\n")); err != nil { + // ok + return + } + } + t.Fatal("Connection should have been closed") +}