diff --git a/go.mod b/go.mod index 4c487003..634660a3 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,9 @@ module github.com/nats-io/nats-server/v2 -go 1.14 - require ( github.com/minio/highwayhash v1.0.0 github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7 - github.com/nats-io/nats.go v1.10.1-0.20200529175407-3559350e3ada + github.com/nats-io/nats.go v1.10.1-0.20200530210759-eb0f6c78d70b github.com/nats-io/nkeys v0.1.4 github.com/nats-io/nuid v1.0.1 golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 diff --git a/go.sum b/go.sum index dc88d2e4..9f1498cb 100644 --- a/go.sum +++ b/go.sum @@ -19,6 +19,8 @@ github.com/nats-io/nats.go v1.10.0 h1:L8qnKaofSfNFbXg0C5F71LdjPRnmQwSsA4ukmkt1Tv github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE= github.com/nats-io/nats.go v1.10.1-0.20200529175407-3559350e3ada h1:ZV/q9/eCPmjwQXFob3GWT3MlTyKlKPB8rcRdr8/dpPw= github.com/nats-io/nats.go v1.10.1-0.20200529175407-3559350e3ada/go.mod h1:C4MWF84vZCs3asgTFW+IL61SqxUZZ+YcZ8VRnqw3pGY= +github.com/nats-io/nats.go v1.10.1-0.20200530210759-eb0f6c78d70b h1:UtcALRnrmKQNl8hSq3b6EL1iXsFztWdO/HduVkEf/HI= +github.com/nats-io/nats.go v1.10.1-0.20200530210759-eb0f6c78d70b/go.mod h1:C4MWF84vZCs3asgTFW+IL61SqxUZZ+YcZ8VRnqw3pGY= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.4 h1:aEsHIssIk6ETN5m2/MD8Y4B2X7FfXrBAUdkyRvbVYzA= github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= diff --git a/server/consumer.go b/server/consumer.go index 5819777a..41727cb3 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1264,14 +1264,41 @@ func (o *Consumer) deliverCurrentMsg(subj string, hdr, msg []byte, seq uint64, t return true } +// Some constants for headers. +// TODO(dlc) - Move these to a more generic place. +const ( + hdrLine = "NATS/1.0\r\n" + crlf = "\r\n" + cHdrsT = "Jetstream-Stream-Sequence: %d\r\nJetstream-Sequence: %d\r\nJetstream-Deliver-Count: %d\r\n\r\n" +) + +// createMsgHeader will add on custom headers. +// TODO(dlc) - if we know client can not receive could avoid. +func createMsgHeader(ohdr []byte, sseq, dseq, dcount uint64) []byte { + var hdr []byte + if len(ohdr) > 0 { + // Strip ending CRLF, will put it back on at end. + hdr = ohdr[:len(ohdr)-len(crlf)] + } else { + hdr = []byte(hdrLine) + } + // Now add in the consumer fields. + // TODO(dlc) - Make more efficient. + hdr = append(hdr, []byte(fmt.Sprintf(cHdrsT, sseq, dseq, dcount))...) + return hdr +} + // Deliver a msg to the observable. // Lock should be held and o.mset validated to be non-nil. func (o *Consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dcount uint64, ts int64) { if o.mset == nil { return } + // Create the headers. + ahdr := createMsgHeader(hdr, seq, o.dseq, dcount) + pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, o.dseq, dcount, ts), ahdr, msg, o, seq} + sendq := o.mset.sendq - pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, o.dseq, dcount, ts), hdr, msg, o, seq} // This needs to be unlocked since the other side may need this lock on failed delivery. o.mu.Unlock() diff --git a/server/parser.go b/server/parser.go index b17b5aaf..255007a9 100644 --- a/server/parser.go +++ b/server/parser.go @@ -1054,6 +1054,7 @@ func (c *client) parse(buf []byte) error { c.state == ASUB_ARG || c.state == AUSUB_ARG || c.state == MSG_ARG || c.state == HMSG_ARG || c.state == MINUS_ERR_ARG || c.state == CONNECT_ARG || c.state == INFO_ARG { + // Setup a holder buffer to deal with split buffer scenario. if c.argBuf == nil { c.argBuf = c.scratch[:0] @@ -1075,6 +1076,7 @@ func (c *client) parse(buf []byte) error { if (c.state == MSG_PAYLOAD || c.state == MSG_END_R || c.state == MSG_END_N) && c.msgBuf == nil { // We need to clone the pubArg if it is still referencing the // read buffer and we are not able to process the msg. + if c.argBuf == nil { // Works also for MSG_ARG, when message comes from ROUTE. if err := c.clonePubArg(); err != nil { @@ -1141,7 +1143,11 @@ func (c *client) clonePubArg() error { return c.processRoutedHeaderMsgArgs(c.argBuf) } case LEAF: - return c.processLeafMsgArgs(c.argBuf) + if c.pa.hdr < 0 { + return c.processLeafMsgArgs(c.argBuf) + } else { + return c.processLeafHeaderMsgArgs(c.argBuf) + } default: if c.pa.hdr < 0 { return c.processPub(c.argBuf) diff --git a/server/stream.go b/server/stream.go index f370457a..76040ee3 100644 --- a/server/stream.go +++ b/server/stream.go @@ -768,6 +768,8 @@ func (mset *Stream) internalSendLoop() { msg = append(pm.hdr, pm.msg...) msg = append(msg, _CRLF_...) } else { + c.pa.hdr = -1 + c.pa.hdb = nil msg = append(pm.msg, _CRLF_...) } didDeliver := c.processInboundClientMsg(msg) diff --git a/test/jetstream_test.go b/test/jetstream_test.go index b518f6d6..fa2edaf6 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -94,7 +94,10 @@ func RunJetStreamServerOnPort(port int, sd string) *server.Server { } func clientConnectToServer(t *testing.T, s *server.Server) *nats.Conn { - nc, err := nats.Connect(s.ClientURL(), nats.Name("JS-TEST"), nats.ReconnectWait(5*time.Millisecond), nats.MaxReconnects(-1)) + nc, err := nats.Connect(s.ClientURL(), + nats.Name("JS-TEST"), + nats.ReconnectWait(5*time.Millisecond), + nats.MaxReconnects(-1)) if err != nil { t.Fatalf("Failed to create client: %v", err) } @@ -793,7 +796,7 @@ func sendStreamMsg(t *testing.T, nc *nats.Conn, subject, msg string) { t.Helper() resp, _ := nc.Request(subject, []byte(msg), 100*time.Millisecond) if resp == nil { - t.Fatalf("No response, possible timeout?") + t.Fatalf("No response for %q, possible timeout?", msg) } if !bytes.HasPrefix(resp.Data, []byte("+OK {")) { t.Fatalf("Expected a JetStreamPubAck, got %q", resp.Data) @@ -6245,10 +6248,20 @@ func TestJetStreamMsgHeaders(t *testing.T) { if err != nil { t.Fatalf("Error getting message: %v", err) } - // Remove reply subject and sub for comparison. - cm.Sub, cm.Reply = nil, "" - if !reflect.DeepEqual(cm, m) { - t.Fatalf("Messages do not match: %+v vs %+v", cm, m) + // Check the message. + // Check out original headers. + if cm.Header.Get("Accept-Encoding") != "json" || + cm.Header.Get("Authorization") != "s3cr3t" { + t.Fatalf("Original headers not present") + } + // Now check for jetstream headers. + if cm.Header.Get("Jetstream-Stream-Sequence") != "1" || + cm.Header.Get("Jetstream-Sequence") != "1" || + cm.Header.Get("Jetstream-Deliver-Count") != "1" { + t.Fatalf("Did not get proper Jetstream headers: %+v", cm.Header) + } + if !bytes.Equal(m.Data, cm.Data) { + t.Fatalf("Message payloads are not the same: %q vs %q", cm.Data, m.Data) } }) } diff --git a/vendor/github.com/nats-io/nats.go/parser.go b/vendor/github.com/nats-io/nats.go/parser.go index 77bfe05e..c9cbfeb6 100644 --- a/vendor/github.com/nats-io/nats.go/parser.go +++ b/vendor/github.com/nats-io/nats.go/parser.go @@ -87,9 +87,11 @@ func (nc *Conn) parse(buf []byte) error { case 'M', 'm': nc.ps.state = OP_M nc.ps.hdr = -1 + nc.ps.ma.hdr = -1 case 'H', 'h': nc.ps.state = OP_H nc.ps.hdr = 0 + nc.ps.ma.hdr = 0 case 'P', 'p': nc.ps.state = OP_P case '+': diff --git a/vendor/modules.txt b/vendor/modules.txt index ef738707..95b5c6b2 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -4,7 +4,7 @@ github.com/minio/highwayhash # github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7 ## explicit github.com/nats-io/jwt -# github.com/nats-io/nats.go v1.10.1-0.20200529175407-3559350e3ada +# github.com/nats-io/nats.go v1.10.1-0.20200530210759-eb0f6c78d70b ## explicit github.com/nats-io/nats.go github.com/nats-io/nats.go/encoders/builtin