From 6e03f80ca26285d865e8d63cf65ad1e6e6a6f2ff Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 7 Nov 2020 00:39:40 +0100 Subject: [PATCH] check: Split the parallelized tree loader into a reusable component The actual code change is minimal --- internal/checker/checker.go | 159 +++------------------------------ internal/restic/tree_stream.go | 155 ++++++++++++++++++++++++++++++++ 2 files changed, 169 insertions(+), 145 deletions(-) create mode 100644 internal/restic/tree_stream.go diff --git a/internal/checker/checker.go b/internal/checker/checker.go index 9c6922a36..4a1c5cb4b 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -308,37 +308,14 @@ func (e TreeError) Error() string { return fmt.Sprintf("tree %v: %v", e.ID.Str(), e.Errors) } -type treeJob struct { - restic.ID - error - *restic.Tree -} - -// loadTreeWorker loads trees from repo and sends them to out. -func loadTreeWorker(ctx context.Context, repo restic.Repository, - in <-chan restic.ID, out chan<- treeJob) { - - for treeID := range in { - tree, err := repo.LoadTree(ctx, treeID) - debug.Log("load tree %v (%v) returned err: %v", tree, treeID, err) - job := treeJob{ID: treeID, error: err, Tree: tree} - - select { - case <-ctx.Done(): - return - case out <- job: - } - } -} - // checkTreeWorker checks the trees received and sends out errors to errChan. -func (c *Checker) checkTreeWorker(ctx context.Context, in <-chan treeJob, out chan<- error) { - for job := range in { - debug.Log("check tree %v (tree %v, err %v)", job.ID, job.Tree, job.error) +func (c *Checker) checkTreeWorker(ctx context.Context, trees <-chan restic.TreeItem, out chan<- error) { + for job := range trees { + debug.Log("check tree %v (tree %v, err %v)", job.ID, job.Tree, job.Error) var errs []error - if job.error != nil { - errs = append(errs, job.error) + if job.Error != nil { + errs = append(errs, job.Error) } else { errs = c.checkTree(job.ID, job.Tree) } @@ -356,97 +333,6 @@ func (c *Checker) checkTreeWorker(ctx context.Context, in <-chan treeJob, out ch } } -func (c *Checker) filterTrees(ctx context.Context, backlog restic.IDs, loaderChan chan<- restic.ID, in <-chan treeJob, out chan<- treeJob) { - var ( - inCh = in - outCh chan<- treeJob - loadCh chan<- restic.ID - job treeJob - nextTreeID restic.ID - outstandingLoadTreeJobs = 0 - ) - - for { - if loadCh == nil && len(backlog) > 0 { - // process last added ids first, that is traverse the tree in depth-first order - ln := len(backlog) - 1 - nextTreeID, backlog = backlog[ln], backlog[:ln] - - // use a separate flag for processed trees to ensure that check still processes trees - // even when a file references a tree blob - c.blobRefs.Lock() - h := restic.BlobHandle{ID: nextTreeID, Type: restic.TreeBlob} - blobReferenced := c.blobRefs.M.Has(h) - // noop if already referenced - c.blobRefs.M.Insert(h) - c.blobRefs.Unlock() - if blobReferenced { - continue - } - - loadCh = loaderChan - } - - if loadCh == nil && outCh == nil && outstandingLoadTreeJobs == 0 { - debug.Log("backlog is empty, all channels nil, exiting") - return - } - - select { - case <-ctx.Done(): - return - - case loadCh <- nextTreeID: - outstandingLoadTreeJobs++ - loadCh = nil - - case j, ok := <-inCh: - if !ok { - debug.Log("input channel closed") - inCh = nil - in = nil - continue - } - - outstandingLoadTreeJobs-- - - debug.Log("input job tree %v", j.ID) - - if j.error != nil { - debug.Log("received job with error: %v (tree %v, ID %v)", j.error, j.Tree, j.ID) - } else if j.Tree == nil { - debug.Log("received job with nil tree pointer: %v (ID %v)", j.error, j.ID) - // send a new job with the new error instead of the old one - j = treeJob{ID: j.ID, error: errors.New("tree is nil and error is nil")} - } else { - subtrees := j.Tree.Subtrees() - debug.Log("subtrees for tree %v: %v", j.ID, subtrees) - // iterate backwards over subtree to compensate backwards traversal order of nextTreeID selection - for i := len(subtrees) - 1; i >= 0; i-- { - id := subtrees[i] - if id.IsNull() { - // We do not need to raise this error here, it is - // checked when the tree is checked. Just make sure - // that we do not add any null IDs to the backlog. - debug.Log("tree %v has nil subtree", j.ID) - continue - } - backlog = append(backlog, id) - } - } - - job = j - outCh = out - inCh = nil - - case outCh <- job: - debug.Log("tree sent to check: %v", job.ID) - outCh = nil - inCh = in - } - } -} - func loadSnapshotTreeIDs(ctx context.Context, repo restic.Repository) (ids restic.IDs, errs []error) { err := restic.ForAllSnapshots(ctx, repo, nil, func(id restic.ID, sn *restic.Snapshot, err error) error { if err != nil { @@ -480,26 +366,16 @@ func (c *Checker) Structure(ctx context.Context, errChan chan<- error) { } } - loaderChan := make(chan restic.ID) - loadedTreeChan := make(chan treeJob) - treeStream := make(chan treeJob) - wg, ctx := errgroup.WithContext(ctx) - var loadTreeWg sync.WaitGroup - - for i := 0; i < defaultParallelism; i++ { - loadTreeWg.Add(1) - wg.Go(func() error { - defer loadTreeWg.Done() - loadTreeWorker(ctx, c.repo, loaderChan, loadedTreeChan) - return nil - }) - } - // close once all loadTreeWorkers have completed - wg.Go(func() error { - loadTreeWg.Wait() - close(loadedTreeChan) - return nil + treeStream := restic.StreamTrees(ctx, wg, c.repo, trees, func(treeID restic.ID) bool { + // blobRefs may be accessed in parallel by checkTree + c.blobRefs.Lock() + h := restic.BlobHandle{ID: treeID, Type: restic.TreeBlob} + blobReferenced := c.blobRefs.M.Has(h) + // noop if already referenced + c.blobRefs.M.Insert(h) + c.blobRefs.Unlock() + return blobReferenced }) defer close(errChan) @@ -510,13 +386,6 @@ func (c *Checker) Structure(ctx context.Context, errChan chan<- error) { }) } - wg.Go(func() error { - defer close(loaderChan) - defer close(treeStream) - c.filterTrees(ctx, trees, loaderChan, loadedTreeChan, treeStream) - return nil - }) - wg.Wait() } diff --git a/internal/restic/tree_stream.go b/internal/restic/tree_stream.go new file mode 100644 index 000000000..b71f4aa18 --- /dev/null +++ b/internal/restic/tree_stream.go @@ -0,0 +1,155 @@ +package restic + +import ( + "context" + "errors" + "sync" + + "github.com/restic/restic/internal/debug" + "golang.org/x/sync/errgroup" +) + +const streamTreeParallelism = 5 + +// TreeItem is used to return either an error or the tree for a tree id +type TreeItem struct { + ID + Error error + *Tree +} + +// loadTreeWorker loads trees from repo and sends them to out. +func loadTreeWorker(ctx context.Context, repo TreeLoader, + in <-chan ID, out chan<- TreeItem) { + + for treeID := range in { + tree, err := repo.LoadTree(ctx, treeID) + debug.Log("load tree %v (%v) returned err: %v", tree, treeID, err) + job := TreeItem{ID: treeID, Error: err, Tree: tree} + + select { + case <-ctx.Done(): + return + case out <- job: + } + } +} + +func filterTrees(ctx context.Context, backlog IDs, loaderChan chan<- ID, + in <-chan TreeItem, out chan<- TreeItem, skip func(tree ID) bool) { + + var ( + inCh = in + outCh chan<- TreeItem + loadCh chan<- ID + job TreeItem + nextTreeID ID + outstandingLoadTreeJobs = 0 + ) + + for { + if loadCh == nil && len(backlog) > 0 { + // process last added ids first, that is traverse the tree in depth-first order + ln := len(backlog) - 1 + nextTreeID, backlog = backlog[ln], backlog[:ln] + + if skip(nextTreeID) { + continue + } + + loadCh = loaderChan + } + + if loadCh == nil && outCh == nil && outstandingLoadTreeJobs == 0 { + debug.Log("backlog is empty, all channels nil, exiting") + return + } + + select { + case <-ctx.Done(): + return + + case loadCh <- nextTreeID: + outstandingLoadTreeJobs++ + loadCh = nil + + case j, ok := <-inCh: + if !ok { + debug.Log("input channel closed") + inCh = nil + in = nil + continue + } + + outstandingLoadTreeJobs-- + + debug.Log("input job tree %v", j.ID) + + if j.Error != nil { + debug.Log("received job with error: %v (tree %v, ID %v)", j.Error, j.Tree, j.ID) + } else if j.Tree == nil { + debug.Log("received job with nil tree pointer: %v (ID %v)", j.Error, j.ID) + // send a new job with the new error instead of the old one + j = TreeItem{ID: j.ID, Error: errors.New("tree is nil and error is nil")} + } else { + subtrees := j.Tree.Subtrees() + debug.Log("subtrees for tree %v: %v", j.ID, subtrees) + // iterate backwards over subtree to compensate backwards traversal order of nextTreeID selection + for i := len(subtrees) - 1; i >= 0; i-- { + id := subtrees[i] + if id.IsNull() { + // We do not need to raise this error here, it is + // checked when the tree is checked. Just make sure + // that we do not add any null IDs to the backlog. + debug.Log("tree %v has nil subtree", j.ID) + continue + } + backlog = append(backlog, id) + } + } + + job = j + outCh = out + inCh = nil + + case outCh <- job: + debug.Log("tree sent to process: %v", job.ID) + outCh = nil + inCh = in + } + } +} + +// StreamTrees iteratively loads the given trees and their subtrees. The skip method +// is guaranteed to always be called from the same goroutine. +func StreamTrees(ctx context.Context, wg *errgroup.Group, repo TreeLoader, trees IDs, skip func(tree ID) bool) <-chan TreeItem { + loaderChan := make(chan ID) + loadedTreeChan := make(chan TreeItem) + treeStream := make(chan TreeItem) + + var loadTreeWg sync.WaitGroup + + for i := 0; i < streamTreeParallelism; i++ { + loadTreeWg.Add(1) + wg.Go(func() error { + defer loadTreeWg.Done() + loadTreeWorker(ctx, repo, loaderChan, loadedTreeChan) + return nil + }) + } + + // close once all loadTreeWorkers have completed + wg.Go(func() error { + loadTreeWg.Wait() + close(loadedTreeChan) + return nil + }) + + wg.Go(func() error { + defer close(loaderChan) + defer close(treeStream) + filterTrees(ctx, trees, loaderChan, loadedTreeChan, treeStream, skip) + return nil + }) + return treeStream +}