Added ability to properly restore consumers from a snapshot.

This made us add forwarding proposals functionality in the raft layer.
More general cleanup and bug fixes as well.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-01-24 13:59:54 -08:00
parent cad0db2aec
commit 9c858d197a
18 changed files with 994 additions and 201 deletions

2
go.mod
View File

@@ -3,7 +3,7 @@ module github.com/nats-io/nats-server/v2
go 1.15
require (
github.com/klauspost/compress v1.11.4
github.com/klauspost/compress v1.11.7
github.com/minio/highwayhash v1.0.0
github.com/nats-io/jwt/v2 v2.0.0-20210107222814-18c5cc45d263
github.com/nats-io/nats.go v1.10.1-0.20210122204956-b8ea7fc17ea6

2
go.sum
View File

@@ -11,6 +11,8 @@ github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.11.4 h1:kz40R/YWls3iqT9zX9AHN3WoVsrAWVyui5sxuLqiXqU=
github.com/klauspost/compress v1.11.4/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.7 h1:0hzRabrMN4tSTvMfnL3SCv1ZGeAP23ynzodBgaHeMeg=
github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/minio/highwayhash v1.0.0 h1:iMSDhgUILCr0TNm8LWlSjF8N0ZIj2qbO8WHp6Q/J2BA=
github.com/minio/highwayhash v1.0.0/go.mod h1:xQboMTeM9nY9v/LlAOxFctujiv5+Aq2hR5dxBpaMbdc=
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=

View File

@@ -214,6 +214,7 @@ type Consumer struct {
closed bool
// Clustered.
ca *consumerAssignment
node RaftNode
infoSub *subscription
}
@@ -230,7 +231,7 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) {
return mset.addConsumer(config, _EMPTY_, nil)
}
func (mset *Stream) addConsumer(config *ConsumerConfig, oname string, node RaftNode) (*Consumer, error) {
func (mset *Stream) addConsumer(config *ConsumerConfig, oname string, ca *consumerAssignment) (*Consumer, error) {
mset.mu.RLock()
s, jsa := mset.srv, mset.jsa
mset.mu.RUnlock()
@@ -507,13 +508,14 @@ func (mset *Stream) addConsumer(config *ConsumerConfig, oname string, node RaftN
store, err := mset.store.ConsumerStore(o.name, config)
if err != nil {
mset.mu.Unlock()
o.deleteWithoutAdvisory()
return nil, fmt.Errorf("error creating store for observable: %v", err)
}
o.store = store
// FIXME(dlc) - Failures past this point should be consumer cleanup.
if !isValidName(o.name) {
mset.mu.Unlock()
o.deleteWithoutAdvisory()
return nil, fmt.Errorf("durable name can not contain '.', '*', '>'")
}
@@ -525,15 +527,21 @@ func (mset *Stream) addConsumer(config *ConsumerConfig, oname string, node RaftN
if eo, ok := mset.consumers[o.name]; ok {
mset.mu.Unlock()
if !o.isDurable() || !o.isPushMode() {
o.name = _EMPTY_ // Prevent removal since same name.
o.deleteWithoutAdvisory()
return nil, fmt.Errorf("consumer already exists")
}
// If we are here we have already registered this durable. If it is still active that is an error.
if eo.Active() {
o.name = _EMPTY_ // Prevent removal since same name.
o.deleteWithoutAdvisory()
return nil, fmt.Errorf("consumer already exists and is still active")
}
// Since we are here this means we have a potentially new durable so we should update here.
// Check that configs are the same.
if !configsEqualSansDelivery(o.config, eo.config) {
o.name = _EMPTY_ // Precent removal since same name.
o.deleteWithoutAdvisory()
return nil, fmt.Errorf("consumer replacement durable config not the same")
}
// Once we are here we have a replacement push-based durable.
@@ -565,8 +573,10 @@ func (mset *Stream) addConsumer(config *ConsumerConfig, oname string, node RaftN
}
}
// Set our node.
o.node = node
// Set our ca.
if ca != nil {
o.setConsumerAssignment(ca)
}
mset.setConsumer(o)
mset.mu.Unlock()
@@ -584,6 +594,16 @@ func (mset *Stream) addConsumer(config *ConsumerConfig, oname string, node RaftN
return o, nil
}
func (o *Consumer) setConsumerAssignment(ca *consumerAssignment) {
o.mu.Lock()
defer o.mu.Unlock()
o.ca = ca
// Set our node.
if ca != nil {
o.node = ca.Group.node
}
}
// Lock should be held.
func (o *Consumer) isLeader() bool {
if o.node != nil {
@@ -865,14 +885,14 @@ func (o *Consumer) deleteNotActive() {
o.mu.RUnlock()
return
}
s, jsa := o.mset.srv, o.mset.jsa
stream, name := o.stream, o.name
s, js, jsa := o.mset.srv, o.mset.srv.js, o.mset.jsa
acc, stream, name := o.acc.Name, o.stream, o.name
o.mu.RUnlock()
// If we are clustered, check if we still have this consumer assigned.
// If we do forward a proposal to delete ourselves to the metacontroller leader.
if s.JetStreamIsClustered() {
if ca := jsa.consumerAssignment(stream, name); ca != nil {
if ca := js.consumerAssignment(acc, stream, name); ca != nil {
// We copy and clear the reply since this removal is internal.
jsa.mu.Lock()
cca := *ca
@@ -892,7 +912,7 @@ func (o *Consumer) deleteNotActive() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for range ticker.C {
if ca := jsa.consumerAssignment(stream, name); ca != nil {
if ca := js.consumerAssignment(acc, stream, name); ca != nil {
s.Warnf("Consumer assignment not cleaned up, retrying")
meta.ForwardProposal(removeEntry)
} else {
@@ -1125,31 +1145,57 @@ func (o *Consumer) readStoredState() error {
return nil
}
state, err := o.store.State()
if err == nil && state != nil {
// FIXME(dlc) - re-apply state.
o.dseq = state.Delivered.Consumer + 1
o.sseq = state.Delivered.Stream + 1
o.adflr = state.AckFloor.Consumer
o.asflr = state.AckFloor.Stream
o.pending = state.Pending
o.rdc = state.Redelivered
o.applyState(state)
}
return err
}
// Apply the consumer stored state.
func (o *Consumer) applyState(state *ConsumerState) {
if state == nil {
return
}
o.dseq = state.Delivered.Consumer + 1
o.sseq = state.Delivered.Stream + 1
o.adflr = state.AckFloor.Consumer
o.asflr = state.AckFloor.Stream
o.pending = state.Pending
o.rdc = state.Redelivered
// Setup tracking timer if we have restored pending.
if len(o.pending) > 0 && o.ptmr == nil {
o.ptmr = time.AfterFunc(o.ackWait(0), o.checkPending)
}
return err
}
func (o *Consumer) readStoreState() *ConsumerState {
o.mu.RLock()
defer o.mu.RUnlock()
if o.store == nil {
return nil
}
state, _ := o.store.State()
return state
}
// Sets our store state from another source. Used in clustered mode on snapshot restore.
func (o *Consumer) setStoreState(state *ConsumerState) error {
if state == nil {
return nil
}
o.applyState(state)
return o.store.Update(state)
}
// Update our state to the store.
func (o *Consumer) writeState() {
func (o *Consumer) writeStoreState() error {
o.mu.Lock()
defer o.mu.Unlock()
if o.store == nil {
return
return nil
}
state := ConsumerState{
@@ -1164,8 +1210,7 @@ func (o *Consumer) writeState() {
Pending: o.pending,
Redelivered: o.rdc,
}
// FIXME(dlc) - Hold onto any errors.
o.store.Update(&state)
return o.store.Update(&state)
}
// loopAndDeliverMsgs() will loop and deliver messages and watch for interest changes.
@@ -2312,7 +2357,7 @@ func (o *Consumer) purge(sseq uint64) {
}
o.mu.Unlock()
o.writeState()
o.writeStoreState()
}
func stopAndClearTimer(tp **time.Timer) {
@@ -2423,8 +2468,13 @@ func (o *Consumer) stop(dflag, doSignal, advisory bool) error {
}
}
if dflag && n != nil {
n.Stop()
// Cluster cleanup.
if n != nil {
if dflag {
n.Delete()
} else {
n.Stop()
}
}
var err error

View File

@@ -223,7 +223,7 @@ type DataStats struct {
// Used for internally queueing up messages that the server wants to send.
type pubMsg struct {
acc *Account
c *client
sub string
rply string
si *ServerInfo
@@ -258,7 +258,6 @@ RESET:
}
sysc := s.sys.client
resetCh := s.sys.resetCh
sysacc := s.sys.account
sendq := s.sys.sendq
id := s.info.ID
host := s.info.Host
@@ -276,10 +275,6 @@ RESET:
warnFreq := time.Second
last := time.Now().Add(-warnFreq)
// Internal client not generally accessible for when we
// need to change accounts from the system account.
ic := s.createInternalAccountClient()
for s.eventsRunning() {
// Setup information for next message
if len(sendq) > warnThresh && time.Since(last) >= warnFreq {
@@ -314,19 +309,14 @@ RESET:
// Setup our client. If the user wants to use a non-system account use our internal
// account scoped here so that we are not changing out accounts for the system client.
var c *client
if pm.acc != nil && pm.acc != sysacc {
c = ic
if pm.c != nil {
c = pm.c
} else {
c = sysc
pm.acc = nil
}
// Grab client lock.
c.mu.Lock()
// We can have an override for account here.
if pm.acc != nil {
c.acc = pm.acc
}
// Prep internal structures needed to send message.
c.pa.subject = []byte(pm.sub)
@@ -392,8 +382,15 @@ func (s *Server) sendInternalAccountMsg(a *Account, subject string, msg interfac
}
sendq := s.sys.sendq
// Don't hold lock while placing on the channel.
c := s.sys.client
s.mu.Unlock()
sendq <- &pubMsg{a, subject, "", nil, msg, false}
// Replace our client with the account's internal client.
if a != nil {
c = a.internalClient()
}
sendq <- &pubMsg{c, subject, _EMPTY_, nil, msg, false}
return nil
}

View File

@@ -2844,10 +2844,7 @@ const errFile = "errors.txt"
func (fs *fileStore) streamSnapshot(w io.WriteCloser, state *StreamState, includeConsumers bool) {
defer w.Close()
bw := bufio.NewWriter(w)
defer bw.Flush()
enc := s2.NewWriter(bw)
enc := s2.NewWriter(w)
defer enc.Close()
tw := tar.NewWriter(enc)

View File

@@ -1081,12 +1081,42 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, subject, repl
name := streamNameFromSubject(subject)
var resp = JSApiStreamInfoResponse{ApiResponse: ApiResponse{Type: JSApiStreamInfoResponseType}}
// If we are in clustered mode we need to be the stream leader to proceed.
if s.JetStreamIsClustered() && !acc.JetStreamIsStreamLeader(name) {
return
if s.JetStreamIsClustered() {
// Check to make sure the consumer is assigned.
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
return
}
jsEnabled := acc.JetStreamEnabled()
js.mu.RLock()
isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, name)
js.mu.RUnlock()
if isLeader && sa == nil {
// We can't find the stream, so mimic what would be the errors below.
if !jsEnabled {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// No stream present.
resp.Error = jsNotFoundError(ErrJetStreamStreamNotFound)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
} else if sa == nil {
return
}
// We have the stream assigned, only the stream leader should answer.
if !acc.JetStreamIsStreamLeader(name) {
return
}
}
var resp = JSApiStreamInfoResponse{ApiResponse: ApiResponse{Type: JSApiStreamInfoResponseType}}
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
@@ -2039,12 +2069,47 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, subject, re
stream := streamNameFromSubject(subject)
consumer := consumerNameFromSubject(subject)
var resp = JSApiConsumerInfoResponse{ApiResponse: ApiResponse{Type: JSApiConsumerInfoResponseType}}
// If we are in clustered mode we need to be the stream leader to proceed.
if s.JetStreamIsClustered() && !acc.JetStreamIsConsumerLeader(stream, consumer) {
return
if s.JetStreamIsClustered() {
// Check to make sure the consumer is assigned.
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
return
}
jsEnabled := acc.JetStreamEnabled()
js.mu.RLock()
isLeader, sa, ca := cc.isLeader(), js.streamAssignment(acc.Name, stream), js.consumerAssignment(acc.Name, stream, consumer)
js.mu.RUnlock()
if isLeader && ca == nil {
// We can't find the consumer, so mimic what would be the errors below.
if !jsEnabled {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if sa == nil {
resp.Error = jsNotFoundError(ErrJetStreamStreamNotFound)
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// If we are here the consumer is not present.
resp.Error = jsNoConsumerErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
} else if ca == nil {
return
}
// We have the consumer assigned, only the consumer leader should answer.
if !acc.JetStreamIsConsumerLeader(stream, consumer) {
return
}
}
var resp = JSApiConsumerInfoResponse{ApiResponse: ApiResponse{Type: JSApiConsumerInfoResponseType}}
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))

View File

@@ -62,6 +62,8 @@ const (
// Consumer ops
updateDeliveredOp
updateAcksOp
// Compressed consumer assignments.
assignCompressedConsumerOp
)
// raftGroups are controlled by the metagroup controller.
@@ -89,6 +91,20 @@ type streamAssignment struct {
err error
}
// consumerAssignment is what the meta controller uses to assign consumers to streams.
type consumerAssignment struct {
Client *ClientInfo `json:"client,omitempty"`
Name string `json:"name"`
Stream string `json:"stream"`
Config *ConsumerConfig `json:"consumer"`
Group *raftGroup `json:"group"`
Reply string `json:"reply"`
State *ConsumerState `json:"state,omitempty"`
// Internal
responded bool
err error
}
// streamPurge is what the stream leader will replicate when purging a stream.
type streamPurge struct {
Client *ClientInfo `json:"client,omitempty"`
@@ -104,19 +120,6 @@ type streamMsgDelete struct {
Reply string `json:"reply"`
}
// consumerAssignment is what the meta controller uses to assign consumers to streams.
type consumerAssignment struct {
Client *ClientInfo `json:"client,omitempty"`
Name string `json:"name"`
Stream string `json:"stream"`
Config *ConsumerConfig `json:"consumer"`
Group *raftGroup `json:"group"`
Reply string `json:"reply"`
// Internal
responded bool
err error
}
const (
defaultStoreDirName = "_js_"
defaultMetaGroupName = "_meta_"
@@ -376,6 +379,8 @@ func (s *Server) JetStreamIsConsumerLeader(account, stream, consumer string) boo
if js == nil || cc == nil {
return false
}
js.mu.RLock()
defer js.mu.RUnlock()
return cc.isConsumerLeader(account, stream, consumer)
}
@@ -577,7 +582,7 @@ func (cc *jetStreamCluster) isConsumerLeader(account, stream, consumer string) b
rg := ca.Group
for _, peer := range rg.Peers {
if peer == ourID {
if len(rg.Peers) == 1 || rg.node.Leader() {
if len(rg.Peers) == 1 || (rg.node != nil && rg.node.Leader()) {
return true
}
}
@@ -823,6 +828,13 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool
return didSnap, err
}
js.processConsumerAssignment(ca)
case assignCompressedConsumerOp:
ca, err := decodeConsumerAssignmentCompressed(buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode compressed consumer assigment: %q", buf[1:])
return didSnap, err
}
js.processConsumerAssignment(ca)
case removeConsumerOp:
ca, err := decodeConsumerAssignment(buf[1:])
if err != nil {
@@ -995,6 +1007,9 @@ func (js *jetStream) monitorStream(mset *Stream, sa *streamAssignment) {
for {
select {
case err := <-restoreDoneCh:
// We have completed a restore from snapshot on this server. The stream assignment has
// already been assigned but the replicas will need to catch up out of band. Consumers
// will need to be assigned by forwarding the proposal and stamping the initial state.
s.Debugf("Stream restore for '%s > %s' completed", sa.Client.Account, sa.Config.Name)
if err != nil {
s.Debugf("Stream restore failed: %v", err)
@@ -1037,6 +1052,47 @@ func (js *jetStream) monitorStream(mset *Stream, sa *streamAssignment) {
js.processStreamLeaderChange(mset, sa, isLeader)
attemptSnapshot()
// Check to see if we have restored consumers here.
// These are not currently assigned so we will need to do so here.
if consumers := mset.Consumers(); len(consumers) > 0 {
for _, o := range mset.Consumers() {
rg := cc.createGroupForConsumer(sa)
// Pick a preferred leader.
rg.setPreferred()
name, cfg := o.Name(), o.Config()
// Place our initial state here as well for assignment distribution.
ca := &consumerAssignment{
Group: rg,
Stream: sa.Config.Name,
Name: name,
Config: &cfg,
Client: sa.Client,
State: o.readStoreState(),
}
// We make these compressed in case state is complex.
addEntry := encodeAddConsumerAssignmentCompressed(ca)
cc.meta.ForwardProposal(addEntry)
// Check to make sure we see the assignment.
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for range ticker.C {
js.mu.RLock()
ca, meta := js.consumerAssignment(ca.Client.Account, sa.Config.Name, name), cc.meta
js.mu.RUnlock()
if ca == nil {
s.Warnf("Consumer assignment has not been assigned, retrying")
meta.ForwardProposal(addEntry)
} else {
return
}
}
}()
}
}
case <-s.quitCh:
return
case <-qch:
@@ -1195,7 +1251,12 @@ func (js *jetStream) processStreamLeaderChange(mset *Stream, sa *streamAssignmen
// Will lookup a stream assignment.
// Lock should be held.
func (js *jetStream) streamAssignment(account, stream string) (sa *streamAssignment) {
if as := js.cluster.streams[account]; as != nil {
cc := js.cluster
if cc == nil {
return nil
}
if as := cc.streams[account]; as != nil {
sa = as[stream]
}
return sa
@@ -1269,8 +1330,16 @@ func (js *jetStream) processClusterCreateStream(sa *streamAssignment) {
var mset *Stream
// Process here if not restoring.
if sa.Restore == nil {
// If we are restoring, create the stream if we are R>1 and not the preferred who handles the
// receipt of the snapshot itself.
shouldCreate := true
if sa.Restore != nil {
if len(rg.Peers) == 1 || rg.node != nil && rg.node.ID() == rg.Preferred {
shouldCreate = false
}
}
// Process here if not restoring or not the leader.
if shouldCreate {
// Go ahead and create or update the stream.
mset, err = acc.LookupStream(sa.Config.Name)
if err == nil && mset != nil {
@@ -1287,7 +1356,7 @@ func (js *jetStream) processClusterCreateStream(sa *streamAssignment) {
// This is an error condition.
if err != nil {
s.Debugf("Stream create failed for '%s > %s': %v\n", sa.Client.Account, sa.Config.Name, err)
s.Debugf("Stream create failed for '%s > %s': %v", sa.Client.Account, sa.Config.Name, err)
js.mu.Lock()
sa.err = err
sa.responded = true
@@ -1348,6 +1417,47 @@ func (js *jetStream) processClusterCreateStream(sa *streamAssignment) {
}
js.processStreamLeaderChange(mset, sa, true)
// Check to see if we have restored consumers here.
// These are not currently assigned so we will need to do so here.
if consumers := mset.Consumers(); len(consumers) > 0 {
js.mu.RLock()
cc := js.cluster
js.mu.RUnlock()
for _, o := range mset.Consumers() {
rg := cc.createGroupForConsumer(sa)
name, cfg := o.Name(), o.Config()
// Place our initial state here as well for assignment distribution.
ca := &consumerAssignment{
Group: rg,
Stream: sa.Config.Name,
Name: name,
Config: &cfg,
Client: sa.Client,
}
addEntry := encodeAddConsumerAssignment(ca)
cc.meta.ForwardProposal(addEntry)
// Check to make sure we see the assignment.
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for range ticker.C {
js.mu.RLock()
ca, meta := js.consumerAssignment(ca.Client.Account, sa.Config.Name, name), cc.meta
js.mu.RUnlock()
if ca == nil {
s.Warnf("Consumer assignment has not been assigned, retrying")
meta.ForwardProposal(addEntry)
} else {
return
}
}
}()
}
}
case <-s.quitCh:
return
}
@@ -1445,7 +1555,18 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
sa := js.streamAssignment(ca.Client.Account, ca.Stream)
if sa == nil {
// FIXME(dlc) - log.
s.Debugf("Consumer create failed, could not locate stream '%s > %s'", ca.Client.Account, ca.Stream)
ca.err = ErrJetStreamStreamNotFound
result := &consumerAssignmentResult{
Account: ca.Client.Account,
Stream: ca.Stream,
Consumer: ca.Name,
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
}
result.Response.Error = jsNotFoundError(ErrJetStreamStreamNotFound)
// Send response to the metadata leader. They will forward to the user as needed.
b, _ := json.Marshal(result) // Avoids auto-processing and doing fancy json with newlines.
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, b)
js.mu.Unlock()
return
}
@@ -1519,11 +1640,26 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment) {
// Go ahead and create or update the consumer.
mset, err := acc.LookupStream(ca.Stream)
if err != nil {
s.Debugf("JetStream cluster error looking up stream %q for account %q: %v", ca.Stream, acc.Name, err)
ca.err = err
js.mu.Lock()
s.Debugf("Consumer create failed, could not locate stream '%s > %s'", ca.Client.Account, ca.Stream)
ca.err = ErrJetStreamStreamNotFound
result := &consumerAssignmentResult{
Account: ca.Client.Account,
Stream: ca.Stream,
Consumer: ca.Name,
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
}
result.Response.Error = jsNotFoundError(ErrJetStreamStreamNotFound)
// Send response to the metadata leader. They will forward to the user as needed.
b, _ := json.Marshal(result) // Avoids auto-processing and doing fancy json with newlines.
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, b)
js.mu.Unlock()
return
}
// Process the raft group and make sure its running if needed.
js.createRaftGroup(rg)
// Check if we already have this consumer running.
o := mset.LookupConsumer(ca.Name)
if o != nil {
@@ -1533,15 +1669,18 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment) {
o.updateDeliverSubject(ca.Config.DeliverSubject)
}
}
o.setConsumerAssignment(ca)
s.Debugf("JetStream cluster, consumer already running")
}
// Process the raft group and make sure its running if needed.
js.createRaftGroup(rg)
// Add in the consumer if needed.
if o == nil {
o, err = mset.addConsumer(ca.Config, ca.Name, rg.node)
o, err = mset.addConsumer(ca.Config, ca.Name, ca)
}
// If we have an initial state set apply that now.
if ca.State != nil && o != nil {
err = o.setStoreState(ca.State)
}
if err != nil {
@@ -1627,28 +1766,12 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb
}
// Returns the consumer assignment, or nil if not present.
func (jsa *jsAccount) consumerAssignment(stream, consumer string) *consumerAssignment {
jsa.mu.RLock()
defer jsa.mu.RUnlock()
js, acc := jsa.js, jsa.account
if js == nil {
return nil
// Lock should be held.
func (js *jetStream) consumerAssignment(account, stream, consumer string) *consumerAssignment {
if sa := js.streamAssignment(account, stream); sa != nil {
return sa.consumers[consumer]
}
cc := js.cluster
if cc == nil {
return nil
}
var sa *streamAssignment
accStreams := cc.streams[acc.Name]
if accStreams != nil {
sa = accStreams[stream]
}
if sa == nil {
return nil
}
return sa.consumers[consumer]
return nil
}
// consumerAssigned informs us if this server has this consumer assigned.
@@ -2101,9 +2224,20 @@ func (s *Server) jsClusteredStreamDeleteRequest(ci *ClientInfo, stream, reply st
osa := js.streamAssignment(ci.Account, stream)
if osa == nil {
// TODO(dlc) - Should respond? Log?
acc, err := s.LookupAccount(ci.Account)
if err == nil {
var resp = JSApiStreamDeleteResponse{ApiResponse: ApiResponse{Type: JSApiStreamDeleteResponseType}}
resp.Error = jsNotFoundError(ErrJetStreamStreamNotFound)
s.sendAPIResponse(ci, acc, _EMPTY_, reply, string(rmsg), s.jsonResponse(&resp))
}
return
}
// Remove any remaining consumers as well.
for _, ca := range osa.consumers {
ca.Reply, ca.State = _EMPTY_, nil
cc.meta.Propose(encodeDeleteConsumerAssignment(ca))
}
sa := &streamAssignment{Group: osa.Group, Config: osa.Config, Reply: reply, Client: ci}
cc.meta.Propose(encodeDeleteStreamAssignment(sa))
}
@@ -2399,7 +2533,12 @@ func (s *Server) jsClusteredConsumerDeleteRequest(ci *ClientInfo, stream, consum
}
oca := sa.consumers[consumer]
if oca == nil {
// TODO(dlc) - Should respond? Log?
acc, err := s.LookupAccount(ci.Account)
if err == nil {
var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerDeleteResponseType}}
resp.Error = jsNoConsumerErr
s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
}
return
}
ca := &consumerAssignment{Group: oca.Group, Stream: stream, Name: consumer, Config: oca.Config, Reply: reply, Client: ci}
@@ -2546,6 +2685,28 @@ func decodeConsumerAssignment(buf []byte) (*consumerAssignment, error) {
return &ca, err
}
func encodeAddConsumerAssignmentCompressed(ca *consumerAssignment) []byte {
b, err := json.Marshal(ca)
if err != nil {
return nil
}
// TODO(dlc) - Streaming better approach here probably.
var bb bytes.Buffer
bb.WriteByte(byte(assignCompressedConsumerOp))
bb.Write(s2.Encode(nil, b))
return bb.Bytes()
}
func decodeConsumerAssignmentCompressed(buf []byte) (*consumerAssignment, error) {
var ca consumerAssignment
js, err := s2.Decode(nil, buf)
if err != nil {
return nil, err
}
err = json.Unmarshal(js, &ca)
return &ca, err
}
var errBadStreamMsg = errors.New("jetstream cluster bad replicated stream msg")
func decodeStreamMsg(buf []byte) (subject, reply string, hdr, msg []byte, lseq uint64, ts int64, err error) {

View File

@@ -173,8 +173,8 @@ type lps struct {
const (
minElectionTimeout = 350 * time.Millisecond
maxElectionTimeout = 3 * minElectionTimeout
minCampaignTimeout = 5 * time.Millisecond
maxCampaignTimeout = 5 * minCampaignTimeout
minCampaignTimeout = 50 * time.Millisecond
maxCampaignTimeout = 4 * minCampaignTimeout
hbInterval = 200 * time.Millisecond
)
@@ -323,6 +323,10 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
n.notice("Started")
n.Lock()
n.resetElectionTimeout()
n.Unlock()
s.registerRaftNode(n.group, n)
s.startGoRoutine(n.run)
@@ -392,12 +396,14 @@ func (n *raft) Propose(data []byte) error {
n.RLock()
if n.state != Leader {
n.RUnlock()
n.debug("Proposal ignored, not leader")
return errNotLeader
}
propc, paused, quit := n.propc, n.pausec, n.quit
n.RUnlock()
if paused != nil {
n.debug("Proposals paused, will wait")
select {
case <-paused:
case <-quit:
@@ -410,6 +416,7 @@ func (n *raft) Propose(data []byte) error {
select {
case propc <- &Entry{EntryNormal, data}:
default:
n.debug("Propose failed!")
return errProposalFailed
}
return nil
@@ -685,9 +692,9 @@ func (n *raft) campaign() error {
if n.state == Leader {
return errAlreadyLeader
}
if n.state == Follower {
n.resetElect(randCampaignTimeout())
}
// Pre-place our vote for ourselves.
n.vote = n.id
n.resetElect(randCampaignTimeout())
return nil
}
@@ -797,6 +804,12 @@ const (
raftReplySubj = "$NRG.R.%s"
)
// Our internal subscribe.
// Lock should be held.
func (n *raft) subscribe(subject string, cb msgHandler) (*subscription, error) {
return n.s.systemSubscribe(subject, _EMPTY_, false, n.c, cb)
}
func (n *raft) createInternalSubs() error {
cn := n.s.ClusterName()
n.vsubj, n.vreply = fmt.Sprintf(raftVoteSubj, cn, n.group), n.newInbox(cn)
@@ -804,17 +817,17 @@ func (n *raft) createInternalSubs() error {
n.psubj = fmt.Sprintf(raftPropSubj, n.group)
// Votes
if _, err := n.s.sysSubscribe(n.vreply, n.handleVoteResponse); err != nil {
if _, err := n.subscribe(n.vreply, n.handleVoteResponse); err != nil {
return err
}
if _, err := n.s.sysSubscribe(n.vsubj, n.handleVoteRequest); err != nil {
if _, err := n.subscribe(n.vsubj, n.handleVoteRequest); err != nil {
return err
}
// AppendEntry
if _, err := n.s.sysSubscribe(n.areply, n.handleAppendEntryResponse); err != nil {
if _, err := n.subscribe(n.areply, n.handleAppendEntryResponse); err != nil {
return err
}
if _, err := n.s.sysSubscribe(n.asubj, n.handleAppendEntry); err != nil {
if _, err := n.subscribe(n.asubj, n.handleAppendEntry); err != nil {
return err
}
@@ -848,10 +861,6 @@ func (n *raft) run() {
s := n.s
defer s.grWG.Done()
n.Lock()
n.resetElectionTimeout()
n.Unlock()
for s.isRunning() {
switch n.State() {
case Follower:
@@ -1079,6 +1088,8 @@ func (n *raft) handleForwardedProposal(sub *subscription, c *client, _, reply st
n.debug("Ignoring forwarded proposal, not leader")
return
}
// Need to copy since this is underlying client/route buffer.
msg = append(msg[:0:0], msg...)
if err := n.Propose(msg); err != nil {
n.warn("Got error processing forwarded proposal: %v", err)
}
@@ -1087,7 +1098,7 @@ func (n *raft) handleForwardedProposal(sub *subscription, c *client, _, reply st
func (n *raft) runAsLeader() {
n.Lock()
// For forwarded proposals.
fsub, err := n.s.sysSubscribe(n.psubj, n.handleForwardedProposal)
fsub, err := n.subscribe(n.psubj, n.handleForwardedProposal)
if err != nil {
n.warn("Error subscribing to forwarded proposals: %v", err)
}
@@ -1247,7 +1258,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) {
n.progress = make(map[string]chan uint64)
}
if _, ok := n.progress[ar.peer]; ok {
n.debug("Existing entry for catching up %q\n", ar.peer)
n.debug("Existing entry for catching up %q", ar.peer)
n.Unlock()
return
}
@@ -1502,7 +1513,7 @@ func (n *raft) createCatchup(ae *appendEntry) string {
pindex: n.pindex,
}
inbox := n.newInbox(n.s.ClusterName())
sub, _ := n.s.sysSubscribe(inbox, n.handleAppendEntry)
sub, _ := n.subscribe(inbox, n.handleAppendEntry)
n.catchup.sub = sub
return inbox
}
@@ -1731,7 +1742,7 @@ func (n *raft) sendAppendEntry(entries []*Entry) {
// If we have entries store this in our wal.
if len(entries) > 0 {
if err := n.storeToWAL(ae); err != nil {
panic("Error storing!\n")
panic("Error storing!")
}
// We count ourselves.
n.acks[n.pindex] = map[string]struct{}{n.id: struct{}{}}
@@ -2027,11 +2038,11 @@ func (n *raft) requestVote() {
}
func (n *raft) sendRPC(subject, reply string, msg []byte) {
n.sendq <- &pubMsg{nil, subject, reply, nil, msg, false}
n.sendq <- &pubMsg{n.c, subject, reply, nil, msg, false}
}
func (n *raft) sendReply(subject string, msg []byte) {
n.sendq <- &pubMsg{nil, subject, _EMPTY_, nil, msg, false}
n.sendq <- &pubMsg{n.c, subject, _EMPTY_, nil, msg, false}
}
func (n *raft) wonElection(votes int) bool {
@@ -2099,7 +2110,6 @@ func (n *raft) switchToCandidate() {
n.term++
// Clear current Leader.
n.leader = noLeader
n.resetElectionTimeout()
n.switchState(Candidate)
}

View File

@@ -151,9 +151,9 @@ type ConsumerState struct {
// These are both in stream sequence context.
// Pending is for all messages pending and the timestamp for the delivered time.
// This will only be present when the AckPolicy is ExplicitAck.
Pending map[uint64]*Pending `json:"pending"`
Pending map[uint64]*Pending `json:"pending,omitempty"`
// This is for messages that have been redelivered, so count > 1.
Redelivered map[uint64]uint64 `json:"redelivered"`
Redelivered map[uint64]uint64 `json:"redelivered,omitempty"`
}
// Represents a pending message for explicit ack or ack all.

View File

@@ -1536,6 +1536,9 @@ func TestJetStreamClusterEphemeralConsumerCleanup(t *testing.T) {
}
ci, _ := sub.ConsumerInfo()
if ci == nil {
t.Fatalf("Unexpected error: no consumer info")
}
// We will look up by hand this consumer to set inactive threshold lower for this test.
cl := c.consumerLeader("$G", "foo", ci.Name)
@@ -1604,19 +1607,56 @@ func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) {
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
Replicas: 2,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
toSend := 500
toSend, batchSize := 200, 50
for i := 0; i < toSend; i++ {
if _, err = js.Publish("foo", []byte("OK")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
// Create a consumer as well and give it a non-simplistic state.
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy, AckWait: time.Second})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
jsub, err := js.SubscribeSync("foo", nats.Attach("TEST", "dlc"), nats.Pull(batchSize))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkSubsPending(t, jsub, batchSize)
// Ack first 50.
for i := 1; i <= 50; i++ {
m, err := jsub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error getting msg %d: %v", i, err)
}
m.Ack()
}
// Now ack every third message for next 50.
for i := 51; i <= 100; i++ {
m, err := jsub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error getting msg %d: %v", i, err)
}
if i%3 == 0 {
m.Ack()
}
}
// Snapshot consumer info.
ci, err := jsub.ConsumerInfo()
if err != nil {
t.Fatalf("Unexpected error getting consumer info: %v", err)
}
sreq := &server.JSApiStreamSnapshotRequest{
DeliverSubject: nats.NewInbox(),
ChunkSize: 512,
@@ -1702,7 +1742,7 @@ func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) {
// Send our snapshot back in to restore the stream.
// Can be any size message.
var chunk [512]byte
var chunk [1024]byte
for r := bytes.NewReader(snapshot); ; {
n, err := r.Read(chunk[:])
if err != nil {
@@ -1720,25 +1760,9 @@ func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) {
t.Fatalf("StreamInfo is not correct %+v", si)
}
getExtendedStreamInfo := func() *server.StreamInfo {
t.Helper()
resp, err := nc.Request(fmt.Sprintf(server.JSApiStreamInfoT, "TEST"), nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var si server.StreamInfo
if err = json.Unmarshal(resp.Data, &si); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.Cluster == nil {
t.Fatalf("Expected cluster info")
}
return &si
}
// Make sure the replicas become current eventually. They will be doing catchup.
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
si := getExtendedStreamInfo()
si, _ := js.StreamInfo("TEST")
if si == nil || si.Cluster == nil {
t.Fatalf("Did not get stream info")
}
@@ -1749,6 +1773,46 @@ func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) {
}
return nil
})
// Wait on the system to elect a leader for the restored consumer.
c.waitOnNewConsumerLeader("$G", "TEST", "dlc")
// Now check for the consumer being recreated.
nci, err := js.ConsumerInfo("TEST", "dlc")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if nci.Delivered != ci.Delivered {
t.Fatalf("Delivered states do not match %+v vs %+v", nci.Delivered, ci.Delivered)
}
if nci.AckFloor != ci.AckFloor {
t.Fatalf("Ack floors did not match %+v vs %+v", nci.AckFloor, ci.AckFloor)
}
// Make sure consumer works.
// It should pick up with the next delivery spot, so check for that as first message.
// We should have all the messages for first delivery delivered.
start := 101
end := toSend
for i := start; i <= end; i++ {
m, err := jsub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error getting msg: %v", err)
}
meta, err := m.MetaData()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if meta.Stream != uint64(i) {
t.Fatalf("Expected stream sequence of %d, but got %d", i, meta.Stream)
}
m.Ack()
}
// Check that redelivered come in now..
redelivered := 50/3 + 1
checkSubsPending(t, jsub, redelivered)
}
func TestJetStreamClusterAccountInfoAndLimits(t *testing.T) {
@@ -2128,7 +2192,7 @@ func jsClientConnect(t *testing.T, s *server.Server) (*nats.Conn, nats.JetStream
func checkSubsPending(t *testing.T, sub *nats.Subscription, numExpected int) {
t.Helper()
checkFor(t, 500*time.Millisecond, 10*time.Millisecond, func() error {
checkFor(t, 3*time.Second, 10*time.Millisecond, func() error {
if nmsgs, _, err := sub.Pending(); err != nil || nmsgs != numExpected {
return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, numExpected)
}
@@ -2165,7 +2229,7 @@ func (c *cluster) waitOnPeerCount(n int) {
c.t.Helper()
c.waitOnLeader()
leader := c.leader()
expires := time.Now().Add(5 * time.Second)
expires := time.Now().Add(10 * time.Second)
for time.Now().Before(expires) {
peers := leader.JetStreamClusterPeers()
if len(peers) == n {
@@ -2178,7 +2242,7 @@ func (c *cluster) waitOnPeerCount(n int) {
func (c *cluster) waitOnNewConsumerLeader(account, stream, consumer string) {
c.t.Helper()
expires := time.Now().Add(5 * time.Second)
expires := time.Now().Add(10 * time.Second)
for time.Now().Before(expires) {
if leader := c.consumerLeader(account, stream, consumer); leader != nil {
time.Sleep(25 * time.Millisecond)
@@ -2201,13 +2265,13 @@ func (c *cluster) consumerLeader(account, stream, consumer string) *server.Serve
func (c *cluster) waitOnNewStreamLeader(account, stream string) {
c.t.Helper()
expires := time.Now().Add(5 * time.Second)
expires := time.Now().Add(10 * time.Second)
for time.Now().Before(expires) {
if leader := c.streamLeader(account, stream); leader != nil {
time.Sleep(100 * time.Millisecond)
time.Sleep(25 * time.Millisecond)
return
}
time.Sleep(25 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
}
c.t.Fatalf("Expected a stream leader for %q %q, got none", account, stream)
}

View File

@@ -442,11 +442,51 @@ The PDF sample shows a significant slowdown compared to Snappy, as this mode tri
to compress the data. Very small blocks are also not favorable for better compression, so throughput is way down.
This mode aims to provide better compression at the expense of performance and achieves that
without a huge performance pentalty, except on very small blocks.
without a huge performance penalty, except on very small blocks.
Decompression speed suffers a little compared to the regular S2 mode,
but still manages to be close to Snappy in spite of increased compression.
# Best compression mode
S2 offers a "best" compression mode.
This will compress as much as possible with little regard to CPU usage.
Mainly for offline compression, but where decompression speed should still
be high and compatible with other S2 compressed data.
Some examples compared on 16 core CPU:
```
* enwik10
Default... 10000000000 -> 4761467548 [47.61%]; 1.098s, 8685.6MB/s
Better... 10000000000 -> 4225922984 [42.26%]; 2.817s, 3385.4MB/s
Best... 10000000000 -> 3667646858 [36.68%]; 35.995s, 264.9MB/s
* github-june-2days-2019.json
Default... 6273951764 -> 1043196283 [16.63%]; 431ms, 13882.3MB/s
Better... 6273951764 -> 950079555 [15.14%]; 736ms, 8129.5MB/s
Best... 6273951764 -> 846260870 [13.49%]; 8.125s, 736.4MB/s
* nyc-taxi-data-10M.csv
Default... 3325605752 -> 1095998837 [32.96%]; 324ms, 9788.7MB/s
Better... 3325605752 -> 960330423 [28.88%]; 602ms, 5268.4MB/s
Best... 3325605752 -> 794873295 [23.90%]; 6.619s, 479.1MB/s
* 10gb.tar
Default... 10065157632 -> 5916578242 [58.78%]; 1.028s, 9337.4MB/s
Better... 10065157632 -> 5650133605 [56.14%]; 2.172s, 4419.4MB/s
Best... 10065157632 -> 5246578570 [52.13%]; 25.696s, 373.6MB/s
* consensus.db.10gb
Default... 10737418240 -> 4562648848 [42.49%]; 882ms, 11610.0MB/s
Better... 10737418240 -> 4542443833 [42.30%]; 3.3s, 3103.5MB/s
Best... 10737418240 -> 4272335558 [39.79%]; 38.955s, 262.9MB/s
```
Decompression speed should be around the same as using the 'better' compression mode.
# Concatenating blocks and streams.
Concatenating streams will concatenate the output of both without recompressing them.

View File

@@ -20,8 +20,6 @@ var (
ErrTooLarge = errors.New("s2: decoded block is too large")
// ErrUnsupported reports that the input isn't supported.
ErrUnsupported = errors.New("s2: unsupported input")
errUnsupportedLiteralLength = errors.New("s2: unsupported literal length")
)
// DecodedLen returns the length of the decoded block.
@@ -46,8 +44,7 @@ func decodedLen(src []byte) (blockLen, headerLen int, err error) {
}
const (
decodeErrCodeCorrupt = 1
decodeErrCodeUnsupportedLiteralLength = 2
decodeErrCodeCorrupt = 1
)
// Decode returns the decoded form of src. The returned slice may be a sub-
@@ -65,22 +62,47 @@ func Decode(dst, src []byte) ([]byte, error) {
} else {
dst = make([]byte, dLen)
}
switch s2Decode(dst, src[s:]) {
case 0:
return dst, nil
case decodeErrCodeUnsupportedLiteralLength:
return nil, errUnsupportedLiteralLength
if s2Decode(dst, src[s:]) != 0 {
return nil, ErrCorrupt
}
return nil, ErrCorrupt
return dst, nil
}
// NewReader returns a new Reader that decompresses from r, using the framing
// format described at
// https://github.com/google/snappy/blob/master/framing_format.txt with S2 changes.
func NewReader(r io.Reader) *Reader {
return &Reader{
r: r,
buf: make([]byte, MaxEncodedLen(maxBlockSize)+checksumSize),
func NewReader(r io.Reader, opts ...ReaderOption) *Reader {
nr := Reader{
r: r,
maxBlock: maxBlockSize,
}
for _, opt := range opts {
if err := opt(&nr); err != nil {
nr.err = err
return &nr
}
}
nr.buf = make([]byte, MaxEncodedLen(nr.maxBlock)+checksumSize)
nr.paramsOK = true
return &nr
}
// ReaderOption is an option for creating a decoder.
type ReaderOption func(*Reader) error
// ReaderMaxBlockSize allows to control allocations if the stream
// has been compressed with a smaller WriterBlockSize, or with the default 1MB.
// Blocks must be this size or smaller to decompress,
// otherwise the decoder will return ErrUnsupported.
//
// Default is the maximum limit of 4MB.
func ReaderMaxBlockSize(n int) ReaderOption {
return func(r *Reader) error {
if n > maxBlockSize || n <= 0 {
return errors.New("s2: block size too large. Must be <= 4MB and > 0")
}
r.maxBlock = n
return nil
}
}
@@ -92,13 +114,18 @@ type Reader struct {
buf []byte
// decoded[i:j] contains decoded bytes that have not yet been passed on.
i, j int
maxBlock int
readHeader bool
paramsOK bool
}
// Reset discards any buffered data, resets all state, and switches the Snappy
// reader to read from r. This permits reusing a Reader rather than allocating
// a new one.
func (r *Reader) Reset(reader io.Reader) {
if !r.paramsOK {
return
}
r.r = reader
r.err = nil
r.i = 0
@@ -116,6 +143,36 @@ func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) {
return true
}
// skipN will skip n bytes.
// If the supplied reader supports seeking that is used.
// tmp is used as a temporary buffer for reading.
// The supplied slice does not need to be the size of the read.
func (r *Reader) skipN(tmp []byte, n int, allowEOF bool) (ok bool) {
if rs, ok := r.r.(io.ReadSeeker); ok {
_, err := rs.Seek(int64(n), io.SeekCurrent)
if err == nil {
return true
}
if err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) {
r.err = ErrCorrupt
return false
}
}
for n > 0 {
if n < len(tmp) {
tmp = tmp[:n]
}
if _, r.err = io.ReadFull(r.r, tmp); r.err != nil {
if r.err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) {
r.err = ErrCorrupt
}
return false
}
n -= len(tmp)
}
return true
}
// Read satisfies the io.Reader interface.
func (r *Reader) Read(p []byte) (int, error) {
if r.err != nil {
@@ -139,10 +196,6 @@ func (r *Reader) Read(p []byte) (int, error) {
r.readHeader = true
}
chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16
if chunkLen > len(r.buf) {
r.err = ErrUnsupported
return 0, r.err
}
// The chunk types are specified at
// https://github.com/google/snappy/blob/master/framing_format.txt
@@ -153,6 +206,10 @@ func (r *Reader) Read(p []byte) (int, error) {
r.err = ErrCorrupt
return 0, r.err
}
if chunkLen > len(r.buf) {
r.err = ErrUnsupported
return 0, r.err
}
buf := r.buf[:chunkLen]
if !r.readFull(buf, false) {
return 0, r.err
@@ -166,7 +223,7 @@ func (r *Reader) Read(p []byte) (int, error) {
return 0, r.err
}
if n > len(r.decoded) {
if n > maxBlockSize {
if n > r.maxBlock {
r.err = ErrCorrupt
return 0, r.err
}
@@ -189,6 +246,10 @@ func (r *Reader) Read(p []byte) (int, error) {
r.err = ErrCorrupt
return 0, r.err
}
if chunkLen > len(r.buf) {
r.err = ErrUnsupported
return 0, r.err
}
buf := r.buf[:checksumSize]
if !r.readFull(buf, false) {
return 0, r.err
@@ -197,7 +258,7 @@ func (r *Reader) Read(p []byte) (int, error) {
// Read directly into r.decoded instead of via r.buf.
n := chunkLen - checksumSize
if n > len(r.decoded) {
if n > maxBlockSize {
if n > r.maxBlock {
r.err = ErrCorrupt
return 0, r.err
}
@@ -238,7 +299,12 @@ func (r *Reader) Read(p []byte) (int, error) {
}
// Section 4.4 Padding (chunk type 0xfe).
// Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
if !r.readFull(r.buf[:chunkLen], false) {
if chunkLen > maxBlockSize {
r.err = ErrUnsupported
return 0, r.err
}
if !r.skipN(r.buf, chunkLen, false) {
return 0, r.err
}
}
@@ -286,10 +352,6 @@ func (r *Reader) Skip(n int64) error {
r.readHeader = true
}
chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16
if chunkLen > len(r.buf) {
r.err = ErrUnsupported
return r.err
}
// The chunk types are specified at
// https://github.com/google/snappy/blob/master/framing_format.txt
@@ -300,6 +362,10 @@ func (r *Reader) Skip(n int64) error {
r.err = ErrCorrupt
return r.err
}
if chunkLen > len(r.buf) {
r.err = ErrUnsupported
return r.err
}
buf := r.buf[:chunkLen]
if !r.readFull(buf, false) {
return r.err
@@ -312,7 +378,7 @@ func (r *Reader) Skip(n int64) error {
r.err = err
return r.err
}
if dLen > maxBlockSize {
if dLen > r.maxBlock {
r.err = ErrCorrupt
return r.err
}
@@ -342,6 +408,10 @@ func (r *Reader) Skip(n int64) error {
r.err = ErrCorrupt
return r.err
}
if chunkLen > len(r.buf) {
r.err = ErrUnsupported
return r.err
}
buf := r.buf[:checksumSize]
if !r.readFull(buf, false) {
return r.err
@@ -350,7 +420,7 @@ func (r *Reader) Skip(n int64) error {
// Read directly into r.decoded instead of via r.buf.
n2 := chunkLen - checksumSize
if n2 > len(r.decoded) {
if n2 > maxBlockSize {
if n2 > r.maxBlock {
r.err = ErrCorrupt
return r.err
}
@@ -391,13 +461,15 @@ func (r *Reader) Skip(n int64) error {
r.err = ErrUnsupported
return r.err
}
// Section 4.4 Padding (chunk type 0xfe).
// Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
if !r.readFull(r.buf[:chunkLen], false) {
if chunkLen > maxBlockSize {
r.err = ErrUnsupported
return r.err
}
// Section 4.4 Padding (chunk type 0xfe).
// Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
if !r.skipN(r.buf, chunkLen, false) {
return r.err
}
return io.ErrUnexpectedEOF
}
return nil
}

View File

@@ -54,9 +54,6 @@ func s2Decode(dst, src []byte) int {
x = uint32(src[s-4]) | uint32(src[s-3])<<8 | uint32(src[s-2])<<16 | uint32(src[s-1])<<24
}
length = int(x) + 1
if length <= 0 {
return decodeErrCodeUnsupportedLiteralLength
}
if length > len(dst)-d || length > len(src)-s {
return decodeErrCodeCorrupt
}

View File

@@ -100,6 +100,48 @@ func EncodeBetter(dst, src []byte) []byte {
return dst[:d]
}
// EncodeBest returns the encoded form of src. The returned slice may be a sub-
// slice of dst if dst was large enough to hold the entire encoded block.
// Otherwise, a newly allocated slice will be returned.
//
// EncodeBest compresses as good as reasonably possible but with a
// big speed decrease.
//
// The dst and src must not overlap. It is valid to pass a nil dst.
//
// The blocks will require the same amount of memory to decode as encoding,
// and does not make for concurrent decoding.
// Also note that blocks do not contain CRC information, so corruption may be undetected.
//
// If you need to encode larger amounts of data, consider using
// the streaming interface which gives all of these features.
func EncodeBest(dst, src []byte) []byte {
if n := MaxEncodedLen(len(src)); n < 0 {
panic(ErrTooLarge)
} else if len(dst) < n {
dst = make([]byte, n)
}
// The block starts with the varint-encoded length of the decompressed bytes.
d := binary.PutUvarint(dst, uint64(len(src)))
if len(src) == 0 {
return dst[:d]
}
if len(src) < minNonLiteralBlockSize {
d += emitLiteral(dst[d:], src)
return dst[:d]
}
n := encodeBlockBest(dst[d:], src)
if n > 0 {
d += n
return dst[:d]
}
// Not compressible
d += emitLiteral(dst[d:], src)
return dst[:d]
}
// EncodeSnappy returns the encoded form of src. The returned slice may be a sub-
// slice of dst if dst was large enough to hold the entire encoded block.
// Otherwise, a newly allocated slice will be returned.
@@ -239,6 +281,7 @@ func NewWriter(w io.Writer, opts ...WriterOption) *Writer {
blockSize: defaultBlockSize,
concurrency: runtime.GOMAXPROCS(0),
randSrc: rand.Reader,
level: levelFast,
}
for _, opt := range opts {
if err := opt(&w2); err != nil {
@@ -279,10 +322,16 @@ type Writer struct {
// wroteStreamHeader is whether we have written the stream header.
wroteStreamHeader bool
paramsOK bool
better bool
uncompressed bool
level uint8
}
const (
levelUncompressed = iota + 1
levelFast
levelBetter
levelBest
)
type result []byte
// err returns the previously set error.
@@ -399,6 +448,13 @@ func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) {
return 0, err
}
}
if br, ok := r.(byter); ok {
buf := br.Bytes()
if err := w.EncodeBuffer(buf); err != nil {
return 0, err
}
return int64(len(buf)), w.Flush()
}
for {
inbuf := w.buffers.Get().([]byte)[:w.blockSize+obufHeaderLen]
n2, err := io.ReadFull(r, inbuf[obufHeaderLen:])
@@ -483,10 +539,13 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) {
// Attempt compressing.
n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
var n2 int
if w.better {
n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed)
} else if !w.uncompressed {
switch w.level {
case levelFast:
n2 = encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
case levelBetter:
n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed)
case levelBest:
n2 = encodeBlockBest(obuf[obufHeaderLen+n:], uncompressed)
}
// Check if we should use this, or store as uncompressed instead.
@@ -560,10 +619,13 @@ func (w *Writer) write(p []byte) (nRet int, errRet error) {
// Attempt compressing.
n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
var n2 int
if w.better {
n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed)
} else if !w.uncompressed {
switch w.level {
case levelFast:
n2 = encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
case levelBetter:
n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed)
case levelBest:
n2 = encodeBlockBest(obuf[obufHeaderLen+n:], uncompressed)
}
// Check if we should use this, or store as uncompressed instead.
@@ -636,10 +698,13 @@ func (w *Writer) writeFull(inbuf []byte) (errRet error) {
// Attempt compressing.
n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
var n2 int
if w.better {
n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed)
} else if !w.uncompressed {
switch w.level {
case levelFast:
n2 = encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
case levelBetter:
n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed)
case levelBest:
n2 = encodeBlockBest(obuf[obufHeaderLen+n:], uncompressed)
}
// Check if we should use this, or store as uncompressed instead.
@@ -705,10 +770,13 @@ func (w *Writer) writeSync(p []byte) (nRet int, errRet error) {
// Attempt compressing.
n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
var n2 int
if w.better {
n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed)
} else if !w.uncompressed {
switch w.level {
case levelFast:
n2 = encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
case levelBetter:
n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed)
case levelBest:
n2 = encodeBlockBest(obuf[obufHeaderLen+n:], uncompressed)
}
if n2 > 0 {
@@ -880,8 +948,17 @@ func WriterConcurrency(n int) WriterOption {
// 10-40% speed decrease on both compression and decompression.
func WriterBetterCompression() WriterOption {
return func(w *Writer) error {
w.uncompressed = false
w.better = true
w.level = levelBetter
return nil
}
}
// WriterBestCompression will enable better compression.
// EncodeBetter compresses better than Encode but typically with a
// big speed decrease on compression.
func WriterBestCompression() WriterOption {
return func(w *Writer) error {
w.level = levelBest
return nil
}
}
@@ -891,8 +968,7 @@ func WriterBetterCompression() WriterOption {
// If concurrency is > 1 CRC and output will still be done async.
func WriterUncompressed() WriterOption {
return func(w *Writer) error {
w.better = false
w.uncompressed = true
w.level = levelUncompressed
return nil
}
}
@@ -907,7 +983,7 @@ func WriterUncompressed() WriterOption {
// Default block size is 1MB.
func WriterBlockSize(n int) WriterOption {
return func(w *Writer) error {
if w.blockSize > maxBlockSize || w.blockSize < minBlockSize {
if n > maxBlockSize || n < minBlockSize {
return errors.New("s2: block size too large. Must be <= 4MB and >=4KB")
}
w.blockSize = n

252
vendor/github.com/klauspost/compress/s2/encode_best.go generated vendored Normal file
View File

@@ -0,0 +1,252 @@
// Copyright 2016 The Snappy-Go Authors. All rights reserved.
// Copyright (c) 2019 Klaus Post. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package s2
import (
"fmt"
"math/bits"
)
// encodeBlockBest encodes a non-empty src to a guaranteed-large-enough dst. It
// assumes that the varint-encoded length of the decompressed bytes has already
// been written.
//
// It also assumes that:
// len(dst) >= MaxEncodedLen(len(src)) &&
// minNonLiteralBlockSize <= len(src) && len(src) <= maxBlockSize
func encodeBlockBest(dst, src []byte) (d int) {
// Initialize the hash tables.
const (
// Long hash matches.
lTableBits = 19
maxLTableSize = 1 << lTableBits
// Short hash matches.
sTableBits = 16
maxSTableSize = 1 << sTableBits
inputMargin = 8 + 2
)
var lTable [maxLTableSize]uint64
var sTable [maxSTableSize]uint64
// sLimit is when to stop looking for offset/length copies. The inputMargin
// lets us use a fast path for emitLiteral in the main loop, while we are
// looking for copies.
sLimit := len(src) - inputMargin
if len(src) < minNonLiteralBlockSize {
return 0
}
// Bail if we can't compress to at least this.
dstLimit := len(src) - 5
// nextEmit is where in src the next emitLiteral should start from.
nextEmit := 0
// The encoded form must start with a literal, as there are no previous
// bytes to copy, so we start looking for hash matches at s == 1.
s := 1
cv := load64(src, s)
// We search for a repeat at -1, but don't output repeats when nextEmit == 0
repeat := 1
const lowbitMask = 0xffffffff
getCur := func(x uint64) int {
return int(x & lowbitMask)
}
getPrev := func(x uint64) int {
return int(x >> 32)
}
for {
type match struct {
offset int
s int
length int
rep bool
}
var best match
for {
// Next src position to check
nextS := s + (s-nextEmit)>>8 + 1
if nextS > sLimit {
goto emitRemainder
}
hashL := hash8(cv, lTableBits)
hashS := hash4(cv, sTableBits)
candidateL := lTable[hashL]
candidateS := sTable[hashS]
matchAt := func(offset, s int, first uint32, rep bool) match {
if best.length != 0 && best.s-best.offset == s-offset {
// Don't retest if we have the same offset.
return match{offset: offset, s: s}
}
if load32(src, offset) != first {
return match{offset: offset, s: s}
}
m := match{offset: offset, s: s, length: 4, rep: rep}
s += 4
for s <= sLimit {
if diff := load64(src, s) ^ load64(src, offset+m.length); diff != 0 {
m.length += bits.TrailingZeros64(diff) >> 3
break
}
s += 8
m.length += 8
}
return m
}
bestOf := func(a, b match) match {
aScore := b.s - a.s + a.length
bScore := a.s - b.s + b.length
if !a.rep {
// Estimate bytes needed to store offset.
offset := a.s - a.offset
if offset >= 65536 {
aScore -= 5
} else {
aScore -= 3
}
}
if !b.rep {
// Estimate bytes needed to store offset.
offset := b.s - b.offset
if offset >= 65536 {
bScore -= 5
} else {
bScore -= 3
}
}
if aScore >= bScore {
return a
}
return b
}
best = bestOf(matchAt(getCur(candidateL), s, uint32(cv), false), matchAt(getPrev(candidateL), s, uint32(cv), false))
best = bestOf(best, matchAt(getCur(candidateS), s, uint32(cv), false))
best = bestOf(best, matchAt(getPrev(candidateS), s, uint32(cv), false))
{
best = bestOf(best, matchAt(s-repeat+1, s+1, uint32(cv>>8), true))
if best.length > 0 {
// s+1
nextShort := sTable[hash4(cv>>8, sTableBits)]
s := s + 1
cv := load64(src, s)
nextLong := lTable[hash8(cv, lTableBits)]
best = bestOf(best, matchAt(getCur(nextShort), s, uint32(cv), false))
best = bestOf(best, matchAt(getPrev(nextShort), s, uint32(cv), false))
best = bestOf(best, matchAt(getCur(nextLong), s, uint32(cv), false))
best = bestOf(best, matchAt(getPrev(nextLong), s, uint32(cv), false))
// Repeat at + 2
best = bestOf(best, matchAt(s-repeat+1, s+1, uint32(cv>>8), true))
// s+2
if true {
nextShort = sTable[hash4(cv>>8, sTableBits)]
s++
cv = load64(src, s)
nextLong = lTable[hash8(cv, lTableBits)]
best = bestOf(best, matchAt(getCur(nextShort), s, uint32(cv), false))
best = bestOf(best, matchAt(getPrev(nextShort), s, uint32(cv), false))
best = bestOf(best, matchAt(getCur(nextLong), s, uint32(cv), false))
best = bestOf(best, matchAt(getPrev(nextLong), s, uint32(cv), false))
}
}
}
// Update table
lTable[hashL] = uint64(s) | candidateL<<32
sTable[hashS] = uint64(s) | candidateS<<32
if best.length > 0 {
break
}
cv = load64(src, nextS)
s = nextS
}
// Extend backwards, not needed for repeats...
s = best.s
if !best.rep {
for best.offset > 0 && s > nextEmit && src[best.offset-1] == src[s-1] {
best.offset--
best.length++
s--
}
}
if false && best.offset >= s {
panic(fmt.Errorf("t %d >= s %d", best.offset, s))
}
// Bail if we exceed the maximum size.
if d+(s-nextEmit) > dstLimit {
return 0
}
base := s
offset := s - best.offset
s += best.length
if offset > 65535 && s-base <= 5 && !best.rep {
// Bail if the match is equal or worse to the encoding.
s = best.s + 1
if s >= sLimit {
goto emitRemainder
}
cv = load64(src, s)
continue
}
d += emitLiteral(dst[d:], src[nextEmit:base])
if best.rep {
if nextEmit > 0 {
// same as `add := emitCopy(dst[d:], repeat, s-base)` but skips storing offset.
d += emitRepeat(dst[d:], offset, best.length)
} else {
// First match, cannot be repeat.
d += emitCopy(dst[d:], offset, best.length)
}
} else {
d += emitCopy(dst[d:], offset, best.length)
}
repeat = offset
nextEmit = s
if s >= sLimit {
goto emitRemainder
}
if d > dstLimit {
// Do we have space for more, if not bail.
return 0
}
// Fill tables...
for i := best.s + 1; i < s; i++ {
cv0 := load64(src, i)
long0 := hash8(cv0, lTableBits)
short0 := hash4(cv0, sTableBits)
lTable[long0] = uint64(i) | lTable[long0]<<32
sTable[short0] = uint64(i) | sTable[short0]<<32
}
cv = load64(src, s)
}
emitRemainder:
if nextEmit < len(src) {
// Bail if we exceed the maximum size.
if d+len(src)-nextEmit > dstLimit {
return 0
}
d += emitLiteral(dst[d:], src[nextEmit:])
}
return d
}

View File

@@ -63,6 +63,9 @@ func encodeBlockBetter(dst, src []byte) (d int) {
// lets us use a fast path for emitLiteral in the main loop, while we are
// looking for copies.
sLimit := len(src) - inputMargin
if len(src) < minNonLiteralBlockSize {
return 0
}
// Bail if we can't compress to at least this.
dstLimit := len(src) - len(src)>>5 - 5

View File

@@ -35,6 +35,7 @@
package s2
import (
"bytes"
"hash/crc32"
)
@@ -127,3 +128,9 @@ func literalExtraSize(n int64) int64 {
return 5
}
}
type byter interface {
Bytes() []byte
}
var _ byter = &bytes.Buffer{}

2
vendor/modules.txt vendored
View File

@@ -1,4 +1,4 @@
# github.com/klauspost/compress v1.11.4
# github.com/klauspost/compress v1.11.7
## explicit
github.com/klauspost/compress/s2
# github.com/minio/highwayhash v1.0.0