diff --git a/internal/checker/checker.go b/internal/checker/checker.go index 7255e990d..79b3cb4b9 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -74,82 +74,107 @@ func (err ErrOldIndexFormat) Error() string { // LoadIndex loads all index files. func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) { debug.Log("Start") - type indexRes struct { - Index *repository.Index - err error - ID string + + // track spawned goroutines using wg, create a new context which is + // cancelled as soon as an error occurs. + wg, ctx := errgroup.WithContext(ctx) + + type FileInfo struct { + restic.ID + Size int64 } - indexCh := make(chan indexRes) + type Result struct { + *repository.Index + restic.ID + Err error + } - worker := func(ctx context.Context, id restic.ID) error { - debug.Log("worker got index %v", id) - idx, err := repository.LoadIndexWithDecoder(ctx, c.repo, id, repository.DecodeIndex) - if errors.Cause(err) == repository.ErrOldIndexFormat { - debug.Log("index %v has old format", id) - hints = append(hints, ErrOldIndexFormat{id}) + ch := make(chan FileInfo) + resultCh := make(chan Result) - idx, err = repository.LoadIndexWithDecoder(ctx, c.repo, id, repository.DecodeOldIndex) + // send list of index files through ch, which is closed afterwards + wg.Go(func() error { + defer close(ch) + return c.repo.List(ctx, restic.IndexFile, func(id restic.ID, size int64) error { + select { + case <-ctx.Done(): + return nil + case ch <- FileInfo{id, size}: + } + return nil + }) + }) + + // a worker receives an index ID from ch, loads the index, and sends it to indexCh + worker := func() error { + for fi := range ch { + debug.Log("worker got file %v", fi.ID.Str()) + idx, err := repository.LoadIndexWithDecoder(ctx, c.repo, fi.ID, repository.DecodeIndex) + if errors.Cause(err) == repository.ErrOldIndexFormat { + debug.Log("index %v has old format", fi.ID.Str()) + hints = append(hints, ErrOldIndexFormat{fi.ID}) + + idx, err = repository.LoadIndexWithDecoder(ctx, c.repo, fi.ID, repository.DecodeOldIndex) + } + + err = errors.Wrapf(err, "error loading index %v", fi.ID.Str()) + + select { + case resultCh <- Result{idx, fi.ID, err}: + case <-ctx.Done(): + } } - - err = errors.Wrapf(err, "error loading index %v", id.Str()) - - select { - case indexCh <- indexRes{Index: idx, ID: id.String(), err: err}: - case <-ctx.Done(): - } - return nil } - go func() { - defer close(indexCh) - debug.Log("start loading indexes in parallel") - err := repository.FilesInParallel(ctx, c.repo.Backend(), restic.IndexFile, defaultParallelism, - repository.ParallelWorkFuncParseID(worker)) - debug.Log("loading indexes finished, error: %v", err) - if err != nil { - panic(err) - } - }() + // final closes indexCh after all workers have terminated + final := func() error { + close(resultCh) + return nil + } - done := make(chan struct{}) - defer close(done) + // run workers on ch + wg.Go(func() error { + return repository.RunWorkers(ctx, defaultParallelism, worker, final) + }) + // receive decoded indexes packToIndex := make(map[restic.ID]restic.IDSet) + wg.Go(func() error { + for res := range resultCh { + debug.Log("process index %v, err %v", res.ID, res.Err) - for res := range indexCh { - debug.Log("process index %v, err %v", res.ID, res.err) - - if res.err != nil { - errs = append(errs, res.err) - continue - } - - idxID, err := restic.ParseID(res.ID) - if err != nil { - errs = append(errs, errors.Errorf("unable to parse as index ID: %v", res.ID)) - continue - } - - c.indexes[idxID] = res.Index - c.masterIndex.Insert(res.Index) - - debug.Log("process blobs") - cnt := 0 - for blob := range res.Index.Each(ctx) { - c.packs.Insert(blob.PackID) - c.blobs.Insert(blob.ID) - c.blobRefs.M[blob.ID] = 0 - cnt++ - - if _, ok := packToIndex[blob.PackID]; !ok { - packToIndex[blob.PackID] = restic.NewIDSet() + if res.Err != nil { + errs = append(errs, res.Err) + continue } - packToIndex[blob.PackID].Insert(idxID) - } - debug.Log("%d blobs processed", cnt) + c.indexes[res.ID] = res.Index + c.masterIndex.Insert(res.Index) + + debug.Log("process blobs") + cnt := 0 + for blob := range res.Index.Each(ctx) { + c.packs.Insert(blob.PackID) + c.blobs.Insert(blob.ID) + c.blobRefs.M[blob.ID] = 0 + cnt++ + + if _, ok := packToIndex[blob.PackID]; !ok { + packToIndex[blob.PackID] = restic.NewIDSet() + } + packToIndex[blob.PackID].Insert(res.ID) + } + + debug.Log("%d blobs processed", cnt) + } + return nil + }) + + err := wg.Wait() + if err != nil { + errs = append(errs, err) } debug.Log("checking for duplicate packs") @@ -163,7 +188,7 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) { } } - err := c.repo.SetIndex(c.masterIndex) + err = c.repo.SetIndex(c.masterIndex) if err != nil { debug.Log("SetIndex returned error: %v", err) errs = append(errs, err) @@ -281,31 +306,52 @@ func loadSnapshotTreeIDs(ctx context.Context, repo restic.Repository) (restic.ID sync.Mutex } - snapshotWorker := func(ctx context.Context, strID string) error { - id, err := restic.ParseID(strID) - if err != nil { - return err - } + // track spawned goroutines using wg, create a new context which is + // cancelled as soon as an error occurs. + wg, ctx := errgroup.WithContext(ctx) - debug.Log("load snapshot %v", id) + ch := make(chan restic.ID) - treeID, err := loadTreeFromSnapshot(ctx, repo, id) - if err != nil { - errs.Lock() - errs.errs = append(errs.errs, err) - errs.Unlock() + // send list of index files through ch, which is closed afterwards + wg.Go(func() error { + defer close(ch) + return repo.List(ctx, restic.SnapshotFile, func(id restic.ID, size int64) error { + select { + case <-ctx.Done(): + return nil + case ch <- id: + } return nil + }) + }) + + // a worker receives an index ID from ch, loads the snapshot and the tree, + // and adds the result to errs and trees. + worker := func() error { + for id := range ch { + debug.Log("load snapshot %v", id) + + treeID, err := loadTreeFromSnapshot(ctx, repo, id) + if err != nil { + errs.Lock() + errs.errs = append(errs.errs, err) + errs.Unlock() + continue + } + + debug.Log("snapshot %v has tree %v", id, treeID) + trees.Lock() + trees.IDs = append(trees.IDs, treeID) + trees.Unlock() } - - debug.Log("snapshot %v has tree %v", id, treeID) - trees.Lock() - trees.IDs = append(trees.IDs, treeID) - trees.Unlock() - return nil } - err := repository.FilesInParallel(ctx, repo.Backend(), restic.SnapshotFile, defaultParallelism, snapshotWorker) + for i := 0; i < defaultParallelism; i++ { + wg.Go(worker) + } + + err := wg.Wait() if err != nil { errs.errs = append(errs.errs, err) } diff --git a/internal/repository/parallel.go b/internal/repository/parallel.go deleted file mode 100644 index 154b58bfa..000000000 --- a/internal/repository/parallel.go +++ /dev/null @@ -1,65 +0,0 @@ -package repository - -import ( - "context" - - "github.com/restic/restic/internal/debug" - "github.com/restic/restic/internal/restic" - "golang.org/x/sync/errgroup" -) - -// ParallelWorkFunc gets one file ID to work on. If an error is returned, -// processing stops. When the contect is cancelled the function should return. -type ParallelWorkFunc func(ctx context.Context, id string) error - -// ParallelIDWorkFunc gets one restic.ID to work on. If an error is returned, -// processing stops. When the context is cancelled the function should return. -type ParallelIDWorkFunc func(ctx context.Context, id restic.ID) error - -// FilesInParallel runs n workers of f in parallel, on the IDs that -// repo.List(t) yields. If f returns an error, the process is aborted and the -// first error is returned. -func FilesInParallel(ctx context.Context, repo restic.Lister, t restic.FileType, n int, f ParallelWorkFunc) error { - g, ctx := errgroup.WithContext(ctx) - - ch := make(chan string, n) - g.Go(func() error { - defer close(ch) - return repo.List(ctx, t, func(fi restic.FileInfo) error { - select { - case <-ctx.Done(): - case ch <- fi.Name: - } - return nil - }) - }) - - for i := 0; i < n; i++ { - g.Go(func() error { - for name := range ch { - err := f(ctx, name) - if err != nil { - return err - } - } - return nil - }) - } - - return g.Wait() -} - -// ParallelWorkFuncParseID converts a function that takes a restic.ID to a -// function that takes a string. Filenames that do not parse as a restic.ID -// are ignored. -func ParallelWorkFuncParseID(f ParallelIDWorkFunc) ParallelWorkFunc { - return func(ctx context.Context, s string) error { - id, err := restic.ParseID(s) - if err != nil { - debug.Log("invalid ID %q: %v", id, err) - return nil - } - - return f(ctx, id) - } -} diff --git a/internal/repository/parallel_test.go b/internal/repository/parallel_test.go deleted file mode 100644 index 7b4c4a583..000000000 --- a/internal/repository/parallel_test.go +++ /dev/null @@ -1,129 +0,0 @@ -package repository_test - -import ( - "context" - "math/rand" - "testing" - "time" - - "github.com/restic/restic/internal/errors" - "github.com/restic/restic/internal/restic" - - "github.com/restic/restic/internal/repository" - rtest "github.com/restic/restic/internal/test" -) - -type testIDs []string - -var lister = testIDs{ - "40bb581cd36de952985c97a3ff6b21df41ee897d4db2040354caa36a17ff5268", - "2e15811a4d14ffac66d36a9ff456019d8de4c10c949d45b643f8477d17e92ff3", - "70c11b3ed521ad6b76d905c002ca98b361fca06aca060a063432c7311155a4da", - "8056a33e75dccdda701b6c989c7ed0cb71bbb6da13c6427fe5986f0896cc91c0", - "79d8776200596aa0237b10d470f7b850b86f8a1a80988ef5c8bee2874ce992e2", - "f9f1f29791c6b79b90b35efd083f17a3b163bbbafb1a2fdf43d46d56cffda289", - "3834178d05d0f6dd07f872ee0262ff1ace0f0f375768227d3c902b0b66591369", - "66d5cc68c9186414806f366ae5493ce7f229212993750a4992be4030f6af28c5", - "ebca5af4f397944f68cd215e3dfa2b197a7ba0f7c17d65d9f7390d0a15cde296", - "d4511ce6ff732d106275a57e40745c599e987c0da44c42cddbef592aac102437", - "f366202f0bfeefaedd7b49e2f21a90d3cbddb97d257a74d788dd34e19a684dae", - "a5c17728ab2433cd50636dd5c6c7068c7a44f2999d09c46e8f528466da8a059d", - "bae0f9492b9b208233029b87692a1a55cbd7fbe1cf3f6d7bc693ac266a6d6f0e", - "9d500187913c7510d71d1902703d312c7aaa56f1e98351385b9535fdabae595e", - "ffbddd8a4c1e54d258bb3e16d3929b546b61af63cb560b3e3061a8bef5b24552", - "201bb3abf655e7ef71e79ed4fb1079b0502b5acb4d9fad5e72a0de690c50a386", - "08eb57bbd559758ea96e99f9b7688c30e7b3bcf0c4562ff4535e2d8edeffaeed", - "e50b7223b04985ff38d9e11d1cba333896ef4264f82bd5d0653a028bce70e542", - "65a9421cd59cc7b7a71dcd9076136621af607fb4701d2e5c2af23b6396cf2f37", - "995a655b3521c19b4d0c266222266d89c8fc62889597d61f45f336091e646d57", - "51ec6f0bce77ed97df2dd7ae849338c3a8155a057da927eedd66e3d61be769ad", - "7b3923a0c0666431efecdbf6cb171295ec1710b6595eebcba3b576b49d13e214", - "2cedcc3d14698bea7e4b0546f7d5d48951dd90add59e6f2d44b693fd8913717d", - "fd6770cbd54858fdbd3d7b4239b985e5599180064d93ca873f27e86e8407d011", - "9edc51d8e6e04d05c9757848c1bfbfdc8e86b6330982294632488922e59fdb1b", - "1a6c4fbb24ad724c968b2020417c3d057e6c89e49bdfb11d91006def65eab6a0", - "cb3b29808cd0adfa2dca1f3a04f98114fbccf4eb487cdd4022f49bd70eeb049b", - "f55edcb40c619e29a20e432f8aaddc83a649be2c2d1941ccdc474cd2af03d490", - "e8ccc1763a92de23566b95c3ad1414a098016ece69a885fc8a72782a7517d17c", - "0fe2e3db8c5a12ad7101a63a0fffee901be54319cfe146bead7aec851722f82d", - "36be45a6ae7c95ad97cee1b33023be324bce7a7b4b7036e24125679dd9ff5b44", - "1685ed1a57c37859fbef1f7efb7509f20b84ec17a765605de43104d2fa37884b", - "9d83629a6a004c505b100a0b5d0b246833b63aa067aa9b59e3abd6b74bc4d3a8", - "be49a66b60175c5e2ee273b42165f86ef11bb6518c1c79950bcd3f4c196c98bd", - "0fd89885d821761b4a890782908e75793028747d15ace3c6cbf0ad56582b4fa5", - "94a767519a4e352a88796604943841fea21429f3358b4d5d55596dbda7d15dce", - "8dd07994afe6e572ddc9698fb0d13a0d4c26a38b7992818a71a99d1e0ac2b034", - "f7380a6f795ed31fbeb2945c72c5fd1d45044e5ab152311e75e007fa530f5847", - "5ca1ce01458e484393d7e9c8af42b0ff37a73a2fee0f18e14cff0fb180e33014", - "8f44178be3fe0a2bd41f922576fb7a9b19d589754504be746f56c759df328fda", - "12d33847c2be711c989f37360dd7aa8537fd14972262a4530634a08fdf32a767", - "31e077f5080f78846a00093caff2b6b839519cc47516142eeba9c41d4072a605", - "14f01db8a0054e70222b76d2555d70114b4bf8a0f02084324af2df226f14a795", - "7f5dbbaf31b4551828e8e76cef408375db9fbcdcdb6b5949f2d1b0c4b8632132", - "42a5d9b9bb7e4a16f23ba916bcf87f38c1aa1f2de2ab79736f725850a8ff6a1b", - "e06f8f901ea708beba8712a11b6e2d0be7c4b018d0254204ef269bcdf5e8c6cc", - "d9ba75785bf45b0c4fd3b2365c968099242483f2f0d0c7c20306dac11fae96e9", - "428debbb280873907cef2ec099efe1566e42a59775d6ec74ded0c4048d5a6515", - "3b51049d4dae701098e55a69536fa31ad2be1adc17b631a695a40e8a294fe9c0", - "168f88aa4b105e9811f5f79439cc1a689be4eec77f3361d42f22fe8f7ddc74a9", - "0baa0ab2249b33d64449a899cb7bd8eae5231f0d4ff70f09830dc1faa2e4abee", - "0c3896d346b580306a49de29f3a78913a41e14b8461b124628c33a64636241f2", - "b18313f1651c15e100e7179aa3eb8ffa62c3581159eaf7f83156468d19781e42", - "996361f7d988e48267ccc7e930fed4637be35fe7562b8601dceb7a32313a14c8", - "dfb4e6268437d53048d22b811048cd045df15693fc6789affd002a0fc80a6e60", - "34dd044c228727f2226a0c9c06a3e5ceb5e30e31cb7854f8fa1cde846b395a58", -} - -func (tests testIDs) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error { - for i := 0; i < 500; i++ { - for _, id := range tests { - if ctx.Err() != nil { - return ctx.Err() - } - - fi := restic.FileInfo{ - Name: id, - } - - err := fn(fi) - if err != nil { - return err - } - } - } - - return nil -} - -func TestFilesInParallel(t *testing.T) { - f := func(ctx context.Context, id string) error { - time.Sleep(1 * time.Millisecond) - return nil - } - - for n := 1; n < 5; n++ { - err := repository.FilesInParallel(context.TODO(), lister, restic.DataFile, n*100, f) - rtest.OK(t, err) - } -} - -var errTest = errors.New("test error") - -func TestFilesInParallelWithError(t *testing.T) { - f := func(ctx context.Context, id string) error { - time.Sleep(1 * time.Millisecond) - - if rand.Float32() < 0.01 { - return errTest - } - - return nil - } - - for n := 1; n < 5; n++ { - err := repository.FilesInParallel(context.TODO(), lister, restic.DataFile, n*100, f) - if err != errTest { - t.Fatalf("wrong error returned, want %q, got %v", errTest, err) - } - } -} diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 1a6e5c505..880006cf5 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -18,6 +18,7 @@ import ( "github.com/restic/restic/internal/hashing" "github.com/restic/restic/internal/pack" "github.com/restic/restic/internal/restic" + "golang.org/x/sync/errgroup" ) // Repository is used to access a repository in a backend. @@ -391,45 +392,84 @@ const loadIndexParallelism = 4 func (r *Repository) LoadIndex(ctx context.Context) error { debug.Log("Loading index") - errCh := make(chan error, 1) - indexes := make(chan *Index) + // track spawned goroutines using wg, create a new context which is + // cancelled as soon as an error occurs. + wg, ctx := errgroup.WithContext(ctx) - worker := func(ctx context.Context, id restic.ID) error { - idx, err := LoadIndex(ctx, r, id) - if err != nil { - fmt.Fprintf(os.Stderr, "%v, ignoring\n", err) + type FileInfo struct { + restic.ID + Size int64 + } + ch := make(chan FileInfo) + indexCh := make(chan *Index) + + // send list of index files through ch, which is closed afterwards + wg.Go(func() error { + defer close(ch) + return r.List(ctx, restic.IndexFile, func(id restic.ID, size int64) error { + select { + case <-ctx.Done(): + return nil + case ch <- FileInfo{id, size}: + } return nil - } + }) + }) - select { - case indexes <- idx: - case <-ctx.Done(): + // a worker receives an index ID from ch, loads the index, and sends it to indexCh + worker := func() error { + for fi := range ch { + idx, err := LoadIndex(ctx, r, fi.ID) + if err != nil { + fmt.Fprintf(os.Stderr, "%v, ignoring\n", err) + return nil + } + + select { + case indexCh <- idx: + case <-ctx.Done(): + } } return nil } - go func() { - defer close(indexes) - errCh <- FilesInParallel(ctx, r.be, restic.IndexFile, loadIndexParallelism, - ParallelWorkFuncParseID(worker)) - }() - - validIndex := restic.NewIDSet() - for idx := range indexes { - id, err := idx.ID() - if err == nil { - validIndex.Insert(id) - } - r.idx.Insert(idx) + // final closes indexCh after all workers have terminated + final := func() error { + close(indexCh) + return nil } - err := r.PrepareCache(validIndex) + // run workers on ch + wg.Go(func() error { + return RunWorkers(ctx, loadIndexParallelism, worker, final) + }) + + // receive decoded indexes + validIndex := restic.NewIDSet() + wg.Go(func() error { + for idx := range indexCh { + id, err := idx.ID() + if err == nil { + validIndex.Insert(id) + } + r.idx.Insert(idx) + } + return nil + }) + + err := wg.Wait() if err != nil { return err } - return <-errCh + // remove index files from the cache which have been removed in the repo + err = r.PrepareCache(validIndex) + if err != nil { + return err + } + + return nil } // PrepareCache initializes the local cache. indexIDs is the list of IDs of diff --git a/internal/repository/worker_group.go b/internal/repository/worker_group.go new file mode 100644 index 000000000..ab09d441f --- /dev/null +++ b/internal/repository/worker_group.go @@ -0,0 +1,35 @@ +package repository + +import ( + "context" + + "golang.org/x/sync/errgroup" +) + +// RunWorkers runs count instances of workerFunc using an errgroup.Group. +// After all workers have terminated, finalFunc is run. If an error occurs in +// one of the workers, it is returned. FinalFunc is always run, regardless of +// any other previous errors. +func RunWorkers(ctx context.Context, count int, workerFunc, finalFunc func() error) error { + wg, ctx := errgroup.WithContext(ctx) + + // run workers + for i := 0; i < count; i++ { + wg.Go(workerFunc) + } + + // wait for termination + err := wg.Wait() + + // make sure finalFunc is run + finalErr := finalFunc() + + // if the workers returned an error, return it to the caller (disregarding + // any error from finalFunc) + if err != nil { + return err + } + + // if not, return the value finalFunc returned + return finalErr +}