First pass header support for JetStream

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-05-30 10:04:23 -07:00
parent 2bbe901187
commit eca04c6fce
19 changed files with 692 additions and 189 deletions

2
go.mod
View File

@@ -5,7 +5,7 @@ go 1.14
require (
github.com/minio/highwayhash v1.0.0
github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7
github.com/nats-io/nats.go v1.10.0
github.com/nats-io/nats.go v1.10.1-0.20200529175407-3559350e3ada
github.com/nats-io/nkeys v0.1.4
github.com/nats-io/nuid v1.0.1
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59

23
go.sum
View File

@@ -1,10 +1,24 @@
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/minio/highwayhash v1.0.0 h1:iMSDhgUILCr0TNm8LWlSjF8N0ZIj2qbO8WHp6Q/J2BA=
github.com/minio/highwayhash v1.0.0/go.mod h1:xQboMTeM9nY9v/LlAOxFctujiv5+Aq2hR5dxBpaMbdc=
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7 h1:RnGotxlghqR5D2KDAu4TyuLqyjuylOsJiAFhXvMvQIc=
github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093/go.mod h1:rQnBf2Rv4P9adtAs/Ti6LfFmVtFG6HLhl/H7cVshcJU=
github.com/nats-io/nats.go v1.10.0 h1:L8qnKaofSfNFbXg0C5F71LdjPRnmQwSsA4ukmkt1TvY=
github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nats.go v1.10.1-0.20200529175407-3559350e3ada h1:ZV/q9/eCPmjwQXFob3GWT3MlTyKlKPB8rcRdr8/dpPw=
github.com/nats-io/nats.go v1.10.1-0.20200529175407-3559350e3ada/go.mod h1:C4MWF84vZCs3asgTFW+IL61SqxUZZ+YcZ8VRnqw3pGY=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.4 h1:aEsHIssIk6ETN5m2/MD8Y4B2X7FfXrBAUdkyRvbVYzA=
github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
@@ -21,3 +35,12 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7 h1:HmbHVPwrPEKPGLAcHSrMe6+hqSUlvZU0rab6x5EXfGU=
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=

View File

@@ -2,6 +2,4 @@
JetStream is the [NATS.io](https://nats.io) persistence engine that will support streaming as well as traditional message and worker queues for At-Least-Once delivery semantics.
JetStream is composed of two major components, Message Sets and Observables. Message Sets determine the interest set, global ordering, retention policy, replicas and resource limits. Observables define how Message Sets are consumed, and have quite a few options.
Information about testing the JetStream Technical Preview can be found [here](https://github.com/nats-io/jetstream#readme)
More information about testing the JetStream Technical Preview can be found [here](https://github.com/nats-io/jetstream#readme)

View File

@@ -489,7 +489,7 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) {
func (o *Consumer) sendAdvisory(subj string, msg []byte) {
if o.mset != nil && o.mset.sendq != nil {
o.mu.Unlock()
o.mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, msg, nil, 0}
o.mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, msg, nil, 0}
o.mu.Lock()
}
}
@@ -1000,8 +1000,8 @@ func (o *Consumer) processNextMsgReq(_ *subscription, _ *client, _, reply string
if o.replay {
o.waiting = append(o.waiting, reply)
shouldSignal = true
} else if subj, msg, seq, dc, ts, err := o.getNextMsg(); err == nil {
o.deliverMsg(reply, subj, msg, seq, dc, ts)
} else if subj, hdr, msg, seq, dc, ts, err := o.getNextMsg(); err == nil {
o.deliverMsg(reply, subj, hdr, msg, seq, dc, ts)
} else {
o.waiting = append(o.waiting, reply)
}
@@ -1058,9 +1058,9 @@ func (o *Consumer) isFilteredMatch(subj string) bool {
// Get next available message from underlying store.
// Is partition aware and redeliver aware.
// Lock should be held.
func (o *Consumer) getNextMsg() (subj string, msg []byte, seq uint64, dcount uint64, ts int64, err error) {
func (o *Consumer) getNextMsg() (subj string, hdr, msg []byte, seq uint64, dcount uint64, ts int64, err error) {
if o.mset == nil {
return _EMPTY_, nil, 0, 0, 0, fmt.Errorf("consumer not valid")
return _EMPTY_, nil, nil, 0, 0, 0, fmt.Errorf("consumer not valid")
}
for {
seq, dcount := o.sseq, uint64(1)
@@ -1078,7 +1078,7 @@ func (o *Consumer) getNextMsg() (subj string, msg []byte, seq uint64, dcount uin
continue
}
}
subj, msg, ts, err := o.mset.store.LoadMsg(seq)
subj, hdr, msg, ts, err := o.mset.store.LoadMsg(seq)
if err == nil {
if dcount == 1 { // First delivery.
o.sseq++
@@ -1087,12 +1087,12 @@ func (o *Consumer) getNextMsg() (subj string, msg []byte, seq uint64, dcount uin
}
}
// We have the msg here.
return subj, msg, seq, dcount, ts, nil
return subj, hdr, msg, seq, dcount, ts, nil
}
// We got an error here. If this is an EOF we will return, otherwise
// we can continue looking.
if err == ErrStoreEOF || err == ErrStoreClosed {
return "", nil, 0, 0, 0, err
return _EMPTY_, nil, nil, 0, 0, 0, err
}
// Skip since its probably deleted or expired.
o.sseq++
@@ -1136,6 +1136,7 @@ func (o *Consumer) loopAndDeliverMsgs(s *Server, a *Account) {
mset *Stream
seq, dcnt uint64
subj, dsubj string
hdr []byte
msg []byte
err error
ts int64
@@ -1160,7 +1161,7 @@ func (o *Consumer) loopAndDeliverMsgs(s *Server, a *Account) {
goto waitForMsgs
}
subj, msg, seq, dcnt, ts, err = o.getNextMsg()
subj, hdr, msg, seq, dcnt, ts, err = o.getNextMsg()
// On error either wait or return.
if err != nil {
@@ -1195,7 +1196,7 @@ func (o *Consumer) loopAndDeliverMsgs(s *Server, a *Account) {
// Track this regardless.
lts = ts
o.deliverMsg(dsubj, subj, msg, seq, dcnt, ts)
o.deliverMsg(dsubj, subj, hdr, msg, seq, dcnt, ts)
o.mu.Unlock()
continue
@@ -1217,7 +1218,7 @@ func (o *Consumer) ackReply(sseq, dseq, dcount uint64, ts int64) string {
// deliverCurrentMsg is the hot path to deliver a message that was just received.
// Will return if the message was delivered or not.
func (o *Consumer) deliverCurrentMsg(subj string, msg []byte, seq uint64, ts int64) bool {
func (o *Consumer) deliverCurrentMsg(subj string, hdr, msg []byte, seq uint64, ts int64) bool {
o.mu.Lock()
if seq != o.sseq {
o.mu.Unlock()
@@ -1257,7 +1258,7 @@ func (o *Consumer) deliverCurrentMsg(subj string, msg []byte, seq uint64, ts int
msg = append(msg[:0:0], msg...)
}
o.deliverMsg(dsubj, subj, msg, seq, 1, ts)
o.deliverMsg(dsubj, subj, hdr, msg, seq, 1, ts)
o.mu.Unlock()
return true
@@ -1265,12 +1266,12 @@ func (o *Consumer) deliverCurrentMsg(subj string, msg []byte, seq uint64, ts int
// Deliver a msg to the observable.
// Lock should be held and o.mset validated to be non-nil.
func (o *Consumer) deliverMsg(dsubj, subj string, msg []byte, seq, dcount uint64, ts int64) {
func (o *Consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dcount uint64, ts int64) {
if o.mset == nil {
return
}
sendq := o.mset.sendq
pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, o.dseq, dcount, ts), msg, o, seq}
pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, o.dseq, dcount, ts), hdr, msg, o, seq}
// This needs to be unlocked since the other side may need this lock on failed delivery.
o.mu.Unlock()
@@ -1434,7 +1435,7 @@ func (o *Consumer) selectSubjectLast() {
}
// FIXME(dlc) - this is linear and can be optimized by store layer.
for seq := stats.LastSeq; seq >= stats.FirstSeq; seq-- {
subj, _, _, err := o.mset.store.LoadMsg(seq)
subj, _, _, _, err := o.mset.store.LoadMsg(seq)
if err == ErrStoreMsgNotFound {
continue
}

View File

@@ -121,6 +121,7 @@ type msgId struct {
type fileStoredMsg struct {
subj string
hdr []byte
msg []byte
seq uint64
ts int64 // nanoseconds
@@ -578,7 +579,7 @@ func (fs *fileStore) enableLastMsgBlockForWriting() error {
}
// Store stores a message.
func (fs *fileStore) StoreMsg(subj string, msg []byte) (uint64, int64, error) {
func (fs *fileStore) StoreMsg(subj string, hdr, msg []byte) (uint64, int64, error) {
fs.mu.Lock()
if fs.closed {
fs.mu.Unlock()
@@ -599,7 +600,7 @@ func (fs *fileStore) StoreMsg(subj string, msg []byte) (uint64, int64, error) {
seq := fs.state.LastSeq + 1
n, ts, err := fs.writeMsgRecord(seq, subj, msg)
n, ts, err := fs.writeMsgRecord(seq, subj, hdr, msg)
if err != nil {
fs.mu.Unlock()
return 0, 0, err
@@ -771,7 +772,7 @@ func (mb *msgBlock) selectNextFirst() {
func (fs *fileStore) deleteMsgFromBlock(mb *msgBlock, seq uint64, sm *fileStoredMsg, secure bool) {
// Update global accounting.
msz := fileStoreMsgSize(sm.subj, sm.msg)
msz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)
fs.mu.Lock()
mb.mu.Lock()
@@ -1048,12 +1049,14 @@ func (mb *msgBlock) updateAccounting(seq uint64, ts int64, rl uint64) {
}
// Lock should be held.
func (fs *fileStore) writeMsgRecord(seq uint64, subj string, msg []byte) (uint64, int64, error) {
func (fs *fileStore) writeMsgRecord(seq uint64, subj string, mhdr, msg []byte) (uint64, int64, error) {
var err error
// Get size for this message.
rl := fileStoreMsgSize(subj, msg)
rl := fileStoreMsgSize(subj, mhdr, msg)
if rl&hbit == 1 {
return 0, 0, ErrMsgTooLarge
}
// Grab our current last message block.
mb := fs.lmb
if mb == nil || mb.numBytes()+rl > fs.fcfg.BlockSize {
@@ -1071,11 +1074,23 @@ func (fs *fileStore) writeMsgRecord(seq uint64, subj string, msg []byte) (uint64
// Update accounting.
mb.updateAccounting(seq, ts, rl)
// Formats
// Format with no header
// total_len(4) sequence(8) timestamp(8) subj_len(2) subj msg hash(8)
// With headers, high bit on total length will be set.
// total_len(4) sequence(8) timestamp(8) subj_len(2) subj hdr_len(4) hdr msg hash(8)
// First write header, etc.
var le = binary.LittleEndian
var hdr [msgHdrSize]byte
le.PutUint32(hdr[0:], uint32(rl))
l := uint32(rl)
hasHeaders := len(mhdr) > 0
if hasHeaders {
l |= hbit
}
le.PutUint32(hdr[0:], l)
le.PutUint64(hdr[4:], seq)
le.PutUint64(hdr[12:], uint64(ts))
le.PutUint16(hdr[20:], uint16(len(subj)))
@@ -1083,6 +1098,12 @@ func (fs *fileStore) writeMsgRecord(seq uint64, subj string, msg []byte) (uint64
// Now write to underlying buffer.
fs.wmb.Write(hdr[:])
fs.wmb.WriteString(subj)
if hasHeaders {
var hlen [4]byte
le.PutUint32(hlen[0:], uint32(len(mhdr)))
fs.wmb.Write(hlen[:])
fs.wmb.Write(mhdr)
}
fs.wmb.Write(msg)
// Calculate hash.
@@ -1090,6 +1111,9 @@ func (fs *fileStore) writeMsgRecord(seq uint64, subj string, msg []byte) (uint64
mb.hh.Reset()
mb.hh.Write(hdr[4:20])
mb.hh.Write([]byte(subj))
if hasHeaders {
mb.hh.Write(mhdr)
}
mb.hh.Write(msg)
checksum := mb.hh.Sum(nil)
// Grab last checksum
@@ -1109,7 +1133,12 @@ func (fs *fileStore) eraseMsg(mb *msgBlock, sm *fileStoredMsg) error {
return fmt.Errorf("bad stored message")
}
// erase contents and rewrite with new hash.
rand.Read(sm.msg)
if len(sm.hdr) > 0 {
rand.Read(sm.hdr)
}
if len(sm.msg) > 0 {
rand.Read(sm.msg)
}
sm.seq, sm.ts = 0, 0
chars := []rune("ABCDEFGHIJKLMNOPQRSTUVWXYZ")
var b strings.Builder
@@ -1121,7 +1150,7 @@ func (fs *fileStore) eraseMsg(mb *msgBlock, sm *fileStoredMsg) error {
var le = binary.LittleEndian
var hdr [msgHdrSize]byte
rl := fileStoreMsgSize(sm.subj, sm.msg)
rl := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)
le.PutUint32(hdr[0:], uint32(rl))
le.PutUint64(hdr[4:], 0)
@@ -1139,6 +1168,9 @@ func (fs *fileStore) eraseMsg(mb *msgBlock, sm *fileStoredMsg) error {
mb.hh.Reset()
mb.hh.Write(hdr[4:20])
mb.hh.Write([]byte(sm.subj))
if len(sm.hdr) > 0 {
mb.hh.Write(sm.hdr)
}
mb.hh.Write(sm.msg)
checksum := mb.hh.Sum(nil)
// Write to msg record.
@@ -1293,13 +1325,17 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
rl := le.Uint32(hdr[0:])
seq := le.Uint64(hdr[4:])
slen := le.Uint16(hdr[20:])
// Clear any headers bit that could be set.
rl &^= hbit
dlen := int(rl) - msgHdrSize
// Do some quick sanity checks here.
if dlen < 0 || int(slen) > dlen || dlen > int(rl) {
// This means something is off.
// Add into bad list?
return fmt.Errorf("malformed or corrupt msg")
// TODO(dlc) - Add into bad list?
return errBadMsg
}
// Adjust if we guessed wrong.
if seq != 0 && seq < fseq {
@@ -1377,6 +1413,7 @@ var (
)
// Used for marking messages that have had their checksums checked.
// Also used to signal a message record with headers
const hbit = 1 << 31
// Will do a lookup from the cache.
@@ -1419,7 +1456,7 @@ func (mb *msgBlock) cacheLookupLocked(seq uint64) (*fileStoredMsg, error) {
buf := mb.cache.buf[bi:]
// Parse from the raw buffer.
subj, msg, mseq, ts, err := msgFromBuf(buf, hh)
subj, hdr, msg, mseq, ts, err := msgFromBuf(buf, hh)
if err != nil {
return nil, err
}
@@ -1428,6 +1465,7 @@ func (mb *msgBlock) cacheLookupLocked(seq uint64) (*fileStoredMsg, error) {
}
sm := &fileStoredMsg{
subj: subj,
hdr: hdr,
msg: msg,
seq: seq,
ts: ts,
@@ -1469,18 +1507,21 @@ func (fs *fileStore) msgForSeq(seq uint64) (*fileStoredMsg, error) {
}
// Internal function to return msg parts from a raw buffer.
func msgFromBuf(buf []byte, hh hash.Hash64) (string, []byte, uint64, int64, error) {
func msgFromBuf(buf []byte, hh hash.Hash64) (string, []byte, []byte, uint64, int64, error) {
if len(buf) < msgHdrSize {
return "", nil, 0, 0, errBadMsg
return _EMPTY_, nil, nil, 0, 0, errBadMsg
}
var le = binary.LittleEndian
hdr := buf[:msgHdrSize]
dlen := int(le.Uint32(hdr[0:])) - msgHdrSize
rl := le.Uint32(hdr[0:])
hasHeaders := rl&hbit != 0
rl &^= hbit // clear header bit
dlen := int(rl) - msgHdrSize
slen := int(le.Uint16(hdr[20:]))
// Simple sanity check.
if dlen < 0 || slen > dlen {
return "", nil, 0, 0, errBadMsg
return _EMPTY_, nil, nil, 0, 0, errBadMsg
}
data := buf[msgHdrSize : msgHdrSize+dlen]
// Do checksum tests here if requested.
@@ -1488,9 +1529,13 @@ func msgFromBuf(buf []byte, hh hash.Hash64) (string, []byte, uint64, int64, erro
hh.Reset()
hh.Write(hdr[4:20])
hh.Write(data[:slen])
hh.Write(data[slen : dlen-8])
if hasHeaders {
hh.Write(data[slen+4 : dlen-8])
} else {
hh.Write(data[slen : dlen-8])
}
if !bytes.Equal(hh.Sum(nil), data[len(data)-8:]) {
return "", nil, 0, 0, errBadMsg
return _EMPTY_, nil, nil, 0, 0, errBadMsg
}
}
seq := le.Uint64(hdr[4:])
@@ -1499,16 +1544,26 @@ func msgFromBuf(buf []byte, hh hash.Hash64) (string, []byte, uint64, int64, erro
// fix the capacity. This will cause a copy though in stream:internalSendLoop when
// we append CRLF but this was causing a race. Need to rethink more to avoid this copy.
end := dlen - 8
return string(data[:slen]), data[slen:end:end], seq, ts, nil
var mhdr, msg []byte
if hasHeaders {
hl := le.Uint32(data[slen:])
bi := slen + 4
li := bi + int(hl)
mhdr = data[bi:li:li]
msg = data[li:end:end]
} else {
msg = data[slen:end:end]
}
return string(data[:slen]), mhdr, msg, seq, ts, nil
}
// LoadMsg will lookup the message by sequence number and return it if found.
func (fs *fileStore) LoadMsg(seq uint64) (string, []byte, int64, error) {
func (fs *fileStore) LoadMsg(seq uint64) (string, []byte, []byte, int64, error) {
sm, err := fs.msgForSeq(seq)
if sm != nil {
return sm.subj, sm.msg, sm.ts, nil
return sm.subj, sm.hdr, sm.msg, sm.ts, nil
}
return "", nil, 0, err
return "", nil, nil, 0, err
}
func (fs *fileStore) State() StreamState {
@@ -1519,9 +1574,13 @@ func (fs *fileStore) State() StreamState {
return state
}
func fileStoreMsgSize(subj string, msg []byte) uint64 {
// length of the message record (4bytes) + seq(8) + ts(8) + subj_len(2) + subj + msg + hash(8)
return uint64(4 + 16 + 2 + len(subj) + len(msg) + 8)
func fileStoreMsgSize(subj string, hdr, msg []byte) uint64 {
if len(hdr) == 0 {
// length of the message record (4bytes) + seq(8) + ts(8) + subj_len(2) + subj + msg + hash(8)
return uint64(22 + len(subj) + len(msg) + 8)
}
// length of the message record (4bytes) + seq(8) + ts(8) + subj_len(2) + subj + hdr_len(4) + hdr + msg + hash(8)
return uint64(22 + len(subj) + 4 + len(hdr) + len(msg) + 8)
}
// Lock should not be held.

View File

@@ -43,7 +43,7 @@ func TestFileStoreBasics(t *testing.T) {
subj, msg := "foo", []byte("Hello World")
now := time.Now().UnixNano()
for i := 1; i <= 5; i++ {
if seq, ts, err := fs.StoreMsg(subj, msg); err != nil {
if seq, ts, err := fs.StoreMsg(subj, nil, msg); err != nil {
t.Fatalf("Error storing msg: %v", err)
} else if seq != uint64(i) {
t.Fatalf("Expected sequence to be %d, got %d", i, seq)
@@ -56,11 +56,11 @@ func TestFileStoreBasics(t *testing.T) {
if state.Msgs != 5 {
t.Fatalf("Expected 5 msgs, got %d", state.Msgs)
}
expectedSize := 5 * fileStoreMsgSize(subj, msg)
expectedSize := 5 * fileStoreMsgSize(subj, nil, msg)
if state.Bytes != expectedSize {
t.Fatalf("Expected %d bytes, got %d", expectedSize, state.Bytes)
}
nsubj, nmsg, _, err := fs.LoadMsg(2)
nsubj, _, nmsg, _, err := fs.LoadMsg(2)
if err != nil {
t.Fatalf("Unexpected error looking up msg: %v", err)
}
@@ -70,12 +70,47 @@ func TestFileStoreBasics(t *testing.T) {
if !bytes.Equal(nmsg, msg) {
t.Fatalf("Msgs don't match, original %q vs %q", msg, nmsg)
}
_, _, _, err = fs.LoadMsg(3)
_, _, _, _, err = fs.LoadMsg(3)
if err != nil {
t.Fatalf("Unexpected error looking up msg: %v", err)
}
}
func TestFileStoreMsgHeaders(t *testing.T) {
storeDir, _ := ioutil.TempDir("", JetStreamStoreDir)
fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir}, StreamConfig{Name: "zzz", Storage: FileStorage})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer fs.Stop()
subj, hdr, msg := "foo", []byte("name:derek"), []byte("Hello World")
elen := 22 + len(subj) + 4 + len(hdr) + len(msg) + 8
if sz := int(fileStoreMsgSize(subj, hdr, msg)); sz != elen {
t.Fatalf("Wrong size for stored msg with header")
}
fs.StoreMsg(subj, hdr, msg)
_, shdr, smsg, _, err := fs.LoadMsg(1)
if err != nil {
t.Fatalf("Unexpected error looking up msg: %v", err)
}
if !bytes.Equal(msg, smsg) {
t.Fatalf("Expected same msg, got %q vs %q", smsg, msg)
}
if !bytes.Equal(hdr, shdr) {
t.Fatalf("Expected same hdr, got %q vs %q", shdr, hdr)
}
if removed, _ := fs.EraseMsg(1); !removed {
t.Fatalf("Expected erase msg to return success")
}
if bytes.Equal(hdr, shdr) {
t.Fatalf("Expected hdr to be erased")
}
if bytes.Equal(msg, smsg) {
t.Fatalf("Expected msg to be erased")
}
}
func TestFileStoreBasicWriteMsgsAndRestore(t *testing.T) {
storeDir, _ := ioutil.TempDir("", JetStreamStoreDir)
os.MkdirAll(storeDir, 0755)
@@ -102,7 +137,7 @@ func TestFileStoreBasicWriteMsgsAndRestore(t *testing.T) {
toStore := uint64(100)
for i := uint64(1); i <= toStore; i++ {
msg := []byte(fmt.Sprintf("[%08d] Hello World!", i))
if seq, _, err := fs.StoreMsg(subj, msg); err != nil {
if seq, _, err := fs.StoreMsg(subj, nil, msg); err != nil {
t.Fatalf("Error storing msg: %v", err)
} else if seq != uint64(i) {
t.Fatalf("Expected sequence to be %d, got %d", i, seq)
@@ -113,7 +148,7 @@ func TestFileStoreBasicWriteMsgsAndRestore(t *testing.T) {
t.Fatalf("Expected %d msgs, got %d", toStore, state.Msgs)
}
msg22 := []byte(fmt.Sprintf("[%08d] Hello World!", 22))
expectedSize := toStore * fileStoreMsgSize(subj, msg22)
expectedSize := toStore * fileStoreMsgSize(subj, nil, msg22)
if state.Bytes != expectedSize {
t.Fatalf("Expected %d bytes, got %d", expectedSize, state.Bytes)
@@ -122,7 +157,7 @@ func TestFileStoreBasicWriteMsgsAndRestore(t *testing.T) {
fs.Stop()
// Make sure Store call after does not work.
if _, _, err := fs.StoreMsg(subj, []byte("no work")); err == nil {
if _, _, err := fs.StoreMsg(subj, nil, []byte("no work")); err == nil {
t.Fatalf("Expected an error for StoreMsg call after Stop, got none")
}
@@ -144,7 +179,7 @@ func TestFileStoreBasicWriteMsgsAndRestore(t *testing.T) {
// Now write 100 more msgs
for i := uint64(101); i <= toStore*2; i++ {
msg := []byte(fmt.Sprintf("[%08d] Hello World!", i))
if seq, _, err := fs.StoreMsg(subj, msg); err != nil {
if seq, _, err := fs.StoreMsg(subj, nil, msg); err != nil {
t.Fatalf("Error storing msg: %v", err)
} else if seq != uint64(i) {
t.Fatalf("Expected sequence to be %d, got %d", i, seq)
@@ -188,13 +223,13 @@ func TestFileStoreMsgLimit(t *testing.T) {
subj, msg := "foo", []byte("Hello World")
for i := 0; i < 10; i++ {
fs.StoreMsg(subj, msg)
fs.StoreMsg(subj, nil, msg)
}
state := fs.State()
if state.Msgs != 10 {
t.Fatalf("Expected %d msgs, got %d", 10, state.Msgs)
}
if _, _, err := fs.StoreMsg(subj, msg); err != nil {
if _, _, err := fs.StoreMsg(subj, nil, msg); err != nil {
t.Fatalf("Error storing msg: %v", err)
}
state = fs.State()
@@ -208,14 +243,14 @@ func TestFileStoreMsgLimit(t *testing.T) {
t.Fatalf("Expected the first sequence to be 2 now, but got %d", state.FirstSeq)
}
// Make sure we can not lookup seq 1.
if _, _, _, err := fs.LoadMsg(1); err == nil {
if _, _, _, _, err := fs.LoadMsg(1); err == nil {
t.Fatalf("Expected error looking up seq 1 but got none")
}
}
func TestFileStoreBytesLimit(t *testing.T) {
subj, msg := "foo", make([]byte, 512)
storedMsgSize := fileStoreMsgSize(subj, msg)
storedMsgSize := fileStoreMsgSize(subj, nil, msg)
toStore := uint64(1024)
maxBytes := storedMsgSize * toStore
@@ -231,7 +266,7 @@ func TestFileStoreBytesLimit(t *testing.T) {
defer fs.Stop()
for i := uint64(0); i < toStore; i++ {
fs.StoreMsg(subj, msg)
fs.StoreMsg(subj, nil, msg)
}
state := fs.State()
if state.Msgs != toStore {
@@ -243,7 +278,7 @@ func TestFileStoreBytesLimit(t *testing.T) {
// Now send 10 more and check that bytes limit enforced.
for i := 0; i < 10; i++ {
if _, _, err := fs.StoreMsg(subj, msg); err != nil {
if _, _, err := fs.StoreMsg(subj, nil, msg); err != nil {
t.Fatalf("Error storing msg: %v", err)
}
}
@@ -279,7 +314,7 @@ func TestFileStoreAgeLimit(t *testing.T) {
subj, msg := "foo", []byte("Hello World")
toStore := 100
for i := 0; i < toStore; i++ {
fs.StoreMsg(subj, msg)
fs.StoreMsg(subj, nil, msg)
}
state := fs.State()
if state.Msgs != uint64(toStore) {
@@ -302,7 +337,7 @@ func TestFileStoreAgeLimit(t *testing.T) {
checkExpired(t)
// Now add some more and make sure that timer will fire again.
for i := 0; i < toStore; i++ {
fs.StoreMsg(subj, msg)
fs.StoreMsg(subj, nil, msg)
}
state = fs.State()
if state.Msgs != uint64(toStore) {
@@ -326,10 +361,10 @@ func TestFileStoreTimeStamps(t *testing.T) {
subj, msg := "foo", []byte("Hello World")
for i := 0; i < 10; i++ {
time.Sleep(5 * time.Millisecond)
fs.StoreMsg(subj, msg)
fs.StoreMsg(subj, nil, msg)
}
for seq := uint64(1); seq <= 10; seq++ {
_, _, ts, err := fs.LoadMsg(seq)
_, _, _, ts, err := fs.LoadMsg(seq)
if err != nil {
t.Fatalf("Unexpected error looking up msg: %v", err)
}
@@ -354,11 +389,11 @@ func TestFileStorePurge(t *testing.T) {
defer fs.Stop()
subj, msg := "foo", make([]byte, 8*1024)
storedMsgSize := fileStoreMsgSize(subj, msg)
storedMsgSize := fileStoreMsgSize(subj, nil, msg)
toStore := uint64(1024)
for i := uint64(0); i < toStore; i++ {
fs.StoreMsg(subj, msg)
fs.StoreMsg(subj, nil, msg)
}
state := fs.State()
if state.Msgs != toStore {
@@ -414,7 +449,7 @@ func TestFileStorePurge(t *testing.T) {
// Now make sure we clean up any dangling purged messages.
for i := uint64(0); i < toStore; i++ {
fs.StoreMsg(subj, msg)
fs.StoreMsg(subj, nil, msg)
}
state = fs.State()
if state.Msgs != toStore {
@@ -473,7 +508,7 @@ func TestFileStoreRemovePartialRecovery(t *testing.T) {
subj, msg := "foo", []byte("Hello World")
toStore := 100
for i := 0; i < toStore; i++ {
fs.StoreMsg(subj, msg)
fs.StoreMsg(subj, nil, msg)
}
state := fs.State()
if state.Msgs != uint64(toStore) {
@@ -519,7 +554,7 @@ func TestFileStoreRemoveOutOfOrderRecovery(t *testing.T) {
subj, msg := "foo", []byte("Hello World")
toStore := 100
for i := 0; i < toStore; i++ {
fs.StoreMsg(subj, msg)
fs.StoreMsg(subj, nil, msg)
}
state := fs.State()
if state.Msgs != uint64(toStore) {
@@ -538,11 +573,11 @@ func TestFileStoreRemoveOutOfOrderRecovery(t *testing.T) {
t.Fatalf("Expected %d msgs, got %d", toStore/2, state.Msgs)
}
if _, _, _, err := fs.LoadMsg(1); err != nil {
if _, _, _, _, err := fs.LoadMsg(1); err != nil {
t.Fatalf("Expected to retrieve seq 1")
}
for i := 2; i <= toStore; i += 2 {
if _, _, _, err := fs.LoadMsg(uint64(i)); err == nil {
if _, _, _, _, err := fs.LoadMsg(uint64(i)); err == nil {
t.Fatalf("Expected error looking up seq %d that should be deleted", i)
}
}
@@ -561,11 +596,11 @@ func TestFileStoreRemoveOutOfOrderRecovery(t *testing.T) {
t.Fatalf("Expected receovered states to be the same, got %+v vs %+v\n", state, state2)
}
if _, _, _, err := fs.LoadMsg(1); err != nil {
if _, _, _, _, err := fs.LoadMsg(1); err != nil {
t.Fatalf("Expected to retrieve seq 1")
}
for i := 2; i <= toStore; i += 2 {
if _, _, _, err := fs.LoadMsg(uint64(i)); err == nil {
if _, _, _, _, err := fs.LoadMsg(uint64(i)); err == nil {
t.Fatalf("Expected error looking up seq %d that should be deleted", i)
}
}
@@ -588,7 +623,7 @@ func TestFileStoreAgeLimitRecovery(t *testing.T) {
subj, msg := "foo", []byte("Hello World")
toStore := 100
for i := 0; i < toStore; i++ {
fs.StoreMsg(subj, msg)
fs.StoreMsg(subj, nil, msg)
}
state := fs.State()
if state.Msgs != uint64(toStore) {
@@ -630,7 +665,7 @@ func TestFileStoreBitRot(t *testing.T) {
subj, msg := "foo", []byte("Hello World")
toStore := 100
for i := 0; i < toStore; i++ {
fs.StoreMsg(subj, msg)
fs.StoreMsg(subj, nil, msg)
}
state := fs.State()
if state.Msgs != uint64(toStore) {
@@ -689,8 +724,8 @@ func TestFileStoreEraseMsg(t *testing.T) {
defer fs.Stop()
subj, msg := "foo", []byte("Hello World")
fs.StoreMsg(subj, msg)
_, smsg, _, err := fs.LoadMsg(1)
fs.StoreMsg(subj, nil, msg)
_, _, smsg, _, err := fs.LoadMsg(1)
if err != nil {
t.Fatalf("Unexpected error looking up msg: %v", err)
}
@@ -708,7 +743,7 @@ func TestFileStoreEraseMsg(t *testing.T) {
}
// Now look on disk as well.
rl := fileStoreMsgSize(subj, msg)
rl := fileStoreMsgSize(subj, nil, msg)
buf := make([]byte, rl)
fp, err := os.Open(path.Join(storeDir, msgDir, fmt.Sprintf(blkScan, 1)))
if err != nil {
@@ -716,7 +751,7 @@ func TestFileStoreEraseMsg(t *testing.T) {
}
defer fp.Close()
fp.ReadAt(buf, sm.off)
nsubj, nmsg, seq, ts, err := msgFromBuf(buf, nil)
nsubj, _, nmsg, seq, ts, err := msgFromBuf(buf, nil)
if err != nil {
t.Fatalf("error reading message from block: %v", err)
}
@@ -748,7 +783,7 @@ func TestFileStoreEraseAndNoIndexRecovery(t *testing.T) {
subj, msg := "foo", []byte("Hello World")
toStore := 100
for i := 0; i < toStore; i++ {
fs.StoreMsg(subj, msg)
fs.StoreMsg(subj, nil, msg)
}
state := fs.State()
if state.Msgs != uint64(toStore) {
@@ -783,7 +818,7 @@ func TestFileStoreEraseAndNoIndexRecovery(t *testing.T) {
}
for i := 2; i <= toStore; i += 2 {
if _, _, _, err := fs.LoadMsg(uint64(i)); err == nil {
if _, _, _, _, err := fs.LoadMsg(uint64(i)); err == nil {
t.Fatalf("Expected error looking up seq %d that should be erased", i)
}
}
@@ -902,8 +937,8 @@ func TestFileStoreWriteAndReadSameBlock(t *testing.T) {
subj, msg := "foo", []byte("Hello World!")
for i := uint64(1); i <= 10; i++ {
fs.StoreMsg(subj, msg)
if _, _, _, err := fs.LoadMsg(i); err != nil {
fs.StoreMsg(subj, nil, msg)
if _, _, _, _, err := fs.LoadMsg(i); err != nil {
t.Fatalf("Error loading %d: %v", i, err)
}
}
@@ -915,7 +950,7 @@ func TestFileStoreAndRetrieveMultiBlock(t *testing.T) {
defer os.RemoveAll(storeDir)
subj, msg := "foo", []byte("Hello World!")
storedMsgSize := fileStoreMsgSize(subj, msg)
storedMsgSize := fileStoreMsgSize(subj, nil, msg)
fs, err := newFileStore(
FileStoreConfig{StoreDir: storeDir, BlockSize: 4 * storedMsgSize},
@@ -926,7 +961,7 @@ func TestFileStoreAndRetrieveMultiBlock(t *testing.T) {
}
for i := 0; i < 20; i++ {
fs.StoreMsg(subj, msg)
fs.StoreMsg(subj, nil, msg)
}
state := fs.State()
if state.Msgs != 20 {
@@ -944,7 +979,7 @@ func TestFileStoreAndRetrieveMultiBlock(t *testing.T) {
defer fs.Stop()
for i := uint64(1); i <= 20; i++ {
if _, _, _, err := fs.LoadMsg(i); err != nil {
if _, _, _, _, err := fs.LoadMsg(i); err != nil {
t.Fatalf("Error loading %d: %v", i, err)
}
}
@@ -956,7 +991,7 @@ func TestFileStoreCollapseDmap(t *testing.T) {
defer os.RemoveAll(storeDir)
subj, msg := "foo", []byte("Hello World!")
storedMsgSize := fileStoreMsgSize(subj, msg)
storedMsgSize := fileStoreMsgSize(subj, nil, msg)
fs, err := newFileStore(
FileStoreConfig{StoreDir: storeDir, BlockSize: 4 * storedMsgSize},
@@ -968,7 +1003,7 @@ func TestFileStoreCollapseDmap(t *testing.T) {
defer fs.Stop()
for i := 0; i < 10; i++ {
fs.StoreMsg(subj, msg)
fs.StoreMsg(subj, nil, msg)
}
state := fs.State()
if state.Msgs != 10 {
@@ -1027,7 +1062,7 @@ func TestFileStoreCollapseDmap(t *testing.T) {
func TestFileStoreReadCache(t *testing.T) {
subj, msg := "foo.bar", make([]byte, 1024)
storedMsgSize := fileStoreMsgSize(subj, msg)
storedMsgSize := fileStoreMsgSize(subj, nil, msg)
storeDir, _ := ioutil.TempDir("", JetStreamStoreDir)
os.MkdirAll(storeDir, 0755)
@@ -1043,7 +1078,7 @@ func TestFileStoreReadCache(t *testing.T) {
totalBytes := uint64(toStore) * storedMsgSize
for i := 0; i < toStore; i++ {
fs.StoreMsg(subj, msg)
fs.StoreMsg(subj, nil, msg)
}
fs.LoadMsg(1)
@@ -1090,7 +1125,7 @@ func TestFileStoreSnapshot(t *testing.T) {
toSend := 2233
for i := 0; i < toSend; i++ {
fs.StoreMsg(subj, msg)
fs.StoreMsg(subj, nil, msg)
}
// Create a few consumers.
@@ -1377,7 +1412,7 @@ func TestFileStorePerf(t *testing.T) {
for i := 0; i < len(msg); i++ {
msg[i] = 'D'
}
storedMsgSize := fileStoreMsgSize(subj, msg)
storedMsgSize := fileStoreMsgSize(subj, nil, msg)
// 5GB
toStore := 5 * 1024 * 1024 * 1024 / storedMsgSize
@@ -1402,7 +1437,7 @@ func TestFileStorePerf(t *testing.T) {
start := time.Now()
for i := 0; i < int(toStore); i++ {
fs.StoreMsg(subj, msg)
fs.StoreMsg(subj, nil, msg)
}
fs.Stop()
@@ -1433,7 +1468,7 @@ func TestFileStorePerf(t *testing.T) {
start = time.Now()
for i := uint64(1); i <= toStore; i++ {
if _, _, _, err := fs.LoadMsg(i); err != nil {
if _, _, _, _, err := fs.LoadMsg(i); err != nil {
t.Fatalf("Error loading %d: %v", i, err)
}
}
@@ -1452,7 +1487,7 @@ func TestFileStorePerf(t *testing.T) {
start = time.Now()
for i := uint64(1); i <= toStore; i++ {
if _, _, _, err := fs.LoadMsg(i); err != nil {
if _, _, _, _, err := fs.LoadMsg(i); err != nil {
t.Fatalf("Error loading %d: %v", i, err)
}
}
@@ -1516,7 +1551,7 @@ func TestFileStoreReadBackMsgPerf(t *testing.T) {
subj := "foo"
msg := []byte("ABCDEFGH") // Smaller shows problems more.
storedMsgSize := fileStoreMsgSize(subj, msg)
storedMsgSize := fileStoreMsgSize(subj, nil, msg)
// Make sure we store 2 blocks.
toStore := defaultStreamBlockSize * 2 / storedMsgSize
@@ -1542,7 +1577,7 @@ func TestFileStoreReadBackMsgPerf(t *testing.T) {
start := time.Now()
for i := 0; i < int(toStore); i++ {
fs.StoreMsg(subj, msg)
fs.StoreMsg(subj, nil, msg)
}
tt := time.Since(start)

View File

@@ -1044,7 +1044,7 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, subject, reply st
return
}
subj, msg, ts, err := mset.store.LoadMsg(req.Seq)
subj, hdr, msg, ts, err := mset.store.LoadMsg(req.Seq)
if err != nil {
resp.Error = jsError(err)
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
@@ -1053,6 +1053,7 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, subject, reply st
resp.Message = &StoredMsg{
Subject: subj,
Sequence: req.Seq,
Header: hdr,
Data: msg,
Time: time.Unix(0, ts),
}
@@ -1255,7 +1256,7 @@ func (s *Server) streamSnapshot(c *client, mset *Stream, sr *SnapshotResult, req
chunk = chunk[:n]
if err != nil {
if n > 0 {
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, chunk, nil, 0}
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, chunk, nil, 0}
}
break
}
@@ -1272,12 +1273,13 @@ func (s *Server) streamSnapshot(c *client, mset *Stream, sr *SnapshotResult, req
}
// TODO(dlc) - Might want these moved off sendq if we have contention.
ackReply := fmt.Sprintf("%s.%d.%d", ackSubj, len(chunk), index)
mset.sendq <- &jsPubMsg{reply, _EMPTY_, ackReply, chunk, nil, 0}
mset.sendq <- &jsPubMsg{reply, _EMPTY_, ackReply, nil, chunk, nil, 0}
atomic.AddInt32(&out, int32(len(chunk)))
}
done:
// Send last EOF
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, nil, 0}
// TODO(dlc) - place hash in header
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, nil, nil, 0}
}
// Request to create a durable consumer.

View File

@@ -34,6 +34,7 @@ type memStore struct {
type storedMsg struct {
subj string
hdr []byte
msg []byte
seq uint64
ts int64 // nanoseconds
@@ -79,7 +80,7 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error {
}
// Store stores a message.
func (ms *memStore) StoreMsg(subj string, msg []byte) (uint64, int64, error) {
func (ms *memStore) StoreMsg(subj string, hdr, msg []byte) (uint64, int64, error) {
ms.mu.Lock()
// Check if we are discarding new messages when we reach the limit.
@@ -109,11 +110,14 @@ func (ms *memStore) StoreMsg(subj string, msg []byte) (uint64, int64, error) {
if len(msg) > 0 {
msg = append(msg[:0:0], msg...)
}
if len(hdr) > 0 {
hdr = append(hdr[:0:0], hdr...)
}
startBytes := int64(ms.state.Bytes)
ms.msgs[seq] = &storedMsg{subj, msg, seq, ts}
ms.msgs[seq] = &storedMsg{subj, hdr, msg, seq, ts}
ms.state.Msgs++
ms.state.Bytes += memStoreMsgSize(subj, msg)
ms.state.Bytes += memStoreMsgSize(subj, hdr, msg)
ms.state.LastSeq = seq
ms.state.LastTime = now.UTC()
@@ -254,7 +258,7 @@ func (ms *memStore) deleteFirstMsg() bool {
}
// LoadMsg will lookup the message by sequence number and return it if found.
func (ms *memStore) LoadMsg(seq uint64) (string, []byte, int64, error) {
func (ms *memStore) LoadMsg(seq uint64) (string, []byte, []byte, int64, error) {
ms.mu.RLock()
sm, ok := ms.msgs[seq]
last := ms.state.LastSeq
@@ -265,9 +269,9 @@ func (ms *memStore) LoadMsg(seq uint64) (string, []byte, int64, error) {
if seq <= last {
err = ErrStoreMsgNotFound
}
return "", nil, 0, err
return "", nil, nil, 0, err
}
return sm.subj, sm.msg, sm.ts, nil
return sm.subj, sm.hdr, sm.msg, sm.ts, nil
}
// RemoveMsg will remove the message from this store.
@@ -297,7 +301,7 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool {
delete(ms.msgs, seq)
ms.state.Msgs--
ss = memStoreMsgSize(sm.subj, sm.msg)
ss = memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
ms.state.Bytes -= ss
if seq == ms.state.FirstSeq {
var nsm *storedMsg
@@ -316,8 +320,14 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool {
ms.state.FirstTime = time.Time{}
}
}
if secure {
rand.Read(sm.msg)
if len(sm.hdr) > 0 {
rand.Read(sm.hdr)
}
if len(sm.msg) > 0 {
rand.Read(sm.msg)
}
sm.seq = 0
}
@@ -337,8 +347,8 @@ func (ms *memStore) State() StreamState {
return state
}
func memStoreMsgSize(subj string, msg []byte) uint64 {
return uint64(len(subj) + len(msg) + 16) // 8*2 for seq + age
func memStoreMsgSize(subj string, hdr, msg []byte) uint64 {
return uint64(len(subj) + len(hdr) + len(msg) + 16) // 8*2 for seq + age
}
// Delete is same as Stop for memory store.

View File

@@ -28,7 +28,7 @@ func TestMemStoreBasics(t *testing.T) {
subj, msg := "foo", []byte("Hello World")
now := time.Now().UnixNano()
if seq, ts, err := ms.StoreMsg(subj, msg); err != nil {
if seq, ts, err := ms.StoreMsg(subj, nil, msg); err != nil {
t.Fatalf("Error storing msg: %v", err)
} else if seq != 1 {
t.Fatalf("Expected sequence to be 1, got %d", seq)
@@ -40,11 +40,11 @@ func TestMemStoreBasics(t *testing.T) {
if state.Msgs != 1 {
t.Fatalf("Expected 1 msg, got %d", state.Msgs)
}
expectedSize := memStoreMsgSize(subj, msg)
expectedSize := memStoreMsgSize(subj, nil, msg)
if state.Bytes != expectedSize {
t.Fatalf("Expected %d bytes, got %d", expectedSize, state.Bytes)
}
nsubj, nmsg, _, err := ms.LoadMsg(1)
nsubj, _, nmsg, _, err := ms.LoadMsg(1)
if err != nil {
t.Fatalf("Unexpected error looking up msg: %v", err)
}
@@ -63,13 +63,13 @@ func TestMemStoreMsgLimit(t *testing.T) {
}
subj, msg := "foo", []byte("Hello World")
for i := 0; i < 10; i++ {
ms.StoreMsg(subj, msg)
ms.StoreMsg(subj, nil, msg)
}
state := ms.State()
if state.Msgs != 10 {
t.Fatalf("Expected %d msgs, got %d", 10, state.Msgs)
}
if _, _, err := ms.StoreMsg(subj, msg); err != nil {
if _, _, err := ms.StoreMsg(subj, nil, msg); err != nil {
t.Fatalf("Error storing msg: %v", err)
}
state = ms.State()
@@ -83,14 +83,14 @@ func TestMemStoreMsgLimit(t *testing.T) {
t.Fatalf("Expected the first sequence to be 2 now, but got %d", state.FirstSeq)
}
// Make sure we can not lookup seq 1.
if _, _, _, err := ms.LoadMsg(1); err == nil {
if _, _, _, _, err := ms.LoadMsg(1); err == nil {
t.Fatalf("Expected error looking up seq 1 but got none")
}
}
func TestMemStoreBytesLimit(t *testing.T) {
subj, msg := "foo", make([]byte, 512)
storedMsgSize := memStoreMsgSize(subj, msg)
storedMsgSize := memStoreMsgSize(subj, nil, msg)
toStore := uint64(1024)
maxBytes := storedMsgSize * toStore
@@ -101,7 +101,7 @@ func TestMemStoreBytesLimit(t *testing.T) {
}
for i := uint64(0); i < toStore; i++ {
ms.StoreMsg(subj, msg)
ms.StoreMsg(subj, nil, msg)
}
state := ms.State()
if state.Msgs != toStore {
@@ -113,7 +113,7 @@ func TestMemStoreBytesLimit(t *testing.T) {
// Now send 10 more and check that bytes limit enforced.
for i := 0; i < 10; i++ {
if _, _, err := ms.StoreMsg(subj, msg); err != nil {
if _, _, err := ms.StoreMsg(subj, nil, msg); err != nil {
t.Fatalf("Error storing msg: %v", err)
}
}
@@ -142,7 +142,7 @@ func TestMemStoreAgeLimit(t *testing.T) {
subj, msg := "foo", []byte("Hello World")
toStore := 100
for i := 0; i < toStore; i++ {
ms.StoreMsg(subj, msg)
ms.StoreMsg(subj, nil, msg)
}
state := ms.State()
if state.Msgs != uint64(toStore) {
@@ -165,7 +165,7 @@ func TestMemStoreAgeLimit(t *testing.T) {
checkExpired(t)
// Now add some more and make sure that timer will fire again.
for i := 0; i < toStore; i++ {
ms.StoreMsg(subj, msg)
ms.StoreMsg(subj, nil, msg)
}
state = ms.State()
if state.Msgs != uint64(toStore) {
@@ -183,10 +183,10 @@ func TestMemStoreTimeStamps(t *testing.T) {
subj, msg := "foo", []byte("Hello World")
for i := 0; i < 10; i++ {
time.Sleep(5 * time.Microsecond)
ms.StoreMsg(subj, msg)
ms.StoreMsg(subj, nil, msg)
}
for seq := uint64(1); seq <= 10; seq++ {
_, _, ts, err := ms.LoadMsg(seq)
_, _, _, ts, err := ms.LoadMsg(seq)
if err != nil {
t.Fatalf("Unexpected error looking up msg: %v", err)
}
@@ -204,8 +204,8 @@ func TestMemStoreEraseMsg(t *testing.T) {
t.Fatalf("Unexpected error creating store: %v", err)
}
subj, msg := "foo", []byte("Hello World")
ms.StoreMsg(subj, msg)
_, smsg, _, err := ms.LoadMsg(1)
ms.StoreMsg(subj, nil, msg)
_, _, smsg, _, err := ms.LoadMsg(1)
if err != nil {
t.Fatalf("Unexpected error looking up msg: %v", err)
}
@@ -219,3 +219,34 @@ func TestMemStoreEraseMsg(t *testing.T) {
t.Fatalf("Expected msg to be erased")
}
}
func TestMemStoreMsgHeaders(t *testing.T) {
ms, err := newMemStore(&StreamConfig{Storage: MemoryStorage})
if err != nil {
t.Fatalf("Unexpected error creating store: %v", err)
}
subj, hdr, msg := "foo", []byte("name:derek"), []byte("Hello World")
if sz := int(memStoreMsgSize(subj, hdr, msg)); sz != (len(subj) + len(hdr) + len(msg) + 16) {
t.Fatalf("Wrong size for stored msg with header")
}
ms.StoreMsg(subj, hdr, msg)
_, shdr, smsg, _, err := ms.LoadMsg(1)
if err != nil {
t.Fatalf("Unexpected error looking up msg: %v", err)
}
if !bytes.Equal(msg, smsg) {
t.Fatalf("Expected same msg, got %q vs %q", smsg, msg)
}
if !bytes.Equal(hdr, shdr) {
t.Fatalf("Expected same hdr, got %q vs %q", shdr, hdr)
}
if removed, _ := ms.EraseMsg(1); !removed {
t.Fatalf("Expected erase msg to return success")
}
if bytes.Equal(hdr, shdr) {
t.Fatalf("Expected hdr to be erased")
}
if bytes.Equal(msg, smsg) {
t.Fatalf("Expected msg to be erased")
}
}

View File

@@ -988,6 +988,7 @@ func (s *Server) createInternalClient(kind int) *client {
c := &client{srv: s, kind: kind, opts: internalOpts, msubs: -1, mpay: -1, start: now, last: now}
c.initClient()
c.echo = false
c.headers = true
c.flags.set(noReconnect)
return c
}

View File

@@ -48,11 +48,13 @@ var (
// ErrStoreSnapshotInProgress is returned when RemoveMsg or EraseMsg is called
// while a snapshot is in progress.
ErrStoreSnapshotInProgress = errors.New("snapshot in progress")
// ErrMsgTooBig is returned when a message is considered too large.
ErrMsgTooLarge = errors.New("message to large")
)
type StreamStore interface {
StoreMsg(subj string, msg []byte) (uint64, int64, error)
LoadMsg(seq uint64) (subj string, msg []byte, ts int64, err error)
StoreMsg(subj string, hdr, msg []byte) (uint64, int64, error)
LoadMsg(seq uint64) (subj string, hdr, msg []byte, ts int64, err error)
RemoveMsg(seq uint64) (bool, error)
EraseMsg(seq uint64) (bool, error)
Purge() uint64

View File

@@ -204,7 +204,7 @@ func (mset *Stream) sendCreateAdvisory() {
j, err := json.MarshalIndent(m, "", " ")
if err == nil {
subj := JSAdvisoryStreamCreatedPre + "." + name
sendq <- &jsPubMsg{subj, subj, _EMPTY_, j, nil, 0}
sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0}
}
}
@@ -227,7 +227,7 @@ func (mset *Stream) sendDeleteAdvisoryLocked() {
j, err := json.MarshalIndent(m, "", " ")
if err == nil {
subj := JSAdvisoryStreamDeletedPre + "." + mset.config.Name
mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, j, nil, 0}
mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0}
}
}
@@ -249,7 +249,7 @@ func (mset *Stream) sendUpdateAdvisoryLocked() {
j, err := json.MarshalIndent(m, "", " ")
if err == nil {
subj := JSAdvisoryStreamUpdatedPre + "." + mset.config.Name
mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, j, nil, 0}
mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0}
}
}
@@ -598,7 +598,7 @@ func (mset *Stream) setupStore(fsCfg *FileStoreConfig) error {
}
// processInboundJetStreamMsg handles processing messages bound for a stream.
func (mset *Stream) processInboundJetStreamMsg(_ *subscription, _ *client, subject, reply string, msg []byte) {
func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subject, reply string, msg []byte) {
mset.mu.Lock()
store := mset.store
c := mset.client
@@ -619,17 +619,27 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, _ *client, subje
return
}
// Response to send.
var response []byte
var seq uint64
var err error
var ts int64
// Response Ack.
var (
response []byte
seq uint64
err error
ts int64
)
// Header support.
var hdr []byte
// Check to see if we are over the account limit.
if maxMsgSize >= 0 && len(msg) > maxMsgSize {
response = []byte("-ERR 'message size exceeds maximum allowed'")
} else {
// Check to see if we are over the account limit.
seq, ts, err = store.StoreMsg(subject, msg)
// Headers.
if pc != nil && pc.pa.hdr > 0 {
hdr = msg[:pc.pa.hdr]
msg = msg[pc.pa.hdr:]
}
seq, ts, err = store.StoreMsg(subject, hdr, msg)
if err != nil {
if err != ErrStoreClosed {
c.Errorf("JetStream failed to store a msg on account: %q stream: %q - %v", accName, name, err)
@@ -648,14 +658,14 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, _ *client, subje
// Send response here.
if doAck && len(reply) > 0 {
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, response, nil, 0}
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
}
if err == nil && numConsumers > 0 && seq > 0 {
var needSignal bool
mset.mu.Lock()
for _, o := range mset.consumers {
if !o.deliverCurrentMsg(subject, msg, seq, ts) {
if !o.deliverCurrentMsg(subject, hdr, msg, seq, ts) {
needSignal = true
}
}
@@ -681,6 +691,7 @@ type jsPubMsg struct {
subj string
dsubj string
reply string
hdr []byte
msg []byte
o *Consumer
seq uint64
@@ -690,7 +701,8 @@ type jsPubMsg struct {
type StoredMsg struct {
Subject string `json:"subject"`
Sequence uint64 `json:"seq"`
Data []byte `json:"data"`
Header []byte `json:"hdrs,omitempty"`
Data []byte `json:"data,omitempty"`
Time time.Time `json:"time"`
}
@@ -745,10 +757,19 @@ func (mset *Stream) internalSendLoop() {
}
c.pa.subject = []byte(pm.subj)
c.pa.deliver = []byte(pm.dsubj)
c.pa.size = len(pm.msg)
c.pa.size = len(pm.msg) + len(pm.hdr)
c.pa.szb = []byte(strconv.Itoa(c.pa.size))
c.pa.reply = []byte(pm.reply)
msg := append(pm.msg, _CRLF_...)
var msg []byte
if len(pm.hdr) > 0 {
c.pa.hdr = len(pm.hdr)
c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr))
msg = append(pm.hdr, pm.msg...)
msg = append(msg, _CRLF_...)
} else {
msg = append(pm.msg, _CRLF_...)
}
didDeliver := c.processInboundClientMsg(msg)
c.pa.szb = nil
c.flushClients(0)
@@ -824,13 +845,14 @@ func (mset *Stream) stop(delete bool) error {
}
func (mset *Stream) GetMsg(seq uint64) (*StoredMsg, error) {
subj, msg, ts, err := mset.store.LoadMsg(seq)
subj, hdr, msg, ts, err := mset.store.LoadMsg(seq)
if err != nil {
return nil, err
}
sm := &StoredMsg{
Subject: subj,
Sequence: seq,
Header: hdr,
Data: msg,
Time: time.Unix(0, ts).UTC(),
}

View File

@@ -6155,6 +6155,105 @@ func TestJetStreamNextMsgNoInterest(t *testing.T) {
}
}
func TestJetStreamMsgHeaders(t *testing.T) {
cases := []struct {
name string
mconfig *server.StreamConfig
}{
{name: "MemoryStore",
mconfig: &server.StreamConfig{
Name: "foo",
Retention: server.LimitsPolicy,
MaxAge: time.Hour,
Storage: server.MemoryStorage,
Replicas: 1,
}},
{name: "FileStore",
mconfig: &server.StreamConfig{
Name: "foo",
Retention: server.LimitsPolicy,
MaxAge: time.Hour,
Storage: server.FileStorage,
Replicas: 1,
}},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
mset, err := s.GlobalAccount().AddStream(c.mconfig)
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
defer mset.Delete()
nc := clientConnectToServer(t, s)
defer nc.Close()
m := nats.NewMsg("foo")
m.Header.Add("Accept-Encoding", "json")
m.Header.Add("Authorization", "s3cr3t")
m.Data = []byte("Hello JetStream Headers - #1!")
nc.PublishMsg(m)
nc.Flush()
state := mset.State()
if state.Msgs != 1 {
t.Fatalf("Expected 1 message, got %d", state.Msgs)
}
if state.Bytes == 0 {
t.Fatalf("Expected non-zero bytes")
}
// Now access raw from stream.
sm, err := mset.GetMsg(1)
if err != nil {
t.Fatalf("Unexpected error getting stored message: %v", err)
}
// Calculate the []byte version of the headers.
var b bytes.Buffer
b.WriteString("NATS/1.0\r\n")
m.Header.Write(&b)
b.WriteString("\r\n")
hdr := b.Bytes()
if !bytes.Equal(sm.Header, hdr) {
t.Fatalf("Message headers do not match, %q vs %q", hdr, sm.Header)
}
if !bytes.Equal(sm.Data, m.Data) {
t.Fatalf("Message data do not match, %q vs %q", m.Data, sm.Data)
}
// Now do consumer based.
sub, _ := nc.SubscribeSync(nats.NewInbox())
defer sub.Unsubscribe()
nc.Flush()
o, err := mset.AddConsumer(&server.ConsumerConfig{DeliverSubject: sub.Subject})
if err != nil {
t.Fatalf("Expected no error with registered interest, got %v", err)
}
defer o.Delete()
cm, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error getting message: %v", err)
}
// Remove reply subject and sub for comparison.
cm.Sub, cm.Reply = nil, ""
if !reflect.DeepEqual(cm, m) {
t.Fatalf("Messages do not match: %+v vs %+v", cm, m)
}
})
}
}
func TestJetStreamTemplateBasics(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

View File

@@ -93,7 +93,7 @@ func (c *EncodedConn) Publish(subject string, v interface{}) error {
if err != nil {
return err
}
return c.Conn.publish(subject, _EMPTY_, b)
return c.Conn.publish(subject, _EMPTY_, nil, b)
}
// PublishRequest will perform a Publish() expecting a response on the
@@ -104,7 +104,7 @@ func (c *EncodedConn) PublishRequest(subject, reply string, v interface{}) error
if err != nil {
return err
}
return c.Conn.publish(subject, reply, b)
return c.Conn.publish(subject, reply, nil, b)
}
// Request will create an Inbox and perform a Request() call

View File

@@ -1,7 +1,10 @@
module github.com/nats-io/nats.go
require (
github.com/nats-io/jwt v0.3.2
github.com/golang/protobuf v1.4.2
github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7
github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093
github.com/nats-io/nkeys v0.1.4
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
)

View File

@@ -1,5 +1,23 @@
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/minio/highwayhash v1.0.0 h1:iMSDhgUILCr0TNm8LWlSjF8N0ZIj2qbO8WHp6Q/J2BA=
github.com/minio/highwayhash v1.0.0/go.mod h1:xQboMTeM9nY9v/LlAOxFctujiv5+Aq2hR5dxBpaMbdc=
github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI=
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7 h1:RnGotxlghqR5D2KDAu4TyuLqyjuylOsJiAFhXvMvQIc=
github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093 h1:ii4KAXLYB3f7A6VnBhRWsYP+x45C+GAXh5T2VQWLfgQ=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093/go.mod h1:rQnBf2Rv4P9adtAs/Ti6LfFmVtFG6HLhl/H7cVshcJU=
github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.4 h1:aEsHIssIk6ETN5m2/MD8Y4B2X7FfXrBAUdkyRvbVYzA=
github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
@@ -10,6 +28,18 @@ golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 h1:3zb4D3T4G8jdExgVU/95+vQXfpEPiMdCaZgmGVxjNHM=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7 h1:HmbHVPwrPEKPGLAcHSrMe6+hqSUlvZU0rab6x5EXfGU=
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=

View File

@@ -27,6 +27,8 @@ import (
"io/ioutil"
"math/rand"
"net"
"net/http"
"net/textproto"
"net/url"
"os"
"path/filepath"
@@ -45,7 +47,7 @@ import (
// Default Constants
const (
Version = "1.10.0"
Version = "1.11.0"
DefaultURL = "nats://127.0.0.1:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60
@@ -120,6 +122,8 @@ var (
ErrMsgNoReply = errors.New("nats: message does not have a reply")
ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server")
ErrDisconnected = errors.New("nats: server is disconnected")
ErrHeadersNotSupported = errors.New("nats: headers not supported by this server")
ErrBadHeaderMsg = errors.New("nats: message could not decode headers")
)
func init() {
@@ -382,6 +386,12 @@ type Options struct {
// callbacks after Close() is called. Client won't receive notifications
// when Close is invoked by user code. Default is to invoke the callbacks.
NoCallbacksAfterClientClose bool
// LameDuckModeHandler sets the callback to invoke when the server notifies
// the connection that it entered lame duck mode, that is, going to
// gradually disconnect all its connections before shuting down. This is
// often used in deployments when upgrading NATS Servers.
LameDuckModeHandler ConnHandler
}
const (
@@ -494,6 +504,7 @@ type Subscription struct {
type Msg struct {
Subject string
Reply string
Header http.Header
Data []byte
Sub *Subscription
next *Msg
@@ -525,6 +536,7 @@ type srv struct {
tlsName string
}
// The INFO block received from the server.
type serverInfo struct {
ID string `json:"server_id"`
Host string `json:"host"`
@@ -532,12 +544,14 @@ type serverInfo struct {
Version string `json:"version"`
AuthRequired bool `json:"auth_required"`
TLSRequired bool `json:"tls_required"`
Headers bool `json:"headers"`
MaxPayload int64 `json:"max_payload"`
ConnectURLs []string `json:"connect_urls,omitempty"`
Proto int `json:"proto,omitempty"`
CID uint64 `json:"client_id,omitempty"`
ClientIP string `json:"client_ip,omitempty"`
Nonce string `json:"nonce,omitempty"`
LameDuckMode bool `json:"ldm,omitempty"`
}
const (
@@ -564,6 +578,7 @@ type connectInfo struct {
Version string `json:"version"`
Protocol int `json:"protocol"`
Echo bool `json:"echo"`
Headers bool `json:"headers"`
}
// MsgHandler is a callback function that processes messages delivered to
@@ -947,6 +962,17 @@ func NoCallbacksAfterClientClose() Option {
}
}
// LameDuckModeHandler sets the callback to invoke when the server notifies
// the connection that it entered lame duck mode, that is, going to
// gradually disconnect all its connections before shuting down. This is
// often used in deployments when upgrading NATS Servers.
func LameDuckModeHandler(cb ConnHandler) Option {
return func(o *Options) error {
o.LameDuckModeHandler = cb
return nil
}
}
// Handler processing
// SetDisconnectHandler will set the disconnect event handler.
@@ -1077,10 +1103,11 @@ func (o Options) Connect() (*Conn, error) {
}
const (
_CRLF_ = "\r\n"
_EMPTY_ = ""
_SPC_ = " "
_PUB_P_ = "PUB "
_CRLF_ = "\r\n"
_EMPTY_ = ""
_SPC_ = " "
_PUB_P_ = "PUB "
_HPUB_P_ = "HPUB "
)
const (
@@ -1091,12 +1118,12 @@ const (
)
const (
conProto = "CONNECT %s" + _CRLF_
pingProto = "PING" + _CRLF_
pongProto = "PONG" + _CRLF_
subProto = "SUB %s %s %d" + _CRLF_
unsubProto = "UNSUB %d %s" + _CRLF_
okProto = _OK_OP_ + _CRLF_
connectProto = "CONNECT %s" + _CRLF_
pingProto = "PING" + _CRLF_
pongProto = "PONG" + _CRLF_
subProto = "SUB %s %s %d" + _CRLF_
unsubProto = "UNSUB %d %s" + _CRLF_
okProto = _OK_OP_ + _CRLF_
)
// Return the currently selected server
@@ -1444,9 +1471,9 @@ func (nc *Conn) setup() {
nc.fch = make(chan struct{}, flushChanSize)
nc.rqch = make(chan struct{})
// Setup scratch outbound buffer for PUB
pub := nc.scratch[:len(_PUB_P_)]
copy(pub, _PUB_P_)
// Setup scratch outbound buffer for PUB/HPUB
pub := nc.scratch[:len(_HPUB_P_)]
copy(pub, _HPUB_P_)
}
// Process a connected connection and initialize properly.
@@ -1660,7 +1687,7 @@ func (nc *Conn) connectProto() (string, error) {
}
cinfo := connectInfo{o.Verbose, o.Pedantic, ujwt, nkey, sig, user, pass, token,
o.Secure, o.Name, LangString, Version, clientProtoInfo, !o.NoEcho}
o.Secure, o.Name, LangString, Version, clientProtoInfo, !o.NoEcho, true}
b, err := json.Marshal(cinfo)
if err != nil {
@@ -1672,7 +1699,7 @@ func (nc *Conn) connectProto() (string, error) {
return _EMPTY_, ErrNoEchoNotSupported
}
return fmt.Sprintf(conProto, b), nil
return fmt.Sprintf(connectProto, b), nil
}
// normalizeErr removes the prefix -ERR, trim spaces and remove the quotes.
@@ -2264,8 +2291,27 @@ func (nc *Conn) processMsg(data []byte) {
msgPayload := make([]byte, len(data))
copy(msgPayload, data)
// Check if we have headers encoded here.
var h http.Header
var err error
if nc.ps.ma.hdr > 0 {
hbuf := msgPayload[:nc.ps.ma.hdr]
msgPayload = msgPayload[nc.ps.ma.hdr:]
h, err = decodeHeadersMsg(hbuf)
if err != nil {
// We will pass the message through but send async error.
nc.mu.Lock()
nc.err = ErrBadHeaderMsg
if nc.Opts.AsyncErrorCB != nil {
nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, sub, ErrBadHeaderMsg) })
}
nc.mu.Unlock()
}
}
// FIXME(dlc): Should we recycle these containers?
m := &Msg{Data: msgPayload, Subject: subj, Reply: reply, Sub: sub}
m := &Msg{Header: h, Data: msgPayload, Subject: subj, Reply: reply, Sub: sub}
sub.mu.Lock()
@@ -2457,6 +2503,9 @@ func (nc *Conn) processInfo(info string) error {
// did not include themselves in the async INFO protocol.
// If empty, do not remove the implicit servers from the pool.
if len(ncInfo.ConnectURLs) == 0 {
if ncInfo.LameDuckMode && nc.Opts.LameDuckModeHandler != nil {
nc.ach.push(func() { nc.Opts.LameDuckModeHandler(nc) })
}
return nil
}
// Note about pool randomization: when the pool was first created,
@@ -2517,7 +2566,9 @@ func (nc *Conn) processInfo(info string) error {
nc.ach.push(func() { nc.Opts.DiscoveredServersCB(nc) })
}
}
if ncInfo.LameDuckMode && nc.Opts.LameDuckModeHandler != nil {
nc.ach.push(func() { nc.Opts.LameDuckModeHandler(nc) })
}
return nil
}
@@ -2601,7 +2652,34 @@ func (nc *Conn) kickFlusher() {
// argument is left untouched and needs to be correctly interpreted on
// the receiver.
func (nc *Conn) Publish(subj string, data []byte) error {
return nc.publish(subj, _EMPTY_, data)
return nc.publish(subj, _EMPTY_, nil, data)
}
// Used to create a new message for publishing that will use headers.
func NewMsg(subject string) *Msg {
return &Msg{
Subject: subject,
Header: make(http.Header),
}
}
const (
hdrLine = "NATS/1.0\r\n"
crlf = "\r\n"
hdrPreEnd = len(hdrLine) - len(crlf)
)
// decodeHeadersMsg will decode and headers.
func decodeHeadersMsg(data []byte) (http.Header, error) {
tp := textproto.NewReader(bufio.NewReader(bytes.NewReader(data)))
if l, err := tp.ReadLine(); err != nil || l != hdrLine[:hdrPreEnd] {
return nil, ErrBadHeaderMsg
}
mh, err := tp.ReadMIMEHeader()
if err != nil {
return nil, ErrBadHeaderMsg
}
return http.Header(mh), nil
}
// PublishMsg publishes the Msg structure, which includes the
@@ -2610,14 +2688,26 @@ func (nc *Conn) PublishMsg(m *Msg) error {
if m == nil {
return ErrInvalidMsg
}
return nc.publish(m.Subject, m.Reply, m.Data)
var hdr []byte
if len(m.Header) > 0 {
if !nc.info.Headers {
return ErrHeadersNotSupported
}
// FIXME(dlc) - Optimize
var b bytes.Buffer
b.WriteString(hdrLine)
m.Header.Write(&b)
b.WriteString(crlf)
hdr = b.Bytes()
}
return nc.publish(m.Subject, m.Reply, hdr, m.Data)
}
// PublishRequest will perform a Publish() excpecting a response on the
// PublishRequest will perform a Publish() expecting a response on the
// reply subject. Use Request() for automatically waiting for a response
// inline.
func (nc *Conn) PublishRequest(subj, reply string, data []byte) error {
return nc.publish(subj, reply, data)
return nc.publish(subj, reply, nil, data)
}
// Used for handrolled itoa
@@ -2626,7 +2716,7 @@ const digits = "0123456789"
// publish is the internal function to publish messages to a nats-server.
// Sends a protocol data message by queuing into the bufio writer
// and kicking the flush go routine. These writes should be protected.
func (nc *Conn) publish(subj, reply string, data []byte) error {
func (nc *Conn) publish(subj, reply string, hdr, data []byte) error {
if nc == nil {
return ErrInvalidConnection
}
@@ -2646,7 +2736,7 @@ func (nc *Conn) publish(subj, reply string, data []byte) error {
}
// Proactively reject payloads over the threshold set by server.
msgSize := int64(len(data))
msgSize := int64(len(data) + len(hdr))
if msgSize > nc.info.MaxPayload {
nc.mu.Unlock()
return ErrMaxPayload
@@ -2664,37 +2754,65 @@ func (nc *Conn) publish(subj, reply string, data []byte) error {
}
}
msgh := nc.scratch[:len(_PUB_P_)]
msgh = append(msgh, subj...)
msgh = append(msgh, ' ')
var mh []byte
if hdr != nil {
mh = nc.scratch[:len(_HPUB_P_)]
} else {
mh = nc.scratch[1:len(_HPUB_P_)]
}
mh = append(mh, subj...)
mh = append(mh, ' ')
if reply != "" {
msgh = append(msgh, reply...)
msgh = append(msgh, ' ')
mh = append(mh, reply...)
mh = append(mh, ' ')
}
// We could be smarter here, but simple loop is ok,
// just avoid strconv in fast path
// just avoid strconv in fast path.
// FIXME(dlc) - Find a better way here.
// msgh = strconv.AppendInt(msgh, int64(len(data)), 10)
// go 1.14 some values strconv faster, may be able to switch over.
var b [12]byte
var i = len(b)
if len(data) > 0 {
for l := len(data); l > 0; l /= 10 {
i -= 1
if hdr != nil {
if len(hdr) > 0 {
for l := len(hdr); l > 0; l /= 10 {
i--
b[i] = digits[l%10]
}
} else {
i--
b[i] = digits[0]
}
mh = append(mh, b[i:]...)
mh = append(mh, ' ')
// reset for below.
i = len(b)
}
if msgSize > 0 {
for l := msgSize; l > 0; l /= 10 {
i--
b[i] = digits[l%10]
}
} else {
i -= 1
i--
b[i] = digits[0]
}
msgh = append(msgh, b[i:]...)
msgh = append(msgh, _CRLF_...)
mh = append(mh, b[i:]...)
mh = append(mh, _CRLF_...)
_, err := nc.bw.Write(msgh)
_, err := nc.bw.Write(mh)
if err == nil {
_, err = nc.bw.Write(data)
if hdr != nil {
_, err = nc.bw.Write(hdr)
}
if err == nil {
_, err = nc.bw.Write(data)
}
}
if err == nil {
_, err = nc.bw.WriteString(_CRLF_)
@@ -2705,7 +2823,7 @@ func (nc *Conn) publish(subj, reply string, data []byte) error {
}
nc.OutMsgs++
nc.OutBytes += uint64(len(data))
nc.OutBytes += uint64(len(data) + len(hdr))
if len(nc.fch) == 0 {
nc.kickFlusher()

View File

@@ -1,4 +1,4 @@
// Copyright 2012-2018 The NATS Authors
// Copyright 2012-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -21,6 +21,7 @@ type msgArg struct {
subject []byte
reply []byte
sid int64
hdr int
size int
}
@@ -30,6 +31,7 @@ type parseState struct {
state int
as int
drop int
hdr int
ma msgArg
argBuf []byte
msgBuf []byte
@@ -54,6 +56,7 @@ const (
MSG_ARG
MSG_PAYLOAD
MSG_END
OP_H
OP_P
OP_PI
OP_PIN
@@ -83,6 +86,10 @@ func (nc *Conn) parse(buf []byte) error {
switch b {
case 'M', 'm':
nc.ps.state = OP_M
nc.ps.hdr = -1
case 'H', 'h':
nc.ps.state = OP_H
nc.ps.hdr = 0
case 'P', 'p':
nc.ps.state = OP_P
case '+':
@@ -94,6 +101,13 @@ func (nc *Conn) parse(buf []byte) error {
default:
goto parseErr
}
case OP_H:
switch b {
case 'M', 'm':
nc.ps.state = OP_M
default:
goto parseErr
}
case OP_M:
switch b {
case 'S', 's':
@@ -140,8 +154,7 @@ func (nc *Conn) parse(buf []byte) error {
nc.ps.drop, nc.ps.as, nc.ps.state = 0, i+1, MSG_PAYLOAD
// jump ahead with the index. If this overruns
// what is left we fall out and process split
// buffer.
// what is left we fall out and process a split buffer.
i = nc.ps.as + nc.ps.ma.size - 1
default:
if nc.ps.argBuf != nil {
@@ -415,6 +428,11 @@ func (nc *Conn) cloneMsgArg() {
const argsLenMax = 4
func (nc *Conn) processMsgArgs(arg []byte) error {
// Use separate function for header based messages.
if nc.ps.hdr >= 0 {
return nc.processHeaderMsgArgs(arg)
}
// Unroll splitArgs to avoid runtime/heap issues
a := [argsLenMax][]byte{}
args := a[:0]
@@ -459,6 +477,57 @@ func (nc *Conn) processMsgArgs(arg []byte) error {
return nil
}
// processHeaderMsgArgs is for a header based message.
func (nc *Conn) processHeaderMsgArgs(arg []byte) error {
// Unroll splitArgs to avoid runtime/heap issues
a := [argsLenMax][]byte{}
args := a[:0]
start := -1
for i, b := range arg {
switch b {
case ' ', '\t', '\r', '\n':
if start >= 0 {
args = append(args, arg[start:i])
start = -1
}
default:
if start < 0 {
start = i
}
}
}
if start >= 0 {
args = append(args, arg[start:])
}
switch len(args) {
case 4:
nc.ps.ma.subject = args[0]
nc.ps.ma.sid = parseInt64(args[1])
nc.ps.ma.reply = nil
nc.ps.ma.hdr = int(parseInt64(args[2]))
nc.ps.ma.size = int(parseInt64(args[3]))
case 5:
nc.ps.ma.subject = args[0]
nc.ps.ma.sid = parseInt64(args[1])
nc.ps.ma.reply = args[2]
nc.ps.ma.hdr = int(parseInt64(args[3]))
nc.ps.ma.size = int(parseInt64(args[4]))
default:
return fmt.Errorf("nats: processHeaderMsgArgs Parse Error: '%s'", arg)
}
if nc.ps.ma.sid < 0 {
return fmt.Errorf("nats: processHeaderMsgArgs Bad or Missing Sid: '%s'", arg)
}
if nc.ps.ma.hdr < 0 || nc.ps.ma.hdr > nc.ps.ma.size {
return fmt.Errorf("nats: processHeaderMsgArgs Bad or Missing Header Size: '%s'", arg)
}
if nc.ps.ma.size < 0 {
return fmt.Errorf("nats: processHeaderMsgArgs Bad or Missing Size: '%s'", arg)
}
return nil
}
// Ascii numbers 0-9
const (
ascii_0 = 48

2
vendor/modules.txt vendored
View File

@@ -4,7 +4,7 @@ github.com/minio/highwayhash
# github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7
## explicit
github.com/nats-io/jwt
# github.com/nats-io/nats.go v1.10.0
# github.com/nats-io/nats.go v1.10.1-0.20200529175407-3559350e3ada
## explicit
github.com/nats-io/nats.go
github.com/nats-io/nats.go/encoders/builtin