mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
Fixed slice panic on MSG_ARG split buffer
This commit is contained in:
@@ -600,7 +600,6 @@ writeErr:
|
||||
client.mu.Unlock()
|
||||
|
||||
if ne, ok := err.(net.Error); ok && ne.Timeout() {
|
||||
// FIXME: SlowConsumer logic
|
||||
Log("Slow Consumer Detected", clientConnStr(client.nc), client.cid)
|
||||
client.closeConnection()
|
||||
} else {
|
||||
|
||||
@@ -8,7 +8,7 @@ import (
|
||||
|
||||
const (
|
||||
// VERSION is the current version for the server.
|
||||
VERSION = "0.4.3"
|
||||
VERSION = "0.4.4"
|
||||
|
||||
// DEFAULT_PORT is the deault port for client connections.
|
||||
DEFAULT_PORT = 4222
|
||||
@@ -49,7 +49,7 @@ const (
|
||||
LEN_CR_LF = len(CR_LF)
|
||||
|
||||
// DEFAULT_FLUSH_DEADLINE is the write/flush deadlines.
|
||||
DEFAULT_FLUSH_DEADLINE = 500 * time.Millisecond
|
||||
DEFAULT_FLUSH_DEADLINE = 2 * time.Second
|
||||
|
||||
// DEFAULT_HTTP_PORT is the default monitoring port.
|
||||
DEFAULT_HTTP_PORT = 8333
|
||||
|
||||
@@ -571,8 +571,9 @@ func (c *client) parse(buf []byte) error {
|
||||
goto parseErr
|
||||
}
|
||||
}
|
||||
// Check for split buffer scenarios for SUB and UNSUB and PUB
|
||||
if (c.state == SUB_ARG || c.state == UNSUB_ARG || c.state == PUB_ARG || c.state == MINUS_ERR_ARG) && c.argBuf == nil {
|
||||
// Check for split buffer scenarios for any ARG state.
|
||||
if (c.state == SUB_ARG || c.state == UNSUB_ARG || c.state == PUB_ARG ||
|
||||
c.state == MSG_ARG || c.state == MINUS_ERR_ARG) && c.argBuf == nil {
|
||||
c.argBuf = c.scratch[:0]
|
||||
c.argBuf = append(c.argBuf, buf[c.as:(i+1)-c.drop]...)
|
||||
// FIXME, check max len
|
||||
|
||||
@@ -31,6 +31,7 @@ func flushConnection(b *testing.B, c net.Conn, buf []byte) {
|
||||
c.Write([]byte("PING\r\n"))
|
||||
c.SetReadDeadline(time.Now().Add(50 * time.Millisecond))
|
||||
n, err := c.Read(buf)
|
||||
c.SetReadDeadline(time.Time{})
|
||||
if err != nil {
|
||||
b.Fatalf("Failed read: %v\n", err)
|
||||
}
|
||||
|
||||
80
test/client_cluster_test.go
Normal file
80
test/client_cluster_test.go
Normal file
@@ -0,0 +1,80 @@
|
||||
// Copyright 2013 Apcera Inc. All rights reserved.
|
||||
|
||||
package test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/apcera/nats"
|
||||
)
|
||||
|
||||
func TestServerRestartReSliceIssue(t *testing.T) {
|
||||
srvA, srvB, optsA, optsB := runServers(t)
|
||||
defer srvA.Shutdown()
|
||||
|
||||
urlA := fmt.Sprintf("nats://%s:%d/", optsA.Host, optsA.Port)
|
||||
urlB := fmt.Sprintf("nats://%s:%d/", optsB.Host, optsB.Port)
|
||||
|
||||
// msg to send..
|
||||
msg := []byte("Hello World")
|
||||
|
||||
servers := []string{urlA, urlB}
|
||||
|
||||
opts := nats.DefaultOptions
|
||||
opts.Timeout = (5 * time.Second)
|
||||
opts.ReconnectWait = (50 * time.Millisecond)
|
||||
opts.MaxReconnect = 1000
|
||||
|
||||
reconnects := 0
|
||||
reconnectsDone := make(chan bool)
|
||||
opts.ReconnectedCB = func(nc *nats.Conn) {
|
||||
reconnects++
|
||||
reconnectsDone <- true
|
||||
}
|
||||
|
||||
// Create 20 random clients.
|
||||
// Half connected to A and half to B..
|
||||
numClients := 50
|
||||
for i := 0; i < numClients; i++ {
|
||||
opts.Url = servers[i%2]
|
||||
nc, err := opts.Connect()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create connection: %v\n", err)
|
||||
}
|
||||
// Create 10 subscriptions each..
|
||||
for x := 0; x < 10; x++ {
|
||||
subject := fmt.Sprintf("foo.%d", (rand.Int()%50)+1)
|
||||
nc.Subscribe(subject, func(m *nats.Msg) {
|
||||
// Just eat it..
|
||||
})
|
||||
}
|
||||
// Pick one subject to send to..
|
||||
subject := fmt.Sprintf("foo.%d", (rand.Int()%50)+1)
|
||||
go func() {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
for i := 1; 1 <= 100; i++ {
|
||||
nc.Publish(subject, msg)
|
||||
if i % 20 == 0 {
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Wait for a short bit..
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
|
||||
// Restart SrvB
|
||||
srvB.Shutdown()
|
||||
srvB = RunServer(optsB)
|
||||
|
||||
select {
|
||||
case <-reconnectsDone:
|
||||
break
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatalf("Expected %d reconnects, got %d\n", numClients/2, reconnects)
|
||||
}
|
||||
}
|
||||
@@ -1,12 +1,12 @@
|
||||
# Copyright 2012-2013 Apcera Inc. All rights reserved.
|
||||
|
||||
# Cluster Server B
|
||||
# Cluster Server A
|
||||
|
||||
port: 4224
|
||||
port: 4222
|
||||
|
||||
cluster {
|
||||
host: '127.0.0.1'
|
||||
port: 4246
|
||||
port: 4244
|
||||
|
||||
authorization {
|
||||
user: ruser
|
||||
@@ -19,7 +19,7 @@ cluster {
|
||||
# in their routes definitions from above.
|
||||
|
||||
routes = [
|
||||
nats-route://ruser:top_secret@127.0.0.1:4244
|
||||
nats-route://ruser:top_secret@127.0.0.1:4246
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
@@ -2,11 +2,11 @@
|
||||
|
||||
# Cluster Server B
|
||||
|
||||
port: 4222
|
||||
port: 4224
|
||||
|
||||
cluster {
|
||||
host: '127.0.0.1'
|
||||
port: 4244
|
||||
port: 4246
|
||||
|
||||
authorization {
|
||||
user: ruser
|
||||
@@ -19,7 +19,7 @@ cluster {
|
||||
# in their routes definitions from above.
|
||||
|
||||
routes = [
|
||||
nats-route://ruser:top_secret@127.0.0.1:4246
|
||||
nats-route://ruser:top_secret@127.0.0.1:4244
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user