diff --git a/changelog/unreleased/pull-2195 b/changelog/unreleased/pull-2195 new file mode 100644 index 000000000..c2dac8bdf --- /dev/null +++ b/changelog/unreleased/pull-2195 @@ -0,0 +1,17 @@ +Enhancement: Simplify and improve restore performance + +Significantly improves restore performance of large files (i.e. 50M+): +https://github.com/restic/restic/issues/2074 +https://forum.restic.net/t/restore-using-rclone-gdrive-backend-is-slow/1112/8 +https://forum.restic.net/t/degraded-restore-performance-s3-backend/1400 + +Fixes "not enough cache capacity" error during restore: +https://github.com/restic/restic/issues/2244 + +NOTE: This new implementation does not guarantee order in which blobs +are written to the target files and, for example, the last blob of a +file can be written to the file before any of the preceeding file blobs. +It is therefore possible to have gaps in the data written to the target +files if restore fails or interrupted by the user. + +https://github.com/restic/restic/pull/2195 diff --git a/go.mod b/go.mod index f5f78d294..d404a5f4f 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/Azure/azure-sdk-for-go v27.3.0+incompatible github.com/Azure/go-autorest/autorest v0.9.2 // indirect github.com/cenkalti/backoff v2.1.1+incompatible + github.com/cespare/xxhash v1.1.0 github.com/cpuguy83/go-md2man v1.0.10 // indirect github.com/dnaeon/go-vcr v1.0.1 // indirect github.com/elithrar/simple-scrypt v1.3.0 diff --git a/go.sum b/go.sum index 811ba0dc5..7d18ef20c 100644 --- a/go.sum +++ b/go.sum @@ -20,6 +20,8 @@ github.com/Azure/go-autorest/logger v0.1.0/go.mod h1:oExouG+K6PryycPJfVSxi/koC6L github.com/Azure/go-autorest/tracing v0.5.0 h1:TRn4WjSnkcSy5AEG3pnbtFSwNtwzjr4VYyQflFE619k= github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= +github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -28,6 +30,8 @@ github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/cenkalti/backoff v2.1.1+incompatible h1:tKJnvO2kl0zmb/jA5UKAt4VoEVw1qxKWjE/Bpp46npY= github.com/cenkalti/backoff v2.1.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= +github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF22jk= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= @@ -145,6 +149,8 @@ github.com/smartystreets/assertions v0.0.0-20190401211740-f487f9de1cd3 h1:hBSHah github.com/smartystreets/assertions v0.0.0-20190401211740-f487f9de1cd3/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a h1:pa8hGb/2YqsZKovtsgrwcDH1RZhVbTKCjLp47XpqCDs= github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= diff --git a/internal/restorer/doc.go b/internal/restorer/doc.go index b3583c728..5a4622ea6 100644 --- a/internal/restorer/doc.go +++ b/internal/restorer/doc.go @@ -5,29 +5,20 @@ // request and avoiding repeated downloads of the same pack. In addition, // several pack files are fetched concurrently. // -// Here is high-level pseudo-code of the how the Restorer attempts to achieve +// Here is high-level pseudo-code of how the Restorer attempts to achieve // these goals: // // while there are packs to process // choose a pack to process [1] -// get the pack from the backend or cache [2] +// retrieve the pack from the backend [2] // write pack blobs to the files that need them [3] -// if not all pack blobs were used -// cache the pack for future use [4] // -// Pack download and processing (steps [2] - [4]) runs on multiple concurrent -// Goroutines. The Restorer runs all steps [2]-[4] sequentially on the same -// Goroutine. +// Retrieval of repository packs (step [2]) and writing target files (step [3]) +// are performed concurrently on multiple goroutines. // -// Before a pack is downloaded (step [2]), the required space is "reserved" in -// the pack cache. Actual download uses single backend request to get all -// required pack blobs. This may download blobs that are not needed, but we -// assume it'll still be faster than getting individual blobs. -// -// Target files are written (step [3]) in the "right" order, first file blob -// first, then second, then third and so on. Blob write order implies that some -// pack blobs may not be immediately used, i.e. they are "out of order" for -// their respective target files. Packs with unused blobs are cached (step -// [4]). The cache has capacity limit and may purge packs before they are fully -// used, in which case the purged packs will need to be re-downloaded. +// Implementation does not guarantee order in which blobs are written to the +// target files and, for example, the last blob of a file can be written to the +// file before any of the preceeding file blobs. It is therefore possible to +// have gaps in the data written to the target files if restore fails or +// interrupted by the user. package restorer diff --git a/internal/restorer/filepacktraverser.go b/internal/restorer/filepacktraverser.go deleted file mode 100644 index bba61e0f9..000000000 --- a/internal/restorer/filepacktraverser.go +++ /dev/null @@ -1,52 +0,0 @@ -package restorer - -import ( - "github.com/restic/restic/internal/errors" - "github.com/restic/restic/internal/restic" -) - -type filePackTraverser struct { - lookup func(restic.ID, restic.BlobType) ([]restic.PackedBlob, bool) -} - -// iterates over all remaining packs of the file -func (t *filePackTraverser) forEachFilePack(file *fileInfo, fn func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool) error { - if len(file.blobs) == 0 { - return nil - } - - getBlobPack := func(blobID restic.ID) (restic.PackedBlob, error) { - packs, found := t.lookup(blobID, restic.DataBlob) - if !found { - return restic.PackedBlob{}, errors.Errorf("Unknown blob %s", blobID.String()) - } - // TODO which pack to use if multiple packs have the blob? - // MUST return the same pack for the same blob during the same execution - return packs[0], nil - } - - var prevPackID restic.ID - var prevPackBlobs []restic.Blob - packIdx := 0 - for _, blobID := range file.blobs { - packedBlob, err := getBlobPack(blobID) - if err != nil { - return err - } - if !prevPackID.IsNull() && prevPackID != packedBlob.PackID { - if !fn(packIdx, prevPackID, prevPackBlobs) { - return nil - } - packIdx++ - } - if prevPackID != packedBlob.PackID { - prevPackID = packedBlob.PackID - prevPackBlobs = make([]restic.Blob, 0) - } - prevPackBlobs = append(prevPackBlobs, packedBlob.Blob) - } - if len(prevPackBlobs) > 0 { - fn(packIdx, prevPackID, prevPackBlobs) - } - return nil -} diff --git a/internal/restorer/filerestorer.go b/internal/restorer/filerestorer.go index c59b061b0..f4826e586 100644 --- a/internal/restorer/filerestorer.go +++ b/internal/restorer/filerestorer.go @@ -1,9 +1,12 @@ package restorer import ( + "bytes" "context" "io" + "math" "path/filepath" + "sync" "github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/debug" @@ -15,66 +18,58 @@ import ( // TODO evaluate if it makes sense to split download and processing workers // pro: can (slowly) read network and decrypt/write files concurrently // con: each worker needs to keep one pack in memory -// TODO evaluate memory footprint for larger repositories, say 10M packs/10M files -// TODO consider replacing pack file cache with blob cache -// TODO avoid decrypting the same blob multiple times -// TODO evaluate disabled debug logging overhead for large repositories const ( workerCount = 8 - // max number of cached open output file handles - filesWriterCacheCap = 32 + // fileInfo flags + fileProgress = 1 + fileError = 2 - // estimated average pack size used to calculate pack cache capacity - averagePackSize = 5 * 1024 * 1024 - - // pack cache capacity should support at least one cached pack per worker - // allow space for extra 5 packs for actual caching - packCacheCapacity = (workerCount + 5) * averagePackSize + largeFileBlobCount = 25 ) // information about regular file being restored type fileInfo struct { + lock sync.Mutex + flags int location string // file on local filesystem relative to restorer basedir - blobs []restic.ID // remaining blobs of the file + blobs interface{} // blobs of the file +} + +type fileBlobInfo struct { + id restic.ID // the blob id + offset int64 // blob offset in the file } // information about a data pack required to restore one or more files type packInfo struct { - // the pack id - id restic.ID - - // set of files that use blobs from this pack - files map[*fileInfo]struct{} - - // number of other packs that must be downloaded before all blobs in this pack can be used - cost int - - // used by packHeap - index int + id restic.ID // the pack id + files map[*fileInfo]struct{} // set of files that use blobs from this pack } // fileRestorer restores set of files type fileRestorer struct { key *crypto.Key - idx filePackTraverser + idx func(restic.ID, restic.BlobType) ([]restic.PackedBlob, bool) packLoader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error - packCache *packCache // pack cache - filesWriter *filesWriter // file write + filesWriter *filesWriter dst string files []*fileInfo } -func newFileRestorer(dst string, packLoader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error, key *crypto.Key, idx filePackTraverser) *fileRestorer { +func newFileRestorer(dst string, + packLoader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error, + key *crypto.Key, + idx func(restic.ID, restic.BlobType) ([]restic.PackedBlob, bool)) *fileRestorer { + return &fileRestorer{ - packLoader: packLoader, key: key, idx: idx, - filesWriter: newFilesWriter(filesWriterCacheCap), - packCache: newPackCache(packCacheCapacity), + packLoader: packLoader, + filesWriter: newFilesWriter(workerCount), dst: dst, } } @@ -87,237 +82,237 @@ func (r *fileRestorer) targetPath(location string) string { return filepath.Join(r.dst, location) } -// used to pass information among workers (wish golang channels allowed multivalues) -type processingInfo struct { - pack *packInfo - files map[*fileInfo]error -} - -func (r *fileRestorer) restoreFiles(ctx context.Context, onError func(path string, err error)) error { - // TODO conditionally enable when debug log is on - // for _, file := range r.files { - // dbgmsg := file.location + ": " - // r.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool { - // if packIdx > 0 { - // dbgmsg += ", " - // } - // dbgmsg += "pack{id=" + packID.Str() + ", blobs: " - // for blobIdx, blob := range packBlobs { - // if blobIdx > 0 { - // dbgmsg += ", " - // } - // dbgmsg += blob.ID.Str() - // } - // dbgmsg += "}" - // return true // keep going - // }) - // debug.Log(dbgmsg) - // } - - inprogress := make(map[*fileInfo]struct{}) - queue, err := newPackQueue(r.idx, r.files, func(files map[*fileInfo]struct{}) bool { - for file := range files { - if _, found := inprogress[file]; found { - return true - } - } - return false - }) - if err != nil { - return err +func (r *fileRestorer) forEachBlob(blobIDs []restic.ID, fn func(packID restic.ID, packBlob restic.Blob)) error { + if len(blobIDs) == 0 { + return nil } - // workers - downloadCh := make(chan processingInfo) - feedbackCh := make(chan processingInfo) - - defer close(downloadCh) - defer close(feedbackCh) - - worker := func() { - for { - select { - case <-ctx.Done(): - return - case request, ok := <-downloadCh: - if !ok { - return // channel closed - } - rd, err := r.downloadPack(ctx, request.pack) - if err == nil { - r.processPack(ctx, request, rd) - } else { - // mark all files as failed - for file := range request.files { - request.files[file] = err - } - } - feedbackCh <- request - } - } - } - for i := 0; i < workerCount; i++ { - go worker() - } - - processFeedback := func(pack *packInfo, ferrors map[*fileInfo]error) { - // update files blobIdx - // must do it here to avoid race among worker and processing feedback threads - var success []*fileInfo - var failure []*fileInfo - for file, ferr := range ferrors { - target := r.targetPath(file.location) - if ferr != nil { - onError(file.location, ferr) - r.filesWriter.close(target) - delete(inprogress, file) - failure = append(failure, file) - } else { - r.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool { - file.blobs = file.blobs[len(packBlobs):] - return false // only interesed in the first pack - }) - if len(file.blobs) == 0 { - r.filesWriter.close(target) - delete(inprogress, file) - } - success = append(success, file) - } - } - // update the queue and requeueu the pack as necessary - if !queue.requeuePack(pack, success, failure) { - r.packCache.remove(pack.id) - debug.Log("Purged used up pack %s from pack cache", pack.id.Str()) - } - } - - // the main restore loop - for !queue.isEmpty() { - debug.Log("-----------------------------------") - pack, files := queue.nextPack() - if pack != nil { - ferrors := make(map[*fileInfo]error) - for _, file := range files { - ferrors[file] = nil - inprogress[file] = struct{}{} - } - select { - case <-ctx.Done(): - return ctx.Err() - case downloadCh <- processingInfo{pack: pack, files: ferrors}: - debug.Log("Scheduled download pack %s (%d files)", pack.id.Str(), len(files)) - case feedback := <-feedbackCh: - queue.requeuePack(pack, []*fileInfo{}, []*fileInfo{}) // didn't use the pack during this iteration - processFeedback(feedback.pack, feedback.files) - } - } else { - select { - case <-ctx.Done(): - return ctx.Err() - case feedback := <-feedbackCh: - processFeedback(feedback.pack, feedback.files) - } + for _, blobID := range blobIDs { + packs, found := r.idx(blobID, restic.DataBlob) + if !found { + return errors.Errorf("Unknown blob %s", blobID.String()) } + fn(packs[0].PackID, packs[0].Blob) } return nil } -func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) (readerAtCloser, error) { - const MaxInt64 = 1<<63 - 1 // odd Go does not have this predefined somewhere +func (r *fileRestorer) restoreFiles(ctx context.Context) error { - // calculate pack byte range - start, end := int64(MaxInt64), int64(0) - for file := range pack.files { - r.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool { - if packID.Equal(pack.id) { - for _, blob := range packBlobs { - if start > int64(blob.Offset) { - start = int64(blob.Offset) - } - if end < int64(blob.Offset+blob.Length) { - end = int64(blob.Offset + blob.Length) - } - } + packs := make(map[restic.ID]*packInfo) // all packs + + // create packInfo from fileInfo + for _, file := range r.files { + fileBlobs := file.blobs.(restic.IDs) + largeFile := len(fileBlobs) > largeFileBlobCount + var packsMap map[restic.ID][]fileBlobInfo + if largeFile { + packsMap = make(map[restic.ID][]fileBlobInfo) + } + fileOffset := int64(0) + err := r.forEachBlob(fileBlobs, func(packID restic.ID, blob restic.Blob) { + if largeFile { + packsMap[packID] = append(packsMap[packID], fileBlobInfo{id: blob.ID, offset: fileOffset}) + fileOffset += int64(blob.Length) - crypto.Extension } - - return true // keep going + pack, ok := packs[packID] + if !ok { + pack = &packInfo{ + id: packID, + files: make(map[*fileInfo]struct{}), + } + packs[packID] = pack + } + pack.files[file] = struct{}{} }) + if err != nil { + // repository index is messed up, can't do anything + return err + } + if largeFile { + file.blobs = packsMap + } } - packReader, err := r.packCache.get(pack.id, start, int(end-start), func(offset int64, length int, wr io.WriteSeeker) error { - h := restic.Handle{Type: restic.DataFile, Name: pack.id.String()} - return r.packLoader(ctx, h, length, offset, func(rd io.Reader) error { - // reset the file in case of a download retry - _, err := wr.Seek(0, io.SeekStart) - if err != nil { - return err + var wg sync.WaitGroup + downloadCh := make(chan *packInfo) + worker := func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return // context cancelled + case pack, ok := <-downloadCh: + if !ok { + return // channel closed + } + r.downloadPack(ctx, pack) } + } + } + for i := 0; i < workerCount; i++ { + go worker() + wg.Add(1) + } - len, err := io.Copy(wr, rd) - if err != nil { - return err - } - if len != int64(length) { - return errors.Errorf("unexpected pack size: expected %d but got %d", length, len) - } + // the main restore loop + for _, pack := range packs { + select { + case <-ctx.Done(): + return ctx.Err() + case downloadCh <- pack: + debug.Log("Scheduled download pack %s", pack.id.Str()) + } + } - return nil - }) + close(downloadCh) + wg.Wait() + + return nil +} + +func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) { + + // calculate pack byte range and blob->[]files->[]offsets mappings + start, end := int64(math.MaxInt64), int64(0) + blobs := make(map[restic.ID]struct { + offset int64 // offset of the blob in the pack + length int // length of the blob + files map[*fileInfo][]int64 // file -> offsets (plural!) of the blob in the file }) - if err != nil { - return nil, err - } - - return packReader, nil -} - -func (r *fileRestorer) processPack(ctx context.Context, request processingInfo, rd readerAtCloser) { - defer rd.Close() - - for file := range request.files { - target := r.targetPath(file.location) - r.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool { - for _, blob := range packBlobs { - debug.Log("Writing blob %s (%d bytes) from pack %s to %s", blob.ID.Str(), blob.Length, packID.Str(), file.location) - buf, err := r.loadBlob(rd, blob) - if err == nil { - err = r.filesWriter.writeToFile(target, buf) + for file := range pack.files { + addBlob := func(blob restic.Blob, fileOffset int64) { + if start > int64(blob.Offset) { + start = int64(blob.Offset) + } + if end < int64(blob.Offset+blob.Length) { + end = int64(blob.Offset + blob.Length) + } + blobInfo, ok := blobs[blob.ID] + if !ok { + blobInfo.offset = int64(blob.Offset) + blobInfo.length = int(blob.Length) + blobInfo.files = make(map[*fileInfo][]int64) + blobs[blob.ID] = blobInfo + } + blobInfo.files[file] = append(blobInfo.files[file], fileOffset) + } + if fileBlobs, ok := file.blobs.(restic.IDs); ok { + fileOffset := int64(0) + r.forEachBlob(fileBlobs, func(packID restic.ID, blob restic.Blob) { + if packID.Equal(pack.id) { + addBlob(blob, fileOffset) } - if err != nil { - request.files[file] = err - break // could not restore the file + fileOffset += int64(blob.Length) - crypto.Extension + }) + } else if packsMap, ok := file.blobs.(map[restic.ID][]fileBlobInfo); ok { + for _, blob := range packsMap[pack.id] { + idxPacks, found := r.idx(blob.id, restic.DataBlob) + if found { + for _, idxPack := range idxPacks { + if idxPack.PackID.Equal(pack.id) { + addBlob(idxPack.Blob, blob.offset) + break + } + } } } - return false - }) + } + } + + packData := make([]byte, int(end-start)) + + h := restic.Handle{Type: restic.DataFile, Name: pack.id.String()} + err := r.packLoader(ctx, h, int(end-start), start, func(rd io.Reader) error { + l, err := io.ReadFull(rd, packData) + if err != nil { + return err + } + if l != len(packData) { + return errors.Errorf("unexpected pack size: expected %d but got %d", len(packData), l) + } + return nil + }) + + markFileError := func(file *fileInfo, err error) { + file.lock.Lock() + defer file.lock.Unlock() + if file.flags&fileError == 0 { + file.flags |= fileError + } + } + + if err != nil { + for file := range pack.files { + markFileError(file, err) + } + return + } + + rd := bytes.NewReader(packData) + + for blobID, blob := range blobs { + blobData, err := r.loadBlob(rd, blobID, blob.offset-start, blob.length) + if err != nil { + for file := range blob.files { + markFileError(file, err) + } + continue + } + for file, offsets := range blob.files { + for _, offset := range offsets { + writeToFile := func() error { + // this looks overly complicated and needs explanation + // two competing requirements: + // - must create the file once and only once + // - should allow concurrent writes to the file + // so write the first blob while holding file lock + // write other blobs after releasing the lock + file.lock.Lock() + create := file.flags&fileProgress == 0 + if create { + defer file.lock.Unlock() + file.flags |= fileProgress + } else { + file.lock.Unlock() + } + return r.filesWriter.writeToFile(r.targetPath(file.location), blobData, offset, create) + } + err := writeToFile() + if err != nil { + markFileError(file, err) + break + } + } + } } } -func (r *fileRestorer) loadBlob(rd io.ReaderAt, blob restic.Blob) ([]byte, error) { +func (r *fileRestorer) loadBlob(rd io.ReaderAt, blobID restic.ID, offset int64, length int) ([]byte, error) { // TODO reconcile with Repository#loadBlob implementation - buf := make([]byte, blob.Length) + buf := make([]byte, length) - n, err := rd.ReadAt(buf, int64(blob.Offset)) + n, err := rd.ReadAt(buf, offset) if err != nil { return nil, err } - if n != int(blob.Length) { - return nil, errors.Errorf("error loading blob %v: wrong length returned, want %d, got %d", blob.ID.Str(), blob.Length, n) + if n != length { + return nil, errors.Errorf("error loading blob %v: wrong length returned, want %d, got %d", blobID.Str(), length, n) } // decrypt nonce, ciphertext := buf[:r.key.NonceSize()], buf[r.key.NonceSize():] plaintext, err := r.key.Open(ciphertext[:0], nonce, ciphertext, nil) if err != nil { - return nil, errors.Errorf("decrypting blob %v failed: %v", blob.ID, err) + return nil, errors.Errorf("decrypting blob %v failed: %v", blobID, err) } // check hash - if !restic.Hash(plaintext).Equal(blob.ID) { - return nil, errors.Errorf("blob %v returned invalid hash", blob.ID) + if !restic.Hash(plaintext).Equal(blobID) { + return nil, errors.Errorf("blob %v returned invalid hash", blobID) } return plaintext, nil diff --git a/internal/restorer/filerestorer_test.go b/internal/restorer/filerestorer_test.go index db683b10e..9d362b13c 100644 --- a/internal/restorer/filerestorer_test.go +++ b/internal/restorer/filerestorer_test.go @@ -8,7 +8,6 @@ import ( "testing" "github.com/restic/restic/internal/crypto" - "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" rtest "github.com/restic/restic/internal/test" ) @@ -38,9 +37,6 @@ type TestRepo struct { // loader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error - - // - idx filePackTraverser } func (i *TestRepo) Lookup(blobID restic.ID, _ restic.BlobType) ([]restic.PackedBlob, bool) { @@ -56,11 +52,6 @@ func (i *TestRepo) packID(name string) restic.ID { return i.packsNameToID[name] } -func (i *TestRepo) pack(queue *packQueue, name string) *packInfo { - id := i.packsNameToID[name] - return queue.packs[id] -} - func (i *TestRepo) fileContent(file *fileInfo) string { return i.filesPathToContent[file.location] } @@ -147,7 +138,6 @@ func newTestRepo(content []TestFile) *TestRepo { files: files, filesPathToContent: filesPathToContent, } - repo.idx = filePackTraverser{lookup: repo.Lookup} repo.loader = func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { packID, err := restic.ParseID(h.Name) if err != nil { @@ -163,12 +153,11 @@ func newTestRepo(content []TestFile) *TestRepo { func restoreAndVerify(t *testing.T, tempdir string, content []TestFile) { repo := newTestRepo(content) - r := newFileRestorer(tempdir, repo.loader, repo.key, repo.idx) + r := newFileRestorer(tempdir, repo.loader, repo.key, repo.Lookup) r.files = repo.files - r.restoreFiles(context.TODO(), func(path string, err error) { - rtest.OK(t, errors.Wrapf(err, "unexpected error")) - }) + err := r.restoreFiles(context.TODO()) + rtest.OK(t, err) for _, file := range repo.files { target := r.targetPath(file.location) @@ -178,16 +167,11 @@ func restoreAndVerify(t *testing.T, tempdir string, content []TestFile) { continue } - _, contains := r.filesWriter.cache[target] - rtest.Equals(t, false, contains) - content := repo.fileContent(file) if !bytes.Equal(data, []byte(content)) { t.Errorf("file %v has wrong content: want %q, got %q", file.location, content, data) } } - - rtest.OK(t, nil) } func TestFileRestorerBasic(t *testing.T) { @@ -209,5 +193,13 @@ func TestFileRestorerBasic(t *testing.T) { TestBlob{"data2-2", "pack2-2"}, }, }, + TestFile{ + name: "file3", + blobs: []TestBlob{ + // same blob multiple times + TestBlob{"data3-1", "pack3-1"}, + TestBlob{"data3-1", "pack3-1"}, + }, + }, }) } diff --git a/internal/restorer/fileswriter.go b/internal/restorer/fileswriter.go index b3beb8161..8d632cd09 100644 --- a/internal/restorer/fileswriter.go +++ b/internal/restorer/fileswriter.go @@ -4,98 +4,89 @@ import ( "os" "sync" - "github.com/restic/restic/internal/debug" - "github.com/restic/restic/internal/errors" + "github.com/cespare/xxhash" ) -// Writes blobs to output files. Each file is written sequentially, -// start to finish, but multiple files can be written to concurrently. -// Implementation allows virtually unlimited number of logically open -// files, but number of phisically open files will never exceed number -// of concurrent writeToFile invocations plus cacheCap. +// writes blobs to target files. +// multiple files can be written to concurrently. +// multiple blobs can be concurrently written to the same file. +// TODO I am not 100% convinced this is necessary, i.e. it may be okay +// to use multiple os.File to write to the same target file type filesWriter struct { - lock sync.Mutex // guards concurrent access to open files cache - inprogress map[string]struct{} // (logically) opened file writers - cache map[string]*os.File // cache of open files - cacheCap int // max number of cached open files + buckets []filesWriterBucket } -func newFilesWriter(cacheCap int) *filesWriter { +type filesWriterBucket struct { + lock sync.Mutex + files map[string]*os.File + users map[string]int +} + +func newFilesWriter(count int) *filesWriter { + buckets := make([]filesWriterBucket, count) + for b := 0; b < count; b++ { + buckets[b].files = make(map[string]*os.File) + buckets[b].users = make(map[string]int) + } return &filesWriter{ - inprogress: make(map[string]struct{}), - cache: make(map[string]*os.File), - cacheCap: cacheCap, + buckets: buckets, } } -func (w *filesWriter) writeToFile(path string, blob []byte) error { - // First writeToFile invocation for any given path will: - // - create and open the file - // - write the blob to the file - // - cache the open file if there is space, close the file otherwise - // Subsequent invocations will: - // - remove the open file from the cache _or_ open the file for append - // - write the blob to the file - // - cache the open file if there is space, close the file otherwise - // The idea is to cap maximum number of open files with minimal - // coordination among concurrent writeToFile invocations (note that - // writeToFile never touches somebody else's open file). +func (w *filesWriter) writeToFile(path string, blob []byte, offset int64, create bool) error { + bucket := &w.buckets[uint(xxhash.Sum64String(path))%uint(len(w.buckets))] - // TODO measure if caching is useful (likely depends on operating system - // and hardware configuration) acquireWriter := func() (*os.File, error) { - w.lock.Lock() - defer w.lock.Unlock() - if wr, ok := w.cache[path]; ok { - debug.Log("Used cached writer for %s", path) - delete(w.cache, path) + bucket.lock.Lock() + defer bucket.lock.Unlock() + + if wr, ok := bucket.files[path]; ok { + bucket.users[path]++ return wr, nil } + var flags int - if _, append := w.inprogress[path]; append { - flags = os.O_APPEND | os.O_WRONLY - } else { - w.inprogress[path] = struct{}{} + if create { flags = os.O_CREATE | os.O_TRUNC | os.O_WRONLY + } else { + flags = os.O_WRONLY } + wr, err := os.OpenFile(path, flags, 0600) if err != nil { return nil, err } - debug.Log("Opened writer for %s", path) + + bucket.files[path] = wr + bucket.users[path] = 1 + return wr, nil } - cacheOrCloseWriter := func(wr *os.File) { - w.lock.Lock() - defer w.lock.Unlock() - if len(w.cache) < w.cacheCap { - w.cache[path] = wr - } else { - wr.Close() + + releaseWriter := func(wr *os.File) error { + bucket.lock.Lock() + defer bucket.lock.Unlock() + + if bucket.users[path] == 1 { + delete(bucket.files, path) + delete(bucket.users, path) + return wr.Close() } + bucket.users[path]-- + return nil } wr, err := acquireWriter() if err != nil { return err } - n, err := wr.Write(blob) - cacheOrCloseWriter(wr) + + _, err = wr.WriteAt(blob, offset) + if err != nil { + releaseWriter(wr) return err } - if n != len(blob) { - return errors.Errorf("error writing file %v: wrong length written, want %d, got %d", path, len(blob), n) - } - return nil -} -func (w *filesWriter) close(path string) { - w.lock.Lock() - defer w.lock.Unlock() - if wr, ok := w.cache[path]; ok { - wr.Close() - delete(w.cache, path) - } - delete(w.inprogress, path) + return releaseWriter(wr) } diff --git a/internal/restorer/fileswriter_test.go b/internal/restorer/fileswriter_test.go index ada7f2107..690826534 100644 --- a/internal/restorer/fileswriter_test.go +++ b/internal/restorer/fileswriter_test.go @@ -16,23 +16,21 @@ func TestFilesWriterBasic(t *testing.T) { f1 := dir + "/f1" f2 := dir + "/f2" - rtest.OK(t, w.writeToFile(f1, []byte{1})) - rtest.Equals(t, 1, len(w.cache)) - rtest.Equals(t, 1, len(w.inprogress)) + rtest.OK(t, w.writeToFile(f1, []byte{1}, 0, true)) + rtest.Equals(t, 0, len(w.buckets[0].files)) + rtest.Equals(t, 0, len(w.buckets[0].users)) - rtest.OK(t, w.writeToFile(f2, []byte{2})) - rtest.Equals(t, 1, len(w.cache)) - rtest.Equals(t, 2, len(w.inprogress)) + rtest.OK(t, w.writeToFile(f2, []byte{2}, 0, true)) + rtest.Equals(t, 0, len(w.buckets[0].files)) + rtest.Equals(t, 0, len(w.buckets[0].users)) - rtest.OK(t, w.writeToFile(f1, []byte{1})) - w.close(f1) - rtest.Equals(t, 0, len(w.cache)) - rtest.Equals(t, 1, len(w.inprogress)) + rtest.OK(t, w.writeToFile(f1, []byte{1}, 1, false)) + rtest.Equals(t, 0, len(w.buckets[0].files)) + rtest.Equals(t, 0, len(w.buckets[0].users)) - rtest.OK(t, w.writeToFile(f2, []byte{2})) - w.close(f2) - rtest.Equals(t, 0, len(w.cache)) - rtest.Equals(t, 0, len(w.inprogress)) + rtest.OK(t, w.writeToFile(f2, []byte{2}, 1, false)) + rtest.Equals(t, 0, len(w.buckets[0].files)) + rtest.Equals(t, 0, len(w.buckets[0].users)) buf, err := ioutil.ReadFile(f1) rtest.OK(t, err) diff --git a/internal/restorer/packcache.go b/internal/restorer/packcache.go deleted file mode 100644 index 1eaad63bf..000000000 --- a/internal/restorer/packcache.go +++ /dev/null @@ -1,243 +0,0 @@ -package restorer - -import ( - "io" - "sync" - - "github.com/restic/restic/internal/debug" - "github.com/restic/restic/internal/errors" - "github.com/restic/restic/internal/restic" -) - -// packCache is thread safe in-memory cache of pack files required to restore -// one or more files. The cache is meant to hold pack files that cannot be -// fully used right away. This happens when pack files contains blobs from -// "head" of some files and "middle" of other files. "Middle" blobs cannot be -// written to their files until after blobs from some other packs are written -// to the files first. -// -// While the cache is thread safe, implementation assumes (and enforces) -// that individual entries are used by one client at a time. Clients must -// #Close() entry's reader to make the entry available for use by other -// clients. This limitation can be relaxed in the future if necessary. -type packCache struct { - // guards access to cache internal data structures - lock sync.Mutex - - // cache capacity - capacity int - reservedCapacity int - allocatedCapacity int - - // pack records currently being used by active restore worker - reservedPacks map[restic.ID]*packCacheRecord - - // unused allocated packs, can be deleted if necessary - cachedPacks map[restic.ID]*packCacheRecord -} - -type packCacheRecord struct { - master *packCacheRecord - cache *packCache - - id restic.ID // cached pack id - offset int64 // cached pack byte range - - data []byte -} - -type readerAtCloser interface { - io.Closer - io.ReaderAt -} - -type bytesWriteSeeker struct { - pos int - data []byte -} - -func (wr *bytesWriteSeeker) Write(p []byte) (n int, err error) { - if wr.pos+len(p) > len(wr.data) { - return -1, errors.Errorf("not enough space") - } - n = copy(wr.data[wr.pos:], p) - wr.pos += n - return n, nil -} - -func (wr *bytesWriteSeeker) Seek(offset int64, whence int) (int64, error) { - if offset != 0 || whence != io.SeekStart { - return -1, errors.Errorf("unsupported seek request") - } - wr.pos = 0 - return 0, nil -} - -func newPackCache(capacity int) *packCache { - return &packCache{ - capacity: capacity, - reservedPacks: make(map[restic.ID]*packCacheRecord), - cachedPacks: make(map[restic.ID]*packCacheRecord), - } -} - -func (c *packCache) reserve(packID restic.ID, offset int64, length int) (record *packCacheRecord, err error) { - c.lock.Lock() - defer c.lock.Unlock() - - if offset < 0 || length <= 0 { - return nil, errors.Errorf("illegal pack cache allocation range %s {offset: %d, length: %d}", packID.Str(), offset, length) - } - - if c.reservedCapacity+length > c.capacity { - return nil, errors.Errorf("not enough cache capacity: requested %d, available %d", length, c.capacity-c.reservedCapacity) - } - - if _, ok := c.reservedPacks[packID]; ok { - return nil, errors.Errorf("pack is already reserved %s", packID.Str()) - } - - // the pack is available in the cache and currently unused - if pack, ok := c.cachedPacks[packID]; ok { - // check if cached pack includes requested byte range - // the range can shrink, but it never grows bigger unless there is a bug elsewhere - if pack.offset > offset || (pack.offset+int64(len(pack.data))) < (offset+int64(length)) { - return nil, errors.Errorf("cached range %d-%d is smaller than requested range %d-%d for pack %s", pack.offset, pack.offset+int64(len(pack.data)), length, offset+int64(length), packID.Str()) - } - - // move the pack to the used map - delete(c.cachedPacks, packID) - c.reservedPacks[packID] = pack - c.reservedCapacity += len(pack.data) - - debug.Log("Using cached pack %s (%d bytes)", pack.id.Str(), len(pack.data)) - - if pack.offset != offset || len(pack.data) != length { - // restrict returned record to requested range - return &packCacheRecord{ - cache: c, - master: pack, - offset: offset, - data: pack.data[int(offset-pack.offset) : int(offset-pack.offset)+length], - }, nil - } - - return pack, nil - } - - for c.allocatedCapacity+length > c.capacity { - // all cached packs will be needed at some point - // so it does not matter which one to purge - for _, cached := range c.cachedPacks { - delete(c.cachedPacks, cached.id) - c.allocatedCapacity -= len(cached.data) - debug.Log("dropped cached pack %s (%d bytes)", cached.id.Str(), len(cached.data)) - break - } - } - - pack := &packCacheRecord{ - cache: c, - id: packID, - offset: offset, - } - c.reservedPacks[pack.id] = pack - c.allocatedCapacity += length - c.reservedCapacity += length - - return pack, nil -} - -// get returns reader of the specified cached pack. Uses provided load func -// to download pack content if necessary. -// The returned reader is only able to read pack within byte range specified -// by offset and length parameters, attempts to read outside that range will -// result in an error. -// The returned reader must be closed before the same packID can be requested -// from the cache again. -func (c *packCache) get(packID restic.ID, offset int64, length int, load func(offset int64, length int, wr io.WriteSeeker) error) (readerAtCloser, error) { - pack, err := c.reserve(packID, offset, length) - if err != nil { - return nil, err - } - - if pack.data == nil { - releasePack := func() { - delete(c.reservedPacks, pack.id) - c.reservedCapacity -= length - c.allocatedCapacity -= length - } - wr := &bytesWriteSeeker{data: make([]byte, length)} - err = load(offset, length, wr) - if err != nil { - releasePack() - return nil, err - } - if wr.pos != length { - releasePack() - return nil, errors.Errorf("invalid read size") - } - pack.data = wr.data - debug.Log("Downloaded and cached pack %s (%d bytes)", pack.id.Str(), len(pack.data)) - } - - return pack, nil -} - -// releases the pack record back to the cache -func (c *packCache) release(pack *packCacheRecord) error { - c.lock.Lock() - defer c.lock.Unlock() - - if _, ok := c.reservedPacks[pack.id]; !ok { - return errors.Errorf("invalid pack release request") - } - - delete(c.reservedPacks, pack.id) - c.cachedPacks[pack.id] = pack - c.reservedCapacity -= len(pack.data) - - return nil -} - -// remove removes specified pack from the cache and frees -// corresponding cache space. should be called after the pack -// was fully used up by the restorer. -func (c *packCache) remove(packID restic.ID) error { - c.lock.Lock() - defer c.lock.Unlock() - - if _, ok := c.reservedPacks[packID]; ok { - return errors.Errorf("invalid pack remove request, pack %s is reserved", packID.Str()) - } - - pack, ok := c.cachedPacks[packID] - if !ok { - return errors.Errorf("invalid pack remove request, pack %s is not cached", packID.Str()) - } - - delete(c.cachedPacks, pack.id) - c.allocatedCapacity -= len(pack.data) - - return nil -} - -// ReadAt reads len(b) bytes from the pack starting at byte offset off. -// It returns the number of bytes read and the error, if any. -func (r *packCacheRecord) ReadAt(b []byte, off int64) (n int, err error) { - if off < r.offset || off+int64(len(b)) > r.offset+int64(len(r.data)) { - return -1, errors.Errorf("read outside available range") - } - return copy(b, r.data[off-r.offset:]), nil -} - -// Close closes the pack reader and releases corresponding cache record -// to the cache. Once closed, the record can be reused by subsequent -// requests for the same packID or it can be purged from the cache to make -// room for other packs -func (r *packCacheRecord) Close() (err error) { - if r.master != nil { - return r.cache.release(r.master) - } - return r.cache.release(r) -} diff --git a/internal/restorer/packcache_test.go b/internal/restorer/packcache_test.go deleted file mode 100644 index 3a5f18cf5..000000000 --- a/internal/restorer/packcache_test.go +++ /dev/null @@ -1,305 +0,0 @@ -package restorer - -import ( - "io" - "testing" - - "github.com/restic/restic/internal/errors" - "github.com/restic/restic/internal/restic" - rtest "github.com/restic/restic/internal/test" -) - -func assertNotOK(t *testing.T, msg string, err error) { - rtest.Assert(t, err != nil, msg+" did not fail") -} - -func TestBytesWriterSeeker(t *testing.T) { - wr := &bytesWriteSeeker{data: make([]byte, 10)} - - n, err := wr.Write([]byte{1, 2}) - rtest.OK(t, err) - rtest.Equals(t, 2, n) - rtest.Equals(t, []byte{1, 2}, wr.data[0:2]) - - n64, err := wr.Seek(0, io.SeekStart) - rtest.OK(t, err) - rtest.Equals(t, int64(0), n64) - - n, err = wr.Write([]byte{0, 1, 2, 3, 4}) - rtest.OK(t, err) - rtest.Equals(t, 5, n) - n, err = wr.Write([]byte{5, 6, 7, 8, 9}) - rtest.OK(t, err) - rtest.Equals(t, 5, n) - rtest.Equals(t, []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, wr.data) - - // negative tests - _, err = wr.Write([]byte{1}) - assertNotOK(t, "write overflow", err) - _, err = wr.Seek(1, io.SeekStart) - assertNotOK(t, "unsupported seek", err) -} - -func TestPackCacheBasic(t *testing.T) { - assertReader := func(expected []byte, offset int64, rd io.ReaderAt) { - actual := make([]byte, len(expected)) - rd.ReadAt(actual, offset) - rtest.Equals(t, expected, actual) - } - - c := newPackCache(10) - - id := restic.NewRandomID() - - // load pack to the cache - rd, err := c.get(id, 10, 5, func(offset int64, length int, wr io.WriteSeeker) error { - rtest.Equals(t, int64(10), offset) - rtest.Equals(t, 5, length) - wr.Write([]byte{1, 2, 3, 4, 5}) - return nil - }) - rtest.OK(t, err) - assertReader([]byte{1, 2, 3, 4, 5}, 10, rd) - - // must close pack reader before can request it again - _, err = c.get(id, 10, 5, func(offset int64, length int, wr io.WriteSeeker) error { - t.Error("unexpected cache load call") - return nil - }) - assertNotOK(t, "double-reservation", err) - - // close the pack reader and get it from cache - rd.Close() - rd, err = c.get(id, 10, 5, func(offset int64, length int, wr io.WriteSeeker) error { - t.Error("unexpected cache load call") - return nil - }) - rtest.OK(t, err) - assertReader([]byte{1, 2, 3, 4, 5}, 10, rd) - - // close the pack reader and remove the pack from cache, assert the pack is loaded on request - rd.Close() - c.remove(id) - rd, err = c.get(id, 10, 5, func(offset int64, length int, wr io.WriteSeeker) error { - rtest.Equals(t, int64(10), offset) - rtest.Equals(t, 5, length) - wr.Write([]byte{1, 2, 3, 4, 5}) - return nil - }) - rtest.OK(t, err) - assertReader([]byte{1, 2, 3, 4, 5}, 10, rd) -} - -func TestPackCacheInvalidRange(t *testing.T) { - c := newPackCache(10) - - id := restic.NewRandomID() - - _, err := c.get(id, -1, 1, func(offset int64, length int, wr io.WriteSeeker) error { - t.Error("unexpected cache load call") - return nil - }) - assertNotOK(t, "negative offset request", err) - - _, err = c.get(id, 0, 0, func(offset int64, length int, wr io.WriteSeeker) error { - t.Error("unexpected cache load call") - return nil - }) - assertNotOK(t, "zero length request", err) - - _, err = c.get(id, 0, -1, func(offset int64, length int, wr io.WriteSeeker) error { - t.Error("unexpected cache load call") - return nil - }) - assertNotOK(t, "negative length", err) -} - -func TestPackCacheCapacity(t *testing.T) { - c := newPackCache(10) - - id1, id2, id3 := restic.NewRandomID(), restic.NewRandomID(), restic.NewRandomID() - - // load and reserve pack1 - rd1, err := c.get(id1, 0, 5, func(offset int64, length int, wr io.WriteSeeker) error { - wr.Write([]byte{1, 2, 3, 4, 5}) - return nil - }) - rtest.OK(t, err) - - // load and reserve pack2 - _, err = c.get(id2, 0, 5, func(offset int64, length int, wr io.WriteSeeker) error { - wr.Write([]byte{1, 2, 3, 4, 5}) - return nil - }) - rtest.OK(t, err) - - // can't load pack3 because not enough space in the cache - _, err = c.get(id3, 0, 5, func(offset int64, length int, wr io.WriteSeeker) error { - t.Error("unexpected cache load call") - return nil - }) - assertNotOK(t, "request over capacity", err) - - // release pack1 and try again - rd1.Close() - rd3, err := c.get(id3, 0, 5, func(offset int64, length int, wr io.WriteSeeker) error { - wr.Write([]byte{1, 2, 3, 4, 5}) - return nil - }) - rtest.OK(t, err) - - // release pack3 and load pack1 (should not come from cache) - rd3.Close() - loaded := false - rd1, err = c.get(id1, 0, 5, func(offset int64, length int, wr io.WriteSeeker) error { - wr.Write([]byte{1, 2, 3, 4, 5}) - loaded = true - return nil - }) - rtest.OK(t, err) - rtest.Equals(t, true, loaded) -} - -func TestPackCacheDownsizeRecord(t *testing.T) { - c := newPackCache(10) - - id := restic.NewRandomID() - - // get bigger range first - rd, err := c.get(id, 5, 5, func(offset int64, length int, wr io.WriteSeeker) error { - wr.Write([]byte{1, 2, 3, 4, 5}) - return nil - }) - rtest.OK(t, err) - rd.Close() - - // invalid "resize" requests - _, err = c.get(id, 5, 10, func(offset int64, length int, wr io.WriteSeeker) error { - t.Error("unexpected pack load") - return nil - }) - assertNotOK(t, "resize cached record", err) - - // invalid before cached range request - _, err = c.get(id, 0, 5, func(offset int64, length int, wr io.WriteSeeker) error { - t.Error("unexpected pack load") - return nil - }) - assertNotOK(t, "before cached range request", err) - - // invalid after cached range request - _, err = c.get(id, 10, 5, func(offset int64, length int, wr io.WriteSeeker) error { - t.Error("unexpected pack load") - return nil - }) - assertNotOK(t, "after cached range request", err) - - // now get smaller "nested" range - rd, err = c.get(id, 7, 1, func(offset int64, length int, wr io.WriteSeeker) error { - t.Error("unexpected pack load") - return nil - }) - rtest.OK(t, err) - - // assert expected data - buf := make([]byte, 1) - rd.ReadAt(buf, 7) - rtest.Equals(t, byte(3), buf[0]) - _, err = rd.ReadAt(buf, 0) - assertNotOK(t, "read before downsized pack range", err) - _, err = rd.ReadAt(buf, 9) - assertNotOK(t, "read after downsized pack range", err) - - // can't request downsized record again - _, err = c.get(id, 7, 1, func(offset int64, length int, wr io.WriteSeeker) error { - t.Error("unexpected pack load") - return nil - }) - assertNotOK(t, "double-allocation of cache record subrange", err) - - // can't request another subrange of the original record - _, err = c.get(id, 6, 1, func(offset int64, length int, wr io.WriteSeeker) error { - t.Error("unexpected pack load") - return nil - }) - assertNotOK(t, "allocation of another subrange of cache record", err) - - // release downsized record and assert the original is back in the cache - rd.Close() - rd, err = c.get(id, 5, 5, func(offset int64, length int, wr io.WriteSeeker) error { - t.Error("unexpected pack load") - return nil - }) - rtest.OK(t, err) - rd.Close() -} - -func TestPackCacheFailedDownload(t *testing.T) { - c := newPackCache(10) - assertEmpty := func() { - rtest.Equals(t, 0, len(c.cachedPacks)) - rtest.Equals(t, 10, c.capacity) - rtest.Equals(t, 0, c.reservedCapacity) - rtest.Equals(t, 0, c.allocatedCapacity) - } - - _, err := c.get(restic.NewRandomID(), 0, 5, func(offset int64, length int, wr io.WriteSeeker) error { - return errors.Errorf("expected induced test error") - }) - assertNotOK(t, "not enough bytes read", err) - assertEmpty() - - _, err = c.get(restic.NewRandomID(), 0, 5, func(offset int64, length int, wr io.WriteSeeker) error { - wr.Write([]byte{1}) - return nil - }) - assertNotOK(t, "not enough bytes read", err) - assertEmpty() - - _, err = c.get(restic.NewRandomID(), 0, 5, func(offset int64, length int, wr io.WriteSeeker) error { - wr.Write([]byte{1, 2, 3, 4, 5, 6}) - return nil - }) - assertNotOK(t, "too many bytes read", err) - assertEmpty() -} - -func TestPackCacheInvalidRequests(t *testing.T) { - c := newPackCache(10) - - id := restic.NewRandomID() - - // - rd, _ := c.get(id, 0, 1, func(offset int64, length int, wr io.WriteSeeker) error { - wr.Write([]byte{1}) - return nil - }) - assertNotOK(t, "remove() reserved pack", c.remove(id)) - rtest.OK(t, rd.Close()) - assertNotOK(t, "multiple reader Close() calls)", rd.Close()) - - // - rtest.OK(t, c.remove(id)) - assertNotOK(t, "double remove() the same pack", c.remove(id)) -} - -func TestPackCacheRecord(t *testing.T) { - rd := &packCacheRecord{ - offset: 10, - data: []byte{1}, - } - buf := make([]byte, 1) - n, err := rd.ReadAt(buf, 10) - rtest.OK(t, err) - rtest.Equals(t, 1, n) - rtest.Equals(t, byte(1), buf[0]) - - _, err = rd.ReadAt(buf, 0) - assertNotOK(t, "read before loaded range", err) - - _, err = rd.ReadAt(buf, 11) - assertNotOK(t, "read after loaded range", err) - - _, err = rd.ReadAt(make([]byte, 2), 10) - assertNotOK(t, "read more than available data", err) -} diff --git a/internal/restorer/packheap.go b/internal/restorer/packheap.go deleted file mode 100644 index 9f8443d46..000000000 --- a/internal/restorer/packheap.go +++ /dev/null @@ -1,51 +0,0 @@ -package restorer - -// packHeap is a heap of packInfo references -// @see https://golang.org/pkg/container/heap/ -// @see https://en.wikipedia.org/wiki/Heap_(data_structure) -type packHeap struct { - elements []*packInfo - - // returns true if download of any of the files is in progress - inprogress func(files map[*fileInfo]struct{}) bool -} - -func (pq *packHeap) Len() int { return len(pq.elements) } - -func (pq *packHeap) Less(a, b int) bool { - packA, packB := pq.elements[a], pq.elements[b] - - ap := pq.inprogress(packA.files) - bp := pq.inprogress(packB.files) - if ap && !bp { - return true - } - - if packA.cost < packB.cost { - return true - } - - return false -} - -func (pq *packHeap) Swap(i, j int) { - pq.elements[i], pq.elements[j] = pq.elements[j], pq.elements[i] - pq.elements[i].index = i - pq.elements[j].index = j -} - -func (pq *packHeap) Push(x interface{}) { - n := len(pq.elements) - item := x.(*packInfo) - item.index = n - pq.elements = append(pq.elements, item) -} - -func (pq *packHeap) Pop() interface{} { - old := pq.elements - n := len(old) - item := old[n-1] - item.index = -1 // for safety - pq.elements = old[0 : n-1] - return item -} diff --git a/internal/restorer/packqueue.go b/internal/restorer/packqueue.go deleted file mode 100644 index fe8259846..000000000 --- a/internal/restorer/packqueue.go +++ /dev/null @@ -1,224 +0,0 @@ -package restorer - -import ( - "container/heap" - - "github.com/restic/restic/internal/debug" - "github.com/restic/restic/internal/restic" -) - -// packQueue tracks remaining file contents restore work and decides what pack -// to download and files to write next. -// -// The packs in the queue can be in one of three states: waiting, ready and -// in-progress. -// Waiting packs are the packs that only have blobs from the "middle" of their -// corresponding files and therefore cannot be used until blobs from some other -// packs are written to the files first. -// In-progress packs are the packs that were removed from the queue by #nextPack -// and must be first returned to the queue before they are considered again. -// Ready packs are the packs can be immediately used to restore at least one -// file. Internally ready packs are kept in a heap and are ordered according -// to these criteria: -// - Packs with "head" blobs of in-progress files are considered first. The -// idea is to complete restore of in-progress files before starting restore -// of other files. This is both more intuitive and also reduces number of -// open file handles needed during restore. -// - Packs with smallest cost are considered next. Pack cost is measured in -// number of other packs required before all blobs in the pack can be used -// and the pack can be removed from the pack cache. -// For example, consisder a file that requires two blobs, blob1 from pack1 -// and blob2 from pack2. The cost of pack2 is 1, because blob2 cannot be -// used before blob1 is available. The higher the cost, the longer the pack -// must be cached locally to avoid redownload. -// -// Pack queue implementation is NOT thread safe. All pack queue methods must -// be called from single gorouting AND packInfo and fileInfo instances must -// be updated synchronously from the same gorouting. -type packQueue struct { - idx filePackTraverser - - packs map[restic.ID]*packInfo // waiting and ready packs - inprogress map[*packInfo]struct{} // inprogress packs - - heap *packHeap // heap of ready packs -} - -func newPackQueue(idx filePackTraverser, files []*fileInfo, inprogress func(files map[*fileInfo]struct{}) bool) (*packQueue, error) { - packs := make(map[restic.ID]*packInfo) // all packs - - // create packInfo from fileInfo - for _, file := range files { - err := idx.forEachFilePack(file, func(packIdx int, packID restic.ID, _ []restic.Blob) bool { - pack, ok := packs[packID] - if !ok { - pack = &packInfo{ - id: packID, - index: -1, - files: make(map[*fileInfo]struct{}), - } - packs[packID] = pack - } - pack.files[file] = struct{}{} - pack.cost += packIdx - - return true // keep going - }) - if err != nil { - // repository index is messed up, can't do anything - return nil, err - } - } - - // create packInfo heap - pheap := &packHeap{inprogress: inprogress} - headPacks := restic.NewIDSet() - for _, file := range files { - idx.forEachFilePack(file, func(packIdx int, packID restic.ID, _ []restic.Blob) bool { - if !headPacks.Has(packID) { - headPacks.Insert(packID) - pack := packs[packID] - pack.index = len(pheap.elements) - pheap.elements = append(pheap.elements, pack) - } - return false // only first pack - }) - } - heap.Init(pheap) - - return &packQueue{idx: idx, packs: packs, heap: pheap, inprogress: make(map[*packInfo]struct{})}, nil -} - -// isEmpty returns true if the queue is empty, i.e. there are no more packs to -// download and files to write to. -func (h *packQueue) isEmpty() bool { - return len(h.packs) == 0 && len(h.inprogress) == 0 -} - -// nextPack returns next ready pack and corresponding files ready for download -// and processing. The returned pack and the files are marked as "in progress" -// internally and must be first returned to the queue before they are -// considered by #nextPack again. -func (h *packQueue) nextPack() (*packInfo, []*fileInfo) { - debug.Log("Ready packs %d, outstanding packs %d, inprogress packs %d", h.heap.Len(), len(h.packs), len(h.inprogress)) - - if h.heap.Len() == 0 { - return nil, nil - } - - pack := heap.Pop(h.heap).(*packInfo) - h.inprogress[pack] = struct{}{} - debug.Log("Popped pack %s (%d files), heap size=%d", pack.id.Str(), len(pack.files), len(h.heap.elements)) - var files []*fileInfo - for file := range pack.files { - h.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool { - debug.Log("Pack #%d %s (%d blobs) used by %s", packIdx, packID.Str(), len(packBlobs), file.location) - if pack.id == packID { - files = append(files, file) - } - return false // only interested in the fist pack here - }) - } - - return pack, files -} - -// requeuePack conditionally adds back to the queue pack previously returned by -// #nextPack. -// If the pack is needed to restore any incomplete files, adds the pack to the -// queue and adjusts order of all affected packs in the queue. Has no effect -// if the pack is not required to restore any files. -// Returns true if the pack was added to the queue, false otherwise. -func (h *packQueue) requeuePack(pack *packInfo, success []*fileInfo, failure []*fileInfo) bool { - debug.Log("Requeue pack %s (%d/%d/%d files/success/failure)", pack.id.Str(), len(pack.files), len(success), len(failure)) - - // maintain inprogress pack set - delete(h.inprogress, pack) - - affectedPacks := make(map[*packInfo]struct{}) - affectedPacks[pack] = struct{}{} // this pack is alwats affected - - // apply download success/failure to the packs - onFailure := func(file *fileInfo) { - h.idx.forEachFilePack(file, func(packInx int, packID restic.ID, _ []restic.Blob) bool { - pack := h.packs[packID] - delete(pack.files, file) - pack.cost -= packInx - affectedPacks[pack] = struct{}{} - return true // keep going - }) - } - for _, file := range failure { - onFailure(file) - } - onSuccess := func(pack *packInfo, file *fileInfo) { - remove := true - h.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, _ []restic.Blob) bool { - if packID.Equal(pack.id) { - // the pack has more blobs required by the file - remove = false - } - otherPack := h.packs[packID] - otherPack.cost-- - affectedPacks[otherPack] = struct{}{} - return true // keep going - }) - if remove { - delete(pack.files, file) - } - } - for _, file := range success { - onSuccess(pack, file) - } - - // drop/update affected packs - isReady := func(affectedPack *packInfo) (ready bool) { - for file := range affectedPack.files { - h.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, _ []restic.Blob) bool { - if packID.Equal(affectedPack.id) { - ready = true - } - return false // only file's first pack matters - }) - if ready { - break - } - } - return ready - } - for affectedPack := range affectedPacks { - if _, inprogress := h.inprogress[affectedPack]; !inprogress { - if len(affectedPack.files) == 0 { - // drop the pack if it isn't inprogress and has no files that need it - if affectedPack.index >= 0 { - // This can't happen unless there is a bug elsewhere: - // - "current" pack isn't in the heap, hence its index must be < 0 - // - "other" packs can't be ready (i.e. in heap) unless they have other files - // in which case len(affectedPack.files) must be > 0 - debug.Log("corrupted ready heap: removed unexpected ready pack %s", affectedPack.id.Str()) - heap.Remove(h.heap, affectedPack.index) - } - delete(h.packs, affectedPack.id) - } else { - ready := isReady(affectedPack) - switch { - case ready && affectedPack.index < 0: - heap.Push(h.heap, affectedPack) - case ready && affectedPack.index >= 0: - heap.Fix(h.heap, affectedPack.index) - case !ready && affectedPack.index >= 0: - // This can't happen unless there is a bug elsewhere: - // - "current" pack isn't in the heap, hence its index must be < 0 - // - "other" packs can't have same head blobs as the "current" pack, - // hence "other" packs can't change their readiness - debug.Log("corrupted ready heap: removed unexpected waiting pack %s", affectedPack.id.Str()) - heap.Remove(h.heap, affectedPack.index) - case !ready && affectedPack.index < 0: - // do nothing - } - } - } - } - - return len(pack.files) > 0 -} diff --git a/internal/restorer/packqueue_test.go b/internal/restorer/packqueue_test.go deleted file mode 100644 index 880f7037a..000000000 --- a/internal/restorer/packqueue_test.go +++ /dev/null @@ -1,236 +0,0 @@ -package restorer - -import ( - "testing" - - "github.com/restic/restic/internal/restic" - rtest "github.com/restic/restic/internal/test" -) - -func processPack(t *testing.T, data *TestRepo, pack *packInfo, files []*fileInfo) { - for _, file := range files { - data.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool { - // assert file's head pack - rtest.Equals(t, pack.id, packID) - file.blobs = file.blobs[len(packBlobs):] - return false // only interested in the head pack - }) - } -} - -func TestPackQueueBasic(t *testing.T) { - data := newTestRepo([]TestFile{ - TestFile{ - name: "file", - blobs: []TestBlob{ - TestBlob{"data1", "pack1"}, - TestBlob{"data2", "pack2"}, - }, - }, - }) - - queue, err := newPackQueue(data.idx, data.files, func(_ map[*fileInfo]struct{}) bool { return false }) - rtest.OK(t, err) - - // assert initial queue state - rtest.Equals(t, false, queue.isEmpty()) - rtest.Equals(t, 0, queue.packs[data.packID("pack1")].cost) - rtest.Equals(t, 1, queue.packs[data.packID("pack2")].cost) - - // get first pack - pack, files := queue.nextPack() - rtest.Equals(t, "pack1", data.packName(pack)) - rtest.Equals(t, 1, len(files)) - rtest.Equals(t, false, queue.isEmpty()) - // TODO assert pack is inprogress - - // can't process the second pack until the first one is processed - { - pack, files := queue.nextPack() - rtest.Equals(t, true, pack == nil) - rtest.Equals(t, true, files == nil) - rtest.Equals(t, false, queue.isEmpty()) - } - - // requeue the pack without processing - rtest.Equals(t, true, queue.requeuePack(pack, []*fileInfo{}, []*fileInfo{})) - rtest.Equals(t, false, queue.isEmpty()) - rtest.Equals(t, 0, queue.packs[data.packID("pack1")].cost) - rtest.Equals(t, 1, queue.packs[data.packID("pack2")].cost) - - // get the first pack again - pack, files = queue.nextPack() - rtest.Equals(t, "pack1", data.packName(pack)) - rtest.Equals(t, 1, len(files)) - rtest.Equals(t, false, queue.isEmpty()) - - // process the first pack and return it to the queue - processPack(t, data, pack, files) - rtest.Equals(t, false, queue.requeuePack(pack, files, []*fileInfo{})) - rtest.Equals(t, 0, queue.packs[data.packID("pack2")].cost) - - // get the second pack - pack, files = queue.nextPack() - rtest.Equals(t, "pack2", data.packName(pack)) - rtest.Equals(t, 1, len(files)) - rtest.Equals(t, false, queue.isEmpty()) - - // process the second pack and return it to the queue - processPack(t, data, pack, files) - rtest.Equals(t, false, queue.requeuePack(pack, files, []*fileInfo{})) - - // all packs processed - rtest.Equals(t, true, queue.isEmpty()) -} - -func TestPackQueueFailedFile(t *testing.T) { - // point of this test is to assert that enqueuePack removes - // all references to files that failed restore - - data := newTestRepo([]TestFile{ - TestFile{ - name: "file", - blobs: []TestBlob{ - TestBlob{"data1", "pack1"}, - TestBlob{"data2", "pack2"}, - }, - }, - }) - - queue, err := newPackQueue(data.idx, data.files, func(_ map[*fileInfo]struct{}) bool { return false }) - rtest.OK(t, err) - - pack, files := queue.nextPack() - rtest.Equals(t, false, queue.requeuePack(pack, []*fileInfo{}, files /*failed*/)) - rtest.Equals(t, true, queue.isEmpty()) -} - -func TestPackQueueOrderingCost(t *testing.T) { - // assert pack1 is selected before pack2: - // pack1 is ready to restore file1, pack2 is ready to restore file2 - // but pack2 cannot be immediately used to restore file1 - - data := newTestRepo([]TestFile{ - TestFile{ - name: "file1", - blobs: []TestBlob{ - TestBlob{"data1", "pack1"}, - TestBlob{"data2", "pack2"}, - }, - }, - TestFile{ - name: "file2", - blobs: []TestBlob{ - TestBlob{"data2", "pack2"}, - }, - }, - }) - - queue, err := newPackQueue(data.idx, data.files, func(_ map[*fileInfo]struct{}) bool { return false }) - rtest.OK(t, err) - - // assert initial pack costs - rtest.Equals(t, 0, data.pack(queue, "pack1").cost) - rtest.Equals(t, 0, data.pack(queue, "pack1").index) // head of the heap - rtest.Equals(t, 1, data.pack(queue, "pack2").cost) - rtest.Equals(t, 1, data.pack(queue, "pack2").index) - - pack, files := queue.nextPack() - // assert selected pack and queue state - rtest.Equals(t, "pack1", data.packName(pack)) - // process the pack - processPack(t, data, pack, files) - rtest.Equals(t, false, queue.requeuePack(pack, files, []*fileInfo{})) -} - -func TestPackQueueOrderingInprogress(t *testing.T) { - // finish restoring one file before starting another - - data := newTestRepo([]TestFile{ - TestFile{ - name: "file1", - blobs: []TestBlob{ - TestBlob{"data1-1", "pack1-1"}, - TestBlob{"data1-2", "pack1-2"}, - }, - }, - TestFile{ - name: "file2", - blobs: []TestBlob{ - TestBlob{"data2-1", "pack2-1"}, - TestBlob{"data2-2", "pack2-2"}, - }, - }, - }) - - var inprogress *fileInfo - queue, err := newPackQueue(data.idx, data.files, func(files map[*fileInfo]struct{}) bool { - _, found := files[inprogress] - return found - }) - rtest.OK(t, err) - - // first pack of a file - pack, files := queue.nextPack() - rtest.Equals(t, 1, len(files)) - file := files[0] - processPack(t, data, pack, files) - inprogress = files[0] - queue.requeuePack(pack, files, []*fileInfo{}) - - // second pack of the same file - pack, files = queue.nextPack() - rtest.Equals(t, 1, len(files)) - rtest.Equals(t, true, file == files[0]) // same file as before - processPack(t, data, pack, files) - inprogress = nil - queue.requeuePack(pack, files, []*fileInfo{}) - - // first pack of the second file - pack, files = queue.nextPack() - rtest.Equals(t, 1, len(files)) - rtest.Equals(t, false, file == files[0]) // different file as before -} - -func TestPackQueuePackMultiuse(t *testing.T) { - // the same pack is required multiple times to restore the same file - - data := newTestRepo([]TestFile{ - TestFile{ - name: "file", - blobs: []TestBlob{ - TestBlob{"data1", "pack1"}, - TestBlob{"data2", "pack2"}, - TestBlob{"data3", "pack1"}, // pack1 reuse, new blob - TestBlob{"data2", "pack2"}, // pack2 reuse, same blob - }, - }, - }) - - queue, err := newPackQueue(data.idx, data.files, func(_ map[*fileInfo]struct{}) bool { return false }) - rtest.OK(t, err) - - pack, files := queue.nextPack() - rtest.Equals(t, "pack1", data.packName(pack)) - rtest.Equals(t, 1, len(pack.files)) - processPack(t, data, pack, files) - rtest.Equals(t, true, queue.requeuePack(pack, files, []*fileInfo{})) - - pack, files = queue.nextPack() - rtest.Equals(t, "pack2", data.packName(pack)) - rtest.Equals(t, 1, len(pack.files)) - processPack(t, data, pack, files) - rtest.Equals(t, true, queue.requeuePack(pack, files, []*fileInfo{})) - - pack, files = queue.nextPack() - rtest.Equals(t, "pack1", data.packName(pack)) - processPack(t, data, pack, files) - rtest.Equals(t, false, queue.requeuePack(pack, files, []*fileInfo{})) - - pack, files = queue.nextPack() - rtest.Equals(t, "pack2", data.packName(pack)) - processPack(t, data, pack, files) - rtest.Equals(t, false, queue.requeuePack(pack, files, []*fileInfo{})) - - rtest.Equals(t, true, queue.isEmpty()) -} diff --git a/internal/restorer/restorer.go b/internal/restorer/restorer.go index bbef5083b..7f497c5d6 100644 --- a/internal/restorer/restorer.go +++ b/internal/restorer/restorer.go @@ -206,7 +206,7 @@ func (res *Restorer) RestoreTo(ctx context.Context, dst string) error { idx := restic.NewHardlinkIndex() - filerestorer := newFileRestorer(dst, res.repo.Backend().Load, res.repo.Key(), filePackTraverser{lookup: res.repo.Index().Lookup}) + filerestorer := newFileRestorer(dst, res.repo.Backend().Load, res.repo.Key(), res.repo.Index().Lookup) // first tree pass: create directories and collect all files to restore err = res.traverseTree(ctx, dst, string(filepath.Separator), *res.sn.Tree, treeVisitor{ @@ -249,7 +249,7 @@ func (res *Restorer) RestoreTo(ctx context.Context, dst string) error { return err } - err = filerestorer.restoreFiles(ctx, func(location string, err error) { res.Error(location, err) }) + err = filerestorer.restoreFiles(ctx) if err != nil { return err } diff --git a/vendor/github.com/cespare/xxhash/LICENSE.txt b/vendor/github.com/cespare/xxhash/LICENSE.txt new file mode 100644 index 000000000..24b53065f --- /dev/null +++ b/vendor/github.com/cespare/xxhash/LICENSE.txt @@ -0,0 +1,22 @@ +Copyright (c) 2016 Caleb Spare + +MIT License + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/vendor/github.com/cespare/xxhash/README.md b/vendor/github.com/cespare/xxhash/README.md new file mode 100644 index 000000000..0982fd25e --- /dev/null +++ b/vendor/github.com/cespare/xxhash/README.md @@ -0,0 +1,50 @@ +# xxhash + +[![GoDoc](https://godoc.org/github.com/cespare/xxhash?status.svg)](https://godoc.org/github.com/cespare/xxhash) + +xxhash is a Go implementation of the 64-bit +[xxHash](http://cyan4973.github.io/xxHash/) algorithm, XXH64. This is a +high-quality hashing algorithm that is much faster than anything in the Go +standard library. + +The API is very small, taking its cue from the other hashing packages in the +standard library: + + $ go doc github.com/cespare/xxhash ! + package xxhash // import "github.com/cespare/xxhash" + + Package xxhash implements the 64-bit variant of xxHash (XXH64) as described + at http://cyan4973.github.io/xxHash/. + + func New() hash.Hash64 + func Sum64(b []byte) uint64 + func Sum64String(s string) uint64 + +This implementation provides a fast pure-Go implementation and an even faster +assembly implementation for amd64. + +## Benchmarks + +Here are some quick benchmarks comparing the pure-Go and assembly +implementations of Sum64 against another popular Go XXH64 implementation, +[github.com/OneOfOne/xxhash](https://github.com/OneOfOne/xxhash): + +| input size | OneOfOne | cespare (purego) | cespare | +| --- | --- | --- | --- | +| 5 B | 416 MB/s | 720 MB/s | 872 MB/s | +| 100 B | 3980 MB/s | 5013 MB/s | 5252 MB/s | +| 4 KB | 12727 MB/s | 12999 MB/s | 13026 MB/s | +| 10 MB | 9879 MB/s | 10775 MB/s | 10913 MB/s | + +These numbers were generated with: + +``` +$ go test -benchtime 10s -bench '/OneOfOne,' +$ go test -tags purego -benchtime 10s -bench '/xxhash,' +$ go test -benchtime 10s -bench '/xxhash,' +``` + +## Projects using this package + +- [InfluxDB](https://github.com/influxdata/influxdb) +- [Prometheus](https://github.com/prometheus/prometheus) diff --git a/vendor/github.com/cespare/xxhash/go.mod b/vendor/github.com/cespare/xxhash/go.mod new file mode 100644 index 000000000..10605a6a5 --- /dev/null +++ b/vendor/github.com/cespare/xxhash/go.mod @@ -0,0 +1,6 @@ +module github.com/cespare/xxhash + +require ( + github.com/OneOfOne/xxhash v1.2.2 + github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 +) diff --git a/vendor/github.com/cespare/xxhash/go.sum b/vendor/github.com/cespare/xxhash/go.sum new file mode 100644 index 000000000..f6b554261 --- /dev/null +++ b/vendor/github.com/cespare/xxhash/go.sum @@ -0,0 +1,4 @@ +github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= +github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= diff --git a/vendor/github.com/cespare/xxhash/rotate.go b/vendor/github.com/cespare/xxhash/rotate.go new file mode 100644 index 000000000..f3eac5ebc --- /dev/null +++ b/vendor/github.com/cespare/xxhash/rotate.go @@ -0,0 +1,14 @@ +// +build !go1.9 + +package xxhash + +// TODO(caleb): After Go 1.10 comes out, remove this fallback code. + +func rol1(x uint64) uint64 { return (x << 1) | (x >> (64 - 1)) } +func rol7(x uint64) uint64 { return (x << 7) | (x >> (64 - 7)) } +func rol11(x uint64) uint64 { return (x << 11) | (x >> (64 - 11)) } +func rol12(x uint64) uint64 { return (x << 12) | (x >> (64 - 12)) } +func rol18(x uint64) uint64 { return (x << 18) | (x >> (64 - 18)) } +func rol23(x uint64) uint64 { return (x << 23) | (x >> (64 - 23)) } +func rol27(x uint64) uint64 { return (x << 27) | (x >> (64 - 27)) } +func rol31(x uint64) uint64 { return (x << 31) | (x >> (64 - 31)) } diff --git a/vendor/github.com/cespare/xxhash/rotate19.go b/vendor/github.com/cespare/xxhash/rotate19.go new file mode 100644 index 000000000..b99612bab --- /dev/null +++ b/vendor/github.com/cespare/xxhash/rotate19.go @@ -0,0 +1,14 @@ +// +build go1.9 + +package xxhash + +import "math/bits" + +func rol1(x uint64) uint64 { return bits.RotateLeft64(x, 1) } +func rol7(x uint64) uint64 { return bits.RotateLeft64(x, 7) } +func rol11(x uint64) uint64 { return bits.RotateLeft64(x, 11) } +func rol12(x uint64) uint64 { return bits.RotateLeft64(x, 12) } +func rol18(x uint64) uint64 { return bits.RotateLeft64(x, 18) } +func rol23(x uint64) uint64 { return bits.RotateLeft64(x, 23) } +func rol27(x uint64) uint64 { return bits.RotateLeft64(x, 27) } +func rol31(x uint64) uint64 { return bits.RotateLeft64(x, 31) } diff --git a/vendor/github.com/cespare/xxhash/xxhash.go b/vendor/github.com/cespare/xxhash/xxhash.go new file mode 100644 index 000000000..f896bd28f --- /dev/null +++ b/vendor/github.com/cespare/xxhash/xxhash.go @@ -0,0 +1,168 @@ +// Package xxhash implements the 64-bit variant of xxHash (XXH64) as described +// at http://cyan4973.github.io/xxHash/. +package xxhash + +import ( + "encoding/binary" + "hash" +) + +const ( + prime1 uint64 = 11400714785074694791 + prime2 uint64 = 14029467366897019727 + prime3 uint64 = 1609587929392839161 + prime4 uint64 = 9650029242287828579 + prime5 uint64 = 2870177450012600261 +) + +// NOTE(caleb): I'm using both consts and vars of the primes. Using consts where +// possible in the Go code is worth a small (but measurable) performance boost +// by avoiding some MOVQs. Vars are needed for the asm and also are useful for +// convenience in the Go code in a few places where we need to intentionally +// avoid constant arithmetic (e.g., v1 := prime1 + prime2 fails because the +// result overflows a uint64). +var ( + prime1v = prime1 + prime2v = prime2 + prime3v = prime3 + prime4v = prime4 + prime5v = prime5 +) + +type xxh struct { + v1 uint64 + v2 uint64 + v3 uint64 + v4 uint64 + total int + mem [32]byte + n int // how much of mem is used +} + +// New creates a new hash.Hash64 that implements the 64-bit xxHash algorithm. +func New() hash.Hash64 { + var x xxh + x.Reset() + return &x +} + +func (x *xxh) Reset() { + x.n = 0 + x.total = 0 + x.v1 = prime1v + prime2 + x.v2 = prime2 + x.v3 = 0 + x.v4 = -prime1v +} + +func (x *xxh) Size() int { return 8 } +func (x *xxh) BlockSize() int { return 32 } + +// Write adds more data to x. It always returns len(b), nil. +func (x *xxh) Write(b []byte) (n int, err error) { + n = len(b) + x.total += len(b) + + if x.n+len(b) < 32 { + // This new data doesn't even fill the current block. + copy(x.mem[x.n:], b) + x.n += len(b) + return + } + + if x.n > 0 { + // Finish off the partial block. + copy(x.mem[x.n:], b) + x.v1 = round(x.v1, u64(x.mem[0:8])) + x.v2 = round(x.v2, u64(x.mem[8:16])) + x.v3 = round(x.v3, u64(x.mem[16:24])) + x.v4 = round(x.v4, u64(x.mem[24:32])) + b = b[32-x.n:] + x.n = 0 + } + + if len(b) >= 32 { + // One or more full blocks left. + b = writeBlocks(x, b) + } + + // Store any remaining partial block. + copy(x.mem[:], b) + x.n = len(b) + + return +} + +func (x *xxh) Sum(b []byte) []byte { + s := x.Sum64() + return append( + b, + byte(s>>56), + byte(s>>48), + byte(s>>40), + byte(s>>32), + byte(s>>24), + byte(s>>16), + byte(s>>8), + byte(s), + ) +} + +func (x *xxh) Sum64() uint64 { + var h uint64 + + if x.total >= 32 { + v1, v2, v3, v4 := x.v1, x.v2, x.v3, x.v4 + h = rol1(v1) + rol7(v2) + rol12(v3) + rol18(v4) + h = mergeRound(h, v1) + h = mergeRound(h, v2) + h = mergeRound(h, v3) + h = mergeRound(h, v4) + } else { + h = x.v3 + prime5 + } + + h += uint64(x.total) + + i, end := 0, x.n + for ; i+8 <= end; i += 8 { + k1 := round(0, u64(x.mem[i:i+8])) + h ^= k1 + h = rol27(h)*prime1 + prime4 + } + if i+4 <= end { + h ^= uint64(u32(x.mem[i:i+4])) * prime1 + h = rol23(h)*prime2 + prime3 + i += 4 + } + for i < end { + h ^= uint64(x.mem[i]) * prime5 + h = rol11(h) * prime1 + i++ + } + + h ^= h >> 33 + h *= prime2 + h ^= h >> 29 + h *= prime3 + h ^= h >> 32 + + return h +} + +func u64(b []byte) uint64 { return binary.LittleEndian.Uint64(b) } +func u32(b []byte) uint32 { return binary.LittleEndian.Uint32(b) } + +func round(acc, input uint64) uint64 { + acc += input * prime2 + acc = rol31(acc) + acc *= prime1 + return acc +} + +func mergeRound(acc, val uint64) uint64 { + val = round(0, val) + acc ^= val + acc = acc*prime1 + prime4 + return acc +} diff --git a/vendor/github.com/cespare/xxhash/xxhash_amd64.go b/vendor/github.com/cespare/xxhash/xxhash_amd64.go new file mode 100644 index 000000000..d61765268 --- /dev/null +++ b/vendor/github.com/cespare/xxhash/xxhash_amd64.go @@ -0,0 +1,12 @@ +// +build !appengine +// +build gc +// +build !purego + +package xxhash + +// Sum64 computes the 64-bit xxHash digest of b. +// +//go:noescape +func Sum64(b []byte) uint64 + +func writeBlocks(x *xxh, b []byte) []byte diff --git a/vendor/github.com/cespare/xxhash/xxhash_amd64.s b/vendor/github.com/cespare/xxhash/xxhash_amd64.s new file mode 100644 index 000000000..757f2011f --- /dev/null +++ b/vendor/github.com/cespare/xxhash/xxhash_amd64.s @@ -0,0 +1,233 @@ +// +build !appengine +// +build gc +// +build !purego + +#include "textflag.h" + +// Register allocation: +// AX h +// CX pointer to advance through b +// DX n +// BX loop end +// R8 v1, k1 +// R9 v2 +// R10 v3 +// R11 v4 +// R12 tmp +// R13 prime1v +// R14 prime2v +// R15 prime4v + +// round reads from and advances the buffer pointer in CX. +// It assumes that R13 has prime1v and R14 has prime2v. +#define round(r) \ + MOVQ (CX), R12 \ + ADDQ $8, CX \ + IMULQ R14, R12 \ + ADDQ R12, r \ + ROLQ $31, r \ + IMULQ R13, r + +// mergeRound applies a merge round on the two registers acc and val. +// It assumes that R13 has prime1v, R14 has prime2v, and R15 has prime4v. +#define mergeRound(acc, val) \ + IMULQ R14, val \ + ROLQ $31, val \ + IMULQ R13, val \ + XORQ val, acc \ + IMULQ R13, acc \ + ADDQ R15, acc + +// func Sum64(b []byte) uint64 +TEXT ·Sum64(SB), NOSPLIT, $0-32 + // Load fixed primes. + MOVQ ·prime1v(SB), R13 + MOVQ ·prime2v(SB), R14 + MOVQ ·prime4v(SB), R15 + + // Load slice. + MOVQ b_base+0(FP), CX + MOVQ b_len+8(FP), DX + LEAQ (CX)(DX*1), BX + + // The first loop limit will be len(b)-32. + SUBQ $32, BX + + // Check whether we have at least one block. + CMPQ DX, $32 + JLT noBlocks + + // Set up initial state (v1, v2, v3, v4). + MOVQ R13, R8 + ADDQ R14, R8 + MOVQ R14, R9 + XORQ R10, R10 + XORQ R11, R11 + SUBQ R13, R11 + + // Loop until CX > BX. +blockLoop: + round(R8) + round(R9) + round(R10) + round(R11) + + CMPQ CX, BX + JLE blockLoop + + MOVQ R8, AX + ROLQ $1, AX + MOVQ R9, R12 + ROLQ $7, R12 + ADDQ R12, AX + MOVQ R10, R12 + ROLQ $12, R12 + ADDQ R12, AX + MOVQ R11, R12 + ROLQ $18, R12 + ADDQ R12, AX + + mergeRound(AX, R8) + mergeRound(AX, R9) + mergeRound(AX, R10) + mergeRound(AX, R11) + + JMP afterBlocks + +noBlocks: + MOVQ ·prime5v(SB), AX + +afterBlocks: + ADDQ DX, AX + + // Right now BX has len(b)-32, and we want to loop until CX > len(b)-8. + ADDQ $24, BX + + CMPQ CX, BX + JG fourByte + +wordLoop: + // Calculate k1. + MOVQ (CX), R8 + ADDQ $8, CX + IMULQ R14, R8 + ROLQ $31, R8 + IMULQ R13, R8 + + XORQ R8, AX + ROLQ $27, AX + IMULQ R13, AX + ADDQ R15, AX + + CMPQ CX, BX + JLE wordLoop + +fourByte: + ADDQ $4, BX + CMPQ CX, BX + JG singles + + MOVL (CX), R8 + ADDQ $4, CX + IMULQ R13, R8 + XORQ R8, AX + + ROLQ $23, AX + IMULQ R14, AX + ADDQ ·prime3v(SB), AX + +singles: + ADDQ $4, BX + CMPQ CX, BX + JGE finalize + +singlesLoop: + MOVBQZX (CX), R12 + ADDQ $1, CX + IMULQ ·prime5v(SB), R12 + XORQ R12, AX + + ROLQ $11, AX + IMULQ R13, AX + + CMPQ CX, BX + JL singlesLoop + +finalize: + MOVQ AX, R12 + SHRQ $33, R12 + XORQ R12, AX + IMULQ R14, AX + MOVQ AX, R12 + SHRQ $29, R12 + XORQ R12, AX + IMULQ ·prime3v(SB), AX + MOVQ AX, R12 + SHRQ $32, R12 + XORQ R12, AX + + MOVQ AX, ret+24(FP) + RET + +// writeBlocks uses the same registers as above except that it uses AX to store +// the x pointer. + +// func writeBlocks(x *xxh, b []byte) []byte +TEXT ·writeBlocks(SB), NOSPLIT, $0-56 + // Load fixed primes needed for round. + MOVQ ·prime1v(SB), R13 + MOVQ ·prime2v(SB), R14 + + // Load slice. + MOVQ b_base+8(FP), CX + MOVQ CX, ret_base+32(FP) // initialize return base pointer; see NOTE below + MOVQ b_len+16(FP), DX + LEAQ (CX)(DX*1), BX + SUBQ $32, BX + + // Load vN from x. + MOVQ x+0(FP), AX + MOVQ 0(AX), R8 // v1 + MOVQ 8(AX), R9 // v2 + MOVQ 16(AX), R10 // v3 + MOVQ 24(AX), R11 // v4 + + // We don't need to check the loop condition here; this function is + // always called with at least one block of data to process. +blockLoop: + round(R8) + round(R9) + round(R10) + round(R11) + + CMPQ CX, BX + JLE blockLoop + + // Copy vN back to x. + MOVQ R8, 0(AX) + MOVQ R9, 8(AX) + MOVQ R10, 16(AX) + MOVQ R11, 24(AX) + + // Construct return slice. + // NOTE: It's important that we don't construct a slice that has a base + // pointer off the end of the original slice, as in Go 1.7+ this will + // cause runtime crashes. (See discussion in, for example, + // https://github.com/golang/go/issues/16772.) + // Therefore, we calculate the length/cap first, and if they're zero, we + // keep the old base. This is what the compiler does as well if you + // write code like + // b = b[len(b):] + + // New length is 32 - (CX - BX) -> BX+32 - CX. + ADDQ $32, BX + SUBQ CX, BX + JZ afterSetBase + + MOVQ CX, ret_base+32(FP) + +afterSetBase: + MOVQ BX, ret_len+40(FP) + MOVQ BX, ret_cap+48(FP) // set cap == len + + RET diff --git a/vendor/github.com/cespare/xxhash/xxhash_other.go b/vendor/github.com/cespare/xxhash/xxhash_other.go new file mode 100644 index 000000000..c68d13f89 --- /dev/null +++ b/vendor/github.com/cespare/xxhash/xxhash_other.go @@ -0,0 +1,75 @@ +// +build !amd64 appengine !gc purego + +package xxhash + +// Sum64 computes the 64-bit xxHash digest of b. +func Sum64(b []byte) uint64 { + // A simpler version would be + // x := New() + // x.Write(b) + // return x.Sum64() + // but this is faster, particularly for small inputs. + + n := len(b) + var h uint64 + + if n >= 32 { + v1 := prime1v + prime2 + v2 := prime2 + v3 := uint64(0) + v4 := -prime1v + for len(b) >= 32 { + v1 = round(v1, u64(b[0:8:len(b)])) + v2 = round(v2, u64(b[8:16:len(b)])) + v3 = round(v3, u64(b[16:24:len(b)])) + v4 = round(v4, u64(b[24:32:len(b)])) + b = b[32:len(b):len(b)] + } + h = rol1(v1) + rol7(v2) + rol12(v3) + rol18(v4) + h = mergeRound(h, v1) + h = mergeRound(h, v2) + h = mergeRound(h, v3) + h = mergeRound(h, v4) + } else { + h = prime5 + } + + h += uint64(n) + + i, end := 0, len(b) + for ; i+8 <= end; i += 8 { + k1 := round(0, u64(b[i:i+8:len(b)])) + h ^= k1 + h = rol27(h)*prime1 + prime4 + } + if i+4 <= end { + h ^= uint64(u32(b[i:i+4:len(b)])) * prime1 + h = rol23(h)*prime2 + prime3 + i += 4 + } + for ; i < end; i++ { + h ^= uint64(b[i]) * prime5 + h = rol11(h) * prime1 + } + + h ^= h >> 33 + h *= prime2 + h ^= h >> 29 + h *= prime3 + h ^= h >> 32 + + return h +} + +func writeBlocks(x *xxh, b []byte) []byte { + v1, v2, v3, v4 := x.v1, x.v2, x.v3, x.v4 + for len(b) >= 32 { + v1 = round(v1, u64(b[0:8:len(b)])) + v2 = round(v2, u64(b[8:16:len(b)])) + v3 = round(v3, u64(b[16:24:len(b)])) + v4 = round(v4, u64(b[24:32:len(b)])) + b = b[32:len(b):len(b)] + } + x.v1, x.v2, x.v3, x.v4 = v1, v2, v3, v4 + return b +} diff --git a/vendor/github.com/cespare/xxhash/xxhash_safe.go b/vendor/github.com/cespare/xxhash/xxhash_safe.go new file mode 100644 index 000000000..dfa15ab7e --- /dev/null +++ b/vendor/github.com/cespare/xxhash/xxhash_safe.go @@ -0,0 +1,10 @@ +// +build appengine + +// This file contains the safe implementations of otherwise unsafe-using code. + +package xxhash + +// Sum64String computes the 64-bit xxHash digest of s. +func Sum64String(s string) uint64 { + return Sum64([]byte(s)) +} diff --git a/vendor/github.com/cespare/xxhash/xxhash_unsafe.go b/vendor/github.com/cespare/xxhash/xxhash_unsafe.go new file mode 100644 index 000000000..d2b64e8bb --- /dev/null +++ b/vendor/github.com/cespare/xxhash/xxhash_unsafe.go @@ -0,0 +1,30 @@ +// +build !appengine + +// This file encapsulates usage of unsafe. +// xxhash_safe.go contains the safe implementations. + +package xxhash + +import ( + "reflect" + "unsafe" +) + +// Sum64String computes the 64-bit xxHash digest of s. +// It may be faster than Sum64([]byte(s)) by avoiding a copy. +// +// TODO(caleb): Consider removing this if an optimization is ever added to make +// it unnecessary: https://golang.org/issue/2205. +// +// TODO(caleb): We still have a function call; we could instead write Go/asm +// copies of Sum64 for strings to squeeze out a bit more speed. +func Sum64String(s string) uint64 { + // See https://groups.google.com/d/msg/golang-nuts/dcjzJy-bSpw/tcZYBzQqAQAJ + // for some discussion about this unsafe conversion. + var b []byte + bh := (*reflect.SliceHeader)(unsafe.Pointer(&b)) + bh.Data = (*reflect.StringHeader)(unsafe.Pointer(&s)).Data + bh.Len = len(s) + bh.Cap = len(s) + return Sum64(b) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 62085d51a..e663bbe7a 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -20,6 +20,8 @@ github.com/Azure/go-autorest/logger github.com/Azure/go-autorest/tracing # github.com/cenkalti/backoff v2.1.1+incompatible github.com/cenkalti/backoff +# github.com/cespare/xxhash v1.1.0 +github.com/cespare/xxhash # github.com/cpuguy83/go-md2man v1.0.10 github.com/cpuguy83/go-md2man/md2man # github.com/dgrijalva/jwt-go v3.2.0+incompatible