From eca04c6fcecdbe072abd3d1ba007475f235a1a28 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 30 May 2020 10:04:23 -0700 Subject: [PATCH] First pass header support for JetStream Signed-off-by: Derek Collison --- go.mod | 2 +- go.sum | 23 +++ jetstream/README.md | 4 +- server/consumer.go | 31 ++-- server/filestore.go | 109 ++++++++--- server/filestore_test.go | 137 +++++++++----- server/jetstream_api.go | 10 +- server/memstore.go | 30 ++- server/memstore_test.go | 61 ++++-- server/server.go | 1 + server/store.go | 6 +- server/stream.go | 56 ++++-- test/jetstream_test.go | 99 ++++++++++ vendor/github.com/nats-io/nats.go/enc.go | 4 +- vendor/github.com/nats-io/nats.go/go.mod | 5 +- vendor/github.com/nats-io/nats.go/go.sum | 30 +++ vendor/github.com/nats-io/nats.go/nats.go | 196 ++++++++++++++++---- vendor/github.com/nats-io/nats.go/parser.go | 75 +++++++- vendor/modules.txt | 2 +- 19 files changed, 692 insertions(+), 189 deletions(-) diff --git a/go.mod b/go.mod index 7422f8b8..4c487003 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.14 require ( github.com/minio/highwayhash v1.0.0 github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7 - github.com/nats-io/nats.go v1.10.0 + github.com/nats-io/nats.go v1.10.1-0.20200529175407-3559350e3ada github.com/nats-io/nkeys v0.1.4 github.com/nats-io/nuid v1.0.1 golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 diff --git a/go.sum b/go.sum index f2399d79..dc88d2e4 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,24 @@ +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/minio/highwayhash v1.0.0 h1:iMSDhgUILCr0TNm8LWlSjF8N0ZIj2qbO8WHp6Q/J2BA= github.com/minio/highwayhash v1.0.0/go.mod h1:xQboMTeM9nY9v/LlAOxFctujiv5+Aq2hR5dxBpaMbdc= github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7 h1:RnGotxlghqR5D2KDAu4TyuLqyjuylOsJiAFhXvMvQIc= github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M= +github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093/go.mod h1:rQnBf2Rv4P9adtAs/Ti6LfFmVtFG6HLhl/H7cVshcJU= github.com/nats-io/nats.go v1.10.0 h1:L8qnKaofSfNFbXg0C5F71LdjPRnmQwSsA4ukmkt1TvY= github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE= +github.com/nats-io/nats.go v1.10.1-0.20200529175407-3559350e3ada h1:ZV/q9/eCPmjwQXFob3GWT3MlTyKlKPB8rcRdr8/dpPw= +github.com/nats-io/nats.go v1.10.1-0.20200529175407-3559350e3ada/go.mod h1:C4MWF84vZCs3asgTFW+IL61SqxUZZ+YcZ8VRnqw3pGY= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.4 h1:aEsHIssIk6ETN5m2/MD8Y4B2X7FfXrBAUdkyRvbVYzA= github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= @@ -21,3 +35,12 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7 h1:HmbHVPwrPEKPGLAcHSrMe6+hqSUlvZU0rab6x5EXfGU= golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= diff --git a/jetstream/README.md b/jetstream/README.md index 26aa56fe..87282d50 100644 --- a/jetstream/README.md +++ b/jetstream/README.md @@ -2,6 +2,4 @@ JetStream is the [NATS.io](https://nats.io) persistence engine that will support streaming as well as traditional message and worker queues for At-Least-Once delivery semantics. -JetStream is composed of two major components, Message Sets and Observables. Message Sets determine the interest set, global ordering, retention policy, replicas and resource limits. Observables define how Message Sets are consumed, and have quite a few options. - -Information about testing the JetStream Technical Preview can be found [here](https://github.com/nats-io/jetstream#readme) +More information about testing the JetStream Technical Preview can be found [here](https://github.com/nats-io/jetstream#readme) diff --git a/server/consumer.go b/server/consumer.go index c0f416d6..5819777a 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -489,7 +489,7 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) { func (o *Consumer) sendAdvisory(subj string, msg []byte) { if o.mset != nil && o.mset.sendq != nil { o.mu.Unlock() - o.mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, msg, nil, 0} + o.mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, msg, nil, 0} o.mu.Lock() } } @@ -1000,8 +1000,8 @@ func (o *Consumer) processNextMsgReq(_ *subscription, _ *client, _, reply string if o.replay { o.waiting = append(o.waiting, reply) shouldSignal = true - } else if subj, msg, seq, dc, ts, err := o.getNextMsg(); err == nil { - o.deliverMsg(reply, subj, msg, seq, dc, ts) + } else if subj, hdr, msg, seq, dc, ts, err := o.getNextMsg(); err == nil { + o.deliverMsg(reply, subj, hdr, msg, seq, dc, ts) } else { o.waiting = append(o.waiting, reply) } @@ -1058,9 +1058,9 @@ func (o *Consumer) isFilteredMatch(subj string) bool { // Get next available message from underlying store. // Is partition aware and redeliver aware. // Lock should be held. -func (o *Consumer) getNextMsg() (subj string, msg []byte, seq uint64, dcount uint64, ts int64, err error) { +func (o *Consumer) getNextMsg() (subj string, hdr, msg []byte, seq uint64, dcount uint64, ts int64, err error) { if o.mset == nil { - return _EMPTY_, nil, 0, 0, 0, fmt.Errorf("consumer not valid") + return _EMPTY_, nil, nil, 0, 0, 0, fmt.Errorf("consumer not valid") } for { seq, dcount := o.sseq, uint64(1) @@ -1078,7 +1078,7 @@ func (o *Consumer) getNextMsg() (subj string, msg []byte, seq uint64, dcount uin continue } } - subj, msg, ts, err := o.mset.store.LoadMsg(seq) + subj, hdr, msg, ts, err := o.mset.store.LoadMsg(seq) if err == nil { if dcount == 1 { // First delivery. o.sseq++ @@ -1087,12 +1087,12 @@ func (o *Consumer) getNextMsg() (subj string, msg []byte, seq uint64, dcount uin } } // We have the msg here. - return subj, msg, seq, dcount, ts, nil + return subj, hdr, msg, seq, dcount, ts, nil } // We got an error here. If this is an EOF we will return, otherwise // we can continue looking. if err == ErrStoreEOF || err == ErrStoreClosed { - return "", nil, 0, 0, 0, err + return _EMPTY_, nil, nil, 0, 0, 0, err } // Skip since its probably deleted or expired. o.sseq++ @@ -1136,6 +1136,7 @@ func (o *Consumer) loopAndDeliverMsgs(s *Server, a *Account) { mset *Stream seq, dcnt uint64 subj, dsubj string + hdr []byte msg []byte err error ts int64 @@ -1160,7 +1161,7 @@ func (o *Consumer) loopAndDeliverMsgs(s *Server, a *Account) { goto waitForMsgs } - subj, msg, seq, dcnt, ts, err = o.getNextMsg() + subj, hdr, msg, seq, dcnt, ts, err = o.getNextMsg() // On error either wait or return. if err != nil { @@ -1195,7 +1196,7 @@ func (o *Consumer) loopAndDeliverMsgs(s *Server, a *Account) { // Track this regardless. lts = ts - o.deliverMsg(dsubj, subj, msg, seq, dcnt, ts) + o.deliverMsg(dsubj, subj, hdr, msg, seq, dcnt, ts) o.mu.Unlock() continue @@ -1217,7 +1218,7 @@ func (o *Consumer) ackReply(sseq, dseq, dcount uint64, ts int64) string { // deliverCurrentMsg is the hot path to deliver a message that was just received. // Will return if the message was delivered or not. -func (o *Consumer) deliverCurrentMsg(subj string, msg []byte, seq uint64, ts int64) bool { +func (o *Consumer) deliverCurrentMsg(subj string, hdr, msg []byte, seq uint64, ts int64) bool { o.mu.Lock() if seq != o.sseq { o.mu.Unlock() @@ -1257,7 +1258,7 @@ func (o *Consumer) deliverCurrentMsg(subj string, msg []byte, seq uint64, ts int msg = append(msg[:0:0], msg...) } - o.deliverMsg(dsubj, subj, msg, seq, 1, ts) + o.deliverMsg(dsubj, subj, hdr, msg, seq, 1, ts) o.mu.Unlock() return true @@ -1265,12 +1266,12 @@ func (o *Consumer) deliverCurrentMsg(subj string, msg []byte, seq uint64, ts int // Deliver a msg to the observable. // Lock should be held and o.mset validated to be non-nil. -func (o *Consumer) deliverMsg(dsubj, subj string, msg []byte, seq, dcount uint64, ts int64) { +func (o *Consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dcount uint64, ts int64) { if o.mset == nil { return } sendq := o.mset.sendq - pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, o.dseq, dcount, ts), msg, o, seq} + pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, o.dseq, dcount, ts), hdr, msg, o, seq} // This needs to be unlocked since the other side may need this lock on failed delivery. o.mu.Unlock() @@ -1434,7 +1435,7 @@ func (o *Consumer) selectSubjectLast() { } // FIXME(dlc) - this is linear and can be optimized by store layer. for seq := stats.LastSeq; seq >= stats.FirstSeq; seq-- { - subj, _, _, err := o.mset.store.LoadMsg(seq) + subj, _, _, _, err := o.mset.store.LoadMsg(seq) if err == ErrStoreMsgNotFound { continue } diff --git a/server/filestore.go b/server/filestore.go index 64742bfb..1256f878 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -121,6 +121,7 @@ type msgId struct { type fileStoredMsg struct { subj string + hdr []byte msg []byte seq uint64 ts int64 // nanoseconds @@ -578,7 +579,7 @@ func (fs *fileStore) enableLastMsgBlockForWriting() error { } // Store stores a message. -func (fs *fileStore) StoreMsg(subj string, msg []byte) (uint64, int64, error) { +func (fs *fileStore) StoreMsg(subj string, hdr, msg []byte) (uint64, int64, error) { fs.mu.Lock() if fs.closed { fs.mu.Unlock() @@ -599,7 +600,7 @@ func (fs *fileStore) StoreMsg(subj string, msg []byte) (uint64, int64, error) { seq := fs.state.LastSeq + 1 - n, ts, err := fs.writeMsgRecord(seq, subj, msg) + n, ts, err := fs.writeMsgRecord(seq, subj, hdr, msg) if err != nil { fs.mu.Unlock() return 0, 0, err @@ -771,7 +772,7 @@ func (mb *msgBlock) selectNextFirst() { func (fs *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStoredMsg, secure bool) { // Update global accounting. - msz := fileStoreMsgSize(sm.subj, sm.msg) + msz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg) fs.mu.Lock() mb.mu.Lock() @@ -1048,12 +1049,14 @@ func (mb *msgBlock) updateAccounting(seq uint64, ts int64, rl uint64) { } // Lock should be held. -func (fs *fileStore) writeMsgRecord(seq uint64, subj string, msg []byte) (uint64, int64, error) { +func (fs *fileStore) writeMsgRecord(seq uint64, subj string, mhdr, msg []byte) (uint64, int64, error) { var err error // Get size for this message. - rl := fileStoreMsgSize(subj, msg) - + rl := fileStoreMsgSize(subj, mhdr, msg) + if rl&hbit == 1 { + return 0, 0, ErrMsgTooLarge + } // Grab our current last message block. mb := fs.lmb if mb == nil || mb.numBytes()+rl > fs.fcfg.BlockSize { @@ -1071,11 +1074,23 @@ func (fs *fileStore) writeMsgRecord(seq uint64, subj string, msg []byte) (uint64 // Update accounting. mb.updateAccounting(seq, ts, rl) + // Formats + // Format with no header + // total_len(4) sequence(8) timestamp(8) subj_len(2) subj msg hash(8) + // With headers, high bit on total length will be set. + // total_len(4) sequence(8) timestamp(8) subj_len(2) subj hdr_len(4) hdr msg hash(8) + // First write header, etc. var le = binary.LittleEndian var hdr [msgHdrSize]byte - le.PutUint32(hdr[0:], uint32(rl)) + l := uint32(rl) + hasHeaders := len(mhdr) > 0 + if hasHeaders { + l |= hbit + } + + le.PutUint32(hdr[0:], l) le.PutUint64(hdr[4:], seq) le.PutUint64(hdr[12:], uint64(ts)) le.PutUint16(hdr[20:], uint16(len(subj))) @@ -1083,6 +1098,12 @@ func (fs *fileStore) writeMsgRecord(seq uint64, subj string, msg []byte) (uint64 // Now write to underlying buffer. fs.wmb.Write(hdr[:]) fs.wmb.WriteString(subj) + if hasHeaders { + var hlen [4]byte + le.PutUint32(hlen[0:], uint32(len(mhdr))) + fs.wmb.Write(hlen[:]) + fs.wmb.Write(mhdr) + } fs.wmb.Write(msg) // Calculate hash. @@ -1090,6 +1111,9 @@ func (fs *fileStore) writeMsgRecord(seq uint64, subj string, msg []byte) (uint64 mb.hh.Reset() mb.hh.Write(hdr[4:20]) mb.hh.Write([]byte(subj)) + if hasHeaders { + mb.hh.Write(mhdr) + } mb.hh.Write(msg) checksum := mb.hh.Sum(nil) // Grab last checksum @@ -1109,7 +1133,12 @@ func (fs *fileStore) eraseMsg(mb *msgBlock, sm *fileStoredMsg) error { return fmt.Errorf("bad stored message") } // erase contents and rewrite with new hash. - rand.Read(sm.msg) + if len(sm.hdr) > 0 { + rand.Read(sm.hdr) + } + if len(sm.msg) > 0 { + rand.Read(sm.msg) + } sm.seq, sm.ts = 0, 0 chars := []rune("ABCDEFGHIJKLMNOPQRSTUVWXYZ") var b strings.Builder @@ -1121,7 +1150,7 @@ func (fs *fileStore) eraseMsg(mb *msgBlock, sm *fileStoredMsg) error { var le = binary.LittleEndian var hdr [msgHdrSize]byte - rl := fileStoreMsgSize(sm.subj, sm.msg) + rl := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg) le.PutUint32(hdr[0:], uint32(rl)) le.PutUint64(hdr[4:], 0) @@ -1139,6 +1168,9 @@ func (fs *fileStore) eraseMsg(mb *msgBlock, sm *fileStoredMsg) error { mb.hh.Reset() mb.hh.Write(hdr[4:20]) mb.hh.Write([]byte(sm.subj)) + if len(sm.hdr) > 0 { + mb.hh.Write(sm.hdr) + } mb.hh.Write(sm.msg) checksum := mb.hh.Sum(nil) // Write to msg record. @@ -1293,13 +1325,17 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { rl := le.Uint32(hdr[0:]) seq := le.Uint64(hdr[4:]) slen := le.Uint16(hdr[20:]) + + // Clear any headers bit that could be set. + rl &^= hbit + dlen := int(rl) - msgHdrSize // Do some quick sanity checks here. if dlen < 0 || int(slen) > dlen || dlen > int(rl) { // This means something is off. - // Add into bad list? - return fmt.Errorf("malformed or corrupt msg") + // TODO(dlc) - Add into bad list? + return errBadMsg } // Adjust if we guessed wrong. if seq != 0 && seq < fseq { @@ -1377,6 +1413,7 @@ var ( ) // Used for marking messages that have had their checksums checked. +// Also used to signal a message record with headers const hbit = 1 << 31 // Will do a lookup from the cache. @@ -1419,7 +1456,7 @@ func (mb *msgBlock) cacheLookupLocked(seq uint64) (*fileStoredMsg, error) { buf := mb.cache.buf[bi:] // Parse from the raw buffer. - subj, msg, mseq, ts, err := msgFromBuf(buf, hh) + subj, hdr, msg, mseq, ts, err := msgFromBuf(buf, hh) if err != nil { return nil, err } @@ -1428,6 +1465,7 @@ func (mb *msgBlock) cacheLookupLocked(seq uint64) (*fileStoredMsg, error) { } sm := &fileStoredMsg{ subj: subj, + hdr: hdr, msg: msg, seq: seq, ts: ts, @@ -1469,18 +1507,21 @@ func (fs *fileStore) msgForSeq(seq uint64) (*fileStoredMsg, error) { } // Internal function to return msg parts from a raw buffer. -func msgFromBuf(buf []byte, hh hash.Hash64) (string, []byte, uint64, int64, error) { +func msgFromBuf(buf []byte, hh hash.Hash64) (string, []byte, []byte, uint64, int64, error) { if len(buf) < msgHdrSize { - return "", nil, 0, 0, errBadMsg + return _EMPTY_, nil, nil, 0, 0, errBadMsg } var le = binary.LittleEndian hdr := buf[:msgHdrSize] - dlen := int(le.Uint32(hdr[0:])) - msgHdrSize + rl := le.Uint32(hdr[0:]) + hasHeaders := rl&hbit != 0 + rl &^= hbit // clear header bit + dlen := int(rl) - msgHdrSize slen := int(le.Uint16(hdr[20:])) // Simple sanity check. if dlen < 0 || slen > dlen { - return "", nil, 0, 0, errBadMsg + return _EMPTY_, nil, nil, 0, 0, errBadMsg } data := buf[msgHdrSize : msgHdrSize+dlen] // Do checksum tests here if requested. @@ -1488,9 +1529,13 @@ func msgFromBuf(buf []byte, hh hash.Hash64) (string, []byte, uint64, int64, erro hh.Reset() hh.Write(hdr[4:20]) hh.Write(data[:slen]) - hh.Write(data[slen : dlen-8]) + if hasHeaders { + hh.Write(data[slen+4 : dlen-8]) + } else { + hh.Write(data[slen : dlen-8]) + } if !bytes.Equal(hh.Sum(nil), data[len(data)-8:]) { - return "", nil, 0, 0, errBadMsg + return _EMPTY_, nil, nil, 0, 0, errBadMsg } } seq := le.Uint64(hdr[4:]) @@ -1499,16 +1544,26 @@ func msgFromBuf(buf []byte, hh hash.Hash64) (string, []byte, uint64, int64, erro // fix the capacity. This will cause a copy though in stream:internalSendLoop when // we append CRLF but this was causing a race. Need to rethink more to avoid this copy. end := dlen - 8 - return string(data[:slen]), data[slen:end:end], seq, ts, nil + var mhdr, msg []byte + if hasHeaders { + hl := le.Uint32(data[slen:]) + bi := slen + 4 + li := bi + int(hl) + mhdr = data[bi:li:li] + msg = data[li:end:end] + } else { + msg = data[slen:end:end] + } + return string(data[:slen]), mhdr, msg, seq, ts, nil } // LoadMsg will lookup the message by sequence number and return it if found. -func (fs *fileStore) LoadMsg(seq uint64) (string, []byte, int64, error) { +func (fs *fileStore) LoadMsg(seq uint64) (string, []byte, []byte, int64, error) { sm, err := fs.msgForSeq(seq) if sm != nil { - return sm.subj, sm.msg, sm.ts, nil + return sm.subj, sm.hdr, sm.msg, sm.ts, nil } - return "", nil, 0, err + return "", nil, nil, 0, err } func (fs *fileStore) State() StreamState { @@ -1519,9 +1574,13 @@ func (fs *fileStore) State() StreamState { return state } -func fileStoreMsgSize(subj string, msg []byte) uint64 { - // length of the message record (4bytes) + seq(8) + ts(8) + subj_len(2) + subj + msg + hash(8) - return uint64(4 + 16 + 2 + len(subj) + len(msg) + 8) +func fileStoreMsgSize(subj string, hdr, msg []byte) uint64 { + if len(hdr) == 0 { + // length of the message record (4bytes) + seq(8) + ts(8) + subj_len(2) + subj + msg + hash(8) + return uint64(22 + len(subj) + len(msg) + 8) + } + // length of the message record (4bytes) + seq(8) + ts(8) + subj_len(2) + subj + hdr_len(4) + hdr + msg + hash(8) + return uint64(22 + len(subj) + 4 + len(hdr) + len(msg) + 8) } // Lock should not be held. diff --git a/server/filestore_test.go b/server/filestore_test.go index ccd65cd7..41500e10 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -43,7 +43,7 @@ func TestFileStoreBasics(t *testing.T) { subj, msg := "foo", []byte("Hello World") now := time.Now().UnixNano() for i := 1; i <= 5; i++ { - if seq, ts, err := fs.StoreMsg(subj, msg); err != nil { + if seq, ts, err := fs.StoreMsg(subj, nil, msg); err != nil { t.Fatalf("Error storing msg: %v", err) } else if seq != uint64(i) { t.Fatalf("Expected sequence to be %d, got %d", i, seq) @@ -56,11 +56,11 @@ func TestFileStoreBasics(t *testing.T) { if state.Msgs != 5 { t.Fatalf("Expected 5 msgs, got %d", state.Msgs) } - expectedSize := 5 * fileStoreMsgSize(subj, msg) + expectedSize := 5 * fileStoreMsgSize(subj, nil, msg) if state.Bytes != expectedSize { t.Fatalf("Expected %d bytes, got %d", expectedSize, state.Bytes) } - nsubj, nmsg, _, err := fs.LoadMsg(2) + nsubj, _, nmsg, _, err := fs.LoadMsg(2) if err != nil { t.Fatalf("Unexpected error looking up msg: %v", err) } @@ -70,12 +70,47 @@ func TestFileStoreBasics(t *testing.T) { if !bytes.Equal(nmsg, msg) { t.Fatalf("Msgs don't match, original %q vs %q", msg, nmsg) } - _, _, _, err = fs.LoadMsg(3) + _, _, _, _, err = fs.LoadMsg(3) if err != nil { t.Fatalf("Unexpected error looking up msg: %v", err) } } +func TestFileStoreMsgHeaders(t *testing.T) { + storeDir, _ := ioutil.TempDir("", JetStreamStoreDir) + fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "zzz", Storage: FileStorage}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer fs.Stop() + + subj, hdr, msg := "foo", []byte("name:derek"), []byte("Hello World") + elen := 22 + len(subj) + 4 + len(hdr) + len(msg) + 8 + if sz := int(fileStoreMsgSize(subj, hdr, msg)); sz != elen { + t.Fatalf("Wrong size for stored msg with header") + } + fs.StoreMsg(subj, hdr, msg) + _, shdr, smsg, _, err := fs.LoadMsg(1) + if err != nil { + t.Fatalf("Unexpected error looking up msg: %v", err) + } + if !bytes.Equal(msg, smsg) { + t.Fatalf("Expected same msg, got %q vs %q", smsg, msg) + } + if !bytes.Equal(hdr, shdr) { + t.Fatalf("Expected same hdr, got %q vs %q", shdr, hdr) + } + if removed, _ := fs.EraseMsg(1); !removed { + t.Fatalf("Expected erase msg to return success") + } + if bytes.Equal(hdr, shdr) { + t.Fatalf("Expected hdr to be erased") + } + if bytes.Equal(msg, smsg) { + t.Fatalf("Expected msg to be erased") + } +} + func TestFileStoreBasicWriteMsgsAndRestore(t *testing.T) { storeDir, _ := ioutil.TempDir("", JetStreamStoreDir) os.MkdirAll(storeDir, 0755) @@ -102,7 +137,7 @@ func TestFileStoreBasicWriteMsgsAndRestore(t *testing.T) { toStore := uint64(100) for i := uint64(1); i <= toStore; i++ { msg := []byte(fmt.Sprintf("[%08d] Hello World!", i)) - if seq, _, err := fs.StoreMsg(subj, msg); err != nil { + if seq, _, err := fs.StoreMsg(subj, nil, msg); err != nil { t.Fatalf("Error storing msg: %v", err) } else if seq != uint64(i) { t.Fatalf("Expected sequence to be %d, got %d", i, seq) @@ -113,7 +148,7 @@ func TestFileStoreBasicWriteMsgsAndRestore(t *testing.T) { t.Fatalf("Expected %d msgs, got %d", toStore, state.Msgs) } msg22 := []byte(fmt.Sprintf("[%08d] Hello World!", 22)) - expectedSize := toStore * fileStoreMsgSize(subj, msg22) + expectedSize := toStore * fileStoreMsgSize(subj, nil, msg22) if state.Bytes != expectedSize { t.Fatalf("Expected %d bytes, got %d", expectedSize, state.Bytes) @@ -122,7 +157,7 @@ func TestFileStoreBasicWriteMsgsAndRestore(t *testing.T) { fs.Stop() // Make sure Store call after does not work. - if _, _, err := fs.StoreMsg(subj, []byte("no work")); err == nil { + if _, _, err := fs.StoreMsg(subj, nil, []byte("no work")); err == nil { t.Fatalf("Expected an error for StoreMsg call after Stop, got none") } @@ -144,7 +179,7 @@ func TestFileStoreBasicWriteMsgsAndRestore(t *testing.T) { // Now write 100 more msgs for i := uint64(101); i <= toStore*2; i++ { msg := []byte(fmt.Sprintf("[%08d] Hello World!", i)) - if seq, _, err := fs.StoreMsg(subj, msg); err != nil { + if seq, _, err := fs.StoreMsg(subj, nil, msg); err != nil { t.Fatalf("Error storing msg: %v", err) } else if seq != uint64(i) { t.Fatalf("Expected sequence to be %d, got %d", i, seq) @@ -188,13 +223,13 @@ func TestFileStoreMsgLimit(t *testing.T) { subj, msg := "foo", []byte("Hello World") for i := 0; i < 10; i++ { - fs.StoreMsg(subj, msg) + fs.StoreMsg(subj, nil, msg) } state := fs.State() if state.Msgs != 10 { t.Fatalf("Expected %d msgs, got %d", 10, state.Msgs) } - if _, _, err := fs.StoreMsg(subj, msg); err != nil { + if _, _, err := fs.StoreMsg(subj, nil, msg); err != nil { t.Fatalf("Error storing msg: %v", err) } state = fs.State() @@ -208,14 +243,14 @@ func TestFileStoreMsgLimit(t *testing.T) { t.Fatalf("Expected the first sequence to be 2 now, but got %d", state.FirstSeq) } // Make sure we can not lookup seq 1. - if _, _, _, err := fs.LoadMsg(1); err == nil { + if _, _, _, _, err := fs.LoadMsg(1); err == nil { t.Fatalf("Expected error looking up seq 1 but got none") } } func TestFileStoreBytesLimit(t *testing.T) { subj, msg := "foo", make([]byte, 512) - storedMsgSize := fileStoreMsgSize(subj, msg) + storedMsgSize := fileStoreMsgSize(subj, nil, msg) toStore := uint64(1024) maxBytes := storedMsgSize * toStore @@ -231,7 +266,7 @@ func TestFileStoreBytesLimit(t *testing.T) { defer fs.Stop() for i := uint64(0); i < toStore; i++ { - fs.StoreMsg(subj, msg) + fs.StoreMsg(subj, nil, msg) } state := fs.State() if state.Msgs != toStore { @@ -243,7 +278,7 @@ func TestFileStoreBytesLimit(t *testing.T) { // Now send 10 more and check that bytes limit enforced. for i := 0; i < 10; i++ { - if _, _, err := fs.StoreMsg(subj, msg); err != nil { + if _, _, err := fs.StoreMsg(subj, nil, msg); err != nil { t.Fatalf("Error storing msg: %v", err) } } @@ -279,7 +314,7 @@ func TestFileStoreAgeLimit(t *testing.T) { subj, msg := "foo", []byte("Hello World") toStore := 100 for i := 0; i < toStore; i++ { - fs.StoreMsg(subj, msg) + fs.StoreMsg(subj, nil, msg) } state := fs.State() if state.Msgs != uint64(toStore) { @@ -302,7 +337,7 @@ func TestFileStoreAgeLimit(t *testing.T) { checkExpired(t) // Now add some more and make sure that timer will fire again. for i := 0; i < toStore; i++ { - fs.StoreMsg(subj, msg) + fs.StoreMsg(subj, nil, msg) } state = fs.State() if state.Msgs != uint64(toStore) { @@ -326,10 +361,10 @@ func TestFileStoreTimeStamps(t *testing.T) { subj, msg := "foo", []byte("Hello World") for i := 0; i < 10; i++ { time.Sleep(5 * time.Millisecond) - fs.StoreMsg(subj, msg) + fs.StoreMsg(subj, nil, msg) } for seq := uint64(1); seq <= 10; seq++ { - _, _, ts, err := fs.LoadMsg(seq) + _, _, _, ts, err := fs.LoadMsg(seq) if err != nil { t.Fatalf("Unexpected error looking up msg: %v", err) } @@ -354,11 +389,11 @@ func TestFileStorePurge(t *testing.T) { defer fs.Stop() subj, msg := "foo", make([]byte, 8*1024) - storedMsgSize := fileStoreMsgSize(subj, msg) + storedMsgSize := fileStoreMsgSize(subj, nil, msg) toStore := uint64(1024) for i := uint64(0); i < toStore; i++ { - fs.StoreMsg(subj, msg) + fs.StoreMsg(subj, nil, msg) } state := fs.State() if state.Msgs != toStore { @@ -414,7 +449,7 @@ func TestFileStorePurge(t *testing.T) { // Now make sure we clean up any dangling purged messages. for i := uint64(0); i < toStore; i++ { - fs.StoreMsg(subj, msg) + fs.StoreMsg(subj, nil, msg) } state = fs.State() if state.Msgs != toStore { @@ -473,7 +508,7 @@ func TestFileStoreRemovePartialRecovery(t *testing.T) { subj, msg := "foo", []byte("Hello World") toStore := 100 for i := 0; i < toStore; i++ { - fs.StoreMsg(subj, msg) + fs.StoreMsg(subj, nil, msg) } state := fs.State() if state.Msgs != uint64(toStore) { @@ -519,7 +554,7 @@ func TestFileStoreRemoveOutOfOrderRecovery(t *testing.T) { subj, msg := "foo", []byte("Hello World") toStore := 100 for i := 0; i < toStore; i++ { - fs.StoreMsg(subj, msg) + fs.StoreMsg(subj, nil, msg) } state := fs.State() if state.Msgs != uint64(toStore) { @@ -538,11 +573,11 @@ func TestFileStoreRemoveOutOfOrderRecovery(t *testing.T) { t.Fatalf("Expected %d msgs, got %d", toStore/2, state.Msgs) } - if _, _, _, err := fs.LoadMsg(1); err != nil { + if _, _, _, _, err := fs.LoadMsg(1); err != nil { t.Fatalf("Expected to retrieve seq 1") } for i := 2; i <= toStore; i += 2 { - if _, _, _, err := fs.LoadMsg(uint64(i)); err == nil { + if _, _, _, _, err := fs.LoadMsg(uint64(i)); err == nil { t.Fatalf("Expected error looking up seq %d that should be deleted", i) } } @@ -561,11 +596,11 @@ func TestFileStoreRemoveOutOfOrderRecovery(t *testing.T) { t.Fatalf("Expected receovered states to be the same, got %+v vs %+v\n", state, state2) } - if _, _, _, err := fs.LoadMsg(1); err != nil { + if _, _, _, _, err := fs.LoadMsg(1); err != nil { t.Fatalf("Expected to retrieve seq 1") } for i := 2; i <= toStore; i += 2 { - if _, _, _, err := fs.LoadMsg(uint64(i)); err == nil { + if _, _, _, _, err := fs.LoadMsg(uint64(i)); err == nil { t.Fatalf("Expected error looking up seq %d that should be deleted", i) } } @@ -588,7 +623,7 @@ func TestFileStoreAgeLimitRecovery(t *testing.T) { subj, msg := "foo", []byte("Hello World") toStore := 100 for i := 0; i < toStore; i++ { - fs.StoreMsg(subj, msg) + fs.StoreMsg(subj, nil, msg) } state := fs.State() if state.Msgs != uint64(toStore) { @@ -630,7 +665,7 @@ func TestFileStoreBitRot(t *testing.T) { subj, msg := "foo", []byte("Hello World") toStore := 100 for i := 0; i < toStore; i++ { - fs.StoreMsg(subj, msg) + fs.StoreMsg(subj, nil, msg) } state := fs.State() if state.Msgs != uint64(toStore) { @@ -689,8 +724,8 @@ func TestFileStoreEraseMsg(t *testing.T) { defer fs.Stop() subj, msg := "foo", []byte("Hello World") - fs.StoreMsg(subj, msg) - _, smsg, _, err := fs.LoadMsg(1) + fs.StoreMsg(subj, nil, msg) + _, _, smsg, _, err := fs.LoadMsg(1) if err != nil { t.Fatalf("Unexpected error looking up msg: %v", err) } @@ -708,7 +743,7 @@ func TestFileStoreEraseMsg(t *testing.T) { } // Now look on disk as well. - rl := fileStoreMsgSize(subj, msg) + rl := fileStoreMsgSize(subj, nil, msg) buf := make([]byte, rl) fp, err := os.Open(path.Join(storeDir, msgDir, fmt.Sprintf(blkScan, 1))) if err != nil { @@ -716,7 +751,7 @@ func TestFileStoreEraseMsg(t *testing.T) { } defer fp.Close() fp.ReadAt(buf, sm.off) - nsubj, nmsg, seq, ts, err := msgFromBuf(buf, nil) + nsubj, _, nmsg, seq, ts, err := msgFromBuf(buf, nil) if err != nil { t.Fatalf("error reading message from block: %v", err) } @@ -748,7 +783,7 @@ func TestFileStoreEraseAndNoIndexRecovery(t *testing.T) { subj, msg := "foo", []byte("Hello World") toStore := 100 for i := 0; i < toStore; i++ { - fs.StoreMsg(subj, msg) + fs.StoreMsg(subj, nil, msg) } state := fs.State() if state.Msgs != uint64(toStore) { @@ -783,7 +818,7 @@ func TestFileStoreEraseAndNoIndexRecovery(t *testing.T) { } for i := 2; i <= toStore; i += 2 { - if _, _, _, err := fs.LoadMsg(uint64(i)); err == nil { + if _, _, _, _, err := fs.LoadMsg(uint64(i)); err == nil { t.Fatalf("Expected error looking up seq %d that should be erased", i) } } @@ -902,8 +937,8 @@ func TestFileStoreWriteAndReadSameBlock(t *testing.T) { subj, msg := "foo", []byte("Hello World!") for i := uint64(1); i <= 10; i++ { - fs.StoreMsg(subj, msg) - if _, _, _, err := fs.LoadMsg(i); err != nil { + fs.StoreMsg(subj, nil, msg) + if _, _, _, _, err := fs.LoadMsg(i); err != nil { t.Fatalf("Error loading %d: %v", i, err) } } @@ -915,7 +950,7 @@ func TestFileStoreAndRetrieveMultiBlock(t *testing.T) { defer os.RemoveAll(storeDir) subj, msg := "foo", []byte("Hello World!") - storedMsgSize := fileStoreMsgSize(subj, msg) + storedMsgSize := fileStoreMsgSize(subj, nil, msg) fs, err := newFileStore( FileStoreConfig{StoreDir: storeDir, BlockSize: 4 * storedMsgSize}, @@ -926,7 +961,7 @@ func TestFileStoreAndRetrieveMultiBlock(t *testing.T) { } for i := 0; i < 20; i++ { - fs.StoreMsg(subj, msg) + fs.StoreMsg(subj, nil, msg) } state := fs.State() if state.Msgs != 20 { @@ -944,7 +979,7 @@ func TestFileStoreAndRetrieveMultiBlock(t *testing.T) { defer fs.Stop() for i := uint64(1); i <= 20; i++ { - if _, _, _, err := fs.LoadMsg(i); err != nil { + if _, _, _, _, err := fs.LoadMsg(i); err != nil { t.Fatalf("Error loading %d: %v", i, err) } } @@ -956,7 +991,7 @@ func TestFileStoreCollapseDmap(t *testing.T) { defer os.RemoveAll(storeDir) subj, msg := "foo", []byte("Hello World!") - storedMsgSize := fileStoreMsgSize(subj, msg) + storedMsgSize := fileStoreMsgSize(subj, nil, msg) fs, err := newFileStore( FileStoreConfig{StoreDir: storeDir, BlockSize: 4 * storedMsgSize}, @@ -968,7 +1003,7 @@ func TestFileStoreCollapseDmap(t *testing.T) { defer fs.Stop() for i := 0; i < 10; i++ { - fs.StoreMsg(subj, msg) + fs.StoreMsg(subj, nil, msg) } state := fs.State() if state.Msgs != 10 { @@ -1027,7 +1062,7 @@ func TestFileStoreCollapseDmap(t *testing.T) { func TestFileStoreReadCache(t *testing.T) { subj, msg := "foo.bar", make([]byte, 1024) - storedMsgSize := fileStoreMsgSize(subj, msg) + storedMsgSize := fileStoreMsgSize(subj, nil, msg) storeDir, _ := ioutil.TempDir("", JetStreamStoreDir) os.MkdirAll(storeDir, 0755) @@ -1043,7 +1078,7 @@ func TestFileStoreReadCache(t *testing.T) { totalBytes := uint64(toStore) * storedMsgSize for i := 0; i < toStore; i++ { - fs.StoreMsg(subj, msg) + fs.StoreMsg(subj, nil, msg) } fs.LoadMsg(1) @@ -1090,7 +1125,7 @@ func TestFileStoreSnapshot(t *testing.T) { toSend := 2233 for i := 0; i < toSend; i++ { - fs.StoreMsg(subj, msg) + fs.StoreMsg(subj, nil, msg) } // Create a few consumers. @@ -1377,7 +1412,7 @@ func TestFileStorePerf(t *testing.T) { for i := 0; i < len(msg); i++ { msg[i] = 'D' } - storedMsgSize := fileStoreMsgSize(subj, msg) + storedMsgSize := fileStoreMsgSize(subj, nil, msg) // 5GB toStore := 5 * 1024 * 1024 * 1024 / storedMsgSize @@ -1402,7 +1437,7 @@ func TestFileStorePerf(t *testing.T) { start := time.Now() for i := 0; i < int(toStore); i++ { - fs.StoreMsg(subj, msg) + fs.StoreMsg(subj, nil, msg) } fs.Stop() @@ -1433,7 +1468,7 @@ func TestFileStorePerf(t *testing.T) { start = time.Now() for i := uint64(1); i <= toStore; i++ { - if _, _, _, err := fs.LoadMsg(i); err != nil { + if _, _, _, _, err := fs.LoadMsg(i); err != nil { t.Fatalf("Error loading %d: %v", i, err) } } @@ -1452,7 +1487,7 @@ func TestFileStorePerf(t *testing.T) { start = time.Now() for i := uint64(1); i <= toStore; i++ { - if _, _, _, err := fs.LoadMsg(i); err != nil { + if _, _, _, _, err := fs.LoadMsg(i); err != nil { t.Fatalf("Error loading %d: %v", i, err) } } @@ -1516,7 +1551,7 @@ func TestFileStoreReadBackMsgPerf(t *testing.T) { subj := "foo" msg := []byte("ABCDEFGH") // Smaller shows problems more. - storedMsgSize := fileStoreMsgSize(subj, msg) + storedMsgSize := fileStoreMsgSize(subj, nil, msg) // Make sure we store 2 blocks. toStore := defaultStreamBlockSize * 2 / storedMsgSize @@ -1542,7 +1577,7 @@ func TestFileStoreReadBackMsgPerf(t *testing.T) { start := time.Now() for i := 0; i < int(toStore); i++ { - fs.StoreMsg(subj, msg) + fs.StoreMsg(subj, nil, msg) } tt := time.Since(start) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 0404283b..35db0b90 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1044,7 +1044,7 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, subject, reply st return } - subj, msg, ts, err := mset.store.LoadMsg(req.Seq) + subj, hdr, msg, ts, err := mset.store.LoadMsg(req.Seq) if err != nil { resp.Error = jsError(err) s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp)) @@ -1053,6 +1053,7 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, subject, reply st resp.Message = &StoredMsg{ Subject: subj, Sequence: req.Seq, + Header: hdr, Data: msg, Time: time.Unix(0, ts), } @@ -1255,7 +1256,7 @@ func (s *Server) streamSnapshot(c *client, mset *Stream, sr *SnapshotResult, req chunk = chunk[:n] if err != nil { if n > 0 { - mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, chunk, nil, 0} + mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, chunk, nil, 0} } break } @@ -1272,12 +1273,13 @@ func (s *Server) streamSnapshot(c *client, mset *Stream, sr *SnapshotResult, req } // TODO(dlc) - Might want these moved off sendq if we have contention. ackReply := fmt.Sprintf("%s.%d.%d", ackSubj, len(chunk), index) - mset.sendq <- &jsPubMsg{reply, _EMPTY_, ackReply, chunk, nil, 0} + mset.sendq <- &jsPubMsg{reply, _EMPTY_, ackReply, nil, chunk, nil, 0} atomic.AddInt32(&out, int32(len(chunk))) } done: // Send last EOF - mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, nil, 0} + // TODO(dlc) - place hash in header + mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, nil, nil, 0} } // Request to create a durable consumer. diff --git a/server/memstore.go b/server/memstore.go index 69d70d6a..cec6e67f 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -34,6 +34,7 @@ type memStore struct { type storedMsg struct { subj string + hdr []byte msg []byte seq uint64 ts int64 // nanoseconds @@ -79,7 +80,7 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error { } // Store stores a message. -func (ms *memStore) StoreMsg(subj string, msg []byte) (uint64, int64, error) { +func (ms *memStore) StoreMsg(subj string, hdr, msg []byte) (uint64, int64, error) { ms.mu.Lock() // Check if we are discarding new messages when we reach the limit. @@ -109,11 +110,14 @@ func (ms *memStore) StoreMsg(subj string, msg []byte) (uint64, int64, error) { if len(msg) > 0 { msg = append(msg[:0:0], msg...) } + if len(hdr) > 0 { + hdr = append(hdr[:0:0], hdr...) + } startBytes := int64(ms.state.Bytes) - ms.msgs[seq] = &storedMsg{subj, msg, seq, ts} + ms.msgs[seq] = &storedMsg{subj, hdr, msg, seq, ts} ms.state.Msgs++ - ms.state.Bytes += memStoreMsgSize(subj, msg) + ms.state.Bytes += memStoreMsgSize(subj, hdr, msg) ms.state.LastSeq = seq ms.state.LastTime = now.UTC() @@ -254,7 +258,7 @@ func (ms *memStore) deleteFirstMsg() bool { } // LoadMsg will lookup the message by sequence number and return it if found. -func (ms *memStore) LoadMsg(seq uint64) (string, []byte, int64, error) { +func (ms *memStore) LoadMsg(seq uint64) (string, []byte, []byte, int64, error) { ms.mu.RLock() sm, ok := ms.msgs[seq] last := ms.state.LastSeq @@ -265,9 +269,9 @@ func (ms *memStore) LoadMsg(seq uint64) (string, []byte, int64, error) { if seq <= last { err = ErrStoreMsgNotFound } - return "", nil, 0, err + return "", nil, nil, 0, err } - return sm.subj, sm.msg, sm.ts, nil + return sm.subj, sm.hdr, sm.msg, sm.ts, nil } // RemoveMsg will remove the message from this store. @@ -297,7 +301,7 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool { delete(ms.msgs, seq) ms.state.Msgs-- - ss = memStoreMsgSize(sm.subj, sm.msg) + ss = memStoreMsgSize(sm.subj, sm.hdr, sm.msg) ms.state.Bytes -= ss if seq == ms.state.FirstSeq { var nsm *storedMsg @@ -316,8 +320,14 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool { ms.state.FirstTime = time.Time{} } } + if secure { - rand.Read(sm.msg) + if len(sm.hdr) > 0 { + rand.Read(sm.hdr) + } + if len(sm.msg) > 0 { + rand.Read(sm.msg) + } sm.seq = 0 } @@ -337,8 +347,8 @@ func (ms *memStore) State() StreamState { return state } -func memStoreMsgSize(subj string, msg []byte) uint64 { - return uint64(len(subj) + len(msg) + 16) // 8*2 for seq + age +func memStoreMsgSize(subj string, hdr, msg []byte) uint64 { + return uint64(len(subj) + len(hdr) + len(msg) + 16) // 8*2 for seq + age } // Delete is same as Stop for memory store. diff --git a/server/memstore_test.go b/server/memstore_test.go index a83a1c5b..e8f25807 100644 --- a/server/memstore_test.go +++ b/server/memstore_test.go @@ -28,7 +28,7 @@ func TestMemStoreBasics(t *testing.T) { subj, msg := "foo", []byte("Hello World") now := time.Now().UnixNano() - if seq, ts, err := ms.StoreMsg(subj, msg); err != nil { + if seq, ts, err := ms.StoreMsg(subj, nil, msg); err != nil { t.Fatalf("Error storing msg: %v", err) } else if seq != 1 { t.Fatalf("Expected sequence to be 1, got %d", seq) @@ -40,11 +40,11 @@ func TestMemStoreBasics(t *testing.T) { if state.Msgs != 1 { t.Fatalf("Expected 1 msg, got %d", state.Msgs) } - expectedSize := memStoreMsgSize(subj, msg) + expectedSize := memStoreMsgSize(subj, nil, msg) if state.Bytes != expectedSize { t.Fatalf("Expected %d bytes, got %d", expectedSize, state.Bytes) } - nsubj, nmsg, _, err := ms.LoadMsg(1) + nsubj, _, nmsg, _, err := ms.LoadMsg(1) if err != nil { t.Fatalf("Unexpected error looking up msg: %v", err) } @@ -63,13 +63,13 @@ func TestMemStoreMsgLimit(t *testing.T) { } subj, msg := "foo", []byte("Hello World") for i := 0; i < 10; i++ { - ms.StoreMsg(subj, msg) + ms.StoreMsg(subj, nil, msg) } state := ms.State() if state.Msgs != 10 { t.Fatalf("Expected %d msgs, got %d", 10, state.Msgs) } - if _, _, err := ms.StoreMsg(subj, msg); err != nil { + if _, _, err := ms.StoreMsg(subj, nil, msg); err != nil { t.Fatalf("Error storing msg: %v", err) } state = ms.State() @@ -83,14 +83,14 @@ func TestMemStoreMsgLimit(t *testing.T) { t.Fatalf("Expected the first sequence to be 2 now, but got %d", state.FirstSeq) } // Make sure we can not lookup seq 1. - if _, _, _, err := ms.LoadMsg(1); err == nil { + if _, _, _, _, err := ms.LoadMsg(1); err == nil { t.Fatalf("Expected error looking up seq 1 but got none") } } func TestMemStoreBytesLimit(t *testing.T) { subj, msg := "foo", make([]byte, 512) - storedMsgSize := memStoreMsgSize(subj, msg) + storedMsgSize := memStoreMsgSize(subj, nil, msg) toStore := uint64(1024) maxBytes := storedMsgSize * toStore @@ -101,7 +101,7 @@ func TestMemStoreBytesLimit(t *testing.T) { } for i := uint64(0); i < toStore; i++ { - ms.StoreMsg(subj, msg) + ms.StoreMsg(subj, nil, msg) } state := ms.State() if state.Msgs != toStore { @@ -113,7 +113,7 @@ func TestMemStoreBytesLimit(t *testing.T) { // Now send 10 more and check that bytes limit enforced. for i := 0; i < 10; i++ { - if _, _, err := ms.StoreMsg(subj, msg); err != nil { + if _, _, err := ms.StoreMsg(subj, nil, msg); err != nil { t.Fatalf("Error storing msg: %v", err) } } @@ -142,7 +142,7 @@ func TestMemStoreAgeLimit(t *testing.T) { subj, msg := "foo", []byte("Hello World") toStore := 100 for i := 0; i < toStore; i++ { - ms.StoreMsg(subj, msg) + ms.StoreMsg(subj, nil, msg) } state := ms.State() if state.Msgs != uint64(toStore) { @@ -165,7 +165,7 @@ func TestMemStoreAgeLimit(t *testing.T) { checkExpired(t) // Now add some more and make sure that timer will fire again. for i := 0; i < toStore; i++ { - ms.StoreMsg(subj, msg) + ms.StoreMsg(subj, nil, msg) } state = ms.State() if state.Msgs != uint64(toStore) { @@ -183,10 +183,10 @@ func TestMemStoreTimeStamps(t *testing.T) { subj, msg := "foo", []byte("Hello World") for i := 0; i < 10; i++ { time.Sleep(5 * time.Microsecond) - ms.StoreMsg(subj, msg) + ms.StoreMsg(subj, nil, msg) } for seq := uint64(1); seq <= 10; seq++ { - _, _, ts, err := ms.LoadMsg(seq) + _, _, _, ts, err := ms.LoadMsg(seq) if err != nil { t.Fatalf("Unexpected error looking up msg: %v", err) } @@ -204,8 +204,8 @@ func TestMemStoreEraseMsg(t *testing.T) { t.Fatalf("Unexpected error creating store: %v", err) } subj, msg := "foo", []byte("Hello World") - ms.StoreMsg(subj, msg) - _, smsg, _, err := ms.LoadMsg(1) + ms.StoreMsg(subj, nil, msg) + _, _, smsg, _, err := ms.LoadMsg(1) if err != nil { t.Fatalf("Unexpected error looking up msg: %v", err) } @@ -219,3 +219,34 @@ func TestMemStoreEraseMsg(t *testing.T) { t.Fatalf("Expected msg to be erased") } } + +func TestMemStoreMsgHeaders(t *testing.T) { + ms, err := newMemStore(&StreamConfig{Storage: MemoryStorage}) + if err != nil { + t.Fatalf("Unexpected error creating store: %v", err) + } + subj, hdr, msg := "foo", []byte("name:derek"), []byte("Hello World") + if sz := int(memStoreMsgSize(subj, hdr, msg)); sz != (len(subj) + len(hdr) + len(msg) + 16) { + t.Fatalf("Wrong size for stored msg with header") + } + ms.StoreMsg(subj, hdr, msg) + _, shdr, smsg, _, err := ms.LoadMsg(1) + if err != nil { + t.Fatalf("Unexpected error looking up msg: %v", err) + } + if !bytes.Equal(msg, smsg) { + t.Fatalf("Expected same msg, got %q vs %q", smsg, msg) + } + if !bytes.Equal(hdr, shdr) { + t.Fatalf("Expected same hdr, got %q vs %q", shdr, hdr) + } + if removed, _ := ms.EraseMsg(1); !removed { + t.Fatalf("Expected erase msg to return success") + } + if bytes.Equal(hdr, shdr) { + t.Fatalf("Expected hdr to be erased") + } + if bytes.Equal(msg, smsg) { + t.Fatalf("Expected msg to be erased") + } +} diff --git a/server/server.go b/server/server.go index 4f0c7de3..2e5bec36 100644 --- a/server/server.go +++ b/server/server.go @@ -988,6 +988,7 @@ func (s *Server) createInternalClient(kind int) *client { c := &client{srv: s, kind: kind, opts: internalOpts, msubs: -1, mpay: -1, start: now, last: now} c.initClient() c.echo = false + c.headers = true c.flags.set(noReconnect) return c } diff --git a/server/store.go b/server/store.go index 3af9bb51..6b50ae8c 100644 --- a/server/store.go +++ b/server/store.go @@ -48,11 +48,13 @@ var ( // ErrStoreSnapshotInProgress is returned when RemoveMsg or EraseMsg is called // while a snapshot is in progress. ErrStoreSnapshotInProgress = errors.New("snapshot in progress") + // ErrMsgTooBig is returned when a message is considered too large. + ErrMsgTooLarge = errors.New("message to large") ) type StreamStore interface { - StoreMsg(subj string, msg []byte) (uint64, int64, error) - LoadMsg(seq uint64) (subj string, msg []byte, ts int64, err error) + StoreMsg(subj string, hdr, msg []byte) (uint64, int64, error) + LoadMsg(seq uint64) (subj string, hdr, msg []byte, ts int64, err error) RemoveMsg(seq uint64) (bool, error) EraseMsg(seq uint64) (bool, error) Purge() uint64 diff --git a/server/stream.go b/server/stream.go index 888f0bd2..f370457a 100644 --- a/server/stream.go +++ b/server/stream.go @@ -204,7 +204,7 @@ func (mset *Stream) sendCreateAdvisory() { j, err := json.MarshalIndent(m, "", " ") if err == nil { subj := JSAdvisoryStreamCreatedPre + "." + name - sendq <- &jsPubMsg{subj, subj, _EMPTY_, j, nil, 0} + sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0} } } @@ -227,7 +227,7 @@ func (mset *Stream) sendDeleteAdvisoryLocked() { j, err := json.MarshalIndent(m, "", " ") if err == nil { subj := JSAdvisoryStreamDeletedPre + "." + mset.config.Name - mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, j, nil, 0} + mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0} } } @@ -249,7 +249,7 @@ func (mset *Stream) sendUpdateAdvisoryLocked() { j, err := json.MarshalIndent(m, "", " ") if err == nil { subj := JSAdvisoryStreamUpdatedPre + "." + mset.config.Name - mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, j, nil, 0} + mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0} } } @@ -598,7 +598,7 @@ func (mset *Stream) setupStore(fsCfg *FileStoreConfig) error { } // processInboundJetStreamMsg handles processing messages bound for a stream. -func (mset *Stream) processInboundJetStreamMsg(_ *subscription, _ *client, subject, reply string, msg []byte) { +func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subject, reply string, msg []byte) { mset.mu.Lock() store := mset.store c := mset.client @@ -619,17 +619,27 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, _ *client, subje return } - // Response to send. - var response []byte - var seq uint64 - var err error - var ts int64 + // Response Ack. + var ( + response []byte + seq uint64 + err error + ts int64 + ) + // Header support. + var hdr []byte + + // Check to see if we are over the account limit. if maxMsgSize >= 0 && len(msg) > maxMsgSize { response = []byte("-ERR 'message size exceeds maximum allowed'") } else { - // Check to see if we are over the account limit. - seq, ts, err = store.StoreMsg(subject, msg) + // Headers. + if pc != nil && pc.pa.hdr > 0 { + hdr = msg[:pc.pa.hdr] + msg = msg[pc.pa.hdr:] + } + seq, ts, err = store.StoreMsg(subject, hdr, msg) if err != nil { if err != ErrStoreClosed { c.Errorf("JetStream failed to store a msg on account: %q stream: %q - %v", accName, name, err) @@ -648,14 +658,14 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, _ *client, subje // Send response here. if doAck && len(reply) > 0 { - mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, response, nil, 0} + mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0} } if err == nil && numConsumers > 0 && seq > 0 { var needSignal bool mset.mu.Lock() for _, o := range mset.consumers { - if !o.deliverCurrentMsg(subject, msg, seq, ts) { + if !o.deliverCurrentMsg(subject, hdr, msg, seq, ts) { needSignal = true } } @@ -681,6 +691,7 @@ type jsPubMsg struct { subj string dsubj string reply string + hdr []byte msg []byte o *Consumer seq uint64 @@ -690,7 +701,8 @@ type jsPubMsg struct { type StoredMsg struct { Subject string `json:"subject"` Sequence uint64 `json:"seq"` - Data []byte `json:"data"` + Header []byte `json:"hdrs,omitempty"` + Data []byte `json:"data,omitempty"` Time time.Time `json:"time"` } @@ -745,10 +757,19 @@ func (mset *Stream) internalSendLoop() { } c.pa.subject = []byte(pm.subj) c.pa.deliver = []byte(pm.dsubj) - c.pa.size = len(pm.msg) + c.pa.size = len(pm.msg) + len(pm.hdr) c.pa.szb = []byte(strconv.Itoa(c.pa.size)) c.pa.reply = []byte(pm.reply) - msg := append(pm.msg, _CRLF_...) + + var msg []byte + if len(pm.hdr) > 0 { + c.pa.hdr = len(pm.hdr) + c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr)) + msg = append(pm.hdr, pm.msg...) + msg = append(msg, _CRLF_...) + } else { + msg = append(pm.msg, _CRLF_...) + } didDeliver := c.processInboundClientMsg(msg) c.pa.szb = nil c.flushClients(0) @@ -824,13 +845,14 @@ func (mset *Stream) stop(delete bool) error { } func (mset *Stream) GetMsg(seq uint64) (*StoredMsg, error) { - subj, msg, ts, err := mset.store.LoadMsg(seq) + subj, hdr, msg, ts, err := mset.store.LoadMsg(seq) if err != nil { return nil, err } sm := &StoredMsg{ Subject: subj, Sequence: seq, + Header: hdr, Data: msg, Time: time.Unix(0, ts).UTC(), } diff --git a/test/jetstream_test.go b/test/jetstream_test.go index f16ecbe5..b518f6d6 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -6155,6 +6155,105 @@ func TestJetStreamNextMsgNoInterest(t *testing.T) { } } +func TestJetStreamMsgHeaders(t *testing.T) { + cases := []struct { + name string + mconfig *server.StreamConfig + }{ + {name: "MemoryStore", + mconfig: &server.StreamConfig{ + Name: "foo", + Retention: server.LimitsPolicy, + MaxAge: time.Hour, + Storage: server.MemoryStorage, + Replicas: 1, + }}, + {name: "FileStore", + mconfig: &server.StreamConfig{ + Name: "foo", + Retention: server.LimitsPolicy, + MaxAge: time.Hour, + Storage: server.FileStorage, + Replicas: 1, + }}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + mset, err := s.GlobalAccount().AddStream(c.mconfig) + if err != nil { + t.Fatalf("Unexpected error adding stream: %v", err) + } + defer mset.Delete() + + nc := clientConnectToServer(t, s) + defer nc.Close() + + m := nats.NewMsg("foo") + m.Header.Add("Accept-Encoding", "json") + m.Header.Add("Authorization", "s3cr3t") + m.Data = []byte("Hello JetStream Headers - #1!") + + nc.PublishMsg(m) + nc.Flush() + + state := mset.State() + if state.Msgs != 1 { + t.Fatalf("Expected 1 message, got %d", state.Msgs) + } + if state.Bytes == 0 { + t.Fatalf("Expected non-zero bytes") + } + + // Now access raw from stream. + sm, err := mset.GetMsg(1) + if err != nil { + t.Fatalf("Unexpected error getting stored message: %v", err) + } + // Calculate the []byte version of the headers. + var b bytes.Buffer + b.WriteString("NATS/1.0\r\n") + m.Header.Write(&b) + b.WriteString("\r\n") + hdr := b.Bytes() + + if !bytes.Equal(sm.Header, hdr) { + t.Fatalf("Message headers do not match, %q vs %q", hdr, sm.Header) + } + if !bytes.Equal(sm.Data, m.Data) { + t.Fatalf("Message data do not match, %q vs %q", m.Data, sm.Data) + } + + // Now do consumer based. + sub, _ := nc.SubscribeSync(nats.NewInbox()) + defer sub.Unsubscribe() + nc.Flush() + + o, err := mset.AddConsumer(&server.ConsumerConfig{DeliverSubject: sub.Subject}) + if err != nil { + t.Fatalf("Expected no error with registered interest, got %v", err) + } + defer o.Delete() + + cm, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error getting message: %v", err) + } + // Remove reply subject and sub for comparison. + cm.Sub, cm.Reply = nil, "" + if !reflect.DeepEqual(cm, m) { + t.Fatalf("Messages do not match: %+v vs %+v", cm, m) + } + }) + } +} + func TestJetStreamTemplateBasics(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown() diff --git a/vendor/github.com/nats-io/nats.go/enc.go b/vendor/github.com/nats-io/nats.go/enc.go index 0ed71a2c..ee832f69 100644 --- a/vendor/github.com/nats-io/nats.go/enc.go +++ b/vendor/github.com/nats-io/nats.go/enc.go @@ -93,7 +93,7 @@ func (c *EncodedConn) Publish(subject string, v interface{}) error { if err != nil { return err } - return c.Conn.publish(subject, _EMPTY_, b) + return c.Conn.publish(subject, _EMPTY_, nil, b) } // PublishRequest will perform a Publish() expecting a response on the @@ -104,7 +104,7 @@ func (c *EncodedConn) PublishRequest(subject, reply string, v interface{}) error if err != nil { return err } - return c.Conn.publish(subject, reply, b) + return c.Conn.publish(subject, reply, nil, b) } // Request will create an Inbox and perform a Request() call diff --git a/vendor/github.com/nats-io/nats.go/go.mod b/vendor/github.com/nats-io/nats.go/go.mod index bd7d44a2..f75b7c5e 100644 --- a/vendor/github.com/nats-io/nats.go/go.mod +++ b/vendor/github.com/nats-io/nats.go/go.mod @@ -1,7 +1,10 @@ module github.com/nats-io/nats.go require ( - github.com/nats-io/jwt v0.3.2 + github.com/golang/protobuf v1.4.2 + github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7 + github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093 github.com/nats-io/nkeys v0.1.4 github.com/nats-io/nuid v1.0.1 + google.golang.org/protobuf v1.23.0 ) diff --git a/vendor/github.com/nats-io/nats.go/go.sum b/vendor/github.com/nats-io/nats.go/go.sum index 70e81d40..2a7fe904 100644 --- a/vendor/github.com/nats-io/nats.go/go.sum +++ b/vendor/github.com/nats-io/nats.go/go.sum @@ -1,5 +1,23 @@ +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/minio/highwayhash v1.0.0 h1:iMSDhgUILCr0TNm8LWlSjF8N0ZIj2qbO8WHp6Q/J2BA= +github.com/minio/highwayhash v1.0.0/go.mod h1:xQboMTeM9nY9v/LlAOxFctujiv5+Aq2hR5dxBpaMbdc= github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI= github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= +github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7 h1:RnGotxlghqR5D2KDAu4TyuLqyjuylOsJiAFhXvMvQIc= +github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M= +github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093 h1:ii4KAXLYB3f7A6VnBhRWsYP+x45C+GAXh5T2VQWLfgQ= +github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093/go.mod h1:rQnBf2Rv4P9adtAs/Ti6LfFmVtFG6HLhl/H7cVshcJU= +github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.4 h1:aEsHIssIk6ETN5m2/MD8Y4B2X7FfXrBAUdkyRvbVYzA= github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= @@ -10,6 +28,18 @@ golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 h1:3zb4D3T4G8jdExgVU/95+vQXfpEPiMdCaZgmGVxjNHM= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7 h1:HmbHVPwrPEKPGLAcHSrMe6+hqSUlvZU0rab6x5EXfGU= +golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= diff --git a/vendor/github.com/nats-io/nats.go/nats.go b/vendor/github.com/nats-io/nats.go/nats.go index d0d569cc..32640367 100644 --- a/vendor/github.com/nats-io/nats.go/nats.go +++ b/vendor/github.com/nats-io/nats.go/nats.go @@ -27,6 +27,8 @@ import ( "io/ioutil" "math/rand" "net" + "net/http" + "net/textproto" "net/url" "os" "path/filepath" @@ -45,7 +47,7 @@ import ( // Default Constants const ( - Version = "1.10.0" + Version = "1.11.0" DefaultURL = "nats://127.0.0.1:4222" DefaultPort = 4222 DefaultMaxReconnect = 60 @@ -120,6 +122,8 @@ var ( ErrMsgNoReply = errors.New("nats: message does not have a reply") ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server") ErrDisconnected = errors.New("nats: server is disconnected") + ErrHeadersNotSupported = errors.New("nats: headers not supported by this server") + ErrBadHeaderMsg = errors.New("nats: message could not decode headers") ) func init() { @@ -382,6 +386,12 @@ type Options struct { // callbacks after Close() is called. Client won't receive notifications // when Close is invoked by user code. Default is to invoke the callbacks. NoCallbacksAfterClientClose bool + + // LameDuckModeHandler sets the callback to invoke when the server notifies + // the connection that it entered lame duck mode, that is, going to + // gradually disconnect all its connections before shuting down. This is + // often used in deployments when upgrading NATS Servers. + LameDuckModeHandler ConnHandler } const ( @@ -494,6 +504,7 @@ type Subscription struct { type Msg struct { Subject string Reply string + Header http.Header Data []byte Sub *Subscription next *Msg @@ -525,6 +536,7 @@ type srv struct { tlsName string } +// The INFO block received from the server. type serverInfo struct { ID string `json:"server_id"` Host string `json:"host"` @@ -532,12 +544,14 @@ type serverInfo struct { Version string `json:"version"` AuthRequired bool `json:"auth_required"` TLSRequired bool `json:"tls_required"` + Headers bool `json:"headers"` MaxPayload int64 `json:"max_payload"` ConnectURLs []string `json:"connect_urls,omitempty"` Proto int `json:"proto,omitempty"` CID uint64 `json:"client_id,omitempty"` ClientIP string `json:"client_ip,omitempty"` Nonce string `json:"nonce,omitempty"` + LameDuckMode bool `json:"ldm,omitempty"` } const ( @@ -564,6 +578,7 @@ type connectInfo struct { Version string `json:"version"` Protocol int `json:"protocol"` Echo bool `json:"echo"` + Headers bool `json:"headers"` } // MsgHandler is a callback function that processes messages delivered to @@ -947,6 +962,17 @@ func NoCallbacksAfterClientClose() Option { } } +// LameDuckModeHandler sets the callback to invoke when the server notifies +// the connection that it entered lame duck mode, that is, going to +// gradually disconnect all its connections before shuting down. This is +// often used in deployments when upgrading NATS Servers. +func LameDuckModeHandler(cb ConnHandler) Option { + return func(o *Options) error { + o.LameDuckModeHandler = cb + return nil + } +} + // Handler processing // SetDisconnectHandler will set the disconnect event handler. @@ -1077,10 +1103,11 @@ func (o Options) Connect() (*Conn, error) { } const ( - _CRLF_ = "\r\n" - _EMPTY_ = "" - _SPC_ = " " - _PUB_P_ = "PUB " + _CRLF_ = "\r\n" + _EMPTY_ = "" + _SPC_ = " " + _PUB_P_ = "PUB " + _HPUB_P_ = "HPUB " ) const ( @@ -1091,12 +1118,12 @@ const ( ) const ( - conProto = "CONNECT %s" + _CRLF_ - pingProto = "PING" + _CRLF_ - pongProto = "PONG" + _CRLF_ - subProto = "SUB %s %s %d" + _CRLF_ - unsubProto = "UNSUB %d %s" + _CRLF_ - okProto = _OK_OP_ + _CRLF_ + connectProto = "CONNECT %s" + _CRLF_ + pingProto = "PING" + _CRLF_ + pongProto = "PONG" + _CRLF_ + subProto = "SUB %s %s %d" + _CRLF_ + unsubProto = "UNSUB %d %s" + _CRLF_ + okProto = _OK_OP_ + _CRLF_ ) // Return the currently selected server @@ -1444,9 +1471,9 @@ func (nc *Conn) setup() { nc.fch = make(chan struct{}, flushChanSize) nc.rqch = make(chan struct{}) - // Setup scratch outbound buffer for PUB - pub := nc.scratch[:len(_PUB_P_)] - copy(pub, _PUB_P_) + // Setup scratch outbound buffer for PUB/HPUB + pub := nc.scratch[:len(_HPUB_P_)] + copy(pub, _HPUB_P_) } // Process a connected connection and initialize properly. @@ -1660,7 +1687,7 @@ func (nc *Conn) connectProto() (string, error) { } cinfo := connectInfo{o.Verbose, o.Pedantic, ujwt, nkey, sig, user, pass, token, - o.Secure, o.Name, LangString, Version, clientProtoInfo, !o.NoEcho} + o.Secure, o.Name, LangString, Version, clientProtoInfo, !o.NoEcho, true} b, err := json.Marshal(cinfo) if err != nil { @@ -1672,7 +1699,7 @@ func (nc *Conn) connectProto() (string, error) { return _EMPTY_, ErrNoEchoNotSupported } - return fmt.Sprintf(conProto, b), nil + return fmt.Sprintf(connectProto, b), nil } // normalizeErr removes the prefix -ERR, trim spaces and remove the quotes. @@ -2264,8 +2291,27 @@ func (nc *Conn) processMsg(data []byte) { msgPayload := make([]byte, len(data)) copy(msgPayload, data) + // Check if we have headers encoded here. + var h http.Header + var err error + + if nc.ps.ma.hdr > 0 { + hbuf := msgPayload[:nc.ps.ma.hdr] + msgPayload = msgPayload[nc.ps.ma.hdr:] + h, err = decodeHeadersMsg(hbuf) + if err != nil { + // We will pass the message through but send async error. + nc.mu.Lock() + nc.err = ErrBadHeaderMsg + if nc.Opts.AsyncErrorCB != nil { + nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, sub, ErrBadHeaderMsg) }) + } + nc.mu.Unlock() + } + } + // FIXME(dlc): Should we recycle these containers? - m := &Msg{Data: msgPayload, Subject: subj, Reply: reply, Sub: sub} + m := &Msg{Header: h, Data: msgPayload, Subject: subj, Reply: reply, Sub: sub} sub.mu.Lock() @@ -2457,6 +2503,9 @@ func (nc *Conn) processInfo(info string) error { // did not include themselves in the async INFO protocol. // If empty, do not remove the implicit servers from the pool. if len(ncInfo.ConnectURLs) == 0 { + if ncInfo.LameDuckMode && nc.Opts.LameDuckModeHandler != nil { + nc.ach.push(func() { nc.Opts.LameDuckModeHandler(nc) }) + } return nil } // Note about pool randomization: when the pool was first created, @@ -2517,7 +2566,9 @@ func (nc *Conn) processInfo(info string) error { nc.ach.push(func() { nc.Opts.DiscoveredServersCB(nc) }) } } - + if ncInfo.LameDuckMode && nc.Opts.LameDuckModeHandler != nil { + nc.ach.push(func() { nc.Opts.LameDuckModeHandler(nc) }) + } return nil } @@ -2601,7 +2652,34 @@ func (nc *Conn) kickFlusher() { // argument is left untouched and needs to be correctly interpreted on // the receiver. func (nc *Conn) Publish(subj string, data []byte) error { - return nc.publish(subj, _EMPTY_, data) + return nc.publish(subj, _EMPTY_, nil, data) +} + +// Used to create a new message for publishing that will use headers. +func NewMsg(subject string) *Msg { + return &Msg{ + Subject: subject, + Header: make(http.Header), + } +} + +const ( + hdrLine = "NATS/1.0\r\n" + crlf = "\r\n" + hdrPreEnd = len(hdrLine) - len(crlf) +) + +// decodeHeadersMsg will decode and headers. +func decodeHeadersMsg(data []byte) (http.Header, error) { + tp := textproto.NewReader(bufio.NewReader(bytes.NewReader(data))) + if l, err := tp.ReadLine(); err != nil || l != hdrLine[:hdrPreEnd] { + return nil, ErrBadHeaderMsg + } + mh, err := tp.ReadMIMEHeader() + if err != nil { + return nil, ErrBadHeaderMsg + } + return http.Header(mh), nil } // PublishMsg publishes the Msg structure, which includes the @@ -2610,14 +2688,26 @@ func (nc *Conn) PublishMsg(m *Msg) error { if m == nil { return ErrInvalidMsg } - return nc.publish(m.Subject, m.Reply, m.Data) + var hdr []byte + if len(m.Header) > 0 { + if !nc.info.Headers { + return ErrHeadersNotSupported + } + // FIXME(dlc) - Optimize + var b bytes.Buffer + b.WriteString(hdrLine) + m.Header.Write(&b) + b.WriteString(crlf) + hdr = b.Bytes() + } + return nc.publish(m.Subject, m.Reply, hdr, m.Data) } -// PublishRequest will perform a Publish() excpecting a response on the +// PublishRequest will perform a Publish() expecting a response on the // reply subject. Use Request() for automatically waiting for a response // inline. func (nc *Conn) PublishRequest(subj, reply string, data []byte) error { - return nc.publish(subj, reply, data) + return nc.publish(subj, reply, nil, data) } // Used for handrolled itoa @@ -2626,7 +2716,7 @@ const digits = "0123456789" // publish is the internal function to publish messages to a nats-server. // Sends a protocol data message by queuing into the bufio writer // and kicking the flush go routine. These writes should be protected. -func (nc *Conn) publish(subj, reply string, data []byte) error { +func (nc *Conn) publish(subj, reply string, hdr, data []byte) error { if nc == nil { return ErrInvalidConnection } @@ -2646,7 +2736,7 @@ func (nc *Conn) publish(subj, reply string, data []byte) error { } // Proactively reject payloads over the threshold set by server. - msgSize := int64(len(data)) + msgSize := int64(len(data) + len(hdr)) if msgSize > nc.info.MaxPayload { nc.mu.Unlock() return ErrMaxPayload @@ -2664,37 +2754,65 @@ func (nc *Conn) publish(subj, reply string, data []byte) error { } } - msgh := nc.scratch[:len(_PUB_P_)] - msgh = append(msgh, subj...) - msgh = append(msgh, ' ') + var mh []byte + if hdr != nil { + mh = nc.scratch[:len(_HPUB_P_)] + } else { + mh = nc.scratch[1:len(_HPUB_P_)] + } + mh = append(mh, subj...) + mh = append(mh, ' ') if reply != "" { - msgh = append(msgh, reply...) - msgh = append(msgh, ' ') + mh = append(mh, reply...) + mh = append(mh, ' ') } // We could be smarter here, but simple loop is ok, - // just avoid strconv in fast path + // just avoid strconv in fast path. // FIXME(dlc) - Find a better way here. // msgh = strconv.AppendInt(msgh, int64(len(data)), 10) + // go 1.14 some values strconv faster, may be able to switch over. var b [12]byte var i = len(b) - if len(data) > 0 { - for l := len(data); l > 0; l /= 10 { - i -= 1 + + if hdr != nil { + if len(hdr) > 0 { + for l := len(hdr); l > 0; l /= 10 { + i-- + b[i] = digits[l%10] + } + } else { + i-- + b[i] = digits[0] + } + mh = append(mh, b[i:]...) + mh = append(mh, ' ') + // reset for below. + i = len(b) + } + + if msgSize > 0 { + for l := msgSize; l > 0; l /= 10 { + i-- b[i] = digits[l%10] } } else { - i -= 1 + i-- b[i] = digits[0] } - msgh = append(msgh, b[i:]...) - msgh = append(msgh, _CRLF_...) + mh = append(mh, b[i:]...) + mh = append(mh, _CRLF_...) - _, err := nc.bw.Write(msgh) + _, err := nc.bw.Write(mh) if err == nil { - _, err = nc.bw.Write(data) + if hdr != nil { + _, err = nc.bw.Write(hdr) + } + if err == nil { + _, err = nc.bw.Write(data) + } } if err == nil { _, err = nc.bw.WriteString(_CRLF_) @@ -2705,7 +2823,7 @@ func (nc *Conn) publish(subj, reply string, data []byte) error { } nc.OutMsgs++ - nc.OutBytes += uint64(len(data)) + nc.OutBytes += uint64(len(data) + len(hdr)) if len(nc.fch) == 0 { nc.kickFlusher() diff --git a/vendor/github.com/nats-io/nats.go/parser.go b/vendor/github.com/nats-io/nats.go/parser.go index 630142a3..77bfe05e 100644 --- a/vendor/github.com/nats-io/nats.go/parser.go +++ b/vendor/github.com/nats-io/nats.go/parser.go @@ -1,4 +1,4 @@ -// Copyright 2012-2018 The NATS Authors +// Copyright 2012-2020 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -21,6 +21,7 @@ type msgArg struct { subject []byte reply []byte sid int64 + hdr int size int } @@ -30,6 +31,7 @@ type parseState struct { state int as int drop int + hdr int ma msgArg argBuf []byte msgBuf []byte @@ -54,6 +56,7 @@ const ( MSG_ARG MSG_PAYLOAD MSG_END + OP_H OP_P OP_PI OP_PIN @@ -83,6 +86,10 @@ func (nc *Conn) parse(buf []byte) error { switch b { case 'M', 'm': nc.ps.state = OP_M + nc.ps.hdr = -1 + case 'H', 'h': + nc.ps.state = OP_H + nc.ps.hdr = 0 case 'P', 'p': nc.ps.state = OP_P case '+': @@ -94,6 +101,13 @@ func (nc *Conn) parse(buf []byte) error { default: goto parseErr } + case OP_H: + switch b { + case 'M', 'm': + nc.ps.state = OP_M + default: + goto parseErr + } case OP_M: switch b { case 'S', 's': @@ -140,8 +154,7 @@ func (nc *Conn) parse(buf []byte) error { nc.ps.drop, nc.ps.as, nc.ps.state = 0, i+1, MSG_PAYLOAD // jump ahead with the index. If this overruns - // what is left we fall out and process split - // buffer. + // what is left we fall out and process a split buffer. i = nc.ps.as + nc.ps.ma.size - 1 default: if nc.ps.argBuf != nil { @@ -415,6 +428,11 @@ func (nc *Conn) cloneMsgArg() { const argsLenMax = 4 func (nc *Conn) processMsgArgs(arg []byte) error { + // Use separate function for header based messages. + if nc.ps.hdr >= 0 { + return nc.processHeaderMsgArgs(arg) + } + // Unroll splitArgs to avoid runtime/heap issues a := [argsLenMax][]byte{} args := a[:0] @@ -459,6 +477,57 @@ func (nc *Conn) processMsgArgs(arg []byte) error { return nil } +// processHeaderMsgArgs is for a header based message. +func (nc *Conn) processHeaderMsgArgs(arg []byte) error { + // Unroll splitArgs to avoid runtime/heap issues + a := [argsLenMax][]byte{} + args := a[:0] + start := -1 + for i, b := range arg { + switch b { + case ' ', '\t', '\r', '\n': + if start >= 0 { + args = append(args, arg[start:i]) + start = -1 + } + default: + if start < 0 { + start = i + } + } + } + if start >= 0 { + args = append(args, arg[start:]) + } + + switch len(args) { + case 4: + nc.ps.ma.subject = args[0] + nc.ps.ma.sid = parseInt64(args[1]) + nc.ps.ma.reply = nil + nc.ps.ma.hdr = int(parseInt64(args[2])) + nc.ps.ma.size = int(parseInt64(args[3])) + case 5: + nc.ps.ma.subject = args[0] + nc.ps.ma.sid = parseInt64(args[1]) + nc.ps.ma.reply = args[2] + nc.ps.ma.hdr = int(parseInt64(args[3])) + nc.ps.ma.size = int(parseInt64(args[4])) + default: + return fmt.Errorf("nats: processHeaderMsgArgs Parse Error: '%s'", arg) + } + if nc.ps.ma.sid < 0 { + return fmt.Errorf("nats: processHeaderMsgArgs Bad or Missing Sid: '%s'", arg) + } + if nc.ps.ma.hdr < 0 || nc.ps.ma.hdr > nc.ps.ma.size { + return fmt.Errorf("nats: processHeaderMsgArgs Bad or Missing Header Size: '%s'", arg) + } + if nc.ps.ma.size < 0 { + return fmt.Errorf("nats: processHeaderMsgArgs Bad or Missing Size: '%s'", arg) + } + return nil +} + // Ascii numbers 0-9 const ( ascii_0 = 48 diff --git a/vendor/modules.txt b/vendor/modules.txt index bd74d26c..ef738707 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -4,7 +4,7 @@ github.com/minio/highwayhash # github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7 ## explicit github.com/nats-io/jwt -# github.com/nats-io/nats.go v1.10.0 +# github.com/nats-io/nats.go v1.10.1-0.20200529175407-3559350e3ada ## explicit github.com/nats-io/nats.go github.com/nats-io/nats.go/encoders/builtin