Merge pull request #2842 from aawsome/rebuild-index-inmem

Rebuild index in prune by using in-memory index
This commit is contained in:
MichaelEischer 2020-11-06 20:51:20 +01:00 committed by GitHub
commit 4707bdb204
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 280 additions and 48 deletions

View File

@ -3,6 +3,11 @@ Enhancement: Improve pruning performance and make pruning more customizable
The `prune` command is now much faster. This is especially the case for remote
repositories or repositories with not much data to remove.
Also the memory usage of the `prune` command is now reduced.
Restic used to rebuild the index from scratch after pruning. This could lead
to missing packs in the index in some cases for eventually consistent
backends, like e.g. AWS S3.
This behavior is now changed and the index rebuilding uses the information
already known by `prune`.
By default, the `prune` command no longer removes all unused data. This
behavior can be fine-tuned by new options, like the acceptable amount of unused space or
@ -14,9 +19,11 @@ also shows what `prune` would do.
Fixes several open issues, e.g.:
https://github.com/restic/restic/issues/1140
https://github.com/restic/restic/issues/1599
https://github.com/restic/restic/issues/1985
https://github.com/restic/restic/issues/2112
https://github.com/restic/restic/issues/2227
https://github.com/restic/restic/issues/2305
https://github.com/restic/restic/pull/2718
https://github.com/restic/restic/pull/2842

View File

@ -471,19 +471,31 @@ func prune(opts PruneOptions, gopts GlobalOptions, repo restic.Repository, usedB
DeleteFiles(gopts, repo, removePacksFirst, restic.PackFile)
}
packsAddedByRepack := 0
if len(repackPacks) != 0 {
// Remember the number of unique packs before repacking
packsBeforeRepacking := len(repo.Index().Packs())
Verbosef("repacking packs\n")
bar := newProgressMax(!gopts.Quiet, uint64(len(repackPacks)), "packs repacked")
_, err := repository.Repack(ctx, repo, repackPacks, keepBlobs, bar)
if err != nil {
return err
}
// Since repacking will only add new packs, we can calculate the number
// of packs like this:
packsAddedByRepack = len(repo.Index().Packs()) - packsBeforeRepacking
// Also remove repacked packs
removePacks.Merge(repackPacks)
}
if len(removePacks) != 0 {
if err = rebuildIndex(ctx, repo, removePacks); err != nil {
totalpacks := int(stats.packs.used+stats.packs.partlyUsed+stats.packs.unused) -
len(removePacks) + packsAddedByRepack
err = rebuildIndexFiles(gopts, repo, removePacks, uint64(totalpacks))
if err != nil {
return err
}
@ -495,6 +507,20 @@ func prune(opts PruneOptions, gopts GlobalOptions, repo restic.Repository, usedB
return nil
}
func rebuildIndexFiles(gopts GlobalOptions, repo restic.Repository, removePacks restic.IDSet, packcount uint64) error {
Verbosef("rebuilding index\n")
bar := newProgressMax(!gopts.Quiet, packcount, "packs processed")
obsoleteIndexes, err := (repo.Index()).(*repository.MasterIndex).
Save(gopts.ctx, repo, removePacks, bar)
if err != nil {
return err
}
Verbosef("deleting obsolete index files\n")
return DeleteFilesChecked(gopts, repo, obsoleteIndexes, restic.IndexFile)
}
func getUsedBlobs(gopts GlobalOptions, repo restic.Repository, snapshots []*restic.Snapshot) (usedBlobs restic.BlobSet, err error) {
ctx := gopts.ctx

View File

@ -1559,6 +1559,62 @@ func testEdgeCaseRepo(t *testing.T, tarfile string, optionsCheck CheckOptions, o
}
}
// a listOnceBackend only allows listing once per filetype
// listing filetypes more than once may cause problems with eventually consistent
// backends (like e.g. AWS S3) as the second listing may be inconsistent to what
// is expected by the first listing + some operations.
type listOnceBackend struct {
restic.Backend
listedFileType map[restic.FileType]bool
}
func newListOnceBackend(be restic.Backend) *listOnceBackend {
return &listOnceBackend{
Backend: be,
listedFileType: make(map[restic.FileType]bool),
}
}
func (be *listOnceBackend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error {
if t != restic.LockFile && be.listedFileType[t] {
return errors.Errorf("tried listing type %v the second time", t)
}
be.listedFileType[t] = true
return be.Backend.List(ctx, t, fn)
}
func TestPruneListOnce(t *testing.T) {
env, cleanup := withTestEnvironment(t)
defer cleanup()
env.gopts.backendTestHook = func(r restic.Backend) (restic.Backend, error) {
return newListOnceBackend(r), nil
}
pruneOpts := PruneOptions{MaxUnused: "0"}
checkOpts := CheckOptions{ReadData: true, CheckUnused: true}
testSetupBackupData(t, env)
opts := BackupOptions{}
testRunBackup(t, "", []string{filepath.Join(env.testdata, "0", "0", "9")}, opts, env.gopts)
firstSnapshot := testRunList(t, "snapshots", env.gopts)
rtest.Assert(t, len(firstSnapshot) == 1,
"expected one snapshot, got %v", firstSnapshot)
testRunBackup(t, "", []string{filepath.Join(env.testdata, "0", "0", "9", "2")}, opts, env.gopts)
testRunBackup(t, "", []string{filepath.Join(env.testdata, "0", "0", "9", "3")}, opts, env.gopts)
snapshotIDs := testRunList(t, "snapshots", env.gopts)
rtest.Assert(t, len(snapshotIDs) == 3,
"expected 3 snapshot, got %v", snapshotIDs)
testRunForgetJSON(t, env.gopts)
testRunForget(t, env.gopts, firstSnapshot[0].String())
testRunPrune(t, env.gopts, pruneOpts)
rtest.OK(t, runCheck(checkOpts, env.gopts, nil))
}
func TestHardLink(t *testing.T) {
// this test assumes a test set with a single directory containing hard linked files
env, cleanup := withTestEnvironment(t)

View File

@ -99,12 +99,10 @@ command must be run:
repacking packs
[0:00] 100.00% 2 / 2 packs repacked
counting files in repo
[0:00] 100.00% 3 / 3 packs
finding old index files
saved new indexes as [59270b3a]
remove 4 old index files
[0:00] 100.00% 4 / 4 files deleted
rebuilding index
[0:00] 100.00% 3 / 3 packs processed
deleting obsolete index files
[0:00] 100.00% 3 / 3 files deleted
removing 3 old packs
[0:00] 100.00% 3 / 3 files deleted
done
@ -147,12 +145,10 @@ to ``forget``:
repacking packs
[0:00] 100.00% 2 / 2 packs repacked
counting files in repo
[0:00] 100.00% 3 / 3 packs
finding old index files
saved new indexes as [59270b3a]
remove 4 old index files
[0:00] 100.00% 4 / 4 files deleted
rebuilding index
[0:00] 100.00% 3 / 3 packs processed
deleting obsolete index files
[0:00] 100.00% 3 / 3 files deleted
removing 3 old packs
[0:00] 100.00% 3 / 3 files deleted
done

View File

@ -275,6 +275,55 @@ func (idx *Index) Each(ctx context.Context) <-chan restic.PackedBlob {
return ch
}
type EachByPackResult struct {
packID restic.ID
blobs []restic.Blob
}
// EachByPack returns a channel that yields all blobs known to the index
// grouped by packID but ignoring blobs with a packID in packPlacklist.
// When the context is cancelled, the background goroutine
// terminates. This blocks any modification of the index.
func (idx *Index) EachByPack(ctx context.Context, packBlacklist restic.IDSet) <-chan EachByPackResult {
idx.m.Lock()
ch := make(chan EachByPackResult)
go func() {
defer idx.m.Unlock()
defer func() {
close(ch)
}()
for typ := range idx.byType {
byPack := make(map[restic.ID][]*indexEntry)
m := &idx.byType[typ]
m.foreach(func(e *indexEntry) bool {
packID := idx.packs[e.packIndex]
if !packBlacklist.Has(packID) {
byPack[packID] = append(byPack[packID], e)
}
return true
})
for packID, pack := range byPack {
var result EachByPackResult
result.packID = packID
for _, e := range pack {
result.blobs = append(result.blobs, idx.toPackedBlob(e, restic.BlobType(typ)).Blob)
}
select {
case <-ctx.Done():
return
case ch <- result:
}
}
}
}()
return ch
}
// Packs returns all packs in this index
func (idx *Index) Packs() restic.IDSet {
idx.m.Lock()

View File

@ -97,6 +97,19 @@ func (mi *MasterIndex) Has(id restic.ID, tpe restic.BlobType) bool {
return false
}
// Packs returns all packs that are covered by the index.
func (mi *MasterIndex) Packs() restic.IDSet {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
packs := restic.NewIDSet()
for _, idx := range mi.idx {
packs.Merge(idx.Packs())
}
return packs
}
// Count returns the number of blobs of type t in the index.
func (mi *MasterIndex) Count(t restic.BlobType) (n uint) {
mi.idxMutex.RLock()
@ -248,49 +261,66 @@ func (mi *MasterIndex) MergeFinalIndexes() {
mi.idx = newIdx
}
// RebuildIndex combines all known indexes to a new index, leaving out any
// Save saves all known indexes to index files, leaving out any
// packs whose ID is contained in packBlacklist. The new index contains the IDs
// of all known indexes in the "supersedes" field.
func (mi *MasterIndex) RebuildIndex(ctx context.Context, packBlacklist restic.IDSet) (*Index, error) {
// of all known indexes in the "supersedes" field. The IDs are also returned in
// the IDSet obsolete
// After calling this function, you should remove the obsolete index files.
func (mi *MasterIndex) Save(ctx context.Context, repo restic.Repository, packBlacklist restic.IDSet, p *restic.Progress) (obsolete restic.IDSet, err error) {
p.Start()
defer p.Done()
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
debug.Log("start rebuilding index of %d indexes, pack blacklist: %v", len(mi.idx), packBlacklist)
newIndex := NewIndex()
obsolete = restic.NewIDSet()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for i, idx := range mi.idx {
debug.Log("adding index %d", i)
for pb := range idx.Each(ctx) {
if packBlacklist.Has(pb.PackID) {
continue
}
newIndex.Store(pb)
}
if !idx.Final() {
debug.Log("index %d isn't final, don't add to supersedes field", i)
continue
}
ids, err := idx.IDs()
if err != nil {
debug.Log("index %d does not have an ID: %v", err)
return nil, err
}
debug.Log("adding index ids %v to supersedes field", ids)
err = newIndex.AddToSupersedes(ids...)
if err != nil {
return nil, err
finalize := func() error {
newIndex.Finalize()
if _, err := SaveIndex(ctx, repo, newIndex); err != nil {
return err
}
newIndex = NewIndex()
return nil
}
return newIndex, nil
for i, idx := range mi.idx {
if idx.Final() {
ids, err := idx.IDs()
if err != nil {
debug.Log("index %d does not have an ID: %v", err)
return nil, err
}
debug.Log("adding index ids %v to supersedes field", ids)
err = newIndex.AddToSupersedes(ids...)
if err != nil {
return nil, err
}
obsolete.Merge(restic.NewIDSet(ids...))
} else {
debug.Log("index %d isn't final, don't add to supersedes field", i)
}
debug.Log("adding index %d", i)
for pbs := range idx.EachByPack(ctx, packBlacklist) {
newIndex.StorePack(pbs.packID, pbs.blobs)
p.Report(restic.Stat{Blobs: 1})
if IndexFull(newIndex) {
if err := finalize(); err != nil {
return nil, err
}
}
}
}
if err := finalize(); err != nil {
return nil, err
}
return
}

View File

@ -5,7 +5,9 @@ import (
"fmt"
"math/rand"
"testing"
"time"
"github.com/restic/restic/internal/checker"
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
@ -322,3 +324,65 @@ func BenchmarkMasterIndexLookupBlobSize(b *testing.B) {
mIdx.LookupSize(lookupID, restic.DataBlob)
}
}
var (
snapshotTime = time.Unix(1470492820, 207401672)
depth = 3
)
func createFilledRepo(t testing.TB, snapshots int, dup float32) (restic.Repository, func()) {
repo, cleanup := repository.TestRepository(t)
for i := 0; i < 3; i++ {
restic.TestCreateSnapshot(t, repo, snapshotTime.Add(time.Duration(i)*time.Second), depth, dup)
}
return repo, cleanup
}
func TestIndexSave(t *testing.T) {
repo, cleanup := createFilledRepo(t, 3, 0)
defer cleanup()
repo.LoadIndex(context.TODO())
obsoletes, err := repo.Index().(*repository.MasterIndex).Save(context.TODO(), repo, nil, nil)
if err != nil {
t.Fatalf("unable to save new index: %v", err)
}
for id := range obsoletes {
t.Logf("remove index %v", id.Str())
h := restic.Handle{Type: restic.IndexFile, Name: id.String()}
err = repo.Backend().Remove(context.TODO(), h)
if err != nil {
t.Errorf("error removing index %v: %v", id, err)
}
}
checker := checker.New(repo)
hints, errs := checker.LoadIndex(context.TODO())
for _, h := range hints {
t.Logf("hint: %v\n", h)
}
for _, err := range errs {
t.Errorf("checker found error: %v", err)
}
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
errCh := make(chan error)
go checker.Structure(ctx, errCh)
i := 0
for err := range errCh {
t.Errorf("checker returned error: %v", err)
i++
if i == 10 {
t.Errorf("more than 10 errors returned, skipping the rest")
cancel()
break
}
}
}

View File

@ -322,7 +322,10 @@ func (r *Repository) Flush(ctx context.Context) error {
return err
}
// Save index after flushing
// Save index after flushing only if noAutoIndexUpdate is not set
if r.noAutoIndexUpdate {
return nil
}
return r.SaveIndex(ctx)
}

View File

@ -62,6 +62,7 @@ type MasterIndex interface {
Has(ID, BlobType) bool
Lookup(ID, BlobType) []PackedBlob
Count(BlobType) uint
Packs() IDSet
// Each returns a channel that yields all blobs known to the index. When
// the context is cancelled, the background goroutine terminates. This