mirror of
				https://github.com/taigrr/bitcask
				synced 2025-01-18 04:03:17 -08:00 
			
		
		
		
	cmd/bitcask: add recovery tool for datafiles (#102)
* cmd/bitcask: refactor recovery index logic * cmd/bitcask: first version of datafile recovery tool * cmd/bitcask: finished recovery datafile tool * cmd/bitcask: temporary script to test recovery tool * cmd/bitcask: remove commited binary file * cmd/bitcask: delete bash test
This commit is contained in:
		
							parent
							
								
									f17187a5c7
								
							
						
					
					
						commit
						16a7feb603
					
				| @ -1,11 +1,15 @@ | ||||
| package main | ||||
| 
 | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"os" | ||||
| 	"path/filepath" | ||||
| 
 | ||||
| 	"github.com/prologic/bitcask" | ||||
| 	"github.com/prologic/bitcask/internal" | ||||
| 	"github.com/prologic/bitcask/internal/config" | ||||
| 	"github.com/prologic/bitcask/internal/data/codec" | ||||
| 	"github.com/prologic/bitcask/internal/index" | ||||
| 	log "github.com/sirupsen/logrus" | ||||
| 	"github.com/spf13/cobra" | ||||
| @ -37,37 +41,101 @@ func init() { | ||||
| 
 | ||||
| func recover(path string, dryRun bool) int { | ||||
| 	maxKeySize := bitcask.DefaultMaxKeySize | ||||
| 	maxValueSize := bitcask.DefaultMaxValueSize | ||||
| 	if cfg, err := config.Load(filepath.Join(path, "config.json")); err == nil { | ||||
| 		maxKeySize = cfg.MaxKeySize | ||||
| 		maxValueSize = cfg.MaxValueSize | ||||
| 	} | ||||
| 
 | ||||
| 	if err := recoverIndex(filepath.Join(path, "index"), maxKeySize, dryRun); err != nil { | ||||
| 		log.WithError(err).Info("recovering index file") | ||||
| 		return 1 | ||||
| 	} | ||||
| 
 | ||||
| 	datafiles, err := internal.GetDatafiles(path) | ||||
| 	if err != nil { | ||||
| 		log.WithError(err).Info("coudn't list existing datafiles") | ||||
| 		return 1 | ||||
| 	} | ||||
| 	for _, file := range datafiles { | ||||
| 		if err := recoverDatafile(file, maxKeySize, maxValueSize, dryRun); err != nil { | ||||
| 			log.WithError(err).Info("recovering data file") | ||||
| 			return 1 | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return 0 | ||||
| } | ||||
| 
 | ||||
| func recoverIndex(path string, maxKeySize uint32, dryRun bool) error { | ||||
| 	t, found, err := index.NewIndexer().Load(path, maxKeySize) | ||||
| 	if err != nil && !index.IsIndexCorruption(err) { | ||||
| 		log.WithError(err).Info("error while opening the index file") | ||||
| 		log.WithError(err).Info("opening the index file") | ||||
| 	} | ||||
| 	if !found { | ||||
| 		log.Info("index file doesn't exist, will be recreated on next run.") | ||||
| 		return 0 | ||||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
| 	if err == nil { | ||||
| 		log.Debug("index file is not corrupted") | ||||
| 		return 0 | ||||
| 		return nil | ||||
| 	} | ||||
| 	log.Debugf("index file is corrupted: %v", err) | ||||
| 
 | ||||
| 	if dryRun { | ||||
| 		log.Debug("dry-run mode, not writing to a file") | ||||
| 		return 0 | ||||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
| 	// Leverage that t has the partiatially read tree even on corrupted files | ||||
| 	err = index.NewIndexer().Save(t, "index.recovered") | ||||
| 	if err != nil { | ||||
| 		log.WithError(err).Info("error while writing the recovered index file") | ||||
| 		return 1 | ||||
| 		return fmt.Errorf("writing the recovered index file: %w", err) | ||||
| 	} | ||||
| 	log.Debug("the index was recovered in the index.recovered new file") | ||||
| 
 | ||||
| 	return 0 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func recoverDatafile(path string, maxKeySize uint32, maxValueSize uint64, dryRun bool) error { | ||||
| 	f, err := os.Open(path) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("opening the datafile: %w", err) | ||||
| 	} | ||||
| 	defer f.Close() | ||||
| 	_, file := filepath.Split(path) | ||||
| 	fr, err := os.OpenFile(fmt.Sprintf("%s.recovered", file), os.O_CREATE|os.O_WRONLY, os.ModePerm) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("creating the recovered datafile: %w", err) | ||||
| 	} | ||||
| 	defer fr.Close() | ||||
| 
 | ||||
| 	dec := codec.NewDecoder(f, maxKeySize, maxValueSize) | ||||
| 	enc := codec.NewEncoder(fr) | ||||
| 	e := internal.Entry{} | ||||
| 	for { | ||||
| 		_, err = dec.Decode(&e) | ||||
| 		if err == io.EOF { | ||||
| 			break | ||||
| 		} | ||||
| 		if codec.IsCorruptedData(err) { | ||||
| 			log.Debugf("%s is corrupted, a best-effort recovery was done", file) | ||||
| 			return nil | ||||
| 		} | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("unexpected error while reading datafile: %w", err) | ||||
| 		} | ||||
| 		if dryRun { | ||||
| 			continue | ||||
| 		} | ||||
| 		if _, err := enc.Encode(e); err != nil { | ||||
| 			return fmt.Errorf("writing to recovered datafile: %w", err) | ||||
| 		} | ||||
| 	} | ||||
| 	if err := os.Remove(fr.Name()); err != nil { | ||||
| 		return fmt.Errorf("can't remove temporal recovered datafile: %w", err) | ||||
| 	} | ||||
| 	log.Debugf("%s is not corrupted", file) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user