package restorer import ( "io" "sync" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" ) // packCache is thread safe in-memory cache of pack files required to restore // one or more files. The cache is meant to hold pack files that cannot be // fully used right away. This happens when pack files contains blobs from // "head" of some files and "middle" of other files. "Middle" blobs cannot be // written to their files until after blobs from some other packs are written // to the files first. // // While the cache is thread safe, implementation assumes (and enforces) // that individual entries are used by one client at a time. Clients must // #Close() entry's reader to make the entry available for use by other // clients. This limitation can be relaxed in the future if necessary. type packCache struct { // guards access to cache internal data structures lock sync.Mutex // cache capacity capacity int reservedCapacity int allocatedCapacity int // pack records currently being used by active restore worker reservedPacks map[restic.ID]*packCacheRecord // unused allocated packs, can be deleted if necessary cachedPacks map[restic.ID]*packCacheRecord } type packCacheRecord struct { master *packCacheRecord cache *packCache id restic.ID // cached pack id offset int64 // cached pack byte range data []byte } type readerAtCloser interface { io.Closer io.ReaderAt } type bytesWriteSeeker struct { pos int data []byte } func (wr *bytesWriteSeeker) Write(p []byte) (n int, err error) { if wr.pos+len(p) > len(wr.data) { return -1, errors.Errorf("not enough space") } n = copy(wr.data[wr.pos:], p) wr.pos += n return n, nil } func (wr *bytesWriteSeeker) Seek(offset int64, whence int) (int64, error) { if offset != 0 || whence != io.SeekStart { return -1, errors.Errorf("unsupported seek request") } wr.pos = 0 return 0, nil } func newPackCache(capacity int) *packCache { return &packCache{ capacity: capacity, reservedPacks: make(map[restic.ID]*packCacheRecord), cachedPacks: make(map[restic.ID]*packCacheRecord), } } func (c *packCache) reserve(packID restic.ID, offset int64, length int) (record *packCacheRecord, err error) { c.lock.Lock() defer c.lock.Unlock() if offset < 0 || length <= 0 { return nil, errors.Errorf("illegal pack cache allocation range %s {offset: %d, length: %d}", packID.Str(), offset, length) } if c.reservedCapacity+length > c.capacity { return nil, errors.Errorf("not enough cache capacity: requested %d, available %d", length, c.capacity-c.reservedCapacity) } if _, ok := c.reservedPacks[packID]; ok { return nil, errors.Errorf("pack is already reserved %s", packID.Str()) } // the pack is available in the cache and currently unused if pack, ok := c.cachedPacks[packID]; ok { // check if cached pack includes requested byte range // the range can shrink, but it never grows bigger unless there is a bug elsewhere if pack.offset > offset || (pack.offset+int64(len(pack.data))) < (offset+int64(length)) { return nil, errors.Errorf("cached range %d-%d is smaller than requested range %d-%d for pack %s", pack.offset, pack.offset+int64(len(pack.data)), length, offset+int64(length), packID.Str()) } // move the pack to the used map delete(c.cachedPacks, packID) c.reservedPacks[packID] = pack c.reservedCapacity += len(pack.data) debug.Log("Using cached pack %s (%d bytes)", pack.id.Str(), len(pack.data)) if pack.offset != offset || len(pack.data) != length { // restrict returned record to requested range return &packCacheRecord{ cache: c, master: pack, offset: offset, data: pack.data[int(offset-pack.offset) : int(offset-pack.offset)+length], }, nil } return pack, nil } for c.allocatedCapacity+length > c.capacity { // all cached packs will be needed at some point // so it does not matter which one to purge for _, cached := range c.cachedPacks { delete(c.cachedPacks, cached.id) c.allocatedCapacity -= len(cached.data) debug.Log("dropped cached pack %s (%d bytes)", cached.id.Str(), len(cached.data)) break } } pack := &packCacheRecord{ cache: c, id: packID, offset: offset, } c.reservedPacks[pack.id] = pack c.allocatedCapacity += length c.reservedCapacity += length return pack, nil } // get returns reader of the specified cached pack. Uses provided load func // to download pack content if necessary. // The returned reader is only able to read pack within byte range specified // by offset and length parameters, attempts to read outside that range will // result in an error. // The returned reader must be closed before the same packID can be requested // from the cache again. func (c *packCache) get(packID restic.ID, offset int64, length int, load func(offset int64, length int, wr io.WriteSeeker) error) (readerAtCloser, error) { pack, err := c.reserve(packID, offset, length) if err != nil { return nil, err } if pack.data == nil { releasePack := func() { delete(c.reservedPacks, pack.id) c.reservedCapacity -= length c.allocatedCapacity -= length } wr := &bytesWriteSeeker{data: make([]byte, length)} err = load(offset, length, wr) if err != nil { releasePack() return nil, err } if wr.pos != length { releasePack() return nil, errors.Errorf("invalid read size") } pack.data = wr.data debug.Log("Downloaded and cached pack %s (%d bytes)", pack.id.Str(), len(pack.data)) } return pack, nil } // releases the pack record back to the cache func (c *packCache) release(pack *packCacheRecord) error { c.lock.Lock() defer c.lock.Unlock() if _, ok := c.reservedPacks[pack.id]; !ok { return errors.Errorf("invalid pack release request") } delete(c.reservedPacks, pack.id) c.cachedPacks[pack.id] = pack c.reservedCapacity -= len(pack.data) return nil } // remove removes specified pack from the cache and frees // corresponding cache space. should be called after the pack // was fully used up by the restorer. func (c *packCache) remove(packID restic.ID) error { c.lock.Lock() defer c.lock.Unlock() if _, ok := c.reservedPacks[packID]; ok { return errors.Errorf("invalid pack remove request, pack %s is reserved", packID.Str()) } pack, ok := c.cachedPacks[packID] if !ok { return errors.Errorf("invalid pack remove request, pack %s is not cached", packID.Str()) } delete(c.cachedPacks, pack.id) c.allocatedCapacity -= len(pack.data) return nil } // ReadAt reads len(b) bytes from the pack starting at byte offset off. // It returns the number of bytes read and the error, if any. func (r *packCacheRecord) ReadAt(b []byte, off int64) (n int, err error) { if off < r.offset || off+int64(len(b)) > r.offset+int64(len(r.data)) { return -1, errors.Errorf("read outside available range") } return copy(b, r.data[off-r.offset:]), nil } // Close closes the pack reader and releases corresponding cache record // to the cache. Once closed, the record can be reused by subsequent // requests for the same packID or it can be purged from the cache to make // room for other packs func (r *packCacheRecord) Close() (err error) { if r.master != nil { return r.cache.release(r.master) } return r.cache.release(r) }