From 45509eafc8c0a08301bd734926f87030dadbd233 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 4 May 2024 18:57:37 +0200 Subject: [PATCH 1/5] dump: load blobs of a file from repository in parallel --- internal/dump/common.go | 97 ++++++++++++++++++++++++++++++++--------- 1 file changed, 76 insertions(+), 21 deletions(-) diff --git a/internal/dump/common.go b/internal/dump/common.go index 016328835..116762b5a 100644 --- a/internal/dump/common.go +++ b/internal/dump/common.go @@ -9,6 +9,7 @@ import ( "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/walker" + "golang.org/x/sync/errgroup" ) // A Dumper writes trees and files from a repository to a Writer @@ -16,11 +17,11 @@ import ( type Dumper struct { cache *bloblru.Cache format string - repo restic.BlobLoader + repo restic.Loader w io.Writer } -func New(format string, repo restic.BlobLoader, w io.Writer) *Dumper { +func New(format string, repo restic.Loader, w io.Writer) *Dumper { return &Dumper{ cache: bloblru.New(64 << 20), format: format, @@ -103,27 +104,81 @@ func (d *Dumper) WriteNode(ctx context.Context, node *restic.Node) error { } func (d *Dumper) writeNode(ctx context.Context, w io.Writer, node *restic.Node) error { - var ( - buf []byte - err error - ) - for _, id := range node.Content { - blob, ok := d.cache.Get(id) - if !ok { - blob, err = d.repo.LoadBlob(ctx, restic.DataBlob, id, buf) - if err != nil { - return err - } - - buf = d.cache.Add(id, blob) // Reuse evicted buffer. - } - - if _, err := w.Write(blob); err != nil { - return errors.Wrap(err, "Write") - } + type loadTask struct { + id restic.ID + out chan<- []byte + } + type writeTask struct { + data <-chan []byte } - return nil + loaderCh := make(chan loadTask) + // per worker: allows for one blob that gets download + one blob thats queue for writing + writerCh := make(chan writeTask, d.repo.Connections()*2) + + wg, ctx := errgroup.WithContext(ctx) + + wg.Go(func() error { + defer close(loaderCh) + defer close(writerCh) + for _, id := range node.Content { + // non-blocking blob handover to allow the loader to load the next blob + // while the old one is still written + ch := make(chan []byte, 1) + select { + case loaderCh <- loadTask{id: id, out: ch}: + case <-ctx.Done(): + return ctx.Err() + } + + select { + case writerCh <- writeTask{data: ch}: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil + }) + + for i := uint(0); i < d.repo.Connections(); i++ { + wg.Go(func() error { + for task := range loaderCh { + var err error + blob, ok := d.cache.Get(task.id) + if !ok { + blob, err = d.repo.LoadBlob(ctx, restic.DataBlob, task.id, nil) + if err != nil { + return err + } + + d.cache.Add(task.id, blob) + } + + select { + case task.out <- blob: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil + }) + } + + wg.Go(func() error { + for result := range writerCh { + select { + case data := <-result.data: + if _, err := w.Write(data); err != nil { + return errors.Wrap(err, "Write") + } + case <-ctx.Done(): + return ctx.Err() + } + } + return nil + }) + + return wg.Wait() } // IsDir checks if the given node is a directory. From bd03af2febc5e223e8edf4fade1638d43720329a Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 5 May 2024 11:37:35 +0200 Subject: [PATCH 2/5] dump: add GetOrCompute to bloblru cache --- internal/bloblru/cache.go | 48 +++++++++++++++++++++++++++++++++++++-- internal/dump/common.go | 14 ++++-------- 2 files changed, 51 insertions(+), 11 deletions(-) diff --git a/internal/bloblru/cache.go b/internal/bloblru/cache.go index 302ecc769..4477e37a9 100644 --- a/internal/bloblru/cache.go +++ b/internal/bloblru/cache.go @@ -20,13 +20,15 @@ type Cache struct { c *simplelru.LRU[restic.ID, []byte] free, size int // Current and max capacity, in bytes. + inProgress map[restic.ID]chan struct{} } // New constructs a blob cache that stores at most size bytes worth of blobs. func New(size int) *Cache { c := &Cache{ - free: size, - size: size, + free: size, + size: size, + inProgress: make(map[restic.ID]chan struct{}), } // NewLRU wants us to specify some max. number of entries, else it errors. @@ -85,6 +87,48 @@ func (c *Cache) Get(id restic.ID) ([]byte, bool) { return blob, ok } +func (c *Cache) GetOrCompute(id restic.ID, compute func() ([]byte, error)) ([]byte, error) { + // check if already cached + blob, ok := c.Get(id) + if ok { + return blob, nil + } + + // check for parallel download or start our own + finish := make(chan struct{}) + c.mu.Lock() + waitForResult, isDownloading := c.inProgress[id] + if !isDownloading { + c.inProgress[id] = finish + + // remove progress channel once finished here + defer func() { + c.mu.Lock() + delete(c.inProgress, id) + c.mu.Unlock() + close(finish) + }() + } + c.mu.Unlock() + + if isDownloading { + // wait for result of parallel download + <-waitForResult + blob, ok := c.Get(id) + if ok { + return blob, nil + } + } + + // download it + blob, err := compute() + if err == nil { + c.Add(id, blob) + } + + return blob, err +} + func (c *Cache) evict(key restic.ID, blob []byte) { debug.Log("bloblru.Cache: evict %v, %d bytes", key, cap(blob)) c.free += cap(blob) + overhead diff --git a/internal/dump/common.go b/internal/dump/common.go index 116762b5a..62145ba9c 100644 --- a/internal/dump/common.go +++ b/internal/dump/common.go @@ -143,15 +143,11 @@ func (d *Dumper) writeNode(ctx context.Context, w io.Writer, node *restic.Node) for i := uint(0); i < d.repo.Connections(); i++ { wg.Go(func() error { for task := range loaderCh { - var err error - blob, ok := d.cache.Get(task.id) - if !ok { - blob, err = d.repo.LoadBlob(ctx, restic.DataBlob, task.id, nil) - if err != nil { - return err - } - - d.cache.Add(task.id, blob) + blob, err := d.cache.GetOrCompute(task.id, func() ([]byte, error) { + return d.repo.LoadBlob(ctx, restic.DataBlob, task.id, nil) + }) + if err != nil { + return err } select { From 7cce667f921da5735f8c4a04540208599494a2bd Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 5 May 2024 11:38:17 +0200 Subject: [PATCH 3/5] fuse: switch to use bloblru.GetOrCompute --- internal/fuse/file.go | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/internal/fuse/file.go b/internal/fuse/file.go index 6152c9122..5190febbb 100644 --- a/internal/fuse/file.go +++ b/internal/fuse/file.go @@ -96,20 +96,14 @@ func (f *file) Open(_ context.Context, _ *fuse.OpenRequest, _ *fuse.OpenResponse } func (f *openFile) getBlobAt(ctx context.Context, i int) (blob []byte, err error) { - - blob, ok := f.root.blobCache.Get(f.node.Content[i]) - if ok { - return blob, nil - } - - blob, err = f.root.repo.LoadBlob(ctx, restic.DataBlob, f.node.Content[i], nil) + blob, err = f.root.blobCache.GetOrCompute(f.node.Content[i], func() ([]byte, error) { + return f.root.repo.LoadBlob(ctx, restic.DataBlob, f.node.Content[i], nil) + }) if err != nil { debug.Log("LoadBlob(%v, %v) failed: %v", f.node.Name, f.node.Content[i], err) return nil, unwrapCtxCanceled(err) } - f.root.blobCache.Add(f.node.Content[i], blob) - return blob, nil } From 4d55a62ada22931ce7e54ef133d3472c40e81148 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 5 May 2024 12:00:25 +0200 Subject: [PATCH 4/5] bloblru: add test for GetOrCompute --- internal/bloblru/cache_test.go | 67 ++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/internal/bloblru/cache_test.go b/internal/bloblru/cache_test.go index aa6f4465c..b2becd256 100644 --- a/internal/bloblru/cache_test.go +++ b/internal/bloblru/cache_test.go @@ -1,11 +1,14 @@ package bloblru import ( + "context" + "fmt" "math/rand" "testing" "github.com/restic/restic/internal/restic" rtest "github.com/restic/restic/internal/test" + "golang.org/x/sync/errgroup" ) func TestCache(t *testing.T) { @@ -52,6 +55,70 @@ func TestCache(t *testing.T) { rtest.Equals(t, cacheSize, c.free) } +func TestCacheGetOrCompute(t *testing.T) { + var id1, id2 restic.ID + id1[0] = 1 + id2[0] = 2 + + const ( + kiB = 1 << 10 + cacheSize = 64*kiB + 3*overhead + ) + + c := New(cacheSize) + + e := fmt.Errorf("broken") + _, err := c.GetOrCompute(id1, func() ([]byte, error) { + return nil, e + }) + rtest.Equals(t, e, err, "expected error was not returned") + + // fill buffer + data1 := make([]byte, 10*kiB) + blob, err := c.GetOrCompute(id1, func() ([]byte, error) { + return data1, nil + }) + rtest.OK(t, err) + rtest.Equals(t, &data1[0], &blob[0], "wrong buffer returend") + + // now the buffer should be returned without calling the compute function + blob, err = c.GetOrCompute(id1, func() ([]byte, error) { + return nil, e + }) + rtest.OK(t, err) + rtest.Equals(t, &data1[0], &blob[0], "wrong buffer returend") + + // check concurrency + wg, _ := errgroup.WithContext(context.TODO()) + wait := make(chan struct{}) + calls := make(chan struct{}, 10) + + // start a bunch of blocking goroutines + for i := 0; i < 10; i++ { + wg.Go(func() error { + buf, err := c.GetOrCompute(id2, func() ([]byte, error) { + // block to ensure that multiple requests are waiting in parallel + <-wait + calls <- struct{}{} + return make([]byte, 42), nil + }) + if len(buf) != 42 { + return fmt.Errorf("wrong buffer") + } + return err + }) + } + + close(wait) + rtest.OK(t, wg.Wait()) + close(calls) + count := 0 + for range calls { + count++ + } + rtest.Equals(t, 1, count, "expected exactly one call of the compute function") +} + func BenchmarkAdd(b *testing.B) { const ( MiB = 1 << 20 From e184538ddf9f4757cce6017ca3a0291cca2dd601 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 5 May 2024 12:12:21 +0200 Subject: [PATCH 5/5] dump: add changelog --- changelog/unreleased/pull-4796 | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 changelog/unreleased/pull-4796 diff --git a/changelog/unreleased/pull-4796 b/changelog/unreleased/pull-4796 new file mode 100644 index 000000000..319b9ccdc --- /dev/null +++ b/changelog/unreleased/pull-4796 @@ -0,0 +1,8 @@ +Enhancement: Improve `dump` performance for large files + +The `dump` command now retrieves the data chunks for a file in parallel. This +improves the download performance by up to the configured number of parallel +backend connections. + +https://github.com/restic/restic/issues/3406 +https://github.com/restic/restic/pull/4796