diff --git a/cmd/restic/cmd_cat.go b/cmd/restic/cmd_cat.go index ccec9b5d9..8d11a9dc4 100644 --- a/cmd/restic/cmd_cat.go +++ b/cmd/restic/cmd_cat.go @@ -7,7 +7,6 @@ import ( "github.com/spf13/cobra" - "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" @@ -146,9 +145,9 @@ func runCat(ctx context.Context, gopts GlobalOptions, args []string) error { return nil case "pack": - h := backend.Handle{Type: restic.PackFile, Name: id.String()} - buf, err := backend.LoadAll(ctx, nil, repo.Backend(), h) - if err != nil { + buf, err := repo.LoadRaw(ctx, restic.PackFile, id) + // allow returning broken pack files + if buf == nil { return err } diff --git a/cmd/restic/cmd_debug.go b/cmd/restic/cmd_debug.go index 3abb9d7eb..a63ac8c4c 100644 --- a/cmd/restic/cmd_debug.go +++ b/cmd/restic/cmd_debug.go @@ -316,10 +316,11 @@ func loadBlobs(ctx context.Context, opts DebugExamineOptions, repo restic.Reposi if err != nil { panic(err) } - be := repo.Backend() - h := backend.Handle{ - Name: packID.String(), - Type: restic.PackFile, + + pack, err := repo.LoadRaw(ctx, restic.PackFile, packID) + // allow processing broken pack files + if pack == nil { + return err } wg, ctx := errgroup.WithContext(ctx) @@ -331,19 +332,11 @@ func loadBlobs(ctx context.Context, opts DebugExamineOptions, repo restic.Reposi wg.Go(func() error { for _, blob := range list { Printf(" loading blob %v at %v (length %v)\n", blob.ID, blob.Offset, blob.Length) - buf := make([]byte, blob.Length) - err := be.Load(ctx, h, int(blob.Length), int64(blob.Offset), func(rd io.Reader) error { - n, err := io.ReadFull(rd, buf) - if err != nil { - return fmt.Errorf("read error after %d bytes: %v", n, err) - } - return nil - }) - if err != nil { - Warnf("error read: %v\n", err) + if int(blob.Offset+blob.Length) > len(pack) { + Warnf("skipping truncated blob\n") continue } - + buf := pack[blob.Offset : blob.Offset+blob.Length] key := repo.Key() nonce, plaintext := buf[:key.NonceSize()], buf[key.NonceSize():] @@ -492,8 +485,9 @@ func examinePack(ctx context.Context, opts DebugExamineOptions, repo restic.Repo } Printf(" file size is %v\n", fi.Size) - buf, err := backend.LoadAll(ctx, nil, repo.Backend(), h) - if err != nil { + buf, err := repo.LoadRaw(ctx, restic.PackFile, id) + // also process damaged pack files + if buf == nil { return err } gotID := restic.Hash(buf) diff --git a/cmd/restic/cmd_repair_packs.go b/cmd/restic/cmd_repair_packs.go index 636213965..ab8c7f475 100644 --- a/cmd/restic/cmd_repair_packs.go +++ b/cmd/restic/cmd_repair_packs.go @@ -1,11 +1,11 @@ package main import ( + "bytes" "context" "io" "os" - "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" @@ -17,8 +17,6 @@ var cmdRepairPacks = &cobra.Command{ Use: "packs [packIDs...]", Short: "Salvage damaged pack files", Long: ` -WARNING: The CLI for this command is experimental and will likely change in the future! - The "repair packs" command extracts intact blobs from the specified pack files, rebuilds the index to remove the damaged pack files and removes the pack files from the repository. @@ -68,20 +66,17 @@ func runRepairPacks(ctx context.Context, gopts GlobalOptions, term *termstatus.T printer.P("saving backup copies of pack files to current folder") for id := range ids { + buf, err := repo.LoadRaw(ctx, restic.PackFile, id) + // corrupted data is fine + if buf == nil { + return err + } + f, err := os.OpenFile("pack-"+id.String(), os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0o666) if err != nil { return err } - - err = repo.Backend().Load(ctx, backend.Handle{Type: restic.PackFile, Name: id.String()}, 0, 0, func(rd io.Reader) error { - _, err := f.Seek(0, 0) - if err != nil { - return err - } - _, err = io.Copy(f, rd) - return err - }) - if err != nil { + if _, err := io.Copy(f, bytes.NewReader(buf)); err != nil { _ = f.Close() return err } diff --git a/internal/backend/local/local.go b/internal/backend/local/local.go index ff7c4b7a5..599bee0f6 100644 --- a/internal/backend/local/local.go +++ b/internal/backend/local/local.go @@ -247,7 +247,7 @@ func (b *Local) openReader(_ context.Context, h backend.Handle, length int, offs } if length > 0 { - return backend.LimitReadCloser(f, int64(length)), nil + return util.LimitReadCloser(f, int64(length)), nil } return f, nil diff --git a/internal/backend/sftp/sftp.go b/internal/backend/sftp/sftp.go index b624c5060..3591c1530 100644 --- a/internal/backend/sftp/sftp.go +++ b/internal/backend/sftp/sftp.go @@ -437,7 +437,7 @@ func (r *SFTP) Load(ctx context.Context, h backend.Handle, length int, offset in // check the underlying reader to be agnostic to however fn() handles the returned error _, rderr := rd.Read([]byte{0}) - if rderr == io.EOF && rd.(*backend.LimitedReadCloser).N != 0 { + if rderr == io.EOF && rd.(*util.LimitedReadCloser).N != 0 { // file is too short return fmt.Errorf("%w: %v", errTooShort, err) } @@ -463,7 +463,7 @@ func (r *SFTP) openReader(_ context.Context, h backend.Handle, length int, offse if length > 0 { // unlimited reads usually use io.Copy which needs WriteTo support at the underlying reader // limited reads are usually combined with io.ReadFull which reads all required bytes into a buffer in one go - return backend.LimitReadCloser(f, int64(length)), nil + return util.LimitReadCloser(f, int64(length)), nil } return f, nil diff --git a/internal/backend/test/tests.go b/internal/backend/test/tests.go index 963659fda..4c260d264 100644 --- a/internal/backend/test/tests.go +++ b/internal/backend/test/tests.go @@ -36,6 +36,19 @@ func beTest(ctx context.Context, be backend.Backend, h backend.Handle) (bool, er return err == nil, err } +func LoadAll(ctx context.Context, be backend.Backend, h backend.Handle) ([]byte, error) { + var buf []byte + err := be.Load(ctx, h, 0, 0, func(rd io.Reader) error { + var err error + buf, err = io.ReadAll(rd) + return err + }) + if err != nil { + return nil, err + } + return buf, nil +} + // TestStripPasswordCall tests that the StripPassword method of a factory can be called without crashing. // It does not verify whether passwords are removed correctly func (s *Suite[C]) TestStripPasswordCall(_ *testing.T) { @@ -94,7 +107,7 @@ func (s *Suite[C]) TestConfig(t *testing.T) { var testString = "Config" // create config and read it back - _, err := backend.LoadAll(context.TODO(), nil, b, backend.Handle{Type: backend.ConfigFile}) + _, err := LoadAll(context.TODO(), b, backend.Handle{Type: backend.ConfigFile}) if err == nil { t.Fatalf("did not get expected error for non-existing config") } @@ -110,7 +123,7 @@ func (s *Suite[C]) TestConfig(t *testing.T) { // same config for _, name := range []string{"", "foo", "bar", "0000000000000000000000000000000000000000000000000000000000000000"} { h := backend.Handle{Type: backend.ConfigFile, Name: name} - buf, err := backend.LoadAll(context.TODO(), nil, b, h) + buf, err := LoadAll(context.TODO(), b, h) if err != nil { t.Fatalf("unable to read config with name %q: %+v", name, err) } @@ -519,7 +532,7 @@ func (s *Suite[C]) TestSave(t *testing.T) { err := b.Save(context.TODO(), h, backend.NewByteReader(data, b.Hasher())) test.OK(t, err) - buf, err := backend.LoadAll(context.TODO(), nil, b, h) + buf, err := LoadAll(context.TODO(), b, h) test.OK(t, err) if len(buf) != len(data) { t.Fatalf("number of bytes does not match, want %v, got %v", len(data), len(buf)) @@ -821,7 +834,7 @@ func (s *Suite[C]) TestBackend(t *testing.T) { // test Load() h := backend.Handle{Type: tpe, Name: ts.id} - buf, err := backend.LoadAll(context.TODO(), nil, b, h) + buf, err := LoadAll(context.TODO(), b, h) test.OK(t, err) test.Equals(t, ts.data, string(buf)) diff --git a/internal/backend/util/limited_reader.go b/internal/backend/util/limited_reader.go new file mode 100644 index 000000000..fdee1c06a --- /dev/null +++ b/internal/backend/util/limited_reader.go @@ -0,0 +1,15 @@ +package util + +import "io" + +// LimitedReadCloser wraps io.LimitedReader and exposes the Close() method. +type LimitedReadCloser struct { + io.Closer + io.LimitedReader +} + +// LimitReadCloser returns a new reader wraps r in an io.LimitedReader, but also +// exposes the Close() method. +func LimitReadCloser(r io.ReadCloser, n int64) *LimitedReadCloser { + return &LimitedReadCloser{Closer: r, LimitedReader: io.LimitedReader{R: r, N: n}} +} diff --git a/internal/backend/utils.go b/internal/backend/utils.go deleted file mode 100644 index 161608295..000000000 --- a/internal/backend/utils.go +++ /dev/null @@ -1,76 +0,0 @@ -package backend - -import ( - "bytes" - "context" - "encoding/hex" - "fmt" - "io" - - "github.com/minio/sha256-simd" - - "github.com/restic/restic/internal/debug" - "github.com/restic/restic/internal/errors" -) - -func verifyContentMatchesName(s string, data []byte) (bool, error) { - if len(s) != hex.EncodedLen(sha256.Size) { - return false, fmt.Errorf("invalid length for ID: %q", s) - } - - b, err := hex.DecodeString(s) - if err != nil { - return false, fmt.Errorf("invalid ID: %s", err) - } - var id [sha256.Size]byte - copy(id[:], b) - - hashed := sha256.Sum256(data) - return id == hashed, nil -} - -// LoadAll reads all data stored in the backend for the handle into the given -// buffer, which is truncated. If the buffer is not large enough or nil, a new -// one is allocated. -func LoadAll(ctx context.Context, buf []byte, be Backend, h Handle) ([]byte, error) { - retriedInvalidData := false - err := be.Load(ctx, h, 0, 0, func(rd io.Reader) error { - // make sure this is idempotent, in case an error occurs this function may be called multiple times! - wr := bytes.NewBuffer(buf[:0]) - _, cerr := io.Copy(wr, rd) - if cerr != nil { - return cerr - } - buf = wr.Bytes() - - // retry loading damaged data only once. If a file fails to download correctly - // the second time, then it is likely corrupted at the backend. Return the data - // to the caller in that case to let it decide what to do with the data. - if !retriedInvalidData && h.Type != ConfigFile { - if matches, err := verifyContentMatchesName(h.Name, buf); err == nil && !matches { - debug.Log("retry loading broken blob %v", h) - retriedInvalidData = true - return errors.Errorf("loadAll(%v): invalid data returned", h) - } - } - return nil - }) - - if err != nil { - return nil, err - } - - return buf, nil -} - -// LimitedReadCloser wraps io.LimitedReader and exposes the Close() method. -type LimitedReadCloser struct { - io.Closer - io.LimitedReader -} - -// LimitReadCloser returns a new reader wraps r in an io.LimitedReader, but also -// exposes the Close() method. -func LimitReadCloser(r io.ReadCloser, n int64) *LimitedReadCloser { - return &LimitedReadCloser{Closer: r, LimitedReader: io.LimitedReader{R: r, N: n}} -} diff --git a/internal/backend/utils_test.go b/internal/backend/utils_test.go deleted file mode 100644 index ad9540e54..000000000 --- a/internal/backend/utils_test.go +++ /dev/null @@ -1,149 +0,0 @@ -package backend_test - -import ( - "bytes" - "context" - "io" - "math/rand" - "testing" - - "github.com/restic/restic/internal/backend" - "github.com/restic/restic/internal/backend/mem" - "github.com/restic/restic/internal/backend/mock" - "github.com/restic/restic/internal/restic" - rtest "github.com/restic/restic/internal/test" -) - -const KiB = 1 << 10 -const MiB = 1 << 20 - -func TestLoadAll(t *testing.T) { - b := mem.New() - var buf []byte - - for i := 0; i < 20; i++ { - data := rtest.Random(23+i, rand.Intn(MiB)+500*KiB) - - id := restic.Hash(data) - h := backend.Handle{Name: id.String(), Type: backend.PackFile} - err := b.Save(context.TODO(), h, backend.NewByteReader(data, b.Hasher())) - rtest.OK(t, err) - - buf, err := backend.LoadAll(context.TODO(), buf, b, backend.Handle{Type: backend.PackFile, Name: id.String()}) - rtest.OK(t, err) - - if len(buf) != len(data) { - t.Errorf("length of returned buffer does not match, want %d, got %d", len(data), len(buf)) - continue - } - - if !bytes.Equal(buf, data) { - t.Errorf("wrong data returned") - continue - } - } -} - -func save(t testing.TB, be backend.Backend, buf []byte) backend.Handle { - id := restic.Hash(buf) - h := backend.Handle{Name: id.String(), Type: backend.PackFile} - err := be.Save(context.TODO(), h, backend.NewByteReader(buf, be.Hasher())) - if err != nil { - t.Fatal(err) - } - return h -} - -type quickRetryBackend struct { - backend.Backend -} - -func (be *quickRetryBackend) Load(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error { - err := be.Backend.Load(ctx, h, length, offset, fn) - if err != nil { - // retry - err = be.Backend.Load(ctx, h, length, offset, fn) - } - return err -} - -func TestLoadAllBroken(t *testing.T) { - b := mock.NewBackend() - - data := rtest.Random(23, rand.Intn(MiB)+500*KiB) - id := restic.Hash(data) - // damage buffer - data[0] ^= 0xff - - b.OpenReaderFn = func(ctx context.Context, h backend.Handle, length int, offset int64) (io.ReadCloser, error) { - return io.NopCloser(bytes.NewReader(data)), nil - } - - // must fail on first try - _, err := backend.LoadAll(context.TODO(), nil, b, backend.Handle{Type: backend.PackFile, Name: id.String()}) - if err == nil { - t.Fatalf("missing expected error") - } - - // must return the broken data after a retry - be := &quickRetryBackend{Backend: b} - buf, err := backend.LoadAll(context.TODO(), nil, be, backend.Handle{Type: backend.PackFile, Name: id.String()}) - rtest.OK(t, err) - - if !bytes.Equal(buf, data) { - t.Fatalf("wrong data returned") - } -} - -func TestLoadAllAppend(t *testing.T) { - b := mem.New() - - h1 := save(t, b, []byte("foobar test string")) - randomData := rtest.Random(23, rand.Intn(MiB)+500*KiB) - h2 := save(t, b, randomData) - - var tests = []struct { - handle backend.Handle - buf []byte - want []byte - }{ - { - handle: h1, - buf: nil, - want: []byte("foobar test string"), - }, - { - handle: h1, - buf: []byte("xxx"), - want: []byte("foobar test string"), - }, - { - handle: h2, - buf: nil, - want: randomData, - }, - { - handle: h2, - buf: make([]byte, 0, 200), - want: randomData, - }, - { - handle: h2, - buf: []byte("foobarbaz"), - want: randomData, - }, - } - - for _, test := range tests { - t.Run("", func(t *testing.T) { - buf, err := backend.LoadAll(context.TODO(), test.buf, b, test.handle) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(buf, test.want) { - t.Errorf("wrong data returned, want %q, got %q", test.want, buf) - } - }) - } -} diff --git a/internal/cache/backend.go b/internal/cache/backend.go index 5cbdb5444..63bb6f85f 100644 --- a/internal/cache/backend.go +++ b/internal/cache/backend.go @@ -40,7 +40,8 @@ func (b *Backend) Remove(ctx context.Context, h backend.Handle) error { return err } - return b.Cache.remove(h) + _, err = b.Cache.remove(h) + return err } func autoCacheTypes(h backend.Handle) bool { @@ -79,10 +80,9 @@ func (b *Backend) Save(ctx context.Context, h backend.Handle, rd backend.RewindR return err } - err = b.Cache.Save(h, rd) + err = b.Cache.save(h, rd) if err != nil { debug.Log("unable to save %v to cache: %v", h, err) - _ = b.Cache.remove(h) return err } @@ -120,11 +120,11 @@ func (b *Backend) cacheFile(ctx context.Context, h backend.Handle) error { if !b.Cache.Has(h) { // nope, it's still not in the cache, pull it from the repo and save it err := b.Backend.Load(ctx, h, 0, 0, func(rd io.Reader) error { - return b.Cache.Save(h, rd) + return b.Cache.save(h, rd) }) if err != nil { // try to remove from the cache, ignore errors - _ = b.Cache.remove(h) + _, _ = b.Cache.remove(h) } return err } @@ -134,9 +134,9 @@ func (b *Backend) cacheFile(ctx context.Context, h backend.Handle) error { // loadFromCache will try to load the file from the cache. func (b *Backend) loadFromCache(h backend.Handle, length int, offset int64, consumer func(rd io.Reader) error) (bool, error) { - rd, err := b.Cache.load(h, length, offset) + rd, inCache, err := b.Cache.load(h, length, offset) if err != nil { - return false, err + return inCache, err } err = consumer(rd) @@ -162,14 +162,10 @@ func (b *Backend) Load(ctx context.Context, h backend.Handle, length int, offset // try loading from cache without checking that the handle is actually cached inCache, err := b.loadFromCache(h, length, offset, consumer) if inCache { - if err == nil { - return nil - } - - // drop from cache and retry once - _ = b.Cache.remove(h) + debug.Log("error loading %v from cache: %v", h, err) + // the caller must explicitly use cache.Forget() to remove the cache entry + return err } - debug.Log("error loading %v from cache: %v", h, err) // if we don't automatically cache this file type, fall back to the backend if !autoCacheTypes(h) { @@ -185,6 +181,9 @@ func (b *Backend) Load(ctx context.Context, h backend.Handle, length int, offset inCache, err = b.loadFromCache(h, length, offset, consumer) if inCache { + if err != nil { + debug.Log("error loading %v from cache: %v", h, err) + } return err } @@ -198,13 +197,9 @@ func (b *Backend) Stat(ctx context.Context, h backend.Handle) (backend.FileInfo, debug.Log("cache Stat(%v)", h) fi, err := b.Backend.Stat(ctx, h) - if err != nil { - if b.Backend.IsNotExist(err) { - // try to remove from the cache, ignore errors - _ = b.Cache.remove(h) - } - - return fi, err + if err != nil && b.Backend.IsNotExist(err) { + // try to remove from the cache, ignore errors + _, _ = b.Cache.remove(h) } return fi, err diff --git a/internal/cache/backend_test.go b/internal/cache/backend_test.go index 68fbb02b3..0de90471e 100644 --- a/internal/cache/backend_test.go +++ b/internal/cache/backend_test.go @@ -5,6 +5,7 @@ import ( "context" "io" "math/rand" + "strings" "sync" "testing" "time" @@ -12,12 +13,13 @@ import ( "github.com/pkg/errors" "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend/mem" + backendtest "github.com/restic/restic/internal/backend/test" "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/test" ) func loadAndCompare(t testing.TB, be backend.Backend, h backend.Handle, data []byte) { - buf, err := backend.LoadAll(context.TODO(), nil, be, h) + buf, err := backendtest.LoadAll(context.TODO(), be, h) if err != nil { t.Fatal(err) } @@ -90,7 +92,7 @@ func TestBackend(t *testing.T) { loadAndCompare(t, be, h, data) // load data via cache - loadAndCompare(t, be, h, data) + loadAndCompare(t, wbe, h, data) // remove directly remove(t, be, h) @@ -113,6 +115,77 @@ func TestBackend(t *testing.T) { } } +type loadCountingBackend struct { + backend.Backend + ctr int +} + +func (l *loadCountingBackend) Load(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + l.ctr++ + return l.Backend.Load(ctx, h, length, offset, fn) +} + +func TestOutOfBoundsAccess(t *testing.T) { + be := &loadCountingBackend{Backend: mem.New()} + c := TestNewCache(t) + wbe := c.Wrap(be) + + h, data := randomData(50) + save(t, be, h, data) + + // load out of bounds + err := wbe.Load(context.TODO(), h, 100, 100, func(rd io.Reader) error { + t.Error("cache returned non-existant file section") + return errors.New("broken") + }) + test.Assert(t, strings.Contains(err.Error(), " is too short"), "expected too short error, got %v", err) + test.Equals(t, 1, be.ctr, "expected file to be loaded only once") + // file must nevertheless get cached + if !c.Has(h) { + t.Errorf("cache doesn't have file after load") + } + + // start within bounds, but request too large chunk + err = wbe.Load(context.TODO(), h, 100, 0, func(rd io.Reader) error { + t.Error("cache returned non-existant file section") + return errors.New("broken") + }) + test.Assert(t, strings.Contains(err.Error(), " is too short"), "expected too short error, got %v", err) + test.Equals(t, 1, be.ctr, "expected file to be loaded only once") +} + +func TestForget(t *testing.T) { + be := &loadCountingBackend{Backend: mem.New()} + c := TestNewCache(t) + wbe := c.Wrap(be) + + h, data := randomData(50) + save(t, be, h, data) + + loadAndCompare(t, wbe, h, data) + test.Equals(t, 1, be.ctr, "expected file to be loaded once") + + // must still exist even if load returns an error + exp := errors.New("error") + err := wbe.Load(context.TODO(), h, 0, 0, func(rd io.Reader) error { + return exp + }) + test.Equals(t, exp, err, "wrong error") + test.Assert(t, c.Has(h), "missing cache entry") + + test.OK(t, c.Forget(h)) + test.Assert(t, !c.Has(h), "cache entry should have been removed") + + // cache it again + loadAndCompare(t, wbe, h, data) + test.Assert(t, c.Has(h), "missing cache entry") + + // forget must delete file only once + err = c.Forget(h) + test.Assert(t, strings.Contains(err.Error(), "circuit breaker prevents repeated deletion of cached file"), "wrong error message %q", err) + test.Assert(t, c.Has(h), "cache entry should still exist") +} + type loadErrorBackend struct { backend.Backend loadError error @@ -140,7 +213,7 @@ func TestErrorBackend(t *testing.T) { loadTest := func(wg *sync.WaitGroup, be backend.Backend) { defer wg.Done() - buf, err := backend.LoadAll(context.TODO(), nil, be, h) + buf, err := backendtest.LoadAll(context.TODO(), be, h) if err == testErr { return } @@ -165,38 +238,3 @@ func TestErrorBackend(t *testing.T) { wg.Wait() } - -func TestBackendRemoveBroken(t *testing.T) { - be := mem.New() - c := TestNewCache(t) - - h, data := randomData(5234142) - // save directly in backend - save(t, be, h, data) - - // prime cache with broken copy - broken := append([]byte{}, data...) - broken[0] ^= 0xff - err := c.Save(h, bytes.NewReader(broken)) - test.OK(t, err) - - // loadall retries if broken data was returned - buf, err := backend.LoadAll(context.TODO(), nil, c.Wrap(be), h) - test.OK(t, err) - - if !bytes.Equal(buf, data) { - t.Fatalf("wrong data returned") - } - - // check that the cache now contains the correct data - rd, err := c.load(h, 0, 0) - defer func() { - _ = rd.Close() - }() - test.OK(t, err) - cached, err := io.ReadAll(rd) - test.OK(t, err) - if !bytes.Equal(cached, data) { - t.Fatalf("wrong data cache") - } -} diff --git a/internal/cache/cache.go b/internal/cache/cache.go index 19b3182df..a55b51c70 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -6,6 +6,7 @@ import ( "path/filepath" "regexp" "strconv" + "sync" "time" "github.com/pkg/errors" @@ -20,6 +21,8 @@ type Cache struct { path string Base string Created bool + + forgotten sync.Map } const dirMode = 0700 diff --git a/internal/cache/file.go b/internal/cache/file.go index 8d8bc5e84..12f5f23c5 100644 --- a/internal/cache/file.go +++ b/internal/cache/file.go @@ -1,6 +1,7 @@ package cache import ( + "fmt" "io" "os" "path/filepath" @@ -8,6 +9,7 @@ import ( "github.com/pkg/errors" "github.com/restic/restic/internal/backend" + "github.com/restic/restic/internal/backend/util" "github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/fs" @@ -31,54 +33,54 @@ func (c *Cache) canBeCached(t backend.FileType) bool { return ok } -// Load returns a reader that yields the contents of the file with the +// load returns a reader that yields the contents of the file with the // given handle. rd must be closed after use. If an error is returned, the -// ReadCloser is nil. -func (c *Cache) load(h backend.Handle, length int, offset int64) (io.ReadCloser, error) { +// ReadCloser is nil. The bool return value indicates whether the requested +// file exists in the cache. It can be true even when no reader is returned +// because length or offset are out of bounds +func (c *Cache) load(h backend.Handle, length int, offset int64) (io.ReadCloser, bool, error) { debug.Log("Load(%v, %v, %v) from cache", h, length, offset) if !c.canBeCached(h.Type) { - return nil, errors.New("cannot be cached") + return nil, false, errors.New("cannot be cached") } f, err := fs.Open(c.filename(h)) if err != nil { - return nil, errors.WithStack(err) + return nil, false, errors.WithStack(err) } fi, err := f.Stat() if err != nil { _ = f.Close() - return nil, errors.WithStack(err) + return nil, true, errors.WithStack(err) } size := fi.Size() if size <= int64(crypto.CiphertextLength(0)) { _ = f.Close() - _ = c.remove(h) - return nil, errors.Errorf("cached file %v is truncated, removing", h) + return nil, true, errors.Errorf("cached file %v is truncated", h) } if size < offset+int64(length) { _ = f.Close() - _ = c.remove(h) - return nil, errors.Errorf("cached file %v is too short, removing", h) + return nil, true, errors.Errorf("cached file %v is too short", h) } if offset > 0 { if _, err = f.Seek(offset, io.SeekStart); err != nil { _ = f.Close() - return nil, err + return nil, true, err } } if length <= 0 { - return f, nil + return f, true, nil } - return backend.LimitReadCloser(f, int64(length)), nil + return util.LimitReadCloser(f, int64(length)), true, nil } -// Save saves a file in the cache. -func (c *Cache) Save(h backend.Handle, rd io.Reader) error { +// save saves a file in the cache. +func (c *Cache) save(h backend.Handle, rd io.Reader) error { debug.Log("Save to cache: %v", h) if rd == nil { return errors.New("Save() called with nil reader") @@ -138,13 +140,34 @@ func (c *Cache) Save(h backend.Handle, rd io.Reader) error { return errors.WithStack(err) } -// Remove deletes a file. When the file is not cache, no error is returned. -func (c *Cache) remove(h backend.Handle) error { - if !c.Has(h) { - return nil +func (c *Cache) Forget(h backend.Handle) error { + h.IsMetadata = false + + if _, ok := c.forgotten.Load(h); ok { + // Delete a file at most once while restic runs. + // This prevents repeatedly caching and forgetting broken files + return fmt.Errorf("circuit breaker prevents repeated deletion of cached file %v", h) } - return fs.Remove(c.filename(h)) + removed, err := c.remove(h) + if removed { + c.forgotten.Store(h, struct{}{}) + } + return err +} + +// remove deletes a file. When the file is not cached, no error is returned. +func (c *Cache) remove(h backend.Handle) (bool, error) { + if !c.canBeCached(h.Type) { + return false, nil + } + + err := fs.Remove(c.filename(h)) + removed := err == nil + if errors.Is(err, os.ErrNotExist) { + err = nil + } + return removed, err } // Clear removes all files of type t from the cache that are not contained in diff --git a/internal/cache/file_test.go b/internal/cache/file_test.go index 7935f9806..331e3251d 100644 --- a/internal/cache/file_test.go +++ b/internal/cache/file_test.go @@ -14,7 +14,7 @@ import ( "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/fs" "github.com/restic/restic/internal/restic" - "github.com/restic/restic/internal/test" + rtest "github.com/restic/restic/internal/test" "golang.org/x/sync/errgroup" ) @@ -22,7 +22,7 @@ import ( func generateRandomFiles(t testing.TB, tpe backend.FileType, c *Cache) restic.IDSet { ids := restic.NewIDSet() for i := 0; i < rand.Intn(15)+10; i++ { - buf := test.Random(rand.Int(), 1<<19) + buf := rtest.Random(rand.Int(), 1<<19) id := restic.Hash(buf) h := backend.Handle{Type: tpe, Name: id.String()} @@ -30,7 +30,7 @@ func generateRandomFiles(t testing.TB, tpe backend.FileType, c *Cache) restic.ID t.Errorf("index %v present before save", id) } - err := c.Save(h, bytes.NewReader(buf)) + err := c.save(h, bytes.NewReader(buf)) if err != nil { t.Fatal(err) } @@ -48,10 +48,11 @@ func randomID(s restic.IDSet) restic.ID { } func load(t testing.TB, c *Cache, h backend.Handle) []byte { - rd, err := c.load(h, 0, 0) + rd, inCache, err := c.load(h, 0, 0) if err != nil { t.Fatal(err) } + rtest.Equals(t, true, inCache, "expected inCache flag to be true") if rd == nil { t.Fatalf("load() returned nil reader") @@ -144,14 +145,14 @@ func TestFileLoad(t *testing.T) { c := TestNewCache(t) // save about 5 MiB of data in the cache - data := test.Random(rand.Int(), 5234142) + data := rtest.Random(rand.Int(), 5234142) id := restic.ID{} copy(id[:], data) h := backend.Handle{ Type: restic.PackFile, Name: id.String(), } - if err := c.Save(h, bytes.NewReader(data)); err != nil { + if err := c.save(h, bytes.NewReader(data)); err != nil { t.Fatalf("Save() returned error: %v", err) } @@ -169,10 +170,11 @@ func TestFileLoad(t *testing.T) { for _, test := range tests { t.Run(fmt.Sprintf("%v/%v", test.length, test.offset), func(t *testing.T) { - rd, err := c.load(h, test.length, test.offset) + rd, inCache, err := c.load(h, test.length, test.offset) if err != nil { t.Fatal(err) } + rtest.Equals(t, true, inCache, "expected inCache flag to be true") buf, err := io.ReadAll(rd) if err != nil { @@ -225,7 +227,7 @@ func TestFileSaveConcurrent(t *testing.T) { var ( c = TestNewCache(t) - data = test.Random(1, 10000) + data = rtest.Random(1, 10000) g errgroup.Group id restic.ID ) @@ -237,7 +239,7 @@ func TestFileSaveConcurrent(t *testing.T) { } for i := 0; i < nproc/2; i++ { - g.Go(func() error { return c.Save(h, bytes.NewReader(data)) }) + g.Go(func() error { return c.save(h, bytes.NewReader(data)) }) // Can't use load because only the main goroutine may call t.Fatal. g.Go(func() error { @@ -245,7 +247,7 @@ func TestFileSaveConcurrent(t *testing.T) { // ensure is ENOENT or nil error. time.Sleep(time.Duration(100+rand.Intn(200)) * time.Millisecond) - f, err := c.load(h, 0, 0) + f, _, err := c.load(h, 0, 0) t.Logf("Load error: %v", err) switch { case err == nil: @@ -264,23 +266,23 @@ func TestFileSaveConcurrent(t *testing.T) { }) } - test.OK(t, g.Wait()) + rtest.OK(t, g.Wait()) saved := load(t, c, h) - test.Equals(t, data, saved) + rtest.Equals(t, data, saved) } func TestFileSaveAfterDamage(t *testing.T) { c := TestNewCache(t) - test.OK(t, fs.RemoveAll(c.path)) + rtest.OK(t, fs.RemoveAll(c.path)) // save a few bytes of data in the cache - data := test.Random(123456789, 42) + data := rtest.Random(123456789, 42) id := restic.Hash(data) h := backend.Handle{ Type: restic.PackFile, Name: id.String(), } - if err := c.Save(h, bytes.NewReader(data)); err == nil { + if err := c.save(h, bytes.NewReader(data)); err == nil { t.Fatal("Missing error when saving to deleted cache directory") } } diff --git a/internal/checker/checker.go b/internal/checker/checker.go index f60a11f62..d6474f86e 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -532,6 +532,21 @@ func (e *partialReadError) Error() string { // checkPack reads a pack and checks the integrity of all blobs. func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []restic.Blob, size int64, bufRd *bufio.Reader, dec *zstd.Decoder) error { + err := checkPackInner(ctx, r, id, blobs, size, bufRd, dec) + if err != nil { + // retry pack verification to detect transient errors + err2 := checkPackInner(ctx, r, id, blobs, size, bufRd, dec) + if err2 != nil { + err = err2 + } else { + err = fmt.Errorf("check successful on second attempt, original error %w", err) + } + } + return err +} + +func checkPackInner(ctx context.Context, r restic.Repository, id restic.ID, blobs []restic.Blob, size int64, bufRd *bufio.Reader, dec *zstd.Decoder) error { + debug.Log("checking pack %v", id.String()) if len(blobs) == 0 { @@ -590,11 +605,13 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []r if err != nil { return &partialReadError{err} } + curPos += minHdrStart - curPos } // read remainder, which should be the pack header var err error - hdrBuf, err = io.ReadAll(bufRd) + hdrBuf = make([]byte, int(size-int64(curPos))) + _, err = io.ReadFull(bufRd, hdrBuf) if err != nil { return &partialReadError{err} } @@ -608,12 +625,12 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []r // failed to load the pack file, return as further checks cannot succeed anyways debug.Log(" error streaming pack (partial %v): %v", isPartialReadError, err) if isPartialReadError { - return &ErrPackData{PackID: id, errs: append(errs, errors.Errorf("partial download error: %w", err))} + return &ErrPackData{PackID: id, errs: append(errs, fmt.Errorf("partial download error: %w", err))} } // The check command suggests to repair files for which a `ErrPackData` is returned. However, this file // completely failed to download such that there's no point in repairing anything. - return errors.Errorf("download error: %w", err) + return fmt.Errorf("download error: %w", err) } if !hash.Equal(id) { debug.Log("pack ID does not match, want %v, got %v", id, hash) @@ -725,6 +742,7 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p } err := checkPack(ctx, c.repo, ps.id, ps.blobs, ps.size, bufRd, dec) + p.Add(1) if err == nil { continue diff --git a/internal/checker/checker_test.go b/internal/checker/checker_test.go index 9746e9f5d..ee18f893a 100644 --- a/internal/checker/checker_test.go +++ b/internal/checker/checker_test.go @@ -8,6 +8,7 @@ import ( "path/filepath" "sort" "strconv" + "strings" "sync" "testing" "time" @@ -325,42 +326,91 @@ func induceError(data []byte) { data[pos] ^= 1 } +// errorOnceBackend randomly modifies data when reading a file for the first time. +type errorOnceBackend struct { + backend.Backend + m sync.Map +} + +func (b *errorOnceBackend) Load(ctx context.Context, h backend.Handle, length int, offset int64, consumer func(rd io.Reader) error) error { + _, isRetry := b.m.LoadOrStore(h, struct{}{}) + return b.Backend.Load(ctx, h, length, offset, func(rd io.Reader) error { + if !isRetry && h.Type != restic.ConfigFile { + return consumer(errorReadCloser{rd}) + } + return consumer(rd) + }) +} + func TestCheckerModifiedData(t *testing.T) { repo := repository.TestRepository(t) sn := archiver.TestSnapshot(t, repo, ".", nil) t.Logf("archived as %v", sn.ID().Str()) - beError := &errorBackend{Backend: repo.Backend()} - checkRepo := repository.TestOpenBackend(t, beError) + errBe := &errorBackend{Backend: repo.Backend()} - chkr := checker.New(checkRepo, false) + for _, test := range []struct { + name string + be backend.Backend + damage func() + check func(t *testing.T, err error) + }{ + { + "errorBackend", + errBe, + func() { + errBe.ProduceErrors = true + }, + func(t *testing.T, err error) { + if err == nil { + t.Fatal("no error found, checker is broken") + } + }, + }, + { + "errorOnceBackend", + &errorOnceBackend{Backend: repo.Backend()}, + func() {}, + func(t *testing.T, err error) { + if !strings.Contains(err.Error(), "check successful on second attempt, original error pack") { + t.Fatalf("wrong error found, got %v", err) + } + }, + }, + } { + t.Run(test.name, func(t *testing.T) { + checkRepo := repository.TestOpenBackend(t, test.be) - hints, errs := chkr.LoadIndex(context.TODO(), nil) - if len(errs) > 0 { - t.Fatalf("expected no errors, got %v: %v", len(errs), errs) - } + chkr := checker.New(checkRepo, false) - if len(hints) > 0 { - t.Errorf("expected no hints, got %v: %v", len(hints), hints) - } + hints, errs := chkr.LoadIndex(context.TODO(), nil) + if len(errs) > 0 { + t.Fatalf("expected no errors, got %v: %v", len(errs), errs) + } - beError.ProduceErrors = true - errFound := false - for _, err := range checkPacks(chkr) { - t.Logf("pack error: %v", err) - } + if len(hints) > 0 { + t.Errorf("expected no hints, got %v: %v", len(hints), hints) + } - for _, err := range checkStruct(chkr) { - t.Logf("struct error: %v", err) - } + test.damage() + var err error + for _, err := range checkPacks(chkr) { + t.Logf("pack error: %v", err) + } - for _, err := range checkData(chkr) { - t.Logf("data error: %v", err) - errFound = true - } + for _, err := range checkStruct(chkr) { + t.Logf("struct error: %v", err) + } - if !errFound { - t.Fatal("no error found, checker is broken") + for _, cerr := range checkData(chkr) { + t.Logf("data error: %v", cerr) + if err == nil { + err = cerr + } + } + + test.check(t, err) + }) } } diff --git a/internal/migrations/upgrade_repo_v2.go b/internal/migrations/upgrade_repo_v2.go index 585d9e8c7..6f4225947 100644 --- a/internal/migrations/upgrade_repo_v2.go +++ b/internal/migrations/upgrade_repo_v2.go @@ -3,7 +3,6 @@ package migrations import ( "context" "fmt" - "io" "os" "path/filepath" @@ -89,11 +88,7 @@ func (m *UpgradeRepoV2) Apply(ctx context.Context, repo restic.Repository) error h := backend.Handle{Type: restic.ConfigFile} // read raw config file and save it to a temp dir, just in case - var rawConfigFile []byte - err = repo.Backend().Load(ctx, h, 0, 0, func(rd io.Reader) (err error) { - rawConfigFile, err = io.ReadAll(rd) - return err - }) + rawConfigFile, err := repo.LoadRaw(ctx, restic.ConfigFile, restic.ID{}) if err != nil { return fmt.Errorf("load config file failed: %w", err) } diff --git a/internal/repository/key.go b/internal/repository/key.go index 0604b44df..08f997544 100644 --- a/internal/repository/key.go +++ b/internal/repository/key.go @@ -178,8 +178,7 @@ func SearchKey(ctx context.Context, s *Repository, password string, maxKeys int, // LoadKey loads a key from the backend. func LoadKey(ctx context.Context, s *Repository, id restic.ID) (k *Key, err error) { - h := backend.Handle{Type: restic.KeyFile, Name: id.String()} - data, err := backend.LoadAll(ctx, nil, s.be, h) + data, err := s.LoadRaw(ctx, restic.KeyFile, id) if err != nil { return nil, err } diff --git a/internal/repository/raw.go b/internal/repository/raw.go new file mode 100644 index 000000000..31443b010 --- /dev/null +++ b/internal/repository/raw.go @@ -0,0 +1,56 @@ +package repository + +import ( + "bytes" + "context" + "fmt" + "io" + + "github.com/restic/restic/internal/backend" + "github.com/restic/restic/internal/restic" +) + +// LoadRaw reads all data stored in the backend for the file with id and filetype t. +// If the backend returns data that does not match the id, then the buffer is returned +// along with an error that is a restic.ErrInvalidData error. +func (r *Repository) LoadRaw(ctx context.Context, t restic.FileType, id restic.ID) (buf []byte, err error) { + h := backend.Handle{Type: t, Name: id.String()} + + buf, err = loadRaw(ctx, r.be, h) + + // retry loading damaged data only once. If a file fails to download correctly + // the second time, then it is likely corrupted at the backend. + if h.Type != backend.ConfigFile && id != restic.Hash(buf) { + if r.Cache != nil { + // Cleanup cache to make sure it's not the cached copy that is broken. + // Ignore error as there's not much we can do in that case. + _ = r.Cache.Forget(h) + } + + buf, err = loadRaw(ctx, r.be, h) + + if err == nil && id != restic.Hash(buf) { + // Return corrupted data to the caller if it is still broken the second time to + // let the caller decide what to do with the data. + return buf, fmt.Errorf("LoadRaw(%v): %w", h, restic.ErrInvalidData) + } + } + + if err != nil { + return nil, err + } + return buf, nil +} + +func loadRaw(ctx context.Context, be backend.Backend, h backend.Handle) (buf []byte, err error) { + err = be.Load(ctx, h, 0, 0, func(rd io.Reader) error { + wr := new(bytes.Buffer) + _, cerr := io.Copy(wr, rd) + if cerr != nil { + return cerr + } + buf = wr.Bytes() + return cerr + }) + return buf, err +} diff --git a/internal/repository/raw_test.go b/internal/repository/raw_test.go new file mode 100644 index 000000000..28786dbcd --- /dev/null +++ b/internal/repository/raw_test.go @@ -0,0 +1,108 @@ +package repository_test + +import ( + "bytes" + "context" + "io" + "testing" + + "github.com/restic/restic/internal/backend" + "github.com/restic/restic/internal/backend/mem" + "github.com/restic/restic/internal/backend/mock" + "github.com/restic/restic/internal/cache" + "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/repository" + "github.com/restic/restic/internal/restic" + rtest "github.com/restic/restic/internal/test" +) + +const KiB = 1 << 10 +const MiB = 1 << 20 + +func TestLoadRaw(t *testing.T) { + b := mem.New() + repo, err := repository.New(b, repository.Options{}) + rtest.OK(t, err) + + for i := 0; i < 5; i++ { + data := rtest.Random(23+i, 500*KiB) + + id := restic.Hash(data) + h := backend.Handle{Name: id.String(), Type: backend.PackFile} + err := b.Save(context.TODO(), h, backend.NewByteReader(data, b.Hasher())) + rtest.OK(t, err) + + buf, err := repo.LoadRaw(context.TODO(), backend.PackFile, id) + rtest.OK(t, err) + + if len(buf) != len(data) { + t.Errorf("length of returned buffer does not match, want %d, got %d", len(data), len(buf)) + continue + } + + if !bytes.Equal(buf, data) { + t.Errorf("wrong data returned") + continue + } + } +} + +func TestLoadRawBroken(t *testing.T) { + b := mock.NewBackend() + repo, err := repository.New(b, repository.Options{}) + rtest.OK(t, err) + + data := rtest.Random(23, 10*KiB) + id := restic.Hash(data) + // damage buffer + data[0] ^= 0xff + + b.OpenReaderFn = func(ctx context.Context, h backend.Handle, length int, offset int64) (io.ReadCloser, error) { + return io.NopCloser(bytes.NewReader(data)), nil + } + + // must detect but still return corrupt data + buf, err := repo.LoadRaw(context.TODO(), backend.PackFile, id) + rtest.Assert(t, bytes.Equal(buf, data), "wrong data returned") + rtest.Assert(t, errors.Is(err, restic.ErrInvalidData), "missing expected ErrInvalidData error, got %v", err) + + // cause the first access to fail, but repair the data for the second access + data[0] ^= 0xff + loadCtr := 0 + b.OpenReaderFn = func(ctx context.Context, h backend.Handle, length int, offset int64) (io.ReadCloser, error) { + data[0] ^= 0xff + loadCtr++ + return io.NopCloser(bytes.NewReader(data)), nil + } + + // must retry load of corrupted data + buf, err = repo.LoadRaw(context.TODO(), backend.PackFile, id) + rtest.OK(t, err) + rtest.Assert(t, bytes.Equal(buf, data), "wrong data returned") + rtest.Equals(t, 2, loadCtr, "missing retry on broken data") +} + +func TestLoadRawBrokenWithCache(t *testing.T) { + b := mock.NewBackend() + c := cache.TestNewCache(t) + repo, err := repository.New(b, repository.Options{}) + rtest.OK(t, err) + repo.UseCache(c) + + data := rtest.Random(23, 10*KiB) + id := restic.Hash(data) + + loadCtr := 0 + // cause the first access to fail, but repair the data for the second access + b.OpenReaderFn = func(ctx context.Context, h backend.Handle, length int, offset int64) (io.ReadCloser, error) { + data[0] ^= 0xff + loadCtr++ + return io.NopCloser(bytes.NewReader(data)), nil + } + + // must retry load of corrupted data + buf, err := repo.LoadRaw(context.TODO(), backend.SnapshotFile, id) + rtest.OK(t, err) + rtest.Assert(t, bytes.Equal(buf, data), "wrong data returned") + rtest.Equals(t, 2, loadCtr, "missing retry on broken data") +} diff --git a/internal/repository/repair_pack.go b/internal/repository/repair_pack.go index a4261517a..a0bd56012 100644 --- a/internal/repository/repair_pack.go +++ b/internal/repository/repair_pack.go @@ -31,12 +31,8 @@ func RepairPacks(ctx context.Context, repo restic.Repository, ids restic.IDSet, err := repo.LoadBlobsFromPack(wgCtx, b.PackID, blobs, func(blob restic.BlobHandle, buf []byte, err error) error { if err != nil { - // Fallback path - buf, err = repo.LoadBlob(wgCtx, blob.Type, blob.ID, nil) - if err != nil { - printer.E("failed to load blob %v: %v", blob.ID, err) - return nil - } + printer.E("failed to load blob %v: %v", blob.ID, err) + return nil } id, _, _, err := repo.SaveBlob(wgCtx, blob.Type, buf, restic.ID{}, true) if !id.Equal(blob.ID) { diff --git a/internal/repository/repair_pack_test.go b/internal/repository/repair_pack_test.go index 078017d21..0d16d251f 100644 --- a/internal/repository/repair_pack_test.go +++ b/internal/repository/repair_pack_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/restic/restic/internal/backend" + backendtest "github.com/restic/restic/internal/backend/test" "github.com/restic/restic/internal/index" "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" @@ -24,7 +25,7 @@ func listBlobs(repo restic.Repository) restic.BlobSet { } func replaceFile(t *testing.T, repo restic.Repository, h backend.Handle, damage func([]byte) []byte) { - buf, err := backend.LoadAll(context.TODO(), nil, repo.Backend(), h) + buf, err := backendtest.LoadAll(context.TODO(), repo.Backend(), h) test.OK(t, err) buf = damage(buf) test.OK(t, repo.Backend().Remove(context.TODO(), h)) diff --git a/internal/repository/repository.go b/internal/repository/repository.go index a922b44e3..bbdaa16a7 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -174,46 +174,11 @@ func (r *Repository) LoadUnpacked(ctx context.Context, t restic.FileType, id res id = restic.ID{} } - ctx, cancel := context.WithCancel(ctx) - - h := backend.Handle{Type: t, Name: id.String()} - retriedInvalidData := false - var dataErr error - wr := new(bytes.Buffer) - - err := r.be.Load(ctx, h, 0, 0, func(rd io.Reader) error { - // make sure this call is idempotent, in case an error occurs - wr.Reset() - _, cerr := io.Copy(wr, rd) - if cerr != nil { - return cerr - } - - buf := wr.Bytes() - if t != restic.ConfigFile && !restic.Hash(buf).Equal(id) { - debug.Log("retry loading broken blob %v", h) - if !retriedInvalidData { - retriedInvalidData = true - } else { - // with a canceled context there is not guarantee which error will - // be returned by `be.Load`. - dataErr = fmt.Errorf("load(%v): %w", h, restic.ErrInvalidData) - cancel() - } - return restic.ErrInvalidData - - } - return nil - }) - - if dataErr != nil { - return nil, dataErr - } + buf, err := r.LoadRaw(ctx, t, id) if err != nil { return nil, err } - buf := wr.Bytes() nonce, ciphertext := buf[:r.key.NonceSize()], buf[r.key.NonceSize():] plaintext, err := r.key.Open(ciphertext[:0], nonce, ciphertext, nil) if err != nil { @@ -270,16 +235,27 @@ func (r *Repository) LoadBlob(ctx context.Context, t restic.BlobType, id restic. // try cached pack files first sortCachedPacksFirst(r.Cache, blobs) - var lastError error - for _, blob := range blobs { - debug.Log("blob %v/%v found: %v", t, id, blob) - - if blob.Type != t { - debug.Log("blob %v has wrong block type, want %v", blob, t) + buf, err := r.loadBlob(ctx, blobs, buf) + if err != nil { + if r.Cache != nil { + for _, blob := range blobs { + h := backend.Handle{Type: restic.PackFile, Name: blob.PackID.String(), IsMetadata: blob.Type.IsMetadata()} + // ignore errors as there's not much we can do here + _ = r.Cache.Forget(h) + } } + buf, err = r.loadBlob(ctx, blobs, buf) + } + return buf, err +} + +func (r *Repository) loadBlob(ctx context.Context, blobs []restic.PackedBlob, buf []byte) ([]byte, error) { + var lastError error + for _, blob := range blobs { + debug.Log("blob %v found: %v", blob.BlobHandle, blob) // load blob from pack - h := backend.Handle{Type: restic.PackFile, Name: blob.PackID.String(), IsMetadata: t.IsMetadata()} + h := backend.Handle{Type: restic.PackFile, Name: blob.PackID.String(), IsMetadata: blob.Type.IsMetadata()} switch { case cap(buf) < int(blob.Length): @@ -288,42 +264,26 @@ func (r *Repository) LoadBlob(ctx context.Context, t restic.BlobType, id restic. buf = buf[:blob.Length] } - n, err := backend.ReadAt(ctx, r.be, h, int64(blob.Offset), buf) + _, err := backend.ReadAt(ctx, r.be, h, int64(blob.Offset), buf) if err != nil { debug.Log("error loading blob %v: %v", blob, err) lastError = err continue } - if uint(n) != blob.Length { - lastError = errors.Errorf("error loading blob %v: wrong length returned, want %d, got %d", - id.Str(), blob.Length, uint(n)) - debug.Log("lastError: %v", lastError) - continue - } + it := NewPackBlobIterator(blob.PackID, newByteReader(buf), uint(blob.Offset), []restic.Blob{blob.Blob}, r.key, r.getZstdDecoder()) + pbv, err := it.Next() - // decrypt - nonce, ciphertext := buf[:r.key.NonceSize()], buf[r.key.NonceSize():] - plaintext, err := r.key.Open(ciphertext[:0], nonce, ciphertext, nil) + if err == nil { + err = pbv.Err + } if err != nil { - lastError = errors.Errorf("decrypting blob %v failed: %v", id, err) - continue - } - - if blob.IsCompressed() { - plaintext, err = r.getZstdDecoder().DecodeAll(plaintext, make([]byte, 0, blob.DataLength())) - if err != nil { - lastError = errors.Errorf("decompressing blob %v failed: %v", id, err) - continue - } - } - - // check hash - if !restic.Hash(plaintext).Equal(id) { - lastError = errors.Errorf("blob %v returned invalid hash", id) + debug.Log("error decoding blob %v: %v", blob, err) + lastError = err continue } + plaintext := pbv.Plaintext if len(plaintext) > cap(buf) { return plaintext, nil } @@ -337,7 +297,7 @@ func (r *Repository) LoadBlob(ctx context.Context, t restic.BlobType, id restic. return nil, lastError } - return nil, errors.Errorf("loading blob %v from %v packs failed", id.Str(), len(blobs)) + return nil, errors.Errorf("loading %v from %v packs failed", blobs[0].BlobHandle, len(blobs)) } // LookupBlobSize returns the size of blob id. @@ -909,7 +869,17 @@ func (r *Repository) List(ctx context.Context, t restic.FileType, fn func(restic func (r *Repository) ListPack(ctx context.Context, id restic.ID, size int64) ([]restic.Blob, uint32, error) { h := backend.Handle{Type: restic.PackFile, Name: id.String()} - return pack.List(r.Key(), backend.ReaderAt(ctx, r.Backend(), h), size) + entries, hdrSize, err := pack.List(r.Key(), backend.ReaderAt(ctx, r.Backend(), h), size) + if err != nil { + if r.Cache != nil { + // ignore error as there is not much we can do here + _ = r.Cache.Forget(h) + } + + // retry on error + entries, hdrSize, err = pack.List(r.Key(), backend.ReaderAt(ctx, r.Backend(), h), size) + } + return entries, hdrSize, err } // Delete calls backend.Delete() if implemented, and returns an error @@ -1023,7 +993,7 @@ func streamPack(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn } func streamPackPart(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, dec *zstd.Decoder, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { - h := backend.Handle{Type: restic.PackFile, Name: packID.String(), IsMetadata: false} + h := backend.Handle{Type: restic.PackFile, Name: packID.String(), IsMetadata: blobs[0].Type.IsMetadata()} dataStart := blobs[0].Offset dataEnd := blobs[len(blobs)-1].Offset + blobs[len(blobs)-1].Length diff --git a/internal/repository/repository_test.go b/internal/repository/repository_test.go index 48a56a1fd..67622fdf8 100644 --- a/internal/repository/repository_test.go +++ b/internal/repository/repository_test.go @@ -9,16 +9,20 @@ import ( "math/rand" "os" "path/filepath" - "strings" + "sync" "testing" "time" "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend/local" + "github.com/restic/restic/internal/backend/mem" + "github.com/restic/restic/internal/cache" "github.com/restic/restic/internal/crypto" + "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/index" "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" + "github.com/restic/restic/internal/test" rtest "github.com/restic/restic/internal/test" "golang.org/x/sync/errgroup" ) @@ -139,6 +143,28 @@ func testLoadBlob(t *testing.T, version uint) { } } +func TestLoadBlobBroken(t *testing.T) { + be := mem.New() + repo := repository.TestRepositoryWithBackend(t, &damageOnceBackend{Backend: be}, restic.StableRepoVersion, repository.Options{}).(*repository.Repository) + buf := test.Random(42, 1000) + + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + id, _, _, err := repo.SaveBlob(context.TODO(), restic.TreeBlob, buf, restic.ID{}, false) + rtest.OK(t, err) + rtest.OK(t, repo.Flush(context.Background())) + + // setup cache after saving the blob to make sure that the damageOnceBackend damages the cached data + c := cache.TestNewCache(t) + repo.UseCache(c) + + data, err := repo.LoadBlob(context.TODO(), restic.TreeBlob, id, nil) + rtest.OK(t, err) + rtest.Assert(t, bytes.Equal(buf, data), "data mismatch") + pack := repo.Index().Lookup(restic.BlobHandle{Type: restic.TreeBlob, ID: id})[0].PackID + rtest.Assert(t, c.Has(backend.Handle{Type: restic.PackFile, Name: pack.String()}), "expected tree pack to be cached") +} + func BenchmarkLoadBlob(b *testing.B) { repository.BenchmarkAllVersions(b, benchmarkLoadBlob) } @@ -254,16 +280,13 @@ func TestRepositoryLoadUnpackedBroken(t *testing.T) { err := repo.Backend().Save(context.TODO(), h, backend.NewByteReader(data, repo.Backend().Hasher())) rtest.OK(t, err) - // without a retry backend this will just return an error that the file is broken _, err = repo.LoadUnpacked(context.TODO(), restic.IndexFile, id) - if err == nil { - t.Fatal("missing expected error") - } - rtest.Assert(t, strings.Contains(err.Error(), "invalid data returned"), "unexpected error: %v", err) + rtest.Assert(t, errors.Is(err, restic.ErrInvalidData), "unexpected error: %v", err) } type damageOnceBackend struct { backend.Backend + m sync.Map } func (be *damageOnceBackend) Load(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error { @@ -271,13 +294,14 @@ func (be *damageOnceBackend) Load(ctx context.Context, h backend.Handle, length if h.Type == restic.ConfigFile { return be.Backend.Load(ctx, h, length, offset, fn) } - // return broken data on the first try - err := be.Backend.Load(ctx, h, length+1, offset, fn) - if err != nil { - // retry - err = be.Backend.Load(ctx, h, length, offset, fn) + + h.IsMetadata = false + _, isRetry := be.m.LoadOrStore(h, true) + if !isRetry { + // return broken data on the first try + offset++ } - return err + return be.Backend.Load(ctx, h, length, offset, fn) } func TestRepositoryLoadUnpackedRetryBroken(t *testing.T) { @@ -398,3 +422,38 @@ func TestInvalidCompression(t *testing.T) { _, err = repository.New(nil, repository.Options{Compression: comp}) rtest.Assert(t, err != nil, "missing error") } + +func TestListPack(t *testing.T) { + be := mem.New() + repo := repository.TestRepositoryWithBackend(t, &damageOnceBackend{Backend: be}, restic.StableRepoVersion, repository.Options{}).(*repository.Repository) + buf := test.Random(42, 1000) + + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + id, _, _, err := repo.SaveBlob(context.TODO(), restic.TreeBlob, buf, restic.ID{}, false) + rtest.OK(t, err) + rtest.OK(t, repo.Flush(context.Background())) + + // setup cache after saving the blob to make sure that the damageOnceBackend damages the cached data + c := cache.TestNewCache(t) + repo.UseCache(c) + + // Forcibly cache pack file + packID := repo.Index().Lookup(restic.BlobHandle{Type: restic.TreeBlob, ID: id})[0].PackID + rtest.OK(t, repo.Backend().Load(context.TODO(), backend.Handle{Type: restic.PackFile, IsMetadata: true, Name: packID.String()}, 0, 0, func(rd io.Reader) error { return nil })) + + // Get size to list pack + var size int64 + rtest.OK(t, repo.List(context.TODO(), restic.PackFile, func(id restic.ID, sz int64) error { + if id == packID { + size = sz + } + return nil + })) + + blobs, _, err := repo.ListPack(context.TODO(), packID, size) + rtest.OK(t, err) + rtest.Assert(t, len(blobs) == 1 && blobs[0].ID == id, "unexpected blobs in pack: %v", blobs) + + rtest.Assert(t, !c.Has(backend.Handle{Type: restic.PackFile, Name: packID.String()}), "tree pack should no longer be cached as ListPack does not set IsMetadata in the backend.Handle") +} diff --git a/internal/restic/repository.go b/internal/restic/repository.go index 7a3389e00..5393e0701 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -57,6 +57,11 @@ type Repository interface { // LoadUnpacked loads and decrypts the file with the given type and ID. LoadUnpacked(ctx context.Context, t FileType, id ID) (data []byte, err error) SaveUnpacked(context.Context, FileType, []byte) (ID, error) + + // LoadRaw reads all data stored in the backend for the file with id and filetype t. + // If the backend returns data that does not match the id, then the buffer is returned + // along with an error that is a restic.ErrInvalidData error. + LoadRaw(ctx context.Context, t FileType, id ID) (data []byte, err error) } type FileType = backend.FileType