diff --git a/cmd/restic/integration_test.go b/cmd/restic/integration_test.go index 42936d2ea..f926306eb 100644 --- a/cmd/restic/integration_test.go +++ b/cmd/restic/integration_test.go @@ -2135,7 +2135,4 @@ func TestBackendLoadWriteTo(t *testing.T) { firstSnapshot := testRunList(t, "snapshots", env.gopts) rtest.Assert(t, len(firstSnapshot) == 1, "expected one snapshot, got %v", firstSnapshot) - - // test readData using the hashing.Reader - testRunCheck(t, env.gopts) } diff --git a/internal/checker/checker.go b/internal/checker/checker.go index e842a08be..3186b90d0 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -1,14 +1,18 @@ package checker import ( + "bufio" + "bytes" "context" "fmt" "io" - "os" + "sort" "sync" + "github.com/minio/sha256-simd" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/hashing" "github.com/restic/restic/internal/pack" "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" @@ -436,78 +440,112 @@ func (c *Checker) GetPacks() map[restic.ID]int64 { } // checkPack reads a pack and checks the integrity of all blobs. -func checkPack(ctx context.Context, r restic.Repository, id restic.ID, size int64) error { - debug.Log("checking pack %v", id) - h := restic.Handle{Type: restic.PackFile, Name: id.String()} +func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []restic.Blob, size int64) error { + debug.Log("checking pack %v", id.String()) - packfile, hash, realSize, err := repository.DownloadAndHash(ctx, r.Backend(), h) - if err != nil { - return errors.Wrap(err, "checkPack") + if len(blobs) == 0 { + return errors.Errorf("pack %v is empty or not indexed", id) } - defer func() { - _ = packfile.Close() - _ = os.Remove(packfile.Name()) - }() + // sanity check blobs in index + sort.Slice(blobs, func(i, j int) bool { + return blobs[i].Offset < blobs[j].Offset + }) + idxHdrSize := pack.HeaderSize + len(blobs)*int(pack.EntrySize) + lastBlobEnd := 0 + nonContinuousPack := false + for _, blob := range blobs { + if lastBlobEnd != int(blob.Offset) { + nonContinuousPack = true + } + lastBlobEnd = int(blob.Offset + blob.Length) + } + // size was calculated by masterindex.PackSize, thus there's no need to recalculate it here - debug.Log("hash for pack %v is %v", id, hash) + var errs []error + if nonContinuousPack { + debug.Log("Index for pack contains gaps / overlaps, blobs: %v", blobs) + errs = append(errs, errors.New("Index for pack contains gaps / overlapping blobs")) + } + // calculate hash on-the-fly while reading the pack and capture pack header + var hash restic.ID + var hdrBuf *bytes.Buffer + hashingLoader := func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + return r.Backend().Load(ctx, h, int(size), 0, func(rd io.Reader) error { + hrd := hashing.NewReader(rd, sha256.New()) + + // create a buffer that is large enough to be reused by repository.StreamPack + // this ensures that we can read the pack header later on + bufferSize := int(size) + if bufferSize > repository.MaxStreamBufferSize { + bufferSize = repository.MaxStreamBufferSize + } + bufRd := bufio.NewReaderSize(hrd, bufferSize) + + // skip to start of first blob, offset == 0 for correct pack files + _, err := bufRd.Discard(int(offset)) + if err != nil { + return err + } + + err = fn(bufRd) + if err != nil { + return err + } + + // skip enough bytes until we reach the possible header start + curPos := length + int(offset) + minHdrStart := int(size) - pack.MaxHeaderSize + if minHdrStart > curPos { + _, err := bufRd.Discard(minHdrStart - curPos) + if err != nil { + return err + } + } + + // read remainder, which should be the pack header + hdrBuf = new(bytes.Buffer) + _, err = io.Copy(hdrBuf, bufRd) + if err != nil { + return err + } + + hash = restic.IDFromHash(hrd.Sum(nil)) + return nil + }) + } + + err := repository.StreamPack(ctx, hashingLoader, r.Key(), id, blobs, func(blob restic.BlobHandle, buf []byte, err error) error { + debug.Log(" check blob %v: %v", blob.ID, blob) + if err != nil { + debug.Log(" error verifying blob %v: %v", blob.ID, err) + errs = append(errs, errors.Errorf("blob %v: %v", blob.ID, err)) + } + return nil + }) + if err != nil { + // failed to load the pack file, return as further checks cannot succeed anyways + debug.Log(" error streaming pack: %v", err) + return errors.Errorf("pack %v failed to download: %v", err) + } if !hash.Equal(id) { debug.Log("Pack ID does not match, want %v, got %v", id, hash) return errors.Errorf("Pack ID does not match, want %v, got %v", id.Str(), hash.Str()) } - if realSize != size { - debug.Log("Pack size does not match, want %v, got %v", size, realSize) - return errors.Errorf("Pack size does not match, want %v, got %v", size, realSize) - } - - blobs, hdrSize, err := pack.List(r.Key(), packfile, size) + blobs, hdrSize, err := pack.List(r.Key(), bytes.NewReader(hdrBuf.Bytes()), int64(hdrBuf.Len())) if err != nil { return err } - var errs []error - var buf []byte - sizeFromBlobs := uint(hdrSize) + if uint32(idxHdrSize) != hdrSize { + debug.Log("Pack header size does not match, want %v, got %v", idxHdrSize, hdrSize) + errs = append(errs, errors.Errorf("Pack header size does not match, want %v, got %v", idxHdrSize, hdrSize)) + } + idx := r.Index() - for i, blob := range blobs { - sizeFromBlobs += blob.Length - debug.Log(" check blob %d: %v", i, blob) - - buf = buf[:cap(buf)] - if uint(len(buf)) < blob.Length { - buf = make([]byte, blob.Length) - } - buf = buf[:blob.Length] - - _, err := packfile.Seek(int64(blob.Offset), 0) - if err != nil { - return errors.Errorf("Seek(%v): %v", blob.Offset, err) - } - - _, err = io.ReadFull(packfile, buf) - if err != nil { - debug.Log(" error loading blob %v: %v", blob.ID, err) - errs = append(errs, errors.Errorf("blob %v: %v", i, err)) - continue - } - - nonce, ciphertext := buf[:r.Key().NonceSize()], buf[r.Key().NonceSize():] - plaintext, err := r.Key().Open(ciphertext[:0], nonce, ciphertext, nil) - if err != nil { - debug.Log(" error decrypting blob %v: %v", blob.ID, err) - errs = append(errs, errors.Errorf("blob %v: %v", i, err)) - continue - } - - hash := restic.Hash(plaintext) - if !hash.Equal(blob.ID) { - debug.Log(" Blob ID does not match, want %v, got %v", blob.ID, hash) - errs = append(errs, errors.Errorf("Blob ID does not match, want %v, got %v", blob.ID.Str(), hash.Str())) - continue - } - + for _, blob := range blobs { // Check if blob is contained in index and position is correct idxHas := false for _, pb := range idx.Lookup(blob.BlobHandle) { @@ -522,11 +560,6 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, size int6 } } - if int64(sizeFromBlobs) != size { - debug.Log("Pack size does not match, want %v, got %v", size, sizeFromBlobs) - errs = append(errs, errors.Errorf("Pack size does not match, want %v, got %v", size, sizeFromBlobs)) - } - if len(errs) > 0 { return errors.Errorf("pack %v contains %v errors: %v", id.Str(), len(errs), errs) } @@ -544,17 +577,18 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p defer close(errChan) g, ctx := errgroup.WithContext(ctx) - type packsize struct { - id restic.ID - size int64 + type checkTask struct { + id restic.ID + size int64 + blobs []restic.Blob } - ch := make(chan packsize) + ch := make(chan checkTask) // run workers for i := 0; i < defaultParallelism; i++ { g.Go(func() error { for { - var ps packsize + var ps checkTask var ok bool select { @@ -565,7 +599,8 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p return nil } } - err := checkPack(ctx, c.repo, ps.id, ps.size) + + err := checkPack(ctx, c.repo, ps.id, ps.blobs, ps.size) p.Add(1) if err == nil { continue @@ -580,10 +615,17 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p }) } + packSet := restic.NewIDSet() + for pack := range packs { + packSet.Insert(pack) + } + // push packs to ch - for pack, size := range packs { + for pbs := range c.repo.Index().ListPacks(ctx, packSet) { + size := packs[pbs.PackID] + debug.Log("listed %v", pbs.PackID) select { - case ch <- packsize{id: pack, size: size}: + case ch <- checkTask{id: pbs.PackID, size: size, blobs: pbs.Blobs}: case <-ctx.Done(): } } diff --git a/internal/pack/pack.go b/internal/pack/pack.go index d679c658b..95f298acb 100644 --- a/internal/pack/pack.go +++ b/internal/pack/pack.go @@ -160,7 +160,8 @@ const ( // HeaderSize is the header's constant overhead (independent of #entries) HeaderSize = headerLengthSize + crypto.Extension - maxHeaderSize = 16 * 1024 * 1024 + // MaxHeaderSize is the max size of header including header-length field + MaxHeaderSize = 16*1024*1024 + headerLengthSize // number of header enries to download as part of header-length request eagerEntries = 15 ) @@ -199,7 +200,7 @@ func readRecords(rd io.ReaderAt, size int64, max int) ([]byte, int, error) { err = InvalidFileError{Message: "header length is invalid"} case int64(hlen) > size-int64(headerLengthSize): err = InvalidFileError{Message: "header is larger than file"} - case int64(hlen) > maxHeaderSize: + case int64(hlen) > MaxHeaderSize-int64(headerLengthSize): err = InvalidFileError{Message: "header is larger than maxHeaderSize"} } if err != nil { diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 291e00da6..8bb81d390 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -27,7 +27,7 @@ import ( "golang.org/x/sync/errgroup" ) -const maxStreamBufferSize = 4 * 1024 * 1024 +const MaxStreamBufferSize = 4 * 1024 * 1024 // Repository is used to access a repository in a backend. type Repository struct { @@ -808,8 +808,8 @@ func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, pack // stream blobs in pack err := beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error { bufferSize := int(dataEnd - dataStart) - if bufferSize > maxStreamBufferSize { - bufferSize = maxStreamBufferSize + if bufferSize > MaxStreamBufferSize { + bufferSize = MaxStreamBufferSize } bufRd := bufio.NewReaderSize(rd, bufferSize) currentBlobEnd := dataStart