From 9f30bf00e08d69728d6e301d4222a1fd5678ad03 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 1 Dec 2021 10:25:15 -0700 Subject: [PATCH] [FIXED] Corrupted headers receiving from consumer with meta-only When a consumer is configured with "meta-only" option, and the stream was backed by a memory store, a memory corruption could happen causing the application to receive corrupted headers. Also replaced most of usage of `append(a[:0:0], a...)` to make copies. This was based on this wiki: https://github.com/go101/go101/wiki/How-to-efficiently-clone-a-slice%3F But since Go 1.15, it is actually faster to call make+copy instead. Signed-off-by: Ivan Kozlovic --- go.mod | 2 +- go.sum | 4 +- server/consumer.go | 5 +- server/filestore.go | 15 ++++-- server/jetstream_api.go | 6 +-- server/jetstream_cluster.go | 6 +-- server/jetstream_test.go | 64 +++++++++++++++++++++++ server/memstore.go | 6 +-- server/mqtt.go | 9 ---- server/raft.go | 14 ++--- server/raft_test.go | 2 +- server/sendq.go | 4 +- server/stream.go | 10 ++-- server/util.go | 22 ++++++++ vendor/github.com/nats-io/nats.go/js.go | 4 +- vendor/github.com/nats-io/nats.go/kv.go | 15 +++++- vendor/github.com/nats-io/nats.go/nats.go | 1 + vendor/modules.txt | 2 +- 18 files changed, 146 insertions(+), 45 deletions(-) diff --git a/go.mod b/go.mod index 7e921fa2..3bd2ffd2 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/klauspost/compress v1.13.4 github.com/minio/highwayhash v1.0.1 github.com/nats-io/jwt/v2 v2.2.0 - github.com/nats-io/nats.go v1.13.1-0.20211018182449-f2416a8b1483 + github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e diff --git a/go.sum b/go.sum index b5c80979..593bc84b 100644 --- a/go.sum +++ b/go.sum @@ -16,8 +16,8 @@ github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt/v2 v2.2.0 h1:Yg/4WFK6vsqMudRg91eBb7Dh6XeVcDMPHycDE8CfltE= github.com/nats-io/jwt/v2 v2.2.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= -github.com/nats-io/nats.go v1.13.1-0.20211018182449-f2416a8b1483 h1:GMx3ZOcMEVM5qnUItQ4eJyQ6ycwmIEB/VC/UxvdevE0= -github.com/nats-io/nats.go v1.13.1-0.20211018182449-f2416a8b1483/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc h1:SHr4MUUZJ/fAC0uSm2OzWOJYsHpapmR86mpw7q1qPXU= +github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/server/consumer.go b/server/consumer.go index 9bc99c47..fa5ab1f3 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2495,10 +2495,11 @@ func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint6 // If headers only do not send msg payload. // Add in msg size itself as header. if o.cfg.HeadersOnly { - bb := bytes.NewBuffer(hdr) - if hdr == nil { + var bb bytes.Buffer + if len(hdr) == 0 { bb.WriteString(hdrLine) } else { + bb.Write(hdr) bb.Truncate(len(hdr) - LEN_CR_LF) } bb.WriteString(JSMsgSize) diff --git a/server/filestore.go b/server/filestore.go index 0a825367..d3b30676 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1018,7 +1018,7 @@ func (fs *fileStore) expireMsgsOnRecover() { if deleted > 0 { // Update blks slice. - fs.blks = append(fs.blks[:0:0], fs.blks[deleted:]...) + fs.blks = copyMsgBlocks(fs.blks[deleted:]) if lb := len(fs.blks); lb == 0 { fs.lmb = nil } else { @@ -1030,6 +1030,15 @@ func (fs *fileStore) expireMsgsOnRecover() { fs.state.Bytes -= bytes } +func copyMsgBlocks(src []*msgBlock) []*msgBlock { + if src == nil { + return nil + } + dst := make([]*msgBlock, len(src)) + copy(dst, src) + return dst +} + // GetSeqFromTime looks for the first sequence number that has // the message with >= timestamp. // FIXME(dlc) - inefficient, and dumb really. Make this better. @@ -3942,7 +3951,7 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { if deleted > 0 { // Update blks slice. - fs.blks = append(fs.blks[:0:0], fs.blks[deleted:]...) + fs.blks = copyMsgBlocks(fs.blks[deleted:]) } // Update top level accounting. @@ -4068,7 +4077,7 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) { for i, omb := range fs.blks { if mb == omb { blks := append(fs.blks[:i], fs.blks[i+1:]...) - fs.blks = append(blks[:0:0], blks...) + fs.blks = copyMsgBlocks(blks) break } } diff --git a/server/jetstream_api.go b/server/jetstream_api.go index d605b98a..a41c64c1 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -667,7 +667,7 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub // the header from the msg body. No other references are needed. // FIXME(dlc) - Should cleanup eventually and make sending // and receiving internal messages more formal. - rmsg = append(rmsg[:0:0], rmsg...) + rmsg = copyBytes(rmsg) client := &client{srv: s, kind: JETSTREAM} client.pa = c.pa @@ -1565,7 +1565,7 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s // Clustered mode will invoke a scatter and gather. if s.JetStreamIsClustered() { // Need to copy these off before sending.. - msg = append(msg[:0:0], msg...) + msg = copyBytes(msg) s.startGoRoutine(func() { s.jsClusteredStreamListRequest(acc, ci, filter, offset, subject, reply, msg) }) return } @@ -3382,7 +3382,7 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account, // Clustered mode will invoke a scatter and gather. if s.JetStreamIsClustered() { - msg = append(msg[:0:0], msg...) + msg = copyBytes(msg) s.startGoRoutine(func() { s.jsClusteredConsumerListRequest(acc, ci, offset, streamName, subject, reply, msg) }) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index a510179d..aa2e64fb 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1057,7 +1057,7 @@ func (js *jetStream) setConsumerAssignmentRecovering(ca *consumerAssignment) { func (sa *streamAssignment) copyGroup() *streamAssignment { csa, cg := *sa, *sa.Group csa.Group = &cg - csa.Group.Peers = append(sa.Group.Peers[:0:0], sa.Group.Peers...) + csa.Group.Peers = copyStrings(sa.Group.Peers) return &csa } @@ -4901,10 +4901,10 @@ RETRY: // Send our catchup request here. reply := syncReplySubject() sub, err = s.sysSubscribe(reply, func(_ *subscription, _ *client, _ *Account, _, reply string, msg []byte) { - // Make copies - https://github.com/go101/go101/wiki + // Make copies // TODO(dlc) - Since we are using a buffer from the inbound client/route. select { - case msgsC <- &im{append(msg[:0:0], msg...), reply}: + case msgsC <- &im{copyBytes(msg), reply}: default: s.Warnf("Failed to place catchup message onto internal channel: %d pending", len(msgsC)) return diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 43427cba..04aa1dd0 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -13612,6 +13612,70 @@ func TestJetStreamInvalidDeliverSubject(t *testing.T) { require_Error(t, err, NewJSConsumerInvalidDeliverSubjectError()) } +func TestJetStreamMemoryCorruption(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + errCh := make(chan error, 10) + nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, e error) { + select { + case errCh <- e: + default: + } + }) + + // The storage has to be MemoryStorage to show the issue + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "bucket", Storage: nats.MemoryStorage}) + require_NoError(t, err) + + w1, err := kv.WatchAll() + require_NoError(t, err) + + w2, err := kv.WatchAll(nats.MetaOnly()) + require_NoError(t, err) + + kv.Put("key1", []byte("aaa")) + kv.Put("key1", []byte("aab")) + kv.Put("key2", []byte("zza")) + kv.Put("key2", []byte("zzb")) + kv.Delete("key1") + kv.Delete("key2") + kv.Put("key1", []byte("aac")) + kv.Put("key2", []byte("zzc")) + kv.Delete("key1") + kv.Delete("key2") + kv.Purge("key1") + kv.Purge("key2") + + checkUpdates := func(updates <-chan nats.KeyValueEntry) { + t.Helper() + count := 0 + for { + select { + case <-updates: + count++ + if count == 13 { + return + } + case <-time.After(time.Second): + t.Fatal("Did not receive all updates") + } + } + } + checkUpdates(w1.Updates()) + checkUpdates(w2.Updates()) + + select { + case e := <-errCh: + t.Fatal(e) + case <-time.After(250 * time.Millisecond): + // OK + } +} + /////////////////////////////////////////////////////////////////////////// // Simple JetStream Benchmarks /////////////////////////////////////////////////////////////////////////// diff --git a/server/memstore.go b/server/memstore.go index a35bd1d9..848e4e29 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -126,13 +126,13 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int ms.state.FirstTime = now } - // Make copies - https://github.com/go101/go101/wiki + // Make copies // TODO(dlc) - Maybe be smarter here. if len(msg) > 0 { - msg = append(msg[:0:0], msg...) + msg = copyBytes(msg) } if len(hdr) > 0 { - hdr = append(hdr[:0:0], hdr...) + hdr = copyBytes(hdr) } ms.msgs[seq] = &storedMsg{subj, hdr, msg, seq, ts} diff --git a/server/mqtt.go b/server/mqtt.go index 25929f11..e8033286 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -3869,15 +3869,6 @@ func mqttNeedSubForLevelUp(subject string) bool { // ////////////////////////////////////////////////////////////////////////////// -func copyBytes(b []byte) []byte { - if b == nil { - return nil - } - cbuf := make([]byte, len(b)) - copy(cbuf, b) - return cbuf -} - func (r *mqttReader) reset(buf []byte) { r.buf = buf r.pos = 0 diff --git a/server/raft.go b/server/raft.go index 704140d4..cb33b106 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1735,7 +1735,7 @@ func (n *raft) handleForwardedRemovePeerProposal(sub *subscription, c *client, _ return } // Need to copy since this is underlying client/route buffer. - peer := string(append(msg[:0:0], msg...)) + peer := string(copyBytes(msg)) n.RLock() propc, werr := n.propc, n.werr @@ -1760,7 +1760,7 @@ func (n *raft) handleForwardedProposal(sub *subscription, c *client, _ *Account, return } // Need to copy since this is underlying client/route buffer. - msg = append(msg[:0:0], msg...) + msg = copyBytes(msg) n.RLock() propc, werr := n.propc, n.werr @@ -2418,7 +2418,7 @@ func (n *raft) handleAppendEntry(sub *subscription, c *client, _ *Account, subje return } - msg = append(msg[:0:0], msg...) + msg = copyBytes(msg) if ae, err := n.decodeAppendEntry(msg, sub, reply); err == nil { select { case n.entryc <- ae: @@ -2665,7 +2665,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { if ps, err := decodePeerState(ae.entries[1].Data); err == nil { n.processPeerState(ps) // Also need to copy from client's buffer. - ae.entries[0].Data = append(ae.entries[0].Data[:0:0], ae.entries[0].Data...) + ae.entries[0].Data = copyBytes(ae.entries[0].Data) } else { n.warn("Could not parse snapshot peerstate correctly") n.cancelCatchup() @@ -2816,7 +2816,7 @@ func (n *raft) handleAppendEntryResponse(sub *subscription, c *client, _ *Accoun if !n.Leader() { return } - msg = append(msg[:0:0], msg...) + msg = copyBytes(msg) ar := n.decodeAppendEntryResponse(msg) ar.reply = reply @@ -2995,7 +2995,7 @@ func (n *raft) decodeVoteRequest(msg []byte, reply string) *voteRequest { return nil } // Need to copy for now b/c of candidate. - msg = append(msg[:0:0], msg...) + msg = copyBytes(msg) var le = binary.LittleEndian return &voteRequest{ @@ -3117,7 +3117,7 @@ func (n *raft) fileWriter() { } case <-n.wpsch: n.RLock() - buf := append(n.wps[:0:0], n.wps...) + buf := copyBytes(n.wps) n.RUnlock() if err := ioutil.WriteFile(psf, buf, 0640); err != nil { n.setWriteErr(err) diff --git a/server/raft_test.go b/server/raft_test.go index f9cbdb57..a9241c72 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -81,7 +81,7 @@ func TestNRGAppendEntryDecode(t *testing.T) { require_Error(t, err, errBadAppendEntry) for i := 0; i < 100; i++ { - b := append(buf[:0:0], buf...) + b := copyBytes(buf) bi := rand.Intn(len(b)) if b[bi] != 0 { b[bi] = 0 diff --git a/server/sendq.go b/server/sendq.go index 39831c11..12413eac 100644 --- a/server/sendq.go +++ b/server/sendq.go @@ -100,11 +100,11 @@ func (sq *sendq) send(subj, rply string, hdr, msg []byte) { out := &outMsg{subj, rply, nil, nil, nil} // We will copy these for now. if len(hdr) > 0 { - hdr = append(hdr[:0:0], hdr...) + hdr = copyBytes(hdr) out.hdr = hdr } if len(msg) > 0 { - msg = append(msg[:0:0], msg...) + msg = copyBytes(msg) out.msg = msg } diff --git a/server/stream.go b/server/stream.go index 10ce72f2..73af82ca 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1697,7 +1697,7 @@ func (mset *stream) setupMirrorConsumer() error { // Process inbound mirror messages from the wire. sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { - hdr, msg := c.msgParts(append(rmsg[:0:0], rmsg...)) // Need to copy. + hdr, msg := c.msgParts(copyBytes(rmsg)) // Need to copy. mset.queueInbound(msgs, subject, reply, hdr, msg) }) if err != nil { @@ -1889,7 +1889,7 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64) { si.cname = ccr.ConsumerInfo.Name // Now create sub to receive messages. sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { - hdr, msg := c.msgParts(append(rmsg[:0:0], rmsg...)) // Need to copy. + hdr, msg := c.msgParts(copyBytes(rmsg)) // Need to copy. mset.queueInbound(si.msgs, subject, reply, hdr, msg) }) if err != nil { @@ -2663,10 +2663,10 @@ func (mset *stream) queueInbound(ib *inbound, subj, rply string, hdr, msg []byte func (mset *stream) queueInboundMsg(subj, rply string, hdr, msg []byte) { // Copy these. if len(hdr) > 0 { - hdr = append(hdr[:0:0], hdr...) + hdr = copyBytes(hdr) } if len(msg) > 0 { - msg = append(msg[:0:0], msg...) + msg = copyBytes(msg) } mset.queueInbound(mset.msgs, subj, rply, hdr, msg) } @@ -3181,7 +3181,7 @@ func (mset *stream) subjects() []string { if len(mset.cfg.Subjects) == 0 { return nil } - return append(mset.cfg.Subjects[:0:0], mset.cfg.Subjects...) + return copyStrings(mset.cfg.Subjects) } // Linked list for async ack of messages. diff --git a/server/util.go b/server/util.go index 4ecbae44..43c8ef3b 100644 --- a/server/util.go +++ b/server/util.go @@ -271,3 +271,25 @@ func getURLsAsString(urls []*url.URL) []string { } return a } + +// copyBytes make a new slice of the same size than `src` and copy its content. +// If `src` is nil, then this returns `nil` +func copyBytes(src []byte) []byte { + if src == nil { + return nil + } + dst := make([]byte, len(src)) + copy(dst, src) + return dst +} + +// copyStrings make a new slice of the same size than `src` and copy its content. +// If `src` is nil, then this returns `nil` +func copyStrings(src []string) []string { + if src == nil { + return nil + } + dst := make([]string, len(src)) + copy(dst, src) + return dst +} diff --git a/vendor/github.com/nats-io/nats.go/js.go b/vendor/github.com/nats-io/nats.go/js.go index f2ae3571..38d7be5a 100644 --- a/vendor/github.com/nats-io/nats.go/js.go +++ b/vendor/github.com/nats-io/nats.go/js.go @@ -2135,7 +2135,7 @@ func RateLimit(n uint64) SubOpt { // BindStream binds a consumer to a stream explicitly based on a name. // When a stream name is not specified, the library uses the subscribe // subject as a way to find the stream name. It is done by making a request -// to the server to get list of stream names that have a fileter for this +// to the server to get list of stream names that have a filter for this // subject. If the returned list contains a single stream, then this // stream name will be used, otherwise the `ErrNoMatchingStream` is returned. // To avoid the stream lookup, provide the stream name with this function. @@ -2552,7 +2552,7 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error { // Skip if already acked. if atomic.LoadUint32(&m.ackd) == 1 { - return ErrInvalidJSAck + return ErrMsgAlreadyAckd } m.Sub.mu.Lock() diff --git a/vendor/github.com/nats-io/nats.go/kv.go b/vendor/github.com/nats-io/nats.go/kv.go index 58f4b4b9..deaefde2 100644 --- a/vendor/github.com/nats-io/nats.go/kv.go +++ b/vendor/github.com/nats-io/nats.go/kv.go @@ -112,6 +112,8 @@ type watchOpts struct { ignoreDeletes bool // Include all history per subject, not just last one. includeHistory bool + // retrieve only the meta data of the entry + metaOnly bool } type watchOptFn func(opts *watchOpts) error @@ -136,6 +138,14 @@ func IgnoreDeletes() WatchOpt { }) } +// MetaOnly instructs the key watcher to retrieve only the entry meta data, not the entry value +func MetaOnly() WatchOpt { + return watchOptFn(func(opts *watchOpts) error { + opts.metaOnly = true + return nil + }) +} + // KeyValueConfig is for configuring a KeyValue store. type KeyValueConfig struct { Bucket string @@ -534,7 +544,7 @@ func (kv *kvs) PurgeDeletes(opts ...WatchOpt) error { // Keys() will return all keys. func (kv *kvs) Keys(opts ...WatchOpt) ([]string, error) { - opts = append(opts, IgnoreDeletes()) + opts = append(opts, IgnoreDeletes(), MetaOnly()) watcher, err := kv.WatchAll(opts...) if err != nil { return nil, err @@ -676,6 +686,9 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) { if !o.includeHistory { subOpts = append(subOpts, DeliverLastPerSubject()) } + if o.metaOnly { + subOpts = append(subOpts, HeadersOnly()) + } sub, err := kv.js.Subscribe(keys, update, subOpts...) if err != nil { return nil, err diff --git a/vendor/github.com/nats-io/nats.go/nats.go b/vendor/github.com/nats-io/nats.go/nats.go index 129c120c..72759717 100644 --- a/vendor/github.com/nats-io/nats.go/nats.go +++ b/vendor/github.com/nats-io/nats.go/nats.go @@ -157,6 +157,7 @@ var ( ErrPullSubscribeRequired = errors.New("nats: must use pull subscribe to bind to pull based consumer") ErrConsumerNotActive = errors.New("nats: consumer not active") ErrMsgNotFound = errors.New("nats: message not found") + ErrMsgAlreadyAckd = errors.New("nats: message was already acknowledged") ) func init() { diff --git a/vendor/modules.txt b/vendor/modules.txt index 8a790725..d7238346 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -9,7 +9,7 @@ github.com/minio/highwayhash # github.com/nats-io/jwt/v2 v2.2.0 ## explicit github.com/nats-io/jwt/v2 -# github.com/nats-io/nats.go v1.13.1-0.20211018182449-f2416a8b1483 +# github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc ## explicit github.com/nats-io/nats.go github.com/nats-io/nats.go/encoders/builtin