[ADDED] Make Write deadline configurable

We use a hardcoded value of 2 seconds for Write deadline when
writing data to client's socket.
This PR makes that value configurable.

Question is should we push the setting down to the client's object
to avoid indirection such as client.srv.opts.WriteDeadline?
This commit is contained in:
Ivan Kozlovic
2017-01-18 19:27:42 -07:00
parent bfc521a244
commit 95d0152449
5 changed files with 70 additions and 5 deletions

View File

@@ -306,7 +306,7 @@ func (c *client) readLoop() {
wfc := cp.wfc
cp.wfc = 0
cp.nc.SetWriteDeadline(time.Now().Add(DEFAULT_FLUSH_DEADLINE))
cp.nc.SetWriteDeadline(time.Now().Add(s.opts.WriteDeadline))
err := cp.bw.Flush()
cp.nc.SetWriteDeadline(time.Time{})
if err != nil {
@@ -520,7 +520,7 @@ func (c *client) sendProto(info []byte, doFlush bool) error {
if c.bw != nil && c.nc != nil {
deadlineSet := false
if doFlush || c.bw.Available() < len(info) {
c.nc.SetWriteDeadline(time.Now().Add(DEFAULT_FLUSH_DEADLINE))
c.nc.SetWriteDeadline(time.Now().Add(c.srv.opts.WriteDeadline))
deadlineSet = true
}
_, err = c.bw.Write(info)
@@ -941,8 +941,8 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) {
deadlineSet := false
if client.bw.Available() < (len(mh) + len(msg)) {
client.wfc += 1
client.nc.SetWriteDeadline(time.Now().Add(DEFAULT_FLUSH_DEADLINE))
client.wfc++
client.nc.SetWriteDeadline(time.Now().Add(client.srv.opts.WriteDeadline))
deadlineSet = true
}
@@ -1260,7 +1260,7 @@ func (c *client) clearConnection() {
// With TLS, Close() is sending an alert (that is doing a write).
// Need to set a deadline otherwise the server could block there
// if the peer is not reading from socket.
c.nc.SetWriteDeadline(time.Now().Add(DEFAULT_FLUSH_DEADLINE))
c.nc.SetWriteDeadline(time.Now().Add(c.srv.opts.WriteDeadline))
if c.bw != nil {
c.bw.Flush()
}

View File

@@ -37,3 +37,6 @@ max_payload: 65536
# ping interval and no pong threshold
ping_interval: 60
ping_max: 3
# how long server can block on a socket write to a client
write_deadline: 3

View File

@@ -82,6 +82,7 @@ type Options struct {
TLSKey string `json:"-"`
TLSCaCert string `json:"-"`
TLSConfig *tls.Config `json:"-"`
WriteDeadline time.Duration `json:"-"`
}
// Configuration file authorization section.
@@ -234,6 +235,8 @@ func ProcessConfigFile(configFile string) (*Options, error) {
return nil, err
}
opts.TLSTimeout = tc.Timeout
case "write_deadline":
opts.WriteDeadline = time.Duration(v.(int64)) * time.Second
}
}
return opts, nil
@@ -841,4 +844,7 @@ func processOptions(opts *Options) {
if opts.MaxPayload == 0 {
opts.MaxPayload = MAX_PAYLOAD_SIZE
}
if opts.WriteDeadline == time.Duration(0) {
opts.WriteDeadline = DEFAULT_FLUSH_DEADLINE
}
}

View File

@@ -27,6 +27,7 @@ func TestDefaultOptions(t *testing.T) {
AuthTimeout: float64(AUTH_TIMEOUT) / float64(time.Second),
TLSTimeout: float64(TLS_TIMEOUT) / float64(time.Second),
},
WriteDeadline: DEFAULT_FLUSH_DEADLINE,
}
opts := &Options{}
@@ -69,6 +70,7 @@ func TestConfigFile(t *testing.T) {
MaxConn: 100,
PingInterval: 60 * time.Second,
MaxPingsOut: 3,
WriteDeadline: 3 * time.Second,
}
opts, err := ProcessConfigFile("./configs/test.conf")
@@ -230,6 +232,7 @@ func TestMergeOverrides(t *testing.T) {
NoAdvertise: true,
ConnectRetries: 2,
},
WriteDeadline: 3 * time.Second,
}
fopts, err := ProcessConfigFile("./configs/test.conf")
if err != nil {

View File

@@ -275,3 +275,56 @@ func TestProcessCommandLineArgs(t *testing.T) {
t.Errorf("Expected an error handling the command arguments")
}
}
func TestWriteDeadline(t *testing.T) {
opts := DefaultOptions
opts.WriteDeadline = 20 * time.Millisecond
s := RunServer(&opts)
defer s.Shutdown()
c, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", opts.Host, opts.Port), 3*time.Second)
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer c.Close()
if _, err := c.Write([]byte("CONNECT {}\r\nPING\r\nSUB foo 1\r\n")); err != nil {
t.Fatalf("Error sending protocols to server: %v", err)
}
// Reduce socket buffer to increase reliability of getting
// write deadline errors.
c.(*net.TCPConn).SetReadBuffer(10)
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
sender, err := nats.Connect(url)
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer sender.Close()
payload := make([]byte, 1000000)
start := time.Now()
for i := 0; i < 10; i++ {
if err := sender.Publish("foo", payload); err != nil {
t.Fatalf("Error on publish: %v", err)
}
}
dur := time.Now().Sub(start)
// user more than the write deadline to account for calls
// overhead, running with -race, etc...
if dur > 100*time.Millisecond {
t.Fatalf("Flush should have returned sooner, took: %v", dur)
}
// Flush sender connection to ensure that all data has been sent.
sender.Flush()
// At this point server should have closed connection c.
// On certain platforms, it may take more than one call before
// getting the error.
for i := 0; i < 100; i++ {
if _, err := c.Write([]byte("PUB bar 5\r\nhello\r\n")); err != nil {
// ok
return
}
}
t.Fatal("Connection should have been closed")
}