diff --git a/server/client.go b/server/client.go index 33fb2416..14ed8ec2 100644 --- a/server/client.go +++ b/server/client.go @@ -1859,17 +1859,13 @@ func (c *client) maxPayloadViolation(sz int, max int32) { } // queueOutbound queues data for a clientconnection. -// Returns if the data is referenced or not. If referenced, the caller -// should not reuse the `data` array. // Lock should be held. -func (c *client) queueOutbound(data []byte) bool { +func (c *client) queueOutbound(data []byte) { // Do not keep going if closed if c.isClosed() { - return false + return } - // Assume data will not be referenced - referenced := false // Add to pending bytes total. c.out.pb += int64(len(data)) @@ -1882,7 +1878,7 @@ func (c *client) queueOutbound(data []byte) bool { atomic.AddInt64(&c.srv.slowConsumers, 1) c.Noticef("Slow Consumer Detected: MaxPending of %d Exceeded", c.out.mp) c.markConnAsClosed(SlowConsumerPendingBytes) - return false + return } if c.out.p == nil && len(data) < maxBufSize { @@ -1912,34 +1908,30 @@ func (c *client) queueOutbound(data []byte) bool { c.out.nb = append(c.out.nb, c.out.p) c.out.p = nil } - // Check for a big message, and if found place directly on nb - // FIXME(dlc) - do we need signaling of ownership here if we want len(data) < maxBufSize - if len(data) > maxBufSize { - c.out.nb = append(c.out.nb, data) - referenced = true - } else { - // We will copy to primary. - if c.out.p == nil { - // Grow here - if (c.out.sz << 1) <= maxBufSize { - c.out.sz <<= 1 - } - if len(data) > int(c.out.sz) { - c.out.p = make([]byte, 0, len(data)) + // TODO: It was found with LeafNode and Websocket that referencing + // the data buffer when > maxBufSize would cause corruption + // (reproduced with small maxBufSize=10 and TestLeafNodeWSNoBufferCorruption). + // So always make a copy for now. + + // We will copy to primary. + if c.out.p == nil { + // Grow here + if (c.out.sz << 1) <= maxBufSize { + c.out.sz <<= 1 + } + if len(data) > int(c.out.sz) { + c.out.p = make([]byte, 0, len(data)) + } else { + if c.out.s != nil && cap(c.out.s) >= int(c.out.sz) { // TODO(dlc) - Size mismatch? + c.out.p = c.out.s + c.out.s = nil } else { - if c.out.s != nil && cap(c.out.s) >= int(c.out.sz) { // TODO(dlc) - Size mismatch? - c.out.p = c.out.s - c.out.s = nil - } else { - c.out.p = make([]byte, 0, c.out.sz) - } + c.out.p = make([]byte, 0, c.out.sz) } } - c.out.p = append(c.out.p, data...) } - } else { - c.out.p = append(c.out.p, data...) } + c.out.p = append(c.out.p, data...) // Check here if we should create a stall channel if we are falling behind. // We do this here since if we wait for consumer's writeLoop it could be @@ -1947,8 +1939,6 @@ func (c *client) queueOutbound(data []byte) bool { if c.out.pb > c.out.mp/2 && c.out.stc == nil { c.out.stc = make(chan struct{}) } - - return referenced } // Assume the lock is held upon entry. diff --git a/server/leafnode.go b/server/leafnode.go index 86c648af..6adc0186 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -2007,9 +2007,6 @@ func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remot } } - var req *http.Request - var wsKey string - // For http request, we need the passed URL to contain either http or https scheme. scheme := "http" if tlsRequired { @@ -2028,7 +2025,7 @@ func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remot } ustr := fmt.Sprintf("%s://%s%s", scheme, rURL.Host, path) u, _ := url.Parse(ustr) - req = &http.Request{ + req := &http.Request{ Method: "GET", URL: u, Proto: "HTTP/1.1", diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 0891d3fb..ca86cdc2 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -2826,6 +2826,62 @@ func TestLeafNodeWSGossip(t *testing.T) { } } +// This test was showing an issue if one set maxBufSize to very small value, +// such as maxBufSize = 10. With such small value, we would get a corruption +// in that LMSG would arrive with missing bytes. We are now always making +// a copy when dealing with messages that are bigger than maxBufSize. +func TestLeafNodeWSNoBufferCorruption(t *testing.T) { + o := testDefaultLeafNodeWSOptions() + s := RunServer(o) + defer s.Shutdown() + + lo1 := testDefaultRemoteLeafNodeWSOptions(t, o, false) + lo1.LeafNode.ReconnectInterval = 15 * time.Millisecond + ln1 := RunServer(lo1) + defer ln1.Shutdown() + + lo2 := DefaultOptions() + lo2.Cluster.Name = "LN" + lo2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", lo1.Cluster.Port)) + ln2 := RunServer(lo2) + defer ln2.Shutdown() + + checkClusterFormed(t, ln1, ln2) + + checkLeafNodeConnected(t, s) + checkLeafNodeConnected(t, ln1) + + nc := natsConnect(t, s.ClientURL()) + defer nc.Close() + sub := natsSubSync(t, nc, "foo") + + nc1 := natsConnect(t, ln1.ClientURL()) + defer nc1.Close() + + nc2 := natsConnect(t, ln2.ClientURL()) + defer nc2.Close() + sub2 := natsSubSync(t, nc2, "foo") + + checkSubInterest(t, s, globalAccountName, "foo", time.Second) + checkSubInterest(t, ln2, globalAccountName, "foo", time.Second) + checkSubInterest(t, ln1, globalAccountName, "foo", time.Second) + + payload := make([]byte, 100*1024) + for i := 0; i < len(payload); i++ { + payload[i] = 'A' + } + natsPub(t, nc1, "foo", payload) + + checkMsgRcv := func(sub *nats.Subscription) { + msg := natsNexMsg(t, sub, time.Second) + if !bytes.Equal(msg.Data, payload) { + t.Fatalf("Invalid message content: %q", msg.Data) + } + } + checkMsgRcv(sub2) + checkMsgRcv(sub) +} + func TestLeafNodeStreamImport(t *testing.T) { o1 := DefaultOptions() o1.LeafNode.Port = -1 diff --git a/server/websocket.go b/server/websocket.go index c01b2404..0c6d329d 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -1235,12 +1235,17 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { continue } for len(b) > 0 { - endFrame(fhIdx, total) + endStart := total != 0 + if endStart { + endFrame(fhIdx, total) + } total = len(b) if total >= mfs { total = mfs } - fhIdx = startFrame() + if endStart { + fhIdx = startFrame() + } bufs = append(bufs, b[:total]) b = b[total:] } diff --git a/server/websocket_test.go b/server/websocket_test.go index 7d0a2476..a564b1de 100644 --- a/server/websocket_test.go +++ b/server/websocket_test.go @@ -2547,8 +2547,12 @@ func TestWSFrameOutbound(t *testing.T) { if len(res[4]) != wsFrameSizeForBrowsers { t.Fatalf("Big frame should have been limited to %v, got %v", wsFrameSizeForBrowsers, len(res[4])) } - if string(res[6]) != string(bufs[3]) { - t.Fatalf("Frame 6 should be %q, got %q", bufs[3], res[6]) + if test.maskingWrite { + key := getKey(res[5]) + wsMaskBuf(key, res[6]) + } + if string(res[6]) != "then some more" { + t.Fatalf("Frame 6 incorrect: %q", res[6]) } bufs = nil @@ -2606,8 +2610,26 @@ func TestWSFrameOutbound(t *testing.T) { if len(res[3]) != wsFrameSizeForBrowsers-5 { t.Fatalf("Big frame should have been limited to %v, got %v", wsFrameSizeForBrowsers, len(res[4])) } - if string(res[5]) != string(bufs[2]) { - t.Fatalf("Frame 6 should be %q, got %q", bufs[2], res[5]) + if test.maskingWrite { + key := getKey(res[4]) + wsMaskBuf(key, res[5]) + } + if string(res[5]) != "then some more" { + t.Fatalf("Frame 6 incorrect %q", res[5]) + } + + bufs = nil + c.out.pb = 0 + c.ws.fs = 0 + c.ws.frames = nil + c.ws.browser = true + bufs = append(bufs, make([]byte, wsFrameSizeForBrowsers+100)) + c.mu.Lock() + c.out.nb = bufs + res, _ = c.collapsePtoNB() + c.mu.Unlock() + if len(res) != 4 { + t.Fatalf("Unexpected number of frames: %v", len(res)) } }) }