diff --git a/go.mod b/go.mod index 604797f5..20a9eee8 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/nats-io/nats-server/v2 go 1.15 require ( - github.com/klauspost/compress v1.11.4 + github.com/klauspost/compress v1.11.7 github.com/minio/highwayhash v1.0.0 github.com/nats-io/jwt/v2 v2.0.0-20210107222814-18c5cc45d263 github.com/nats-io/nats.go v1.10.1-0.20210122204956-b8ea7fc17ea6 diff --git a/go.sum b/go.sum index 5ef04b3d..496a8bcc 100644 --- a/go.sum +++ b/go.sum @@ -11,6 +11,8 @@ github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/klauspost/compress v1.11.4 h1:kz40R/YWls3iqT9zX9AHN3WoVsrAWVyui5sxuLqiXqU= github.com/klauspost/compress v1.11.4/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/klauspost/compress v1.11.7 h1:0hzRabrMN4tSTvMfnL3SCv1ZGeAP23ynzodBgaHeMeg= +github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/minio/highwayhash v1.0.0 h1:iMSDhgUILCr0TNm8LWlSjF8N0ZIj2qbO8WHp6Q/J2BA= github.com/minio/highwayhash v1.0.0/go.mod h1:xQboMTeM9nY9v/LlAOxFctujiv5+Aq2hR5dxBpaMbdc= github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= diff --git a/server/consumer.go b/server/consumer.go index d3341a48..d37540f5 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -214,6 +214,7 @@ type Consumer struct { closed bool // Clustered. + ca *consumerAssignment node RaftNode infoSub *subscription } @@ -230,7 +231,7 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) { return mset.addConsumer(config, _EMPTY_, nil) } -func (mset *Stream) addConsumer(config *ConsumerConfig, oname string, node RaftNode) (*Consumer, error) { +func (mset *Stream) addConsumer(config *ConsumerConfig, oname string, ca *consumerAssignment) (*Consumer, error) { mset.mu.RLock() s, jsa := mset.srv, mset.jsa mset.mu.RUnlock() @@ -507,13 +508,14 @@ func (mset *Stream) addConsumer(config *ConsumerConfig, oname string, node RaftN store, err := mset.store.ConsumerStore(o.name, config) if err != nil { mset.mu.Unlock() + o.deleteWithoutAdvisory() return nil, fmt.Errorf("error creating store for observable: %v", err) } o.store = store - // FIXME(dlc) - Failures past this point should be consumer cleanup. if !isValidName(o.name) { mset.mu.Unlock() + o.deleteWithoutAdvisory() return nil, fmt.Errorf("durable name can not contain '.', '*', '>'") } @@ -525,15 +527,21 @@ func (mset *Stream) addConsumer(config *ConsumerConfig, oname string, node RaftN if eo, ok := mset.consumers[o.name]; ok { mset.mu.Unlock() if !o.isDurable() || !o.isPushMode() { + o.name = _EMPTY_ // Prevent removal since same name. + o.deleteWithoutAdvisory() return nil, fmt.Errorf("consumer already exists") } // If we are here we have already registered this durable. If it is still active that is an error. if eo.Active() { + o.name = _EMPTY_ // Prevent removal since same name. + o.deleteWithoutAdvisory() return nil, fmt.Errorf("consumer already exists and is still active") } // Since we are here this means we have a potentially new durable so we should update here. // Check that configs are the same. if !configsEqualSansDelivery(o.config, eo.config) { + o.name = _EMPTY_ // Precent removal since same name. + o.deleteWithoutAdvisory() return nil, fmt.Errorf("consumer replacement durable config not the same") } // Once we are here we have a replacement push-based durable. @@ -565,8 +573,10 @@ func (mset *Stream) addConsumer(config *ConsumerConfig, oname string, node RaftN } } - // Set our node. - o.node = node + // Set our ca. + if ca != nil { + o.setConsumerAssignment(ca) + } mset.setConsumer(o) mset.mu.Unlock() @@ -584,6 +594,16 @@ func (mset *Stream) addConsumer(config *ConsumerConfig, oname string, node RaftN return o, nil } +func (o *Consumer) setConsumerAssignment(ca *consumerAssignment) { + o.mu.Lock() + defer o.mu.Unlock() + o.ca = ca + // Set our node. + if ca != nil { + o.node = ca.Group.node + } +} + // Lock should be held. func (o *Consumer) isLeader() bool { if o.node != nil { @@ -865,14 +885,14 @@ func (o *Consumer) deleteNotActive() { o.mu.RUnlock() return } - s, jsa := o.mset.srv, o.mset.jsa - stream, name := o.stream, o.name + s, js, jsa := o.mset.srv, o.mset.srv.js, o.mset.jsa + acc, stream, name := o.acc.Name, o.stream, o.name o.mu.RUnlock() // If we are clustered, check if we still have this consumer assigned. // If we do forward a proposal to delete ourselves to the metacontroller leader. if s.JetStreamIsClustered() { - if ca := jsa.consumerAssignment(stream, name); ca != nil { + if ca := js.consumerAssignment(acc, stream, name); ca != nil { // We copy and clear the reply since this removal is internal. jsa.mu.Lock() cca := *ca @@ -892,7 +912,7 @@ func (o *Consumer) deleteNotActive() { ticker := time.NewTicker(time.Second) defer ticker.Stop() for range ticker.C { - if ca := jsa.consumerAssignment(stream, name); ca != nil { + if ca := js.consumerAssignment(acc, stream, name); ca != nil { s.Warnf("Consumer assignment not cleaned up, retrying") meta.ForwardProposal(removeEntry) } else { @@ -1125,31 +1145,57 @@ func (o *Consumer) readStoredState() error { return nil } state, err := o.store.State() - if err == nil && state != nil { - // FIXME(dlc) - re-apply state. - o.dseq = state.Delivered.Consumer + 1 - o.sseq = state.Delivered.Stream + 1 - o.adflr = state.AckFloor.Consumer - o.asflr = state.AckFloor.Stream - o.pending = state.Pending - o.rdc = state.Redelivered + o.applyState(state) } + return err +} + +// Apply the consumer stored state. +func (o *Consumer) applyState(state *ConsumerState) { + if state == nil { + return + } + + o.dseq = state.Delivered.Consumer + 1 + o.sseq = state.Delivered.Stream + 1 + o.adflr = state.AckFloor.Consumer + o.asflr = state.AckFloor.Stream + o.pending = state.Pending + o.rdc = state.Redelivered // Setup tracking timer if we have restored pending. if len(o.pending) > 0 && o.ptmr == nil { o.ptmr = time.AfterFunc(o.ackWait(0), o.checkPending) } - return err +} + +func (o *Consumer) readStoreState() *ConsumerState { + o.mu.RLock() + defer o.mu.RUnlock() + if o.store == nil { + return nil + } + state, _ := o.store.State() + return state +} + +// Sets our store state from another source. Used in clustered mode on snapshot restore. +func (o *Consumer) setStoreState(state *ConsumerState) error { + if state == nil { + return nil + } + o.applyState(state) + return o.store.Update(state) } // Update our state to the store. -func (o *Consumer) writeState() { +func (o *Consumer) writeStoreState() error { o.mu.Lock() defer o.mu.Unlock() if o.store == nil { - return + return nil } state := ConsumerState{ @@ -1164,8 +1210,7 @@ func (o *Consumer) writeState() { Pending: o.pending, Redelivered: o.rdc, } - // FIXME(dlc) - Hold onto any errors. - o.store.Update(&state) + return o.store.Update(&state) } // loopAndDeliverMsgs() will loop and deliver messages and watch for interest changes. @@ -2312,7 +2357,7 @@ func (o *Consumer) purge(sseq uint64) { } o.mu.Unlock() - o.writeState() + o.writeStoreState() } func stopAndClearTimer(tp **time.Timer) { @@ -2423,8 +2468,13 @@ func (o *Consumer) stop(dflag, doSignal, advisory bool) error { } } - if dflag && n != nil { - n.Stop() + // Cluster cleanup. + if n != nil { + if dflag { + n.Delete() + } else { + n.Stop() + } } var err error diff --git a/server/events.go b/server/events.go index a7cd596e..96e08b0f 100644 --- a/server/events.go +++ b/server/events.go @@ -223,7 +223,7 @@ type DataStats struct { // Used for internally queueing up messages that the server wants to send. type pubMsg struct { - acc *Account + c *client sub string rply string si *ServerInfo @@ -258,7 +258,6 @@ RESET: } sysc := s.sys.client resetCh := s.sys.resetCh - sysacc := s.sys.account sendq := s.sys.sendq id := s.info.ID host := s.info.Host @@ -276,10 +275,6 @@ RESET: warnFreq := time.Second last := time.Now().Add(-warnFreq) - // Internal client not generally accessible for when we - // need to change accounts from the system account. - ic := s.createInternalAccountClient() - for s.eventsRunning() { // Setup information for next message if len(sendq) > warnThresh && time.Since(last) >= warnFreq { @@ -314,19 +309,14 @@ RESET: // Setup our client. If the user wants to use a non-system account use our internal // account scoped here so that we are not changing out accounts for the system client. var c *client - if pm.acc != nil && pm.acc != sysacc { - c = ic + if pm.c != nil { + c = pm.c } else { c = sysc - pm.acc = nil } // Grab client lock. c.mu.Lock() - // We can have an override for account here. - if pm.acc != nil { - c.acc = pm.acc - } // Prep internal structures needed to send message. c.pa.subject = []byte(pm.sub) @@ -392,8 +382,15 @@ func (s *Server) sendInternalAccountMsg(a *Account, subject string, msg interfac } sendq := s.sys.sendq // Don't hold lock while placing on the channel. + c := s.sys.client s.mu.Unlock() - sendq <- &pubMsg{a, subject, "", nil, msg, false} + + // Replace our client with the account's internal client. + if a != nil { + c = a.internalClient() + } + + sendq <- &pubMsg{c, subject, _EMPTY_, nil, msg, false} return nil } diff --git a/server/filestore.go b/server/filestore.go index 858c9212..7a3eab50 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2844,10 +2844,7 @@ const errFile = "errors.txt" func (fs *fileStore) streamSnapshot(w io.WriteCloser, state *StreamState, includeConsumers bool) { defer w.Close() - bw := bufio.NewWriter(w) - defer bw.Flush() - - enc := s2.NewWriter(bw) + enc := s2.NewWriter(w) defer enc.Close() tw := tar.NewWriter(enc) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index d2fc9be2..23606996 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1081,12 +1081,42 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, subject, repl name := streamNameFromSubject(subject) + var resp = JSApiStreamInfoResponse{ApiResponse: ApiResponse{Type: JSApiStreamInfoResponseType}} + // If we are in clustered mode we need to be the stream leader to proceed. - if s.JetStreamIsClustered() && !acc.JetStreamIsStreamLeader(name) { - return + if s.JetStreamIsClustered() { + // Check to make sure the consumer is assigned. + js, cc := s.getJetStreamCluster() + if js == nil || cc == nil { + return + } + jsEnabled := acc.JetStreamEnabled() + + js.mu.RLock() + isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, name) + js.mu.RUnlock() + + if isLeader && sa == nil { + // We can't find the stream, so mimic what would be the errors below. + if !jsEnabled { + resp.Error = jsNotEnabledErr + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + // No stream present. + resp.Error = jsNotFoundError(ErrJetStreamStreamNotFound) + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } else if sa == nil { + return + } + + // We have the stream assigned, only the stream leader should answer. + if !acc.JetStreamIsStreamLeader(name) { + return + } } - var resp = JSApiStreamInfoResponse{ApiResponse: ApiResponse{Type: JSApiStreamInfoResponseType}} if !acc.JetStreamEnabled() { resp.Error = jsNotEnabledErr s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) @@ -2039,12 +2069,47 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, subject, re stream := streamNameFromSubject(subject) consumer := consumerNameFromSubject(subject) + var resp = JSApiConsumerInfoResponse{ApiResponse: ApiResponse{Type: JSApiConsumerInfoResponseType}} + // If we are in clustered mode we need to be the stream leader to proceed. - if s.JetStreamIsClustered() && !acc.JetStreamIsConsumerLeader(stream, consumer) { - return + if s.JetStreamIsClustered() { + // Check to make sure the consumer is assigned. + js, cc := s.getJetStreamCluster() + if js == nil || cc == nil { + return + } + jsEnabled := acc.JetStreamEnabled() + + js.mu.RLock() + isLeader, sa, ca := cc.isLeader(), js.streamAssignment(acc.Name, stream), js.consumerAssignment(acc.Name, stream, consumer) + js.mu.RUnlock() + + if isLeader && ca == nil { + // We can't find the consumer, so mimic what would be the errors below. + if !jsEnabled { + resp.Error = jsNotEnabledErr + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if sa == nil { + resp.Error = jsNotFoundError(ErrJetStreamStreamNotFound) + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + // If we are here the consumer is not present. + resp.Error = jsNoConsumerErr + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } else if ca == nil { + return + } + + // We have the consumer assigned, only the consumer leader should answer. + if !acc.JetStreamIsConsumerLeader(stream, consumer) { + return + } } - var resp = JSApiConsumerInfoResponse{ApiResponse: ApiResponse{Type: JSApiConsumerInfoResponseType}} if !acc.JetStreamEnabled() { resp.Error = jsNotEnabledErr s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 5bd46791..629c2d16 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -62,6 +62,8 @@ const ( // Consumer ops updateDeliveredOp updateAcksOp + // Compressed consumer assignments. + assignCompressedConsumerOp ) // raftGroups are controlled by the metagroup controller. @@ -89,6 +91,20 @@ type streamAssignment struct { err error } +// consumerAssignment is what the meta controller uses to assign consumers to streams. +type consumerAssignment struct { + Client *ClientInfo `json:"client,omitempty"` + Name string `json:"name"` + Stream string `json:"stream"` + Config *ConsumerConfig `json:"consumer"` + Group *raftGroup `json:"group"` + Reply string `json:"reply"` + State *ConsumerState `json:"state,omitempty"` + // Internal + responded bool + err error +} + // streamPurge is what the stream leader will replicate when purging a stream. type streamPurge struct { Client *ClientInfo `json:"client,omitempty"` @@ -104,19 +120,6 @@ type streamMsgDelete struct { Reply string `json:"reply"` } -// consumerAssignment is what the meta controller uses to assign consumers to streams. -type consumerAssignment struct { - Client *ClientInfo `json:"client,omitempty"` - Name string `json:"name"` - Stream string `json:"stream"` - Config *ConsumerConfig `json:"consumer"` - Group *raftGroup `json:"group"` - Reply string `json:"reply"` - // Internal - responded bool - err error -} - const ( defaultStoreDirName = "_js_" defaultMetaGroupName = "_meta_" @@ -376,6 +379,8 @@ func (s *Server) JetStreamIsConsumerLeader(account, stream, consumer string) boo if js == nil || cc == nil { return false } + js.mu.RLock() + defer js.mu.RUnlock() return cc.isConsumerLeader(account, stream, consumer) } @@ -577,7 +582,7 @@ func (cc *jetStreamCluster) isConsumerLeader(account, stream, consumer string) b rg := ca.Group for _, peer := range rg.Peers { if peer == ourID { - if len(rg.Peers) == 1 || rg.node.Leader() { + if len(rg.Peers) == 1 || (rg.node != nil && rg.node.Leader()) { return true } } @@ -823,6 +828,13 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool return didSnap, err } js.processConsumerAssignment(ca) + case assignCompressedConsumerOp: + ca, err := decodeConsumerAssignmentCompressed(buf[1:]) + if err != nil { + js.srv.Errorf("JetStream cluster failed to decode compressed consumer assigment: %q", buf[1:]) + return didSnap, err + } + js.processConsumerAssignment(ca) case removeConsumerOp: ca, err := decodeConsumerAssignment(buf[1:]) if err != nil { @@ -995,6 +1007,9 @@ func (js *jetStream) monitorStream(mset *Stream, sa *streamAssignment) { for { select { case err := <-restoreDoneCh: + // We have completed a restore from snapshot on this server. The stream assignment has + // already been assigned but the replicas will need to catch up out of band. Consumers + // will need to be assigned by forwarding the proposal and stamping the initial state. s.Debugf("Stream restore for '%s > %s' completed", sa.Client.Account, sa.Config.Name) if err != nil { s.Debugf("Stream restore failed: %v", err) @@ -1037,6 +1052,47 @@ func (js *jetStream) monitorStream(mset *Stream, sa *streamAssignment) { js.processStreamLeaderChange(mset, sa, isLeader) attemptSnapshot() + // Check to see if we have restored consumers here. + // These are not currently assigned so we will need to do so here. + if consumers := mset.Consumers(); len(consumers) > 0 { + for _, o := range mset.Consumers() { + rg := cc.createGroupForConsumer(sa) + // Pick a preferred leader. + rg.setPreferred() + name, cfg := o.Name(), o.Config() + // Place our initial state here as well for assignment distribution. + ca := &consumerAssignment{ + Group: rg, + Stream: sa.Config.Name, + Name: name, + Config: &cfg, + Client: sa.Client, + State: o.readStoreState(), + } + + // We make these compressed in case state is complex. + addEntry := encodeAddConsumerAssignmentCompressed(ca) + cc.meta.ForwardProposal(addEntry) + + // Check to make sure we see the assignment. + go func() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for range ticker.C { + js.mu.RLock() + ca, meta := js.consumerAssignment(ca.Client.Account, sa.Config.Name, name), cc.meta + js.mu.RUnlock() + if ca == nil { + s.Warnf("Consumer assignment has not been assigned, retrying") + meta.ForwardProposal(addEntry) + } else { + return + } + } + }() + } + } + case <-s.quitCh: return case <-qch: @@ -1195,7 +1251,12 @@ func (js *jetStream) processStreamLeaderChange(mset *Stream, sa *streamAssignmen // Will lookup a stream assignment. // Lock should be held. func (js *jetStream) streamAssignment(account, stream string) (sa *streamAssignment) { - if as := js.cluster.streams[account]; as != nil { + cc := js.cluster + if cc == nil { + return nil + } + + if as := cc.streams[account]; as != nil { sa = as[stream] } return sa @@ -1269,8 +1330,16 @@ func (js *jetStream) processClusterCreateStream(sa *streamAssignment) { var mset *Stream - // Process here if not restoring. - if sa.Restore == nil { + // If we are restoring, create the stream if we are R>1 and not the preferred who handles the + // receipt of the snapshot itself. + shouldCreate := true + if sa.Restore != nil { + if len(rg.Peers) == 1 || rg.node != nil && rg.node.ID() == rg.Preferred { + shouldCreate = false + } + } + // Process here if not restoring or not the leader. + if shouldCreate { // Go ahead and create or update the stream. mset, err = acc.LookupStream(sa.Config.Name) if err == nil && mset != nil { @@ -1287,7 +1356,7 @@ func (js *jetStream) processClusterCreateStream(sa *streamAssignment) { // This is an error condition. if err != nil { - s.Debugf("Stream create failed for '%s > %s': %v\n", sa.Client.Account, sa.Config.Name, err) + s.Debugf("Stream create failed for '%s > %s': %v", sa.Client.Account, sa.Config.Name, err) js.mu.Lock() sa.err = err sa.responded = true @@ -1348,6 +1417,47 @@ func (js *jetStream) processClusterCreateStream(sa *streamAssignment) { } js.processStreamLeaderChange(mset, sa, true) + // Check to see if we have restored consumers here. + // These are not currently assigned so we will need to do so here. + if consumers := mset.Consumers(); len(consumers) > 0 { + js.mu.RLock() + cc := js.cluster + js.mu.RUnlock() + + for _, o := range mset.Consumers() { + rg := cc.createGroupForConsumer(sa) + name, cfg := o.Name(), o.Config() + // Place our initial state here as well for assignment distribution. + ca := &consumerAssignment{ + Group: rg, + Stream: sa.Config.Name, + Name: name, + Config: &cfg, + Client: sa.Client, + } + + addEntry := encodeAddConsumerAssignment(ca) + cc.meta.ForwardProposal(addEntry) + + // Check to make sure we see the assignment. + go func() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for range ticker.C { + js.mu.RLock() + ca, meta := js.consumerAssignment(ca.Client.Account, sa.Config.Name, name), cc.meta + js.mu.RUnlock() + if ca == nil { + s.Warnf("Consumer assignment has not been assigned, retrying") + meta.ForwardProposal(addEntry) + } else { + return + } + } + }() + } + } + case <-s.quitCh: return } @@ -1445,7 +1555,18 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { sa := js.streamAssignment(ca.Client.Account, ca.Stream) if sa == nil { - // FIXME(dlc) - log. + s.Debugf("Consumer create failed, could not locate stream '%s > %s'", ca.Client.Account, ca.Stream) + ca.err = ErrJetStreamStreamNotFound + result := &consumerAssignmentResult{ + Account: ca.Client.Account, + Stream: ca.Stream, + Consumer: ca.Name, + Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}, + } + result.Response.Error = jsNotFoundError(ErrJetStreamStreamNotFound) + // Send response to the metadata leader. They will forward to the user as needed. + b, _ := json.Marshal(result) // Avoids auto-processing and doing fancy json with newlines. + s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, b) js.mu.Unlock() return } @@ -1519,11 +1640,26 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment) { // Go ahead and create or update the consumer. mset, err := acc.LookupStream(ca.Stream) if err != nil { - s.Debugf("JetStream cluster error looking up stream %q for account %q: %v", ca.Stream, acc.Name, err) - ca.err = err + js.mu.Lock() + s.Debugf("Consumer create failed, could not locate stream '%s > %s'", ca.Client.Account, ca.Stream) + ca.err = ErrJetStreamStreamNotFound + result := &consumerAssignmentResult{ + Account: ca.Client.Account, + Stream: ca.Stream, + Consumer: ca.Name, + Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}, + } + result.Response.Error = jsNotFoundError(ErrJetStreamStreamNotFound) + // Send response to the metadata leader. They will forward to the user as needed. + b, _ := json.Marshal(result) // Avoids auto-processing and doing fancy json with newlines. + s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, b) + js.mu.Unlock() return } + // Process the raft group and make sure its running if needed. + js.createRaftGroup(rg) + // Check if we already have this consumer running. o := mset.LookupConsumer(ca.Name) if o != nil { @@ -1533,15 +1669,18 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment) { o.updateDeliverSubject(ca.Config.DeliverSubject) } } + o.setConsumerAssignment(ca) s.Debugf("JetStream cluster, consumer already running") } - // Process the raft group and make sure its running if needed. - js.createRaftGroup(rg) - // Add in the consumer if needed. if o == nil { - o, err = mset.addConsumer(ca.Config, ca.Name, rg.node) + o, err = mset.addConsumer(ca.Config, ca.Name, ca) + } + + // If we have an initial state set apply that now. + if ca.State != nil && o != nil { + err = o.setStoreState(ca.State) } if err != nil { @@ -1627,28 +1766,12 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb } // Returns the consumer assignment, or nil if not present. -func (jsa *jsAccount) consumerAssignment(stream, consumer string) *consumerAssignment { - jsa.mu.RLock() - defer jsa.mu.RUnlock() - - js, acc := jsa.js, jsa.account - if js == nil { - return nil +// Lock should be held. +func (js *jetStream) consumerAssignment(account, stream, consumer string) *consumerAssignment { + if sa := js.streamAssignment(account, stream); sa != nil { + return sa.consumers[consumer] } - cc := js.cluster - if cc == nil { - return nil - } - - var sa *streamAssignment - accStreams := cc.streams[acc.Name] - if accStreams != nil { - sa = accStreams[stream] - } - if sa == nil { - return nil - } - return sa.consumers[consumer] + return nil } // consumerAssigned informs us if this server has this consumer assigned. @@ -2101,9 +2224,20 @@ func (s *Server) jsClusteredStreamDeleteRequest(ci *ClientInfo, stream, reply st osa := js.streamAssignment(ci.Account, stream) if osa == nil { - // TODO(dlc) - Should respond? Log? + acc, err := s.LookupAccount(ci.Account) + if err == nil { + var resp = JSApiStreamDeleteResponse{ApiResponse: ApiResponse{Type: JSApiStreamDeleteResponseType}} + resp.Error = jsNotFoundError(ErrJetStreamStreamNotFound) + s.sendAPIResponse(ci, acc, _EMPTY_, reply, string(rmsg), s.jsonResponse(&resp)) + } return } + // Remove any remaining consumers as well. + for _, ca := range osa.consumers { + ca.Reply, ca.State = _EMPTY_, nil + cc.meta.Propose(encodeDeleteConsumerAssignment(ca)) + } + sa := &streamAssignment{Group: osa.Group, Config: osa.Config, Reply: reply, Client: ci} cc.meta.Propose(encodeDeleteStreamAssignment(sa)) } @@ -2399,7 +2533,12 @@ func (s *Server) jsClusteredConsumerDeleteRequest(ci *ClientInfo, stream, consum } oca := sa.consumers[consumer] if oca == nil { - // TODO(dlc) - Should respond? Log? + acc, err := s.LookupAccount(ci.Account) + if err == nil { + var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerDeleteResponseType}} + resp.Error = jsNoConsumerErr + s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) + } return } ca := &consumerAssignment{Group: oca.Group, Stream: stream, Name: consumer, Config: oca.Config, Reply: reply, Client: ci} @@ -2546,6 +2685,28 @@ func decodeConsumerAssignment(buf []byte) (*consumerAssignment, error) { return &ca, err } +func encodeAddConsumerAssignmentCompressed(ca *consumerAssignment) []byte { + b, err := json.Marshal(ca) + if err != nil { + return nil + } + // TODO(dlc) - Streaming better approach here probably. + var bb bytes.Buffer + bb.WriteByte(byte(assignCompressedConsumerOp)) + bb.Write(s2.Encode(nil, b)) + return bb.Bytes() +} + +func decodeConsumerAssignmentCompressed(buf []byte) (*consumerAssignment, error) { + var ca consumerAssignment + js, err := s2.Decode(nil, buf) + if err != nil { + return nil, err + } + err = json.Unmarshal(js, &ca) + return &ca, err +} + var errBadStreamMsg = errors.New("jetstream cluster bad replicated stream msg") func decodeStreamMsg(buf []byte) (subject, reply string, hdr, msg []byte, lseq uint64, ts int64, err error) { diff --git a/server/raft.go b/server/raft.go index dcbe7ec9..f4cb14cb 100644 --- a/server/raft.go +++ b/server/raft.go @@ -173,8 +173,8 @@ type lps struct { const ( minElectionTimeout = 350 * time.Millisecond maxElectionTimeout = 3 * minElectionTimeout - minCampaignTimeout = 5 * time.Millisecond - maxCampaignTimeout = 5 * minCampaignTimeout + minCampaignTimeout = 50 * time.Millisecond + maxCampaignTimeout = 4 * minCampaignTimeout hbInterval = 200 * time.Millisecond ) @@ -323,6 +323,10 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { n.notice("Started") + n.Lock() + n.resetElectionTimeout() + n.Unlock() + s.registerRaftNode(n.group, n) s.startGoRoutine(n.run) @@ -392,12 +396,14 @@ func (n *raft) Propose(data []byte) error { n.RLock() if n.state != Leader { n.RUnlock() + n.debug("Proposal ignored, not leader") return errNotLeader } propc, paused, quit := n.propc, n.pausec, n.quit n.RUnlock() if paused != nil { + n.debug("Proposals paused, will wait") select { case <-paused: case <-quit: @@ -410,6 +416,7 @@ func (n *raft) Propose(data []byte) error { select { case propc <- &Entry{EntryNormal, data}: default: + n.debug("Propose failed!") return errProposalFailed } return nil @@ -685,9 +692,9 @@ func (n *raft) campaign() error { if n.state == Leader { return errAlreadyLeader } - if n.state == Follower { - n.resetElect(randCampaignTimeout()) - } + // Pre-place our vote for ourselves. + n.vote = n.id + n.resetElect(randCampaignTimeout()) return nil } @@ -797,6 +804,12 @@ const ( raftReplySubj = "$NRG.R.%s" ) +// Our internal subscribe. +// Lock should be held. +func (n *raft) subscribe(subject string, cb msgHandler) (*subscription, error) { + return n.s.systemSubscribe(subject, _EMPTY_, false, n.c, cb) +} + func (n *raft) createInternalSubs() error { cn := n.s.ClusterName() n.vsubj, n.vreply = fmt.Sprintf(raftVoteSubj, cn, n.group), n.newInbox(cn) @@ -804,17 +817,17 @@ func (n *raft) createInternalSubs() error { n.psubj = fmt.Sprintf(raftPropSubj, n.group) // Votes - if _, err := n.s.sysSubscribe(n.vreply, n.handleVoteResponse); err != nil { + if _, err := n.subscribe(n.vreply, n.handleVoteResponse); err != nil { return err } - if _, err := n.s.sysSubscribe(n.vsubj, n.handleVoteRequest); err != nil { + if _, err := n.subscribe(n.vsubj, n.handleVoteRequest); err != nil { return err } // AppendEntry - if _, err := n.s.sysSubscribe(n.areply, n.handleAppendEntryResponse); err != nil { + if _, err := n.subscribe(n.areply, n.handleAppendEntryResponse); err != nil { return err } - if _, err := n.s.sysSubscribe(n.asubj, n.handleAppendEntry); err != nil { + if _, err := n.subscribe(n.asubj, n.handleAppendEntry); err != nil { return err } @@ -848,10 +861,6 @@ func (n *raft) run() { s := n.s defer s.grWG.Done() - n.Lock() - n.resetElectionTimeout() - n.Unlock() - for s.isRunning() { switch n.State() { case Follower: @@ -1079,6 +1088,8 @@ func (n *raft) handleForwardedProposal(sub *subscription, c *client, _, reply st n.debug("Ignoring forwarded proposal, not leader") return } + // Need to copy since this is underlying client/route buffer. + msg = append(msg[:0:0], msg...) if err := n.Propose(msg); err != nil { n.warn("Got error processing forwarded proposal: %v", err) } @@ -1087,7 +1098,7 @@ func (n *raft) handleForwardedProposal(sub *subscription, c *client, _, reply st func (n *raft) runAsLeader() { n.Lock() // For forwarded proposals. - fsub, err := n.s.sysSubscribe(n.psubj, n.handleForwardedProposal) + fsub, err := n.subscribe(n.psubj, n.handleForwardedProposal) if err != nil { n.warn("Error subscribing to forwarded proposals: %v", err) } @@ -1247,7 +1258,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) { n.progress = make(map[string]chan uint64) } if _, ok := n.progress[ar.peer]; ok { - n.debug("Existing entry for catching up %q\n", ar.peer) + n.debug("Existing entry for catching up %q", ar.peer) n.Unlock() return } @@ -1502,7 +1513,7 @@ func (n *raft) createCatchup(ae *appendEntry) string { pindex: n.pindex, } inbox := n.newInbox(n.s.ClusterName()) - sub, _ := n.s.sysSubscribe(inbox, n.handleAppendEntry) + sub, _ := n.subscribe(inbox, n.handleAppendEntry) n.catchup.sub = sub return inbox } @@ -1731,7 +1742,7 @@ func (n *raft) sendAppendEntry(entries []*Entry) { // If we have entries store this in our wal. if len(entries) > 0 { if err := n.storeToWAL(ae); err != nil { - panic("Error storing!\n") + panic("Error storing!") } // We count ourselves. n.acks[n.pindex] = map[string]struct{}{n.id: struct{}{}} @@ -2027,11 +2038,11 @@ func (n *raft) requestVote() { } func (n *raft) sendRPC(subject, reply string, msg []byte) { - n.sendq <- &pubMsg{nil, subject, reply, nil, msg, false} + n.sendq <- &pubMsg{n.c, subject, reply, nil, msg, false} } func (n *raft) sendReply(subject string, msg []byte) { - n.sendq <- &pubMsg{nil, subject, _EMPTY_, nil, msg, false} + n.sendq <- &pubMsg{n.c, subject, _EMPTY_, nil, msg, false} } func (n *raft) wonElection(votes int) bool { @@ -2099,7 +2110,6 @@ func (n *raft) switchToCandidate() { n.term++ // Clear current Leader. n.leader = noLeader - n.resetElectionTimeout() n.switchState(Candidate) } diff --git a/server/store.go b/server/store.go index c352a937..b8662c39 100644 --- a/server/store.go +++ b/server/store.go @@ -151,9 +151,9 @@ type ConsumerState struct { // These are both in stream sequence context. // Pending is for all messages pending and the timestamp for the delivered time. // This will only be present when the AckPolicy is ExplicitAck. - Pending map[uint64]*Pending `json:"pending"` + Pending map[uint64]*Pending `json:"pending,omitempty"` // This is for messages that have been redelivered, so count > 1. - Redelivered map[uint64]uint64 `json:"redelivered"` + Redelivered map[uint64]uint64 `json:"redelivered,omitempty"` } // Represents a pending message for explicit ack or ack all. diff --git a/test/jetstream_cluster_test.go b/test/jetstream_cluster_test.go index c840cebe..65dd37e8 100644 --- a/test/jetstream_cluster_test.go +++ b/test/jetstream_cluster_test.go @@ -1536,6 +1536,9 @@ func TestJetStreamClusterEphemeralConsumerCleanup(t *testing.T) { } ci, _ := sub.ConsumerInfo() + if ci == nil { + t.Fatalf("Unexpected error: no consumer info") + } // We will look up by hand this consumer to set inactive threshold lower for this test. cl := c.consumerLeader("$G", "foo", ci.Name) @@ -1604,19 +1607,56 @@ func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) { _, err := js.AddStream(&nats.StreamConfig{ Name: "TEST", Subjects: []string{"foo"}, - Replicas: 3, + Replicas: 2, }) if err != nil { t.Fatalf("Unexpected error: %v", err) } - toSend := 500 + toSend, batchSize := 200, 50 + for i := 0; i < toSend; i++ { if _, err = js.Publish("foo", []byte("OK")); err != nil { t.Fatalf("Unexpected publish error: %v", err) } } + // Create a consumer as well and give it a non-simplistic state. + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy, AckWait: time.Second}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + jsub, err := js.SubscribeSync("foo", nats.Attach("TEST", "dlc"), nats.Pull(batchSize)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + checkSubsPending(t, jsub, batchSize) + // Ack first 50. + for i := 1; i <= 50; i++ { + m, err := jsub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Unexpected error getting msg %d: %v", i, err) + } + m.Ack() + } + // Now ack every third message for next 50. + for i := 51; i <= 100; i++ { + m, err := jsub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Unexpected error getting msg %d: %v", i, err) + } + if i%3 == 0 { + m.Ack() + } + } + + // Snapshot consumer info. + ci, err := jsub.ConsumerInfo() + if err != nil { + t.Fatalf("Unexpected error getting consumer info: %v", err) + } + sreq := &server.JSApiStreamSnapshotRequest{ DeliverSubject: nats.NewInbox(), ChunkSize: 512, @@ -1702,7 +1742,7 @@ func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) { // Send our snapshot back in to restore the stream. // Can be any size message. - var chunk [512]byte + var chunk [1024]byte for r := bytes.NewReader(snapshot); ; { n, err := r.Read(chunk[:]) if err != nil { @@ -1720,25 +1760,9 @@ func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) { t.Fatalf("StreamInfo is not correct %+v", si) } - getExtendedStreamInfo := func() *server.StreamInfo { - t.Helper() - resp, err := nc.Request(fmt.Sprintf(server.JSApiStreamInfoT, "TEST"), nil, time.Second) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - var si server.StreamInfo - if err = json.Unmarshal(resp.Data, &si); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if si.Cluster == nil { - t.Fatalf("Expected cluster info") - } - return &si - } - // Make sure the replicas become current eventually. They will be doing catchup. checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { - si := getExtendedStreamInfo() + si, _ := js.StreamInfo("TEST") if si == nil || si.Cluster == nil { t.Fatalf("Did not get stream info") } @@ -1749,6 +1773,46 @@ func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) { } return nil }) + + // Wait on the system to elect a leader for the restored consumer. + c.waitOnNewConsumerLeader("$G", "TEST", "dlc") + + // Now check for the consumer being recreated. + nci, err := js.ConsumerInfo("TEST", "dlc") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if nci.Delivered != ci.Delivered { + t.Fatalf("Delivered states do not match %+v vs %+v", nci.Delivered, ci.Delivered) + } + if nci.AckFloor != ci.AckFloor { + t.Fatalf("Ack floors did not match %+v vs %+v", nci.AckFloor, ci.AckFloor) + } + + // Make sure consumer works. + // It should pick up with the next delivery spot, so check for that as first message. + // We should have all the messages for first delivery delivered. + start := 101 + end := toSend + for i := start; i <= end; i++ { + m, err := jsub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Unexpected error getting msg: %v", err) + } + meta, err := m.MetaData() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if meta.Stream != uint64(i) { + t.Fatalf("Expected stream sequence of %d, but got %d", i, meta.Stream) + } + m.Ack() + } + + // Check that redelivered come in now.. + redelivered := 50/3 + 1 + checkSubsPending(t, jsub, redelivered) } func TestJetStreamClusterAccountInfoAndLimits(t *testing.T) { @@ -2128,7 +2192,7 @@ func jsClientConnect(t *testing.T, s *server.Server) (*nats.Conn, nats.JetStream func checkSubsPending(t *testing.T, sub *nats.Subscription, numExpected int) { t.Helper() - checkFor(t, 500*time.Millisecond, 10*time.Millisecond, func() error { + checkFor(t, 3*time.Second, 10*time.Millisecond, func() error { if nmsgs, _, err := sub.Pending(); err != nil || nmsgs != numExpected { return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, numExpected) } @@ -2165,7 +2229,7 @@ func (c *cluster) waitOnPeerCount(n int) { c.t.Helper() c.waitOnLeader() leader := c.leader() - expires := time.Now().Add(5 * time.Second) + expires := time.Now().Add(10 * time.Second) for time.Now().Before(expires) { peers := leader.JetStreamClusterPeers() if len(peers) == n { @@ -2178,7 +2242,7 @@ func (c *cluster) waitOnPeerCount(n int) { func (c *cluster) waitOnNewConsumerLeader(account, stream, consumer string) { c.t.Helper() - expires := time.Now().Add(5 * time.Second) + expires := time.Now().Add(10 * time.Second) for time.Now().Before(expires) { if leader := c.consumerLeader(account, stream, consumer); leader != nil { time.Sleep(25 * time.Millisecond) @@ -2201,13 +2265,13 @@ func (c *cluster) consumerLeader(account, stream, consumer string) *server.Serve func (c *cluster) waitOnNewStreamLeader(account, stream string) { c.t.Helper() - expires := time.Now().Add(5 * time.Second) + expires := time.Now().Add(10 * time.Second) for time.Now().Before(expires) { if leader := c.streamLeader(account, stream); leader != nil { - time.Sleep(100 * time.Millisecond) + time.Sleep(25 * time.Millisecond) return } - time.Sleep(25 * time.Millisecond) + time.Sleep(100 * time.Millisecond) } c.t.Fatalf("Expected a stream leader for %q %q, got none", account, stream) } diff --git a/vendor/github.com/klauspost/compress/s2/README.md b/vendor/github.com/klauspost/compress/s2/README.md index 303c8552..479e8247 100644 --- a/vendor/github.com/klauspost/compress/s2/README.md +++ b/vendor/github.com/klauspost/compress/s2/README.md @@ -442,11 +442,51 @@ The PDF sample shows a significant slowdown compared to Snappy, as this mode tri to compress the data. Very small blocks are also not favorable for better compression, so throughput is way down. This mode aims to provide better compression at the expense of performance and achieves that -without a huge performance pentalty, except on very small blocks. +without a huge performance penalty, except on very small blocks. Decompression speed suffers a little compared to the regular S2 mode, but still manages to be close to Snappy in spite of increased compression. +# Best compression mode + +S2 offers a "best" compression mode. + +This will compress as much as possible with little regard to CPU usage. + +Mainly for offline compression, but where decompression speed should still +be high and compatible with other S2 compressed data. + +Some examples compared on 16 core CPU: + +``` +* enwik10 +Default... 10000000000 -> 4761467548 [47.61%]; 1.098s, 8685.6MB/s +Better... 10000000000 -> 4225922984 [42.26%]; 2.817s, 3385.4MB/s +Best... 10000000000 -> 3667646858 [36.68%]; 35.995s, 264.9MB/s + +* github-june-2days-2019.json +Default... 6273951764 -> 1043196283 [16.63%]; 431ms, 13882.3MB/s +Better... 6273951764 -> 950079555 [15.14%]; 736ms, 8129.5MB/s +Best... 6273951764 -> 846260870 [13.49%]; 8.125s, 736.4MB/s + +* nyc-taxi-data-10M.csv +Default... 3325605752 -> 1095998837 [32.96%]; 324ms, 9788.7MB/s +Better... 3325605752 -> 960330423 [28.88%]; 602ms, 5268.4MB/s +Best... 3325605752 -> 794873295 [23.90%]; 6.619s, 479.1MB/s + +* 10gb.tar +Default... 10065157632 -> 5916578242 [58.78%]; 1.028s, 9337.4MB/s +Better... 10065157632 -> 5650133605 [56.14%]; 2.172s, 4419.4MB/s +Best... 10065157632 -> 5246578570 [52.13%]; 25.696s, 373.6MB/s + +* consensus.db.10gb +Default... 10737418240 -> 4562648848 [42.49%]; 882ms, 11610.0MB/s +Better... 10737418240 -> 4542443833 [42.30%]; 3.3s, 3103.5MB/s +Best... 10737418240 -> 4272335558 [39.79%]; 38.955s, 262.9MB/s +``` + +Decompression speed should be around the same as using the 'better' compression mode. + # Concatenating blocks and streams. Concatenating streams will concatenate the output of both without recompressing them. diff --git a/vendor/github.com/klauspost/compress/s2/decode.go b/vendor/github.com/klauspost/compress/s2/decode.go index 039ddb91..0b99b3b0 100644 --- a/vendor/github.com/klauspost/compress/s2/decode.go +++ b/vendor/github.com/klauspost/compress/s2/decode.go @@ -20,8 +20,6 @@ var ( ErrTooLarge = errors.New("s2: decoded block is too large") // ErrUnsupported reports that the input isn't supported. ErrUnsupported = errors.New("s2: unsupported input") - - errUnsupportedLiteralLength = errors.New("s2: unsupported literal length") ) // DecodedLen returns the length of the decoded block. @@ -46,8 +44,7 @@ func decodedLen(src []byte) (blockLen, headerLen int, err error) { } const ( - decodeErrCodeCorrupt = 1 - decodeErrCodeUnsupportedLiteralLength = 2 + decodeErrCodeCorrupt = 1 ) // Decode returns the decoded form of src. The returned slice may be a sub- @@ -65,22 +62,47 @@ func Decode(dst, src []byte) ([]byte, error) { } else { dst = make([]byte, dLen) } - switch s2Decode(dst, src[s:]) { - case 0: - return dst, nil - case decodeErrCodeUnsupportedLiteralLength: - return nil, errUnsupportedLiteralLength + if s2Decode(dst, src[s:]) != 0 { + return nil, ErrCorrupt } - return nil, ErrCorrupt + return dst, nil } // NewReader returns a new Reader that decompresses from r, using the framing // format described at // https://github.com/google/snappy/blob/master/framing_format.txt with S2 changes. -func NewReader(r io.Reader) *Reader { - return &Reader{ - r: r, - buf: make([]byte, MaxEncodedLen(maxBlockSize)+checksumSize), +func NewReader(r io.Reader, opts ...ReaderOption) *Reader { + nr := Reader{ + r: r, + maxBlock: maxBlockSize, + } + for _, opt := range opts { + if err := opt(&nr); err != nil { + nr.err = err + return &nr + } + } + nr.buf = make([]byte, MaxEncodedLen(nr.maxBlock)+checksumSize) + nr.paramsOK = true + return &nr +} + +// ReaderOption is an option for creating a decoder. +type ReaderOption func(*Reader) error + +// ReaderMaxBlockSize allows to control allocations if the stream +// has been compressed with a smaller WriterBlockSize, or with the default 1MB. +// Blocks must be this size or smaller to decompress, +// otherwise the decoder will return ErrUnsupported. +// +// Default is the maximum limit of 4MB. +func ReaderMaxBlockSize(n int) ReaderOption { + return func(r *Reader) error { + if n > maxBlockSize || n <= 0 { + return errors.New("s2: block size too large. Must be <= 4MB and > 0") + } + r.maxBlock = n + return nil } } @@ -92,13 +114,18 @@ type Reader struct { buf []byte // decoded[i:j] contains decoded bytes that have not yet been passed on. i, j int + maxBlock int readHeader bool + paramsOK bool } // Reset discards any buffered data, resets all state, and switches the Snappy // reader to read from r. This permits reusing a Reader rather than allocating // a new one. func (r *Reader) Reset(reader io.Reader) { + if !r.paramsOK { + return + } r.r = reader r.err = nil r.i = 0 @@ -116,6 +143,36 @@ func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) { return true } +// skipN will skip n bytes. +// If the supplied reader supports seeking that is used. +// tmp is used as a temporary buffer for reading. +// The supplied slice does not need to be the size of the read. +func (r *Reader) skipN(tmp []byte, n int, allowEOF bool) (ok bool) { + if rs, ok := r.r.(io.ReadSeeker); ok { + _, err := rs.Seek(int64(n), io.SeekCurrent) + if err == nil { + return true + } + if err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) { + r.err = ErrCorrupt + return false + } + } + for n > 0 { + if n < len(tmp) { + tmp = tmp[:n] + } + if _, r.err = io.ReadFull(r.r, tmp); r.err != nil { + if r.err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) { + r.err = ErrCorrupt + } + return false + } + n -= len(tmp) + } + return true +} + // Read satisfies the io.Reader interface. func (r *Reader) Read(p []byte) (int, error) { if r.err != nil { @@ -139,10 +196,6 @@ func (r *Reader) Read(p []byte) (int, error) { r.readHeader = true } chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16 - if chunkLen > len(r.buf) { - r.err = ErrUnsupported - return 0, r.err - } // The chunk types are specified at // https://github.com/google/snappy/blob/master/framing_format.txt @@ -153,6 +206,10 @@ func (r *Reader) Read(p []byte) (int, error) { r.err = ErrCorrupt return 0, r.err } + if chunkLen > len(r.buf) { + r.err = ErrUnsupported + return 0, r.err + } buf := r.buf[:chunkLen] if !r.readFull(buf, false) { return 0, r.err @@ -166,7 +223,7 @@ func (r *Reader) Read(p []byte) (int, error) { return 0, r.err } if n > len(r.decoded) { - if n > maxBlockSize { + if n > r.maxBlock { r.err = ErrCorrupt return 0, r.err } @@ -189,6 +246,10 @@ func (r *Reader) Read(p []byte) (int, error) { r.err = ErrCorrupt return 0, r.err } + if chunkLen > len(r.buf) { + r.err = ErrUnsupported + return 0, r.err + } buf := r.buf[:checksumSize] if !r.readFull(buf, false) { return 0, r.err @@ -197,7 +258,7 @@ func (r *Reader) Read(p []byte) (int, error) { // Read directly into r.decoded instead of via r.buf. n := chunkLen - checksumSize if n > len(r.decoded) { - if n > maxBlockSize { + if n > r.maxBlock { r.err = ErrCorrupt return 0, r.err } @@ -238,7 +299,12 @@ func (r *Reader) Read(p []byte) (int, error) { } // Section 4.4 Padding (chunk type 0xfe). // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd). - if !r.readFull(r.buf[:chunkLen], false) { + if chunkLen > maxBlockSize { + r.err = ErrUnsupported + return 0, r.err + } + + if !r.skipN(r.buf, chunkLen, false) { return 0, r.err } } @@ -286,10 +352,6 @@ func (r *Reader) Skip(n int64) error { r.readHeader = true } chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16 - if chunkLen > len(r.buf) { - r.err = ErrUnsupported - return r.err - } // The chunk types are specified at // https://github.com/google/snappy/blob/master/framing_format.txt @@ -300,6 +362,10 @@ func (r *Reader) Skip(n int64) error { r.err = ErrCorrupt return r.err } + if chunkLen > len(r.buf) { + r.err = ErrUnsupported + return r.err + } buf := r.buf[:chunkLen] if !r.readFull(buf, false) { return r.err @@ -312,7 +378,7 @@ func (r *Reader) Skip(n int64) error { r.err = err return r.err } - if dLen > maxBlockSize { + if dLen > r.maxBlock { r.err = ErrCorrupt return r.err } @@ -342,6 +408,10 @@ func (r *Reader) Skip(n int64) error { r.err = ErrCorrupt return r.err } + if chunkLen > len(r.buf) { + r.err = ErrUnsupported + return r.err + } buf := r.buf[:checksumSize] if !r.readFull(buf, false) { return r.err @@ -350,7 +420,7 @@ func (r *Reader) Skip(n int64) error { // Read directly into r.decoded instead of via r.buf. n2 := chunkLen - checksumSize if n2 > len(r.decoded) { - if n2 > maxBlockSize { + if n2 > r.maxBlock { r.err = ErrCorrupt return r.err } @@ -391,13 +461,15 @@ func (r *Reader) Skip(n int64) error { r.err = ErrUnsupported return r.err } - // Section 4.4 Padding (chunk type 0xfe). - // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd). - if !r.readFull(r.buf[:chunkLen], false) { + if chunkLen > maxBlockSize { + r.err = ErrUnsupported + return r.err + } + // Section 4.4 Padding (chunk type 0xfe). + // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd). + if !r.skipN(r.buf, chunkLen, false) { return r.err } - - return io.ErrUnexpectedEOF } return nil } diff --git a/vendor/github.com/klauspost/compress/s2/decode_other.go b/vendor/github.com/klauspost/compress/s2/decode_other.go index 4cb61001..7ee037b9 100644 --- a/vendor/github.com/klauspost/compress/s2/decode_other.go +++ b/vendor/github.com/klauspost/compress/s2/decode_other.go @@ -54,9 +54,6 @@ func s2Decode(dst, src []byte) int { x = uint32(src[s-4]) | uint32(src[s-3])<<8 | uint32(src[s-2])<<16 | uint32(src[s-1])<<24 } length = int(x) + 1 - if length <= 0 { - return decodeErrCodeUnsupportedLiteralLength - } if length > len(dst)-d || length > len(src)-s { return decodeErrCodeCorrupt } diff --git a/vendor/github.com/klauspost/compress/s2/encode.go b/vendor/github.com/klauspost/compress/s2/encode.go index cdb3ab45..8f89e21a 100644 --- a/vendor/github.com/klauspost/compress/s2/encode.go +++ b/vendor/github.com/klauspost/compress/s2/encode.go @@ -100,6 +100,48 @@ func EncodeBetter(dst, src []byte) []byte { return dst[:d] } +// EncodeBest returns the encoded form of src. The returned slice may be a sub- +// slice of dst if dst was large enough to hold the entire encoded block. +// Otherwise, a newly allocated slice will be returned. +// +// EncodeBest compresses as good as reasonably possible but with a +// big speed decrease. +// +// The dst and src must not overlap. It is valid to pass a nil dst. +// +// The blocks will require the same amount of memory to decode as encoding, +// and does not make for concurrent decoding. +// Also note that blocks do not contain CRC information, so corruption may be undetected. +// +// If you need to encode larger amounts of data, consider using +// the streaming interface which gives all of these features. +func EncodeBest(dst, src []byte) []byte { + if n := MaxEncodedLen(len(src)); n < 0 { + panic(ErrTooLarge) + } else if len(dst) < n { + dst = make([]byte, n) + } + + // The block starts with the varint-encoded length of the decompressed bytes. + d := binary.PutUvarint(dst, uint64(len(src))) + + if len(src) == 0 { + return dst[:d] + } + if len(src) < minNonLiteralBlockSize { + d += emitLiteral(dst[d:], src) + return dst[:d] + } + n := encodeBlockBest(dst[d:], src) + if n > 0 { + d += n + return dst[:d] + } + // Not compressible + d += emitLiteral(dst[d:], src) + return dst[:d] +} + // EncodeSnappy returns the encoded form of src. The returned slice may be a sub- // slice of dst if dst was large enough to hold the entire encoded block. // Otherwise, a newly allocated slice will be returned. @@ -239,6 +281,7 @@ func NewWriter(w io.Writer, opts ...WriterOption) *Writer { blockSize: defaultBlockSize, concurrency: runtime.GOMAXPROCS(0), randSrc: rand.Reader, + level: levelFast, } for _, opt := range opts { if err := opt(&w2); err != nil { @@ -279,10 +322,16 @@ type Writer struct { // wroteStreamHeader is whether we have written the stream header. wroteStreamHeader bool paramsOK bool - better bool - uncompressed bool + level uint8 } +const ( + levelUncompressed = iota + 1 + levelFast + levelBetter + levelBest +) + type result []byte // err returns the previously set error. @@ -399,6 +448,13 @@ func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) { return 0, err } } + if br, ok := r.(byter); ok { + buf := br.Bytes() + if err := w.EncodeBuffer(buf); err != nil { + return 0, err + } + return int64(len(buf)), w.Flush() + } for { inbuf := w.buffers.Get().([]byte)[:w.blockSize+obufHeaderLen] n2, err := io.ReadFull(r, inbuf[obufHeaderLen:]) @@ -483,10 +539,13 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) { // Attempt compressing. n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed))) var n2 int - if w.better { - n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed) - } else if !w.uncompressed { + switch w.level { + case levelFast: n2 = encodeBlock(obuf[obufHeaderLen+n:], uncompressed) + case levelBetter: + n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed) + case levelBest: + n2 = encodeBlockBest(obuf[obufHeaderLen+n:], uncompressed) } // Check if we should use this, or store as uncompressed instead. @@ -560,10 +619,13 @@ func (w *Writer) write(p []byte) (nRet int, errRet error) { // Attempt compressing. n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed))) var n2 int - if w.better { - n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed) - } else if !w.uncompressed { + switch w.level { + case levelFast: n2 = encodeBlock(obuf[obufHeaderLen+n:], uncompressed) + case levelBetter: + n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed) + case levelBest: + n2 = encodeBlockBest(obuf[obufHeaderLen+n:], uncompressed) } // Check if we should use this, or store as uncompressed instead. @@ -636,10 +698,13 @@ func (w *Writer) writeFull(inbuf []byte) (errRet error) { // Attempt compressing. n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed))) var n2 int - if w.better { - n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed) - } else if !w.uncompressed { + switch w.level { + case levelFast: n2 = encodeBlock(obuf[obufHeaderLen+n:], uncompressed) + case levelBetter: + n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed) + case levelBest: + n2 = encodeBlockBest(obuf[obufHeaderLen+n:], uncompressed) } // Check if we should use this, or store as uncompressed instead. @@ -705,10 +770,13 @@ func (w *Writer) writeSync(p []byte) (nRet int, errRet error) { // Attempt compressing. n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed))) var n2 int - if w.better { - n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed) - } else if !w.uncompressed { + switch w.level { + case levelFast: n2 = encodeBlock(obuf[obufHeaderLen+n:], uncompressed) + case levelBetter: + n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed) + case levelBest: + n2 = encodeBlockBest(obuf[obufHeaderLen+n:], uncompressed) } if n2 > 0 { @@ -880,8 +948,17 @@ func WriterConcurrency(n int) WriterOption { // 10-40% speed decrease on both compression and decompression. func WriterBetterCompression() WriterOption { return func(w *Writer) error { - w.uncompressed = false - w.better = true + w.level = levelBetter + return nil + } +} + +// WriterBestCompression will enable better compression. +// EncodeBetter compresses better than Encode but typically with a +// big speed decrease on compression. +func WriterBestCompression() WriterOption { + return func(w *Writer) error { + w.level = levelBest return nil } } @@ -891,8 +968,7 @@ func WriterBetterCompression() WriterOption { // If concurrency is > 1 CRC and output will still be done async. func WriterUncompressed() WriterOption { return func(w *Writer) error { - w.better = false - w.uncompressed = true + w.level = levelUncompressed return nil } } @@ -907,7 +983,7 @@ func WriterUncompressed() WriterOption { // Default block size is 1MB. func WriterBlockSize(n int) WriterOption { return func(w *Writer) error { - if w.blockSize > maxBlockSize || w.blockSize < minBlockSize { + if n > maxBlockSize || n < minBlockSize { return errors.New("s2: block size too large. Must be <= 4MB and >=4KB") } w.blockSize = n diff --git a/vendor/github.com/klauspost/compress/s2/encode_best.go b/vendor/github.com/klauspost/compress/s2/encode_best.go new file mode 100644 index 00000000..4b571bf3 --- /dev/null +++ b/vendor/github.com/klauspost/compress/s2/encode_best.go @@ -0,0 +1,252 @@ +// Copyright 2016 The Snappy-Go Authors. All rights reserved. +// Copyright (c) 2019 Klaus Post. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package s2 + +import ( + "fmt" + "math/bits" +) + +// encodeBlockBest encodes a non-empty src to a guaranteed-large-enough dst. It +// assumes that the varint-encoded length of the decompressed bytes has already +// been written. +// +// It also assumes that: +// len(dst) >= MaxEncodedLen(len(src)) && +// minNonLiteralBlockSize <= len(src) && len(src) <= maxBlockSize +func encodeBlockBest(dst, src []byte) (d int) { + // Initialize the hash tables. + const ( + // Long hash matches. + lTableBits = 19 + maxLTableSize = 1 << lTableBits + + // Short hash matches. + sTableBits = 16 + maxSTableSize = 1 << sTableBits + + inputMargin = 8 + 2 + ) + + var lTable [maxLTableSize]uint64 + var sTable [maxSTableSize]uint64 + + // sLimit is when to stop looking for offset/length copies. The inputMargin + // lets us use a fast path for emitLiteral in the main loop, while we are + // looking for copies. + sLimit := len(src) - inputMargin + if len(src) < minNonLiteralBlockSize { + return 0 + } + + // Bail if we can't compress to at least this. + dstLimit := len(src) - 5 + + // nextEmit is where in src the next emitLiteral should start from. + nextEmit := 0 + + // The encoded form must start with a literal, as there are no previous + // bytes to copy, so we start looking for hash matches at s == 1. + s := 1 + cv := load64(src, s) + + // We search for a repeat at -1, but don't output repeats when nextEmit == 0 + repeat := 1 + const lowbitMask = 0xffffffff + getCur := func(x uint64) int { + return int(x & lowbitMask) + } + getPrev := func(x uint64) int { + return int(x >> 32) + } + + for { + type match struct { + offset int + s int + length int + rep bool + } + var best match + for { + // Next src position to check + nextS := s + (s-nextEmit)>>8 + 1 + if nextS > sLimit { + goto emitRemainder + } + hashL := hash8(cv, lTableBits) + hashS := hash4(cv, sTableBits) + candidateL := lTable[hashL] + candidateS := sTable[hashS] + + matchAt := func(offset, s int, first uint32, rep bool) match { + if best.length != 0 && best.s-best.offset == s-offset { + // Don't retest if we have the same offset. + return match{offset: offset, s: s} + } + if load32(src, offset) != first { + return match{offset: offset, s: s} + } + m := match{offset: offset, s: s, length: 4, rep: rep} + s += 4 + for s <= sLimit { + if diff := load64(src, s) ^ load64(src, offset+m.length); diff != 0 { + m.length += bits.TrailingZeros64(diff) >> 3 + break + } + s += 8 + m.length += 8 + } + return m + } + + bestOf := func(a, b match) match { + aScore := b.s - a.s + a.length + bScore := a.s - b.s + b.length + if !a.rep { + // Estimate bytes needed to store offset. + offset := a.s - a.offset + if offset >= 65536 { + aScore -= 5 + } else { + aScore -= 3 + } + } + if !b.rep { + // Estimate bytes needed to store offset. + offset := b.s - b.offset + if offset >= 65536 { + bScore -= 5 + } else { + bScore -= 3 + } + } + if aScore >= bScore { + return a + } + return b + } + + best = bestOf(matchAt(getCur(candidateL), s, uint32(cv), false), matchAt(getPrev(candidateL), s, uint32(cv), false)) + best = bestOf(best, matchAt(getCur(candidateS), s, uint32(cv), false)) + best = bestOf(best, matchAt(getPrev(candidateS), s, uint32(cv), false)) + + { + best = bestOf(best, matchAt(s-repeat+1, s+1, uint32(cv>>8), true)) + if best.length > 0 { + // s+1 + nextShort := sTable[hash4(cv>>8, sTableBits)] + s := s + 1 + cv := load64(src, s) + nextLong := lTable[hash8(cv, lTableBits)] + best = bestOf(best, matchAt(getCur(nextShort), s, uint32(cv), false)) + best = bestOf(best, matchAt(getPrev(nextShort), s, uint32(cv), false)) + best = bestOf(best, matchAt(getCur(nextLong), s, uint32(cv), false)) + best = bestOf(best, matchAt(getPrev(nextLong), s, uint32(cv), false)) + // Repeat at + 2 + best = bestOf(best, matchAt(s-repeat+1, s+1, uint32(cv>>8), true)) + + // s+2 + if true { + nextShort = sTable[hash4(cv>>8, sTableBits)] + s++ + cv = load64(src, s) + nextLong = lTable[hash8(cv, lTableBits)] + best = bestOf(best, matchAt(getCur(nextShort), s, uint32(cv), false)) + best = bestOf(best, matchAt(getPrev(nextShort), s, uint32(cv), false)) + best = bestOf(best, matchAt(getCur(nextLong), s, uint32(cv), false)) + best = bestOf(best, matchAt(getPrev(nextLong), s, uint32(cv), false)) + } + } + } + + // Update table + lTable[hashL] = uint64(s) | candidateL<<32 + sTable[hashS] = uint64(s) | candidateS<<32 + + if best.length > 0 { + break + } + + cv = load64(src, nextS) + s = nextS + } + + // Extend backwards, not needed for repeats... + s = best.s + if !best.rep { + for best.offset > 0 && s > nextEmit && src[best.offset-1] == src[s-1] { + best.offset-- + best.length++ + s-- + } + } + if false && best.offset >= s { + panic(fmt.Errorf("t %d >= s %d", best.offset, s)) + } + // Bail if we exceed the maximum size. + if d+(s-nextEmit) > dstLimit { + return 0 + } + + base := s + offset := s - best.offset + + s += best.length + + if offset > 65535 && s-base <= 5 && !best.rep { + // Bail if the match is equal or worse to the encoding. + s = best.s + 1 + if s >= sLimit { + goto emitRemainder + } + cv = load64(src, s) + continue + } + d += emitLiteral(dst[d:], src[nextEmit:base]) + if best.rep { + if nextEmit > 0 { + // same as `add := emitCopy(dst[d:], repeat, s-base)` but skips storing offset. + d += emitRepeat(dst[d:], offset, best.length) + } else { + // First match, cannot be repeat. + d += emitCopy(dst[d:], offset, best.length) + } + } else { + d += emitCopy(dst[d:], offset, best.length) + } + repeat = offset + + nextEmit = s + if s >= sLimit { + goto emitRemainder + } + + if d > dstLimit { + // Do we have space for more, if not bail. + return 0 + } + // Fill tables... + for i := best.s + 1; i < s; i++ { + cv0 := load64(src, i) + long0 := hash8(cv0, lTableBits) + short0 := hash4(cv0, sTableBits) + lTable[long0] = uint64(i) | lTable[long0]<<32 + sTable[short0] = uint64(i) | sTable[short0]<<32 + } + cv = load64(src, s) + } + +emitRemainder: + if nextEmit < len(src) { + // Bail if we exceed the maximum size. + if d+len(src)-nextEmit > dstLimit { + return 0 + } + d += emitLiteral(dst[d:], src[nextEmit:]) + } + return d +} diff --git a/vendor/github.com/klauspost/compress/s2/encode_better.go b/vendor/github.com/klauspost/compress/s2/encode_better.go index 6e159173..f4c5e04d 100644 --- a/vendor/github.com/klauspost/compress/s2/encode_better.go +++ b/vendor/github.com/klauspost/compress/s2/encode_better.go @@ -63,6 +63,9 @@ func encodeBlockBetter(dst, src []byte) (d int) { // lets us use a fast path for emitLiteral in the main loop, while we are // looking for copies. sLimit := len(src) - inputMargin + if len(src) < minNonLiteralBlockSize { + return 0 + } // Bail if we can't compress to at least this. dstLimit := len(src) - len(src)>>5 - 5 diff --git a/vendor/github.com/klauspost/compress/s2/s2.go b/vendor/github.com/klauspost/compress/s2/s2.go index c98da6cd..a3e4abfe 100644 --- a/vendor/github.com/klauspost/compress/s2/s2.go +++ b/vendor/github.com/klauspost/compress/s2/s2.go @@ -35,6 +35,7 @@ package s2 import ( + "bytes" "hash/crc32" ) @@ -127,3 +128,9 @@ func literalExtraSize(n int64) int64 { return 5 } } + +type byter interface { + Bytes() []byte +} + +var _ byter = &bytes.Buffer{} diff --git a/vendor/modules.txt b/vendor/modules.txt index c2d39c49..60277f2c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,4 +1,4 @@ -# github.com/klauspost/compress v1.11.4 +# github.com/klauspost/compress v1.11.7 ## explicit github.com/klauspost/compress/s2 # github.com/minio/highwayhash v1.0.0