1
0
mirror of https://github.com/restic/restic.git synced 2024-06-26 07:49:01 +02:00

Parallelize MasterIndex.Save()

This commit is contained in:
Alexander Weiss 2020-11-12 02:49:53 +01:00
parent 1ec628ddf5
commit 187c8fb259

View File

@ -7,6 +7,7 @@ import (
"github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui/progress" "github.com/restic/restic/internal/ui/progress"
"golang.org/x/sync/errgroup"
) )
// MasterIndex is a collection of indexes and IDs of chunks that are in the process of being saved. // MasterIndex is a collection of indexes and IDs of chunks that are in the process of being saved.
@ -261,10 +262,12 @@ func (mi *MasterIndex) MergeFinalIndexes() {
mi.idx = newIdx mi.idx = newIdx
} }
const saveIndexParallelism = 4
// Save saves all known indexes to index files, leaving out any // Save saves all known indexes to index files, leaving out any
// packs whose ID is contained in packBlacklist. The new index contains the IDs // packs whose ID is contained in packBlacklist from finalized indexes.
// of all known indexes in the "supersedes" field. The IDs are also returned in // The new index contains the IDs of all known indexes in the "supersedes"
// the IDSet obsolete // field. The IDs are also returned in the IDSet obsolete.
// After calling this function, you should remove the obsolete index files. // After calling this function, you should remove the obsolete index files.
func (mi *MasterIndex) Save(ctx context.Context, repo restic.Repository, packBlacklist restic.IDSet, extraObsolete restic.IDs, p *progress.Counter) (obsolete restic.IDSet, err error) { func (mi *MasterIndex) Save(ctx context.Context, repo restic.Repository, packBlacklist restic.IDSet, extraObsolete restic.IDs, p *progress.Counter) (obsolete restic.IDSet, err error) {
mi.idxMutex.Lock() mi.idxMutex.Lock()
@ -275,50 +278,79 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.Repository, packBla
newIndex := NewIndex() newIndex := NewIndex()
obsolete = restic.NewIDSet() obsolete = restic.NewIDSet()
for i, idx := range mi.idx { // track spawned goroutines using wg, create a new context which is
if idx.Final() { // cancelled as soon as an error occurs.
ids, err := idx.IDs() wg, ctx := errgroup.WithContext(ctx)
if err != nil {
debug.Log("index %d does not have an ID: %v", err)
return nil, err
}
debug.Log("adding index ids %v to supersedes field", ids) ch := make(chan *Index)
err = newIndex.AddToSupersedes(ids...) wg.Go(func() error {
if err != nil { defer close(ch)
return nil, err for i, idx := range mi.idx {
} if idx.Final() {
obsolete.Merge(restic.NewIDSet(ids...)) ids, err := idx.IDs()
} else { if err != nil {
debug.Log("index %d isn't final, don't add to supersedes field", i) debug.Log("index %d does not have an ID: %v", err)
} return err
}
debug.Log("adding index %d", i)
debug.Log("adding index ids %v to supersedes field", ids)
for pbs := range idx.EachByPack(ctx, packBlacklist) {
newIndex.StorePack(pbs.packID, pbs.blobs) err = newIndex.AddToSupersedes(ids...)
p.Add(1) if err != nil {
if IndexFull(newIndex) { return err
newIndex.Finalize() }
if _, err := SaveIndex(ctx, repo, newIndex); err != nil { obsolete.Merge(restic.NewIDSet(ids...))
return nil, err } else {
debug.Log("index %d isn't final, don't add to supersedes field", i)
}
debug.Log("adding index %d", i)
for pbs := range idx.EachByPack(ctx, packBlacklist) {
newIndex.StorePack(pbs.packID, pbs.blobs)
p.Add(1)
if IndexFull(newIndex) {
select {
case ch <- newIndex:
case <-ctx.Done():
return nil
}
newIndex = NewIndex()
} }
newIndex = NewIndex()
} }
} }
err = newIndex.AddToSupersedes(extraObsolete...)
if err != nil {
return err
}
obsolete.Merge(restic.NewIDSet(extraObsolete...))
select {
case ch <- newIndex:
case <-ctx.Done():
}
return nil
})
// a worker receives an index from ch, and saves the index
worker := func() error {
for idx := range ch {
idx.Finalize()
if _, err := SaveIndex(ctx, repo, idx); err != nil {
return err
}
}
return nil
} }
err = newIndex.AddToSupersedes(extraObsolete...) // run workers on ch
if err != nil { wg.Go(func() error {
return nil, err return RunWorkers(saveIndexParallelism, worker)
} })
obsolete.Merge(restic.NewIDSet(extraObsolete...))
newIndex.Finalize() err = wg.Wait()
if _, err := SaveIndex(ctx, repo, newIndex); err != nil {
return nil, err
}
return return obsolete, err
} }