From 1b39a9fe94fdc44b6bb91bc2975242d27bcd7577 Mon Sep 17 00:00:00 2001 From: Oleg Shaldybin Date: Tue, 7 Apr 2015 22:23:47 -0700 Subject: [PATCH] Fix sid for split messages When message is split we need to copy message arguments to avoid rewriting them with new message. Subject, reply and size were correctly copied by sid wasn't. That led to dropping some messages in clustered mode if they were split, and the second part was long enough to overwrite sid in the original buffer. --- server/parser.go | 10 +++++++++- server/route.go | 1 + server/split_test.go | 28 ++++++++++++++++++++++++++++ 3 files changed, 38 insertions(+), 1 deletion(-) diff --git a/server/parser.go b/server/parser.go index 6c4c0808..40a3d7b8 100644 --- a/server/parser.go +++ b/server/parser.go @@ -629,10 +629,18 @@ func (c *client) clonePubArg() { c.argBuf = c.scratch[:0] c.argBuf = append(c.argBuf, c.pa.subject...) c.argBuf = append(c.argBuf, c.pa.reply...) + c.argBuf = append(c.argBuf, c.pa.sid...) c.argBuf = append(c.argBuf, c.pa.szb...) + c.pa.subject = c.argBuf[:len(c.pa.subject)] + if c.pa.reply != nil { c.pa.reply = c.argBuf[len(c.pa.subject) : len(c.pa.subject)+len(c.pa.reply)] } - c.pa.szb = c.argBuf[len(c.pa.subject)+len(c.pa.reply):] + + if c.pa.sid != nil { + c.pa.sid = c.argBuf[len(c.pa.subject)+len(c.pa.reply) : len(c.pa.subject)+len(c.pa.reply)+len(c.pa.sid)] + } + + c.pa.szb = c.argBuf[len(c.pa.subject)+len(c.pa.reply)+len(c.pa.sid):] } diff --git a/server/route.go b/server/route.go index 5733101d..1cadf400 100644 --- a/server/route.go +++ b/server/route.go @@ -166,6 +166,7 @@ func (s *Server) routeSidQueueSubscriber(rsid []byte) (*subscription, bool) { return nil, true } sid := matches[RSID_SID_INDEX] + if sub, ok := (client.subs.Get(sid)).(*subscription); ok { return sub, true } diff --git a/server/split_test.go b/server/split_test.go index a7d58e2c..59653cb8 100644 --- a/server/split_test.go +++ b/server/split_test.go @@ -341,3 +341,31 @@ func TestSplitDanglingArgBuf(t *testing.T) { t.Fatalf("Expected c.argBuf to be nil: %q\n", c.argBuf) } } + +func TestSplitMsgArg(t *testing.T) { + _, c, _ := setupClient() + + b := make([]byte, 1024) + + copy(b, []byte("MSG hello.world RSID:14:8 6040\r\nAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")) + c.parse(b) + + copy(b, []byte("BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\r\n")) + c.parse(b) + + wantSubject := "hello.world" + wantSid := "RSID:14:8" + wantSzb := "6040" + + if string(c.pa.subject) != wantSubject { + t.Fatalf("Incorrect subject: want %q, got %q", wantSubject, c.pa.subject) + } + + if string(c.pa.sid) != wantSid { + t.Fatalf("Incorrect sid: want %q, got %q", wantSid, c.pa.sid) + } + + if string(c.pa.szb) != wantSzb { + t.Fatalf("Incorrect szb: want %q, got %q", wantSzb, c.pa.szb) + } +}