From 666a0b0bdbc66129b5832de34cbaa6b1d0c3b2bb Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Mon, 22 Apr 2024 20:53:31 +0200 Subject: [PATCH] repository: streamPack: replace streaming with chunked download Due to the interface of streamPack, we cannot guarantee that operations progress fast enough that the underlying connections remains open. This introduces partial failures which massively complicate the error handling. Switch to a simpler approach that retrieves the pack in chunks of 32MB. If a blob is larger than this limit, then it is downloaded separately. To avoid multiple copies in memory, an auxiliary interface `discardReader` is introduced that allows directly accessing the downloaded byte slices, while still supporting the streaming used by the `check` command. --- internal/checker/checker.go | 36 ++++++- internal/repository/repository.go | 168 ++++++++++++++++++------------ 2 files changed, 134 insertions(+), 70 deletions(-) diff --git a/internal/checker/checker.go b/internal/checker/checker.go index 28f55ce3a..d2fc42ca6 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -561,7 +561,7 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []r hrd := hashing.NewReader(rd, sha256.New()) bufRd.Reset(hrd) - it := repository.NewPackBlobIterator(id, bufRd, 0, blobs, r.Key(), dec) + it := repository.NewPackBlobIterator(id, newBufReader(bufRd), 0, blobs, r.Key(), dec) for { val, err := it.Next() if err == repository.ErrPackEOF { @@ -647,11 +647,41 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []r return nil } +type bufReader struct { + rd *bufio.Reader + buf []byte +} + +func newBufReader(rd *bufio.Reader) *bufReader { + return &bufReader{ + rd: rd, + } +} + +func (b *bufReader) Discard(n int) (discarded int, err error) { + return b.rd.Discard(n) +} + +func (b *bufReader) ReadFull(n int) (buf []byte, err error) { + if cap(b.buf) < n { + b.buf = make([]byte, n) + } + b.buf = b.buf[:n] + + _, err = io.ReadFull(b.rd, b.buf) + if err != nil { + return nil, err + } + return b.buf, nil +} + // ReadData loads all data from the repository and checks the integrity. func (c *Checker) ReadData(ctx context.Context, errChan chan<- error) { c.ReadPacks(ctx, c.packs, nil, errChan) } +const maxStreamBufferSize = 4 * 1024 * 1024 + // ReadPacks loads data from specified packs and checks the integrity. func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *progress.Counter, errChan chan<- error) { defer close(errChan) @@ -669,9 +699,7 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p // run workers for i := 0; i < workerCount; i++ { g.Go(func() error { - // create a buffer that is large enough to be reused by repository.StreamPack - // this ensures that we can read the pack header later on - bufRd := bufio.NewReaderSize(nil, repository.MaxStreamBufferSize) + bufRd := bufio.NewReaderSize(nil, maxStreamBufferSize) dec, err := zstd.NewReader(nil) if err != nil { panic(dec) diff --git a/internal/repository/repository.go b/internal/repository/repository.go index f2cde014a..41f22f307 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -1,7 +1,6 @@ package repository import ( - "bufio" "bytes" "context" "fmt" @@ -11,7 +10,6 @@ import ( "sort" "sync" - "github.com/cenkalti/backoff/v4" "github.com/klauspost/compress/zstd" "github.com/restic/chunker" "github.com/restic/restic/internal/backend" @@ -28,8 +26,6 @@ import ( "golang.org/x/sync/errgroup" ) -const MaxStreamBufferSize = 4 * 1024 * 1024 - const MinPackSize = 4 * 1024 * 1024 const DefaultPackSize = 16 * 1024 * 1024 const MaxPackSize = 128 * 1024 * 1024 @@ -951,7 +947,8 @@ const maxUnusedRange = 4 * 1024 * 1024 // LoadBlobsFromPack loads the listed blobs from the specified pack file. The plaintext blob is passed to // the handleBlobFn callback or an error if decryption failed or the blob hash does not match. // handleBlobFn is called at most once for each blob. If the callback returns an error, -// then LoadBlobsFromPack will abort and not retry it. +// then LoadBlobsFromPack will abort and not retry it. The buf passed to the callback is only valid within +// this specific call. The callback must not keep a reference to buf. func (r *Repository) LoadBlobsFromPack(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { return streamPack(ctx, r.Backend().Load, r.LoadBlob, r.key, packID, blobs, handleBlobFn) } @@ -968,12 +965,27 @@ func streamPack(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn lowerIdx := 0 lastPos := blobs[0].Offset + const maxChunkSize = 2 * DefaultPackSize + for i := 0; i < len(blobs); i++ { if blobs[i].Offset < lastPos { // don't wait for streamPackPart to fail return errors.Errorf("overlapping blobs in pack %v", packID) } + + chunkSizeAfter := (blobs[i].Offset + blobs[i].Length) - blobs[lowerIdx].Offset + split := false + // split if the chunk would become larger than maxChunkSize. Oversized chunks are + // handled by the requirement that the chunk contains at least one blob (i > lowerIdx) + if i > lowerIdx && chunkSizeAfter >= maxChunkSize { + split = true + } + // skip too large gaps as a new request is typically much cheaper than data transfers if blobs[i].Offset-lastPos > maxUnusedRange { + split = true + } + + if split { // load everything up to the skipped file section err := streamPackPart(ctx, beLoad, loadBlobFn, key, packID, blobs[lowerIdx:i], handleBlobFn) if err != nil { @@ -1001,75 +1013,108 @@ func streamPackPart(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBl } defer dec.Close() - ctx, cancel := context.WithCancel(ctx) - // stream blobs in pack + data := make([]byte, int(dataEnd-dataStart)) err = beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error { - // prevent callbacks after cancellation - if ctx.Err() != nil { - return ctx.Err() - } - bufferSize := int(dataEnd - dataStart) - if bufferSize > MaxStreamBufferSize { - bufferSize = MaxStreamBufferSize - } - bufRd := bufio.NewReaderSize(rd, bufferSize) - it := NewPackBlobIterator(packID, bufRd, dataStart, blobs, key, dec) - - for { - val, err := it.Next() - if err == ErrPackEOF { - break - } else if err != nil { - return err - } - - if val.Err != nil && loadBlobFn != nil { - var ierr error - // check whether we can get a valid copy somewhere else - buf, ierr := loadBlobFn(ctx, val.Handle.Type, val.Handle.ID, nil) - if ierr == nil { - // success - val.Plaintext = buf - val.Err = nil + _, cerr := io.ReadFull(rd, data) + return cerr + }) + // prevent callbacks after cancellation + if ctx.Err() != nil { + return ctx.Err() + } + if err != nil { + // the context is only still valid if handleBlobFn never returned an error + if loadBlobFn != nil { + // check whether we can get the remaining blobs somewhere else + for _, entry := range blobs { + buf, ierr := loadBlobFn(ctx, entry.Type, entry.ID, nil) + err = handleBlobFn(entry.BlobHandle, buf, ierr) + if err != nil { + break } } - - err = handleBlobFn(val.Handle, val.Plaintext, val.Err) - if err != nil { - cancel() - return backoff.Permanent(err) - } - // ensure that each blob is only passed once to handleBlobFn - blobs = blobs[1:] } - return nil - }) + return errors.Wrap(err, "StreamPack") + } - // the context is only still valid if handleBlobFn never returned an error - if ctx.Err() == nil && loadBlobFn != nil { - // check whether we can get the remaining blobs somewhere else - for _, entry := range blobs { - buf, ierr := loadBlobFn(ctx, entry.Type, entry.ID, nil) - err = handleBlobFn(entry.BlobHandle, buf, ierr) - if err != nil { - break + it := NewPackBlobIterator(packID, newByteReader(data), dataStart, blobs, key, dec) + + for { + val, err := it.Next() + if err == ErrPackEOF { + break + } else if err != nil { + return err + } + + if val.Err != nil && loadBlobFn != nil { + var ierr error + // check whether we can get a valid copy somewhere else + buf, ierr := loadBlobFn(ctx, val.Handle.Type, val.Handle.ID, nil) + if ierr == nil { + // success + val.Plaintext = buf + val.Err = nil } } + + err = handleBlobFn(val.Handle, val.Plaintext, val.Err) + if err != nil { + return err + } + // ensure that each blob is only passed once to handleBlobFn + blobs = blobs[1:] } return errors.Wrap(err, "StreamPack") } +// discardReader allows the PackBlobIterator to perform zero copy +// reads if the underlying data source is a byte slice. +type discardReader interface { + Discard(n int) (discarded int, err error) + // ReadFull reads the next n bytes into a byte slice. The caller must not + // retain a reference to the byte. Modifications are only allowed within + // the boundaries of the returned slice. + ReadFull(n int) (buf []byte, err error) +} + +type byteReader struct { + buf []byte +} + +func newByteReader(buf []byte) *byteReader { + return &byteReader{ + buf: buf, + } +} + +func (b *byteReader) Discard(n int) (discarded int, err error) { + if len(b.buf) < n { + return 0, io.ErrUnexpectedEOF + } + b.buf = b.buf[n:] + return n, nil +} + +func (b *byteReader) ReadFull(n int) (buf []byte, err error) { + if len(b.buf) < n { + return nil, io.ErrUnexpectedEOF + } + buf = b.buf[:n] + b.buf = b.buf[n:] + return buf, nil +} + type PackBlobIterator struct { packID restic.ID - rd *bufio.Reader + rd discardReader currentOffset uint blobs []restic.Blob key *crypto.Key dec *zstd.Decoder - buf []byte decode []byte } @@ -1081,7 +1126,7 @@ type PackBlobValue struct { var ErrPackEOF = errors.New("reached EOF of pack file") -func NewPackBlobIterator(packID restic.ID, rd *bufio.Reader, currentOffset uint, +func NewPackBlobIterator(packID restic.ID, rd discardReader, currentOffset uint, blobs []restic.Blob, key *crypto.Key, dec *zstd.Decoder) *PackBlobIterator { return &PackBlobIterator{ packID: packID, @@ -1116,21 +1161,12 @@ func (b *PackBlobIterator) Next() (PackBlobValue, error) { h := restic.BlobHandle{ID: entry.ID, Type: entry.Type} debug.Log(" process blob %v, skipped %d, %v", h, skipBytes, entry) - if uint(cap(b.buf)) < entry.Length { - b.buf = make([]byte, entry.Length) - } - b.buf = b.buf[:entry.Length] - - n, err := io.ReadFull(b.rd, b.buf) + buf, err := b.rd.ReadFull(int(entry.Length)) if err != nil { debug.Log(" read error %v", err) return PackBlobValue{}, fmt.Errorf("readFull: %w", err) } - if n != len(b.buf) { - return PackBlobValue{}, fmt.Errorf("read blob %v from %v: not enough bytes read, want %v, got %v", - h, b.packID.Str(), len(b.buf), n) - } b.currentOffset = entry.Offset + entry.Length if int(entry.Length) <= b.key.NonceSize() { @@ -1139,7 +1175,7 @@ func (b *PackBlobIterator) Next() (PackBlobValue, error) { } // decryption errors are likely permanent, give the caller a chance to skip them - nonce, ciphertext := b.buf[:b.key.NonceSize()], b.buf[b.key.NonceSize():] + nonce, ciphertext := buf[:b.key.NonceSize()], buf[b.key.NonceSize():] plaintext, err := b.key.Open(ciphertext[:0], nonce, ciphertext, nil) if err != nil { err = fmt.Errorf("decrypting blob %v from %v failed: %w", h, b.packID.Str(), err)