From 4d7cd26956b3b389c7f84a6efcee7da7569d8226 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 30 Jun 2023 22:49:48 -0700 Subject: [PATCH] Add in support for segmented binary stream snapshots. Streams with many interior deletes was causing issues due to the fact that the interior deletes were represented as a sorted []uint64. This approach introduces 3 sub types of delete blocks, avl bitmask tree, a run length encoding, and the legacy format above. We also take into account large interior deletes such that on receiving a snapshot we can skip things we already know about. Signed-off-by: Derek Collison --- server/avl/norace_test.go | 6 +- server/avl/seqset.go | 105 +++++++++++++-- server/avl/seqset_test.go | 47 ++++++- server/events.go | 99 +++++++++++---- server/filestore.go | 149 +++++++++++++++++++++- server/filestore_test.go | 21 +++ server/jetstream_cluster.go | 104 +++++++++++---- server/jetstream_cluster_3_test.go | 22 ++++ server/memstore.go | 47 +++++++ server/norace_test.go | 197 +++++++++++++++++++++++++++++ server/route.go | 2 +- server/server.go | 23 ++-- server/store.go | 142 +++++++++++++++++++++ 13 files changed, 887 insertions(+), 77 deletions(-) diff --git a/server/avl/norace_test.go b/server/avl/norace_test.go index b458a444..0e726b96 100644 --- a/server/avl/norace_test.go +++ b/server/avl/norace_test.go @@ -110,7 +110,7 @@ func TestNoRaceSeqSetEncodeLarge(t *testing.T) { } start = time.Now() - ss2, err := Decode(b) + ss2, _, err := Decode(b) require_NoError(t, err) if elapsed := time.Since(start); elapsed > expected { t.Fatalf("Expected decode to take less than %v, got %v", expected, elapsed) @@ -174,8 +174,8 @@ func TestNoRaceSeqSetRelativeSpeed(t *testing.T) { t.Fatalf("Expected SequenceSet insert to be no more than 2x slower (%v vs %v)", mapInsertElapsed, ssInsertElapsed) } - if mapLookupElapsed*2 <= ssLookupElapsed { - t.Fatalf("Expected SequenceSet lookups to be no more than 2x slower (%v vs %v)", mapLookupElapsed, ssLookupElapsed) + if mapLookupElapsed*3 <= ssLookupElapsed { + t.Fatalf("Expected SequenceSet lookups to be no more than 3x slower (%v vs %v)", mapLookupElapsed, ssLookupElapsed) } } diff --git a/server/avl/seqset.go b/server/avl/seqset.go index 3e0e6c32..2b482c69 100644 --- a/server/avl/seqset.go +++ b/server/avl/seqset.go @@ -138,6 +138,15 @@ func (ss *SequenceSet) Heights() (l, r int) { return l, r } +// Returns min, max and number of set items. +func (ss *SequenceSet) State() (min, max, num uint64) { + if ss.root == nil { + return 0, 0, 0 + } + min, max = ss.MinMax() + return min, max, uint64(ss.Size()) +} + // MinMax will return the minunum and maximum values in the set. func (ss *SequenceSet) MinMax() (min, max uint64) { if ss.root == nil { @@ -177,6 +186,23 @@ func (ss *SequenceSet) Clone() *SequenceSet { return css } +// Union will union this SequenceSet with ssa. +func (ss *SequenceSet) Union(ssa ...*SequenceSet) { + for _, sa := range ssa { + sa.root.nodeIter(func(n *node) { + for nb, b := range n.bits { + for pos := uint64(0); b != 0; pos++ { + if b&1 == 1 { + seq := n.base + (uint64(nb) * uint64(bitsPerBucket)) + pos + ss.Insert(seq) + } + b >>= 1 + } + } + }) + } +} + // Union will return a union of all sets. func Union(ssa ...*SequenceSet) *SequenceSet { if len(ssa) == 0 { @@ -200,7 +226,7 @@ const ( // Magic is used to identify the encode binary state.. magic = uint8(22) // Version - version = uint8(1) + version = uint8(2) // hdrLen hdrLen = 2 // minimum length of an encoded SequenceSet. @@ -247,14 +273,28 @@ func (ss SequenceSet) Encode(buf []byte) ([]byte, error) { // ErrBadEncoding is returned when we can not decode properly. var ( ErrBadEncoding = errors.New("ss: bad encoding") + ErrBadVersion = errors.New("ss: bad version") ErrSetNotEmpty = errors.New("ss: set not empty") ) -func Decode(buf []byte) (*SequenceSet, error) { - if len(buf) < minLen || buf[0] != magic || buf[1] != version { - return nil, ErrBadEncoding +// Decode returns the sequence set and number of bytes read from the buffer on success. +func Decode(buf []byte) (*SequenceSet, int, error) { + if len(buf) < minLen || buf[0] != magic { + return nil, -1, ErrBadEncoding } + switch v := buf[1]; v { + case 1: + return decodev1(buf) + case 2: + return decodev2(buf) + default: + return nil, -1, ErrBadVersion + } +} + +// Helper to decode v2. +func decodev2(buf []byte) (*SequenceSet, int, error) { var le = binary.LittleEndian index := 2 nn := int(le.Uint32(buf[index:])) @@ -262,15 +302,12 @@ func Decode(buf []byte) (*SequenceSet, error) { index += 8 expectedLen := minLen + (nn * ((numBuckets+1)*8 + 2)) - if len(buf) != expectedLen { - return nil, ErrBadEncoding + if len(buf) < expectedLen { + return nil, -1, ErrBadEncoding } - nodes := make([]node, nn) + ss, nodes := SequenceSet{size: sz}, make([]node, nn) - ss := SequenceSet{ - size: sz, - } for i := 0; i < nn; i++ { n := &nodes[i] n.base = le.Uint64(buf[index:]) @@ -284,7 +321,51 @@ func Decode(buf []byte) (*SequenceSet, error) { ss.insertNode(n) } - return &ss, nil + return &ss, index, nil +} + +// Helper to decode v1 into v2 which has fixed buckets of 32 vs 64 originally. +func decodev1(buf []byte) (*SequenceSet, int, error) { + var le = binary.LittleEndian + index := 2 + nn := int(le.Uint32(buf[index:])) + sz := int(le.Uint32(buf[index+4:])) + index += 8 + + const v1NumBuckets = 64 + + expectedLen := minLen + (nn * ((v1NumBuckets+1)*8 + 2)) + if len(buf) < expectedLen { + return nil, -1, ErrBadEncoding + } + + var ss SequenceSet + for i := 0; i < nn; i++ { + base := le.Uint64(buf[index:]) + index += 8 + for nb := uint64(0); nb < v1NumBuckets; nb++ { + n := le.Uint64(buf[index:]) + // Walk all set bits and insert sequences manually for this decode from v1. + for pos := uint64(0); n != 0; pos++ { + if n&1 == 1 { + seq := base + (nb * uint64(bitsPerBucket)) + pos + ss.Insert(seq) + } + n >>= 1 + } + index += 8 + } + // Skip over encoded height. + index += 2 + } + + // Sanity check. + if ss.Size() != sz { + return nil, -1, ErrBadEncoding + } + + return &ss, index, nil + } // insertNode places a decoded node into the tree. @@ -318,7 +399,7 @@ func (ss *SequenceSet) insertNode(n *node) { const ( bitsPerBucket = 64 // bits in uint64 - numBuckets = 64 + numBuckets = 32 numEntries = numBuckets * bitsPerBucket ) diff --git a/server/avl/seqset_test.go b/server/avl/seqset_test.go index d4fe2cd7..3c1fee7e 100644 --- a/server/avl/seqset_test.go +++ b/server/avl/seqset_test.go @@ -14,6 +14,7 @@ package avl import ( + "encoding/base64" "math/rand" "testing" ) @@ -21,7 +22,7 @@ import ( func TestSeqSetBasics(t *testing.T) { var ss SequenceSet - seqs := []uint64{22, 222, 2222, 2, 2, 4} + seqs := []uint64{22, 222, 2000, 2, 2, 4} for _, seq := range seqs { ss.Insert(seq) require_True(t, ss.Exists(seq)) @@ -235,6 +236,50 @@ func TestSeqSetFirst(t *testing.T) { } } +// Test that we can union with nodes vs individual sequence insertion. +func TestSeqSetDistinctUnion(t *testing.T) { + // Distinct sets. + var ss1 SequenceSet + seqs1 := []uint64{1, 10, 100, 200} + for _, seq := range seqs1 { + ss1.Insert(seq) + } + + var ss2 SequenceSet + seqs2 := []uint64{5000, 6100, 6200, 6222} + for _, seq := range seqs2 { + ss2.Insert(seq) + } + + ss := ss1.Clone() + allSeqs := append(seqs1, seqs2...) + + ss.Union(&ss2) + require_True(t, ss.Size() == len(allSeqs)) + for _, seq := range allSeqs { + require_True(t, ss.Exists(seq)) + } +} + +func TestSeqSetDecodeV1(t *testing.T) { + // Encoding from v1 which was 64 buckets. + seqs := []uint64{22, 222, 2222, 222_222, 2_222_222} + encStr := ` +FgEDAAAABQAAAABgAwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAADgIQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAA== +` + + enc, err := base64.StdEncoding.DecodeString(encStr) + require_NoError(t, err) + + ss, _, err := Decode(enc) + require_NoError(t, err) + + require_True(t, ss.Size() == len(seqs)) + for _, seq := range seqs { + require_True(t, ss.Exists(seq)) + } +} + func require_NoError(t *testing.T, err error) { t.Helper() if err != nil { diff --git a/server/events.go b/server/events.go index 8a55939d..2f709679 100644 --- a/server/events.go +++ b/server/events.go @@ -193,18 +193,53 @@ type ServerID struct { ID string `json:"id"` } +// Type for our server capabilities. +type ServerCapability uint64 + // ServerInfo identifies remote servers. type ServerInfo struct { - Name string `json:"name"` - Host string `json:"host"` - ID string `json:"id"` - Cluster string `json:"cluster,omitempty"` - Domain string `json:"domain,omitempty"` - Version string `json:"ver"` - Tags []string `json:"tags,omitempty"` - Seq uint64 `json:"seq"` - JetStream bool `json:"jetstream"` - Time time.Time `json:"time"` + Name string `json:"name"` + Host string `json:"host"` + ID string `json:"id"` + Cluster string `json:"cluster,omitempty"` + Domain string `json:"domain,omitempty"` + Version string `json:"ver"` + Tags []string `json:"tags,omitempty"` + // Whether JetStream is enabled (deprecated in favor of the `ServerCapability`). + JetStream bool `json:"jetstream"` + // Generic capability flags + Flags ServerCapability + // Sequence and Time from the remote server for this message. + Seq uint64 `json:"seq"` + Time time.Time `json:"time"` +} + +const ( + JetStreamEnabled ServerCapability = 1 << iota // Server had JetStream enabled. + BinaryStreamSnapshot // New stream snapshot capability. +) + +// Set JetStream capability. +func (si *ServerInfo) SetJetStreamEnabled() { + si.Flags |= JetStreamEnabled + // Still set old version. + si.JetStream = true +} + +// JetStreamEnabled indicates whether or not we have JetStream enabled. +func (si *ServerInfo) JetStreamEnabled() bool { + // Take into account old version. + return si.Flags&JetStreamEnabled != 0 || si.JetStream +} + +// Set binary stream snapshot capability. +func (si *ServerInfo) SetBinaryStreamSnapshot() { + si.Flags |= BinaryStreamSnapshot +} + +// JetStreamEnabled indicates whether or not we have binary stream snapshot capbilities. +func (si *ServerInfo) BinaryStreamSnapshot() bool { + return si.Flags&BinaryStreamSnapshot != 0 } // ClientInfo is detailed information about the client forming a connection. @@ -391,17 +426,21 @@ RESET: case <-sendq.ch: msgs := sendq.pop() for _, pm := range msgs { - if pm.si != nil { - pm.si.Name = servername - pm.si.Domain = domain - pm.si.Host = host - pm.si.Cluster = cluster - pm.si.ID = id - pm.si.Seq = atomic.AddUint64(seqp, 1) - pm.si.Version = VERSION - pm.si.Time = time.Now().UTC() - pm.si.JetStream = js - pm.si.Tags = tags + if si := pm.si; si != nil { + si.Name = servername + si.Domain = domain + si.Host = host + si.Cluster = cluster + si.ID = id + si.Seq = atomic.AddUint64(seqp, 1) + si.Version = VERSION + si.Time = time.Now().UTC() + si.Tags = tags + if js { + // New capability based flags. + si.SetJetStreamEnabled() + si.SetBinaryStreamSnapshot() + } } var b []byte if pm.msg != nil { @@ -1418,7 +1457,9 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su si.Tags, cfg, stats, - false, si.JetStream, + false, + si.JetStreamEnabled(), + si.BinaryStreamSnapshot(), }) } @@ -1454,7 +1495,19 @@ func (s *Server) processNewServer(si *ServerInfo) { node := getHash(si.Name) // Only update if non-existent if _, ok := s.nodeToInfo.Load(node); !ok { - s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Version, si.Cluster, si.Domain, si.ID, si.Tags, nil, nil, false, si.JetStream}) + s.nodeToInfo.Store(node, nodeInfo{ + si.Name, + si.Version, + si.Cluster, + si.Domain, + si.ID, + si.Tags, + nil, + nil, + false, + si.JetStreamEnabled(), + si.BinaryStreamSnapshot(), + }) } } // Announce ourselves.. diff --git a/server/filestore.go b/server/filestore.go index 3e49dd10..07c3e9d4 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -5307,7 +5307,7 @@ func (mb *msgBlock) readIndexInfo() error { if dmapLen > 0 { // New version is encoded avl seqset. if buf[1] == newVersion { - dmap, err := avl.Decode(buf[bi:]) + dmap, _, err := avl.Decode(buf[bi:]) if err != nil { return fmt.Errorf("could not decode avl dmap: %v", err) } @@ -6786,6 +6786,153 @@ func (fs *fileStore) fileStoreConfig() FileStoreConfig { return fs.fcfg } +// When we will write a run length encoded record vs adding to the existing avl.SequenceSet. +const rlThresh = 4096 + +// Binary encoded state snapshot, >= v2.10 server. +func (fs *fileStore) EncodedStreamState(failed uint64) ([]byte, error) { + fs.mu.RLock() + defer fs.mu.RUnlock() + + // Calculate deleted. + var numDeleted int64 + if fs.state.LastSeq > fs.state.FirstSeq { + numDeleted = int64(fs.state.LastSeq-fs.state.FirstSeq+1) - int64(fs.state.Msgs) + if numDeleted < 0 { + numDeleted = 0 + } + } + + // Encoded is Msgs, Bytes, FirstSeq, LastSeq, Failed, NumDeleted and optional DeletedBlocks + var buf [1024]byte + buf[0], buf[1] = streamStateMagic, streamStateVersion + n := hdrLen + n += binary.PutUvarint(buf[n:], fs.state.Msgs) + n += binary.PutUvarint(buf[n:], fs.state.Bytes) + n += binary.PutUvarint(buf[n:], fs.state.FirstSeq) + n += binary.PutUvarint(buf[n:], fs.state.LastSeq) + n += binary.PutUvarint(buf[n:], failed) + n += binary.PutUvarint(buf[n:], uint64(numDeleted)) + + b := buf[0:n] + + if numDeleted > 0 { + var scratch [4 * 1024]byte + for _, db := range fs.deleteBlocks() { + switch db := db.(type) { + case *DeleteRange: + first, _, num := db.State() + scratch[0] = runLengthMagic + i := 1 + i += binary.PutUvarint(scratch[i:], first) + i += binary.PutUvarint(scratch[i:], num) + b = append(b, scratch[0:i]...) + case *avl.SequenceSet: + buf, err := db.Encode(scratch[:0]) + if err != nil { + return nil, err + } + b = append(b, buf...) + } + } + } + + return b, nil +} + +// Lock should be held. +func (fs *fileStore) deleteBlocks() DeleteBlocks { + var ( + dbs DeleteBlocks + adm *avl.SequenceSet + prevLast uint64 + ) + for _, mb := range fs.blks { + mb.mu.RLock() + // Detect if we have a gap between these blocks. + if prevLast > 0 && prevLast+1 != mb.first.seq { + // Detect if we need to encode a run length encoding here. + gap := mb.first.seq - prevLast - 1 + if gap > rlThresh { + // Check if we have a running adm, if so write that out first, or if contigous update rle params. + if adm != nil && adm.Size() > 0 { + min, max := adm.MinMax() + // Check if we are all contingous. + if uint64(adm.Size()) == max-min+1 { + prevLast, gap = min-1, mb.first.seq-min + } else { + dbs = append(dbs, adm) + } + // Always nil out here. + adm = nil + } + dbs = append(dbs, &DeleteRange{First: prevLast + 1, Num: gap}) + } else { + // Common dmap + if adm == nil { + adm = &avl.SequenceSet{} + adm.SetInitialMin(prevLast + 1) + } + for seq := prevLast + 1; seq < mb.first.seq; seq++ { + adm.Insert(seq) + } + } + } + if sz := mb.dmap.Size(); sz > 0 { + // Check in case the mb's dmap is contiguous. + min, max := mb.dmap.MinMax() + if uint64(sz) == max-min+1 { + // Need to write out adm if it exists. + if adm != nil && adm.Size() > 0 { + dbs = append(dbs, adm) + } + dbs = append(dbs, &DeleteRange{First: min, Num: max - min + 1}) + } else { + // Aggregated dmap + if adm == nil { + adm = mb.dmap.Clone() + } else { + adm.Union(&mb.dmap) + } + } + } + prevLast = mb.last.seq + mb.mu.RUnlock() + } + + if adm != nil { + dbs = append(dbs, adm) + } + + return dbs +} + +// SyncDeleted will make sure this stream has same deleted state as dbs. +func (fs *fileStore) SyncDeleted(dbs DeleteBlocks) { + if len(dbs) == 0 { + return + } + fs.mu.Lock() + defer fs.mu.Unlock() + + mdbs := fs.deleteBlocks() + for i, db := range dbs { + // If the block is same as what we have we can skip. + if i < len(mdbs) { + first, last, num := db.State() + eFirst, eLast, eNum := mdbs[i].State() + if first == eFirst && last == eLast && num == eNum { + continue + } + } + // Need to insert these. + db.Range(func(dseq uint64) bool { + fs.removeMsg(dseq, false, true, false) + return true + }) + } +} + //////////////////////////////////////////////////////////////////////////////// // Consumers //////////////////////////////////////////////////////////////////////////////// diff --git a/server/filestore_test.go b/server/filestore_test.go index 752e6fbf..0bf4328a 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -5583,3 +5583,24 @@ func TestFileStoreNumPendingLargeNumBlks(t *testing.T) { require_True(t, time.Since(start) < 50*time.Millisecond) require_True(t, total == 4000) } + +func TestFileStoreSkipMsgAndNumBlocks(t *testing.T) { + // No need for all permutations here. + storeDir := t.TempDir() + fcfg := FileStoreConfig{ + StoreDir: storeDir, + BlockSize: 128, // Small on purpose to create alot of blks. + } + fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"zzz"}, Storage: FileStorage}) + require_NoError(t, err) + + subj, msg := "zzz", bytes.Repeat([]byte("X"), 100) + numMsgs := 10_000 + + fs.StoreMsg(subj, nil, msg) + for i := 0; i < numMsgs; i++ { + fs.SkipMsg() + } + fs.StoreMsg(subj, nil, msg) + require_True(t, fs.numMsgBlocks() == 2) +} diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index b397ee40..d7596547 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2847,25 +2847,46 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco panic(fmt.Sprintf("JetStream Cluster Unknown group entry op type: %v", op)) } } else if e.Type == EntrySnapshot { - if !isRecovering && mset != nil { - var snap streamSnapshot - if err := json.Unmarshal(e.Data, &snap); err != nil { - return err - } - if !mset.IsLeader() { - if err := mset.processSnapshot(&snap); err != nil { - return err - } - } - } else if isRecovering && mset != nil { - // On recovery, reset CLFS/FAILED. - var snap streamSnapshot - if err := json.Unmarshal(e.Data, &snap); err != nil { - return err - } + if mset == nil { + return nil + } + // Everything operates on new replicated state. Will convert legacy snapshots to this for processing. + var ss *StreamReplicatedState + + // Check if we are the new binary encoding. + if IsEncodedStreamState(e.Data) { + var err error + ss, err = DecodeStreamState(e.Data) + if err != nil { + return err + } + } else { + var snap streamSnapshot + if err := json.Unmarshal(e.Data, &snap); err != nil { + return err + } + // Convert over to StreamReplicatedState + ss = &StreamReplicatedState{ + Msgs: snap.Msgs, + Bytes: snap.Bytes, + FirstSeq: snap.FirstSeq, + LastSeq: snap.LastSeq, + Failed: snap.Failed, + } + if len(snap.Deleted) > 0 { + ss.Deleted = append(ss.Deleted, DeleteSlice(snap.Deleted)) + } + } + + if !isRecovering && !mset.IsLeader() { + if err := mset.processSnapshot(ss); err != nil { + return err + } + } else if isRecovering { + // On recovery, reset CLFS/FAILED. mset.mu.Lock() - mset.clfs = snap.Failed + mset.clfs = ss.Failed mset.mu.Unlock() } } else if e.Type == EntryRemovePeer { @@ -7174,7 +7195,36 @@ func encodeStreamMsgAllowCompress(subject, reply string, hdr, msg []byte, lseq u return buf[:wi] } +// Determine if all peers in our set support the binary snapshot. +func (mset *stream) supportsBinarySnapshot() bool { + mset.mu.RLock() + defer mset.mu.RUnlock() + return mset.supportsBinarySnapshotLocked() +} + +// Determine if all peers in our set support the binary snapshot. +// Lock should be held. +func (mset *stream) supportsBinarySnapshotLocked() bool { + s, n := mset.srv, mset.node + if s == nil || n == nil { + return false + } + // Grab our peers and walk them to make sure we can all support binary stream snapshots. + id, peers := n.ID(), n.Peers() + for _, p := range peers { + if p.ID == id { + // We know we support ourselves. + continue + } + if sir, ok := s.nodeToInfo.Load(p.ID); !ok || sir == nil || !sir.(nodeInfo).binarySnapshots { + return false + } + } + return true +} + // StreamSnapshot is used for snapshotting and out of band catch up in clustered mode. +// Legacy, replace with binary stream snapshots. type streamSnapshot struct { Msgs uint64 `json:"messages"` Bytes uint64 `json:"bytes"` @@ -7194,6 +7244,13 @@ func (mset *stream) stateSnapshot() []byte { // Grab a snapshot of a stream for clustered mode. // Lock should be held. func (mset *stream) stateSnapshotLocked() []byte { + // Decide if we can support the new style of stream snapshots. + if mset.supportsBinarySnapshotLocked() { + snap, _ := mset.store.EncodedStreamState(mset.clfs) + return snap + } + + // Older v1 version with deleted as a sorted []uint64. state := mset.store.State() snap := &streamSnapshot{ Msgs: state.Msgs, @@ -7438,7 +7495,7 @@ type streamSyncRequest struct { } // Given a stream state that represents a snapshot, calculate the sync request based on our current state. -func (mset *stream) calculateSyncRequest(state *StreamState, snap *streamSnapshot) *streamSyncRequest { +func (mset *stream) calculateSyncRequest(state *StreamState, snap *StreamReplicatedState) *streamSyncRequest { // Quick check if we are already caught up. if state.LastSeq >= snap.LastSeq { return nil @@ -7448,7 +7505,7 @@ func (mset *stream) calculateSyncRequest(state *StreamState, snap *streamSnapsho // processSnapshotDeletes will update our current store based on the snapshot // but only processing deletes and new FirstSeq / purges. -func (mset *stream) processSnapshotDeletes(snap *streamSnapshot) { +func (mset *stream) processSnapshotDeletes(snap *StreamReplicatedState) { mset.mu.Lock() var state StreamState mset.store.FastState(&state) @@ -7461,11 +7518,8 @@ func (mset *stream) processSnapshotDeletes(snap *streamSnapshot) { } mset.mu.Unlock() - // Range the deleted and delete if applicable. - for _, dseq := range snap.Deleted { - if dseq > state.FirstSeq && dseq <= state.LastSeq { - mset.store.RemoveMsg(dseq) - } + if len(snap.Deleted) > 0 { + mset.store.SyncDeleted(snap.Deleted) } } @@ -7562,7 +7616,7 @@ var ( ) // Process a stream snapshot. -func (mset *stream) processSnapshot(snap *streamSnapshot) (e error) { +func (mset *stream) processSnapshot(snap *StreamReplicatedState) (e error) { // Update any deletes, etc. mset.processSnapshotDeletes(snap) diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index cfbd6b71..be8a6b08 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -4519,3 +4519,25 @@ func TestJetStreamClusterSnapshotAndRestoreWithHealthz(t *testing.T) { require_NoError(t, err) require_True(t, si.State.Msgs == uint64(toSend)) } + +func TestJetStreamBinaryStreamSnapshotCapability(t *testing.T) { + c := createJetStreamClusterExplicit(t, "NATS", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + mset, err := c.streamLeader(globalAccountName, "TEST").GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + + if !mset.supportsBinarySnapshot() { + t.Fatalf("Expected to signal that we could support binary stream snapshots") + } +} diff --git a/server/memstore.go b/server/memstore.go index 0b71e906..b1ef3a15 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -14,11 +14,14 @@ package server import ( + "encoding/binary" "fmt" "math/rand" "sort" "sync" "time" + + "github.com/nats-io/nats-server/v2/server/avl" ) // TODO(dlc) - This is a fairly simplistic approach but should do for now. @@ -1208,6 +1211,50 @@ func (ms *memStore) Snapshot(_ time.Duration, _, _ bool) (*SnapshotResult, error return nil, fmt.Errorf("no impl") } +// Binary encoded state snapshot, >= v2.10 server. +func (ms *memStore) EncodedStreamState(failed uint64) ([]byte, error) { + // FIXME(dlc) - Don't calculate deleted on the fly, keep delete blocks. + state := ms.State() + + // Encoded is Msgs, Bytes, FirstSeq, LastSeq, Failed, NumDeleted and optional DeletedBlocks + var buf [1024]byte + buf[0], buf[1] = streamStateMagic, streamStateVersion + n := hdrLen + n += binary.PutUvarint(buf[n:], state.Msgs) + n += binary.PutUvarint(buf[n:], state.Bytes) + n += binary.PutUvarint(buf[n:], state.FirstSeq) + n += binary.PutUvarint(buf[n:], state.LastSeq) + n += binary.PutUvarint(buf[n:], failed) + n += binary.PutUvarint(buf[n:], uint64(state.NumDeleted)) + + b := buf[0:n] + + if state.NumDeleted > 0 { + var ss avl.SequenceSet + ss.SetInitialMin(state.Deleted[0]) + for _, seq := range state.Deleted { + ss.Insert(seq) + } + buf, err := ss.Encode(nil) + if err != nil { + return nil, err + } + b = append(b, buf...) + } + + return b, nil +} + +// SyncDeleted will make sure this stream has same deleted state as dbs. +func (ms *memStore) SyncDeleted(dbs DeleteBlocks) { + for _, db := range dbs { + db.Range(func(dseq uint64) bool { + ms.RemoveMsg(dseq) + return true + }) + } +} + func (o *consumerMemStore) Update(state *ConsumerState) error { // Sanity checks. if state.AckFloor.Consumer > state.Delivered.Consumer { diff --git a/server/norace_test.go b/server/norace_test.go index f6c94994..919dd7b5 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -8649,3 +8649,200 @@ func TestNoRaceReplicatedMirrorWithLargeStartingSequenceOverLeafnode(t *testing. return fmt.Errorf("Mirror state not correct: %+v", si.State) }) } + +func TestNoRaceBinaryStreamSnapshotEncodingBasic(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"*"}, + MaxMsgsPerSubject: 1, + }) + require_NoError(t, err) + + // Set first key + sendStreamMsg(t, nc, "key:1", "hello") + + // Set Second key but keep updating it, causing a laggard pattern. + value := bytes.Repeat([]byte("Z"), 8*1024) + + for i := 0; i <= 1000; i++ { + _, err := js.PublishAsync("key:2", value) + require_NoError(t, err) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + // Now do more of swiss cheese style. + for i := 3; i <= 1000; i++ { + key := fmt.Sprintf("key:%d", i) + _, err := js.PublishAsync(key, value) + require_NoError(t, err) + // Send it twice to create hole right behind it, like swiss cheese. + _, err = js.PublishAsync(key, value) + require_NoError(t, err) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + // Make for round numbers for stream state. + sendStreamMsg(t, nc, "key:2", "hello") + sendStreamMsg(t, nc, "key:2", "world") + + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + require_True(t, si.State.FirstSeq == 1) + require_True(t, si.State.LastSeq == 3000) + require_True(t, si.State.Msgs == 1000) + require_True(t, si.State.NumDeleted == 2000) + + mset, err := s.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + + snap, err := mset.store.EncodedStreamState(0) + require_NoError(t, err) + + // Now decode the snapshot. + ss, err := DecodeStreamState(snap) + require_NoError(t, err) + + require_True(t, ss.FirstSeq == 1) + require_True(t, ss.LastSeq == 3000) + require_True(t, ss.Msgs == 1000) + // We should have collapsed all these into 2 delete blocks. + require_True(t, len(ss.Deleted) == 2) + require_True(t, ss.Deleted.NumDeleted() == 2000) +} + +func TestNoRaceFilestoreBinaryStreamSnapshotEncodingLargeGaps(t *testing.T) { + storeDir := t.TempDir() + fcfg := FileStoreConfig{ + StoreDir: storeDir, + BlockSize: 512, // Small on purpose to create alot of blks. + } + fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"zzz"}, Storage: FileStorage}) + require_NoError(t, err) + + subj, msg := "zzz", bytes.Repeat([]byte("X"), 128) + numMsgs := 20_000 + + fs.StoreMsg(subj, nil, msg) + for i := 2; i < numMsgs; i++ { + seq, _, err := fs.StoreMsg(subj, nil, nil) + require_NoError(t, err) + fs.RemoveMsg(seq) + } + fs.StoreMsg(subj, nil, msg) + + snap, err := fs.EncodedStreamState(0) + require_NoError(t, err) + require_True(t, len(snap) < 512) + + // Now decode the snapshot. + ss, err := DecodeStreamState(snap) + require_NoError(t, err) + + require_True(t, ss.FirstSeq == 1) + require_True(t, ss.LastSeq == 20_000) + require_True(t, ss.Msgs == 2) + require_True(t, len(ss.Deleted) == 2) + require_True(t, ss.Deleted.NumDeleted() == 19_998) +} + +func TestNoRaceJetStreamClusterStreamSnapshotCatchup(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + s := c.randomServer() + + // Client based API + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"*"}, + MaxMsgsPerSubject: 1, + Replicas: 3, + }) + require_NoError(t, err) + + msg := []byte("Hello World") + _, err = js.Publish("foo", msg) + require_NoError(t, err) + + for i := 1; i < 1000; i++ { + _, err := js.PublishAsync("bar", msg) + require_NoError(t, err) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + sr := c.randomNonStreamLeader(globalAccountName, "TEST") + sr.Shutdown() + + // Now create larger gap. + for i := 0; i < 50_000; i++ { + _, err := js.PublishAsync("bar", msg) + require_NoError(t, err) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(10 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + sl := c.streamLeader(globalAccountName, "TEST") + sl.JetStreamSnapshotStream(globalAccountName, "TEST") + + sr = c.restartServer(sr) + c.checkClusterFormed() + + c.waitOnServerCurrent(sr) + c.waitOnStreamCurrent(sr, globalAccountName, "TEST") + + mset, err := sr.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + + var state StreamState + mset.store.FastState(&state) + + require_True(t, state.Msgs == 2) + require_True(t, state.FirstSeq == 1) + require_True(t, state.LastSeq == 51_000) + require_True(t, state.NumDeleted == 51_000-2) + + sr.Shutdown() + + _, err = js.Publish("baz", msg) + require_NoError(t, err) + sl.JetStreamSnapshotStream(globalAccountName, "TEST") + + sr = c.restartServer(sr) + c.checkClusterFormed() + + c.waitOnServerCurrent(sr) + c.waitOnStreamCurrent(sr, globalAccountName, "TEST") + + mset, err = sr.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + mset.store.FastState(&state) + + require_True(t, state.Msgs == 3) + require_True(t, state.FirstSeq == 1) + require_True(t, state.LastSeq == 51_001) + require_True(t, state.NumDeleted == 51_001-3) +} diff --git a/server/route.go b/server/route.go index a1921a74..6f6adbe0 100644 --- a/server/route.go +++ b/server/route.go @@ -1997,7 +1997,7 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string // check to be consistent and future proof. but will be same domain if s.sameDomain(info.Domain) { s.nodeToInfo.Store(rHash, - nodeInfo{rn, s.info.Version, s.info.Cluster, info.Domain, id, nil, nil, nil, false, info.JetStream}) + nodeInfo{rn, s.info.Version, s.info.Cluster, info.Domain, id, nil, nil, nil, false, info.JetStream, false}) } } diff --git a/server/server.go b/server/server.go index 5975764c..99d03f2f 100644 --- a/server/server.go +++ b/server/server.go @@ -310,16 +310,17 @@ type Server struct { // For tracking JS nodes. type nodeInfo struct { - name string - version string - cluster string - domain string - id string - tags jwt.TagList - cfg *JetStreamConfig - stats *JetStreamStats - offline bool - js bool + name string + version string + cluster string + domain string + id string + tags jwt.TagList + cfg *JetStreamConfig + stats *JetStreamStats + offline bool + js bool + binarySnapshots bool } // Make sure all are 64bits for atomic use @@ -696,7 +697,7 @@ func NewServer(opts *Options) (*Server, error) { opts.Tags, &JetStreamConfig{MaxMemory: opts.JetStreamMaxMemory, MaxStore: opts.JetStreamMaxStore, CompressOK: true}, nil, - false, true, + false, true, true, }) } diff --git a/server/store.go b/server/store.go index 9a1b4792..5d1aa7b7 100644 --- a/server/store.go +++ b/server/store.go @@ -21,6 +21,8 @@ import ( "io" "strings" "time" + + "github.com/nats-io/nats-server/v2/server/avl" ) // StorageType determines how messages are stored for retention. @@ -97,6 +99,8 @@ type StreamStore interface { NumPending(sseq uint64, filter string, lastPerSubject bool) (total, validThrough uint64) State() StreamState FastState(*StreamState) + EncodedStreamState(failed uint64) (enc []byte, err error) + SyncDeleted(dbs DeleteBlocks) Type() StorageType RegisterStorageUpdates(StorageUpdateHandler) UpdateConfig(cfg *StreamConfig) error @@ -171,6 +175,144 @@ type SnapshotResult struct { State StreamState } +const ( + // Magic is used to identify stream state encodings. + streamStateMagic = uint8(42) + // Version + streamStateVersion = uint8(1) + // Magic / Identifier for run length encodings. + runLengthMagic = uint8(33) + // Magic / Identifier for AVL seqsets. + seqSetMagic = uint8(22) +) + +// Interface for DeleteBlock. +// These will be of three types: +// 1. AVL seqsets. +// 2. Run length encoding of a deleted range. +// 3. Legacy []uint64 +type DeleteBlock interface { + State() (first, last, num uint64) + Range(f func(uint64) bool) +} + +type DeleteBlocks []DeleteBlock + +// StreamReplicatedState represents what is encoded in a binary stream snapshot used +// for stream replication in an NRG. +type StreamReplicatedState struct { + Msgs uint64 + Bytes uint64 + FirstSeq uint64 + LastSeq uint64 + Failed uint64 + Deleted DeleteBlocks +} + +// Determine if this is an encoded stream state. +func IsEncodedStreamState(buf []byte) bool { + return len(buf) >= hdrLen && buf[0] == streamStateMagic && buf[1] == streamStateVersion +} + +var ErrBadStreamStateEncoding = errors.New("bad stream state encoding") + +func DecodeStreamState(buf []byte) (*StreamReplicatedState, error) { + ss := &StreamReplicatedState{} + if len(buf) < hdrLen || buf[0] != streamStateMagic || buf[1] != streamStateVersion { + return nil, ErrBadStreamStateEncoding + } + var bi = hdrLen + + readU64 := func() uint64 { + if bi < 0 || bi >= len(buf) { + bi = -1 + return 0 + } + num, n := binary.Uvarint(buf[bi:]) + if n <= 0 { + bi = -1 + return 0 + } + bi += n + return num + } + + ss.Msgs = readU64() + ss.Bytes = readU64() + ss.FirstSeq = readU64() + ss.LastSeq = readU64() + ss.Failed = readU64() + + if numDeleted := readU64(); numDeleted > 0 { + // If we have some deleted blocks. + for l := len(buf); l > bi; { + switch buf[bi] { + case seqSetMagic: + dmap, n, err := avl.Decode(buf[bi:]) + if err != nil { + return nil, err + } + bi += n + ss.Deleted = append(ss.Deleted, dmap) + case runLengthMagic: + bi++ + var rl DeleteRange + rl.First = readU64() + rl.Num = readU64() + ss.Deleted = append(ss.Deleted, &rl) + } + } + } + + return ss, nil +} + +// DeleteRange is a run length encoded delete range. +type DeleteRange struct { + First uint64 + Num uint64 +} + +func (dr *DeleteRange) State() (first, last, num uint64) { + return dr.First, dr.First + dr.Num, dr.Num +} + +// Range will range over all the deleted sequences represented by this block. +func (dr *DeleteRange) Range(f func(uint64) bool) { + for seq := dr.First; seq <= dr.First+dr.Num; seq++ { + if !f(seq) { + return + } + } +} + +// Legacy []uint64 +type DeleteSlice []uint64 + +func (ds DeleteSlice) State() (first, last, num uint64) { + if len(ds) == 0 { + return 0, 0, 0 + } + return ds[0], ds[len(ds)-1], uint64(len(ds)) +} + +// Range will range over all the deleted sequences represented by this []uint64. +func (ds DeleteSlice) Range(f func(uint64) bool) { + for _, seq := range ds { + if !f(seq) { + return + } + } +} + +func (dbs DeleteBlocks) NumDeleted() (total uint64) { + for _, db := range dbs { + _, _, num := db.State() + total += num + } + return total +} + // ConsumerStore stores state on consumers for streams. type ConsumerStore interface { SetStarting(sseq uint64) error