diff --git a/changelog/unreleased/issue-4274 b/changelog/unreleased/issue-4274 new file mode 100644 index 000000000..96cb0709d --- /dev/null +++ b/changelog/unreleased/issue-4274 @@ -0,0 +1,10 @@ +Bugfix: Improve lock refresh handling when using standby + +If the restic process was stopped or the host running restic entered standby +during a long running operation such as a backup, this resulted in the +operation failing with `Fatal: failed to refresh lock in time`. We've reworked +the lock refresh such that restic first checks whether it is safe to continue +the current operation and only throws an error if not. + +https://github.com/restic/restic/issues/4274 +https://github.com/restic/restic/pull/4374 diff --git a/cmd/restic/cmd_backup.go b/cmd/restic/cmd_backup.go index d7e899eaf..0e678eb2f 100644 --- a/cmd/restic/cmd_backup.go +++ b/cmd/restic/cmd_backup.go @@ -506,10 +506,13 @@ func runBackup(ctx context.Context, opts BackupOptions, gopts GlobalOptions, ter if !gopts.JSON { progressPrinter.V("lock repository") } - lock, ctx, err := lockRepo(ctx, repo, gopts.RetryLock, gopts.JSON) - defer unlockRepo(lock) - if err != nil { - return err + if !opts.DryRun { + var lock *restic.Lock + lock, ctx, err = lockRepo(ctx, repo, gopts.RetryLock, gopts.JSON) + defer unlockRepo(lock) + if err != nil { + return err + } } // rejectByNameFuncs collect functions that can reject items from the backup based on path only diff --git a/cmd/restic/lock.go b/cmd/restic/lock.go index 336b56ad1..11c1ed8f5 100644 --- a/cmd/restic/lock.go +++ b/cmd/restic/lock.go @@ -12,6 +12,7 @@ import ( ) type lockContext struct { + lock *restic.Lock cancel context.CancelFunc refreshWG sync.WaitGroup } @@ -104,15 +105,17 @@ retryLoop: ctx, cancel := context.WithCancel(ctx) lockInfo := &lockContext{ + lock: lock, cancel: cancel, } lockInfo.refreshWG.Add(2) refreshChan := make(chan struct{}) + forceRefreshChan := make(chan refreshLockRequest) globalLocks.Lock() globalLocks.locks[lock] = lockInfo - go refreshLocks(ctx, lock, lockInfo, refreshChan) - go monitorLockRefresh(ctx, lockInfo, refreshChan) + go refreshLocks(ctx, repo.Backend(), lockInfo, refreshChan, forceRefreshChan) + go monitorLockRefresh(ctx, lockInfo, refreshChan, forceRefreshChan) globalLocks.Unlock() return lock, ctx, err @@ -124,8 +127,13 @@ var refreshInterval = 5 * time.Minute // the difference allows to compensate for a small time drift between clients. var refreshabilityTimeout = restic.StaleLockTimeout - refreshInterval*3/2 -func refreshLocks(ctx context.Context, lock *restic.Lock, lockInfo *lockContext, refreshed chan<- struct{}) { +type refreshLockRequest struct { + result chan bool +} + +func refreshLocks(ctx context.Context, backend restic.Backend, lockInfo *lockContext, refreshed chan<- struct{}, forceRefresh <-chan refreshLockRequest) { debug.Log("start") + lock := lockInfo.lock ticker := time.NewTicker(refreshInterval) lastRefresh := lock.Time @@ -149,6 +157,22 @@ func refreshLocks(ctx context.Context, lock *restic.Lock, lockInfo *lockContext, case <-ctx.Done(): debug.Log("terminate") return + + case req := <-forceRefresh: + debug.Log("trying to refresh stale lock") + // keep on going if our current lock still exists + success := tryRefreshStaleLock(ctx, backend, lock, lockInfo.cancel) + // inform refresh goroutine about forced refresh + select { + case <-ctx.Done(): + case req.result <- success: + } + + if success { + // update lock refresh time + lastRefresh = lock.Time + } + case <-ticker.C: if time.Since(lastRefresh) > refreshabilityTimeout { // the lock is too old, wait until the expiry monitor cancels the context @@ -161,7 +185,7 @@ func refreshLocks(ctx context.Context, lock *restic.Lock, lockInfo *lockContext, Warnf("unable to refresh lock: %v\n", err) } else { lastRefresh = lock.Time - // inform monitor gorountine about successful refresh + // inform monitor goroutine about successful refresh select { case <-ctx.Done(): case refreshed <- struct{}{}: @@ -171,7 +195,7 @@ func refreshLocks(ctx context.Context, lock *restic.Lock, lockInfo *lockContext, } } -func monitorLockRefresh(ctx context.Context, lockInfo *lockContext, refreshed <-chan struct{}) { +func monitorLockRefresh(ctx context.Context, lockInfo *lockContext, refreshed <-chan struct{}, forceRefresh chan<- refreshLockRequest) { // time.Now() might use a monotonic timer which is paused during standby // convert to unix time to ensure we compare real time values lastRefresh := time.Now().UnixNano() @@ -183,24 +207,47 @@ func monitorLockRefresh(ctx context.Context, lockInfo *lockContext, refreshed <- // timers are paused during standby, which is a problem as the refresh timeout // _must_ expire if the host was too long in standby. Thus fall back to periodic checks // https://github.com/golang/go/issues/35012 - timer := time.NewTimer(pollDuration) + ticker := time.NewTicker(pollDuration) defer func() { - timer.Stop() + ticker.Stop() lockInfo.cancel() lockInfo.refreshWG.Done() }() + var refreshStaleLockResult chan bool + for { select { case <-ctx.Done(): debug.Log("terminate expiry monitoring") return case <-refreshed: + if refreshStaleLockResult != nil { + // ignore delayed refresh notifications while the stale lock is refreshed + continue + } lastRefresh = time.Now().UnixNano() - case <-timer.C: - if time.Now().UnixNano()-lastRefresh < refreshabilityTimeout.Nanoseconds() { - // restart timer - timer.Reset(pollDuration) + case <-ticker.C: + if time.Now().UnixNano()-lastRefresh < refreshabilityTimeout.Nanoseconds() || refreshStaleLockResult != nil { + continue + } + + debug.Log("trying to refreshStaleLock") + // keep on going if our current lock still exists + refreshReq := refreshLockRequest{ + result: make(chan bool), + } + refreshStaleLockResult = refreshReq.result + + // inform refresh goroutine about forced refresh + select { + case <-ctx.Done(): + case forceRefresh <- refreshReq: + } + case success := <-refreshStaleLockResult: + if success { + lastRefresh = time.Now().UnixNano() + refreshStaleLockResult = nil continue } @@ -210,6 +257,25 @@ func monitorLockRefresh(ctx context.Context, lockInfo *lockContext, refreshed <- } } +func tryRefreshStaleLock(ctx context.Context, backend restic.Backend, lock *restic.Lock, cancel context.CancelFunc) bool { + freeze := restic.AsBackend[restic.FreezeBackend](backend) + if freeze != nil { + debug.Log("freezing backend") + freeze.Freeze() + defer freeze.Unfreeze() + } + + err := lock.RefreshStaleLock(ctx) + if err != nil { + Warnf("failed to refresh stale lock: %v\n", err) + // cancel context while the backend is still frozen to prevent accidental modifications + cancel() + return false + } + + return true +} + func unlockRepo(lock *restic.Lock) { if lock == nil { return diff --git a/cmd/restic/lock_test.go b/cmd/restic/lock_test.go index 150bd8730..2f8420853 100644 --- a/cmd/restic/lock_test.go +++ b/cmd/restic/lock_test.go @@ -5,16 +5,26 @@ import ( "fmt" "runtime" "strings" + "sync" "testing" "time" + "github.com/restic/restic/internal/backend/location" + "github.com/restic/restic/internal/backend/mem" + "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/test" ) -func openTestRepo(t *testing.T, wrapper backendWrapper) (*repository.Repository, func(), *testEnvironment) { +func openLockTestRepo(t *testing.T, wrapper backendWrapper) (*repository.Repository, func(), *testEnvironment) { env, cleanup := withTestEnvironment(t) + + reg := location.NewRegistry() + reg.Register(mem.NewFactory()) + env.gopts.backends = reg + env.gopts.Repo = "mem:" + if wrapper != nil { env.gopts.backendTestHook = wrapper } @@ -36,7 +46,7 @@ func checkedLockRepo(ctx context.Context, t *testing.T, repo restic.Repository, } func TestLock(t *testing.T) { - repo, cleanup, env := openTestRepo(t, nil) + repo, cleanup, env := openLockTestRepo(t, nil) defer cleanup() lock, wrappedCtx := checkedLockRepo(context.Background(), t, repo, env) @@ -47,7 +57,7 @@ func TestLock(t *testing.T) { } func TestLockCancel(t *testing.T) { - repo, cleanup, env := openTestRepo(t, nil) + repo, cleanup, env := openLockTestRepo(t, nil) defer cleanup() ctx, cancel := context.WithCancel(context.Background()) @@ -63,7 +73,7 @@ func TestLockCancel(t *testing.T) { } func TestLockUnlockAll(t *testing.T) { - repo, cleanup, env := openTestRepo(t, nil) + repo, cleanup, env := openLockTestRepo(t, nil) defer cleanup() lock, wrappedCtx := checkedLockRepo(context.Background(), t, repo, env) @@ -78,7 +88,7 @@ func TestLockUnlockAll(t *testing.T) { } func TestLockConflict(t *testing.T) { - repo, cleanup, env := openTestRepo(t, nil) + repo, cleanup, env := openLockTestRepo(t, nil) defer cleanup() repo2, err := OpenRepository(context.TODO(), env.gopts) test.OK(t, err) @@ -107,7 +117,7 @@ func (b *writeOnceBackend) Save(ctx context.Context, h restic.Handle, rd restic. } func TestLockFailedRefresh(t *testing.T) { - repo, cleanup, env := openTestRepo(t, func(r restic.Backend) (restic.Backend, error) { + repo, cleanup, env := openLockTestRepo(t, func(r restic.Backend) (restic.Backend, error) { return &writeOnceBackend{Backend: r}, nil }) defer cleanup() @@ -145,7 +155,7 @@ func (b *loggingBackend) Save(ctx context.Context, h restic.Handle, rd restic.Re } func TestLockSuccessfulRefresh(t *testing.T) { - repo, cleanup, env := openTestRepo(t, func(r restic.Backend) (restic.Backend, error) { + repo, cleanup, env := openLockTestRepo(t, func(r restic.Backend) (restic.Backend, error) { return &loggingBackend{ Backend: r, t: t, @@ -182,8 +192,71 @@ func TestLockSuccessfulRefresh(t *testing.T) { unlockRepo(lock) } +type slowBackend struct { + restic.Backend + m sync.Mutex + sleep time.Duration +} + +func (b *slowBackend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { + b.m.Lock() + sleep := b.sleep + b.m.Unlock() + time.Sleep(sleep) + return b.Backend.Save(ctx, h, rd) +} + +func TestLockSuccessfulStaleRefresh(t *testing.T) { + var sb *slowBackend + repo, cleanup, env := openLockTestRepo(t, func(r restic.Backend) (restic.Backend, error) { + sb = &slowBackend{Backend: r} + return sb, nil + }) + defer cleanup() + + t.Logf("test for successful lock refresh %v", time.Now()) + // reduce locking intervals to be suitable for testing + ri, rt := refreshInterval, refreshabilityTimeout + refreshInterval = 10 * time.Millisecond + refreshabilityTimeout = 50 * time.Millisecond + defer func() { + refreshInterval, refreshabilityTimeout = ri, rt + }() + + lock, wrappedCtx := checkedLockRepo(context.Background(), t, repo, env) + // delay lock refreshing long enough that the lock would expire + sb.m.Lock() + sb.sleep = refreshabilityTimeout + refreshInterval + sb.m.Unlock() + + select { + case <-wrappedCtx.Done(): + // don't call t.Fatal to allow the lock to be properly cleaned up + t.Error("lock refresh failed", time.Now()) + + case <-time.After(refreshabilityTimeout): + } + // reset slow backend + sb.m.Lock() + sb.sleep = 0 + sb.m.Unlock() + debug.Log("normal lock period has expired") + + select { + case <-wrappedCtx.Done(): + // don't call t.Fatal to allow the lock to be properly cleaned up + t.Error("lock refresh failed", time.Now()) + + case <-time.After(3 * refreshabilityTimeout): + // expected lock refresh to work + } + + // unlockRepo should not crash + unlockRepo(lock) +} + func TestLockWaitTimeout(t *testing.T) { - repo, cleanup, env := openTestRepo(t, nil) + repo, cleanup, env := openLockTestRepo(t, nil) defer cleanup() elock, _, err := lockRepoExclusive(context.TODO(), repo, env.gopts.RetryLock, env.gopts.JSON) @@ -205,8 +278,9 @@ func TestLockWaitTimeout(t *testing.T) { test.OK(t, lock.Unlock()) test.OK(t, elock.Unlock()) } + func TestLockWaitCancel(t *testing.T) { - repo, cleanup, env := openTestRepo(t, nil) + repo, cleanup, env := openLockTestRepo(t, nil) defer cleanup() elock, _, err := lockRepoExclusive(context.TODO(), repo, env.gopts.RetryLock, env.gopts.JSON) @@ -215,10 +289,10 @@ func TestLockWaitCancel(t *testing.T) { retryLock := 200 * time.Millisecond cancelAfter := 40 * time.Millisecond + start := time.Now() ctx, cancel := context.WithCancel(context.TODO()) time.AfterFunc(cancelAfter, cancel) - start := time.Now() lock, _, err := lockRepo(ctx, repo, retryLock, env.gopts.JSON) duration := time.Since(start) @@ -227,14 +301,14 @@ func TestLockWaitCancel(t *testing.T) { test.Assert(t, strings.Contains(err.Error(), "context canceled"), "create normal lock with exclusively locked repo didn't return the correct error") test.Assert(t, cancelAfter <= duration && duration < retryLock-10*time.Millisecond, - "create normal lock with exclusively locked repo didn't return in time") + "create normal lock with exclusively locked repo didn't return in time, duration %v", duration) test.OK(t, lock.Unlock()) test.OK(t, elock.Unlock()) } func TestLockWaitSuccess(t *testing.T) { - repo, cleanup, env := openTestRepo(t, nil) + repo, cleanup, env := openLockTestRepo(t, nil) defer cleanup() elock, _, err := lockRepoExclusive(context.TODO(), repo, env.gopts.RetryLock, env.gopts.JSON) diff --git a/internal/backend/sema/backend.go b/internal/backend/sema/backend.go index dd4859ed1..d60788f26 100644 --- a/internal/backend/sema/backend.go +++ b/internal/backend/sema/backend.go @@ -3,6 +3,7 @@ package sema import ( "context" "io" + "sync" "github.com/cenkalti/backoff/v4" "github.com/restic/restic/internal/errors" @@ -15,7 +16,8 @@ var _ restic.Backend = &connectionLimitedBackend{} // connectionLimitedBackend limits the number of concurrent operations. type connectionLimitedBackend struct { restic.Backend - sem semaphore + sem semaphore + freezeLock sync.Mutex } // NewBackend creates a backend that limits the concurrent operations on the underlying backend @@ -39,9 +41,23 @@ func (be *connectionLimitedBackend) typeDependentLimit(t restic.FileType) func() return func() {} } be.sem.GetToken() + // prevent token usage while the backend is frozen + be.freezeLock.Lock() + defer be.freezeLock.Unlock() + return be.sem.ReleaseToken } +// Freeze blocks all backend operations except those on lock files +func (be *connectionLimitedBackend) Freeze() { + be.freezeLock.Lock() +} + +// Unfreeze allows all backend operations to continue +func (be *connectionLimitedBackend) Unfreeze() { + be.freezeLock.Unlock() +} + // Save adds new Data to the backend. func (be *connectionLimitedBackend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { if err := h.Valid(); err != nil { @@ -50,6 +66,10 @@ func (be *connectionLimitedBackend) Save(ctx context.Context, h restic.Handle, r defer be.typeDependentLimit(h.Type)() + if ctx.Err() != nil { + return ctx.Err() + } + return be.Backend.Save(ctx, h, rd) } @@ -68,6 +88,10 @@ func (be *connectionLimitedBackend) Load(ctx context.Context, h restic.Handle, l defer be.typeDependentLimit(h.Type)() + if ctx.Err() != nil { + return ctx.Err() + } + return be.Backend.Load(ctx, h, length, offset, fn) } @@ -79,6 +103,10 @@ func (be *connectionLimitedBackend) Stat(ctx context.Context, h restic.Handle) ( defer be.typeDependentLimit(h.Type)() + if ctx.Err() != nil { + return restic.FileInfo{}, ctx.Err() + } + return be.Backend.Stat(ctx, h) } @@ -90,6 +118,10 @@ func (be *connectionLimitedBackend) Remove(ctx context.Context, h restic.Handle) defer be.typeDependentLimit(h.Type)() + if ctx.Err() != nil { + return ctx.Err() + } + return be.Backend.Remove(ctx, h) } diff --git a/internal/backend/sema/backend_test.go b/internal/backend/sema/backend_test.go index dc599b7f8..a1dd16187 100644 --- a/internal/backend/sema/backend_test.go +++ b/internal/backend/sema/backend_test.go @@ -3,6 +3,7 @@ package sema_test import ( "context" "io" + "sync" "sync/atomic" "testing" "time" @@ -197,3 +198,38 @@ func TestConcurrencyUnlimitedLockSave(t *testing.T) { } }, unblock, true) } + +func TestFreeze(t *testing.T) { + var counter int64 + m := mock.NewBackend() + m.SaveFn = func(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { + atomic.AddInt64(&counter, 1) + return nil + } + m.ConnectionsFn = func() uint { return 2 } + be := sema.NewBackend(m) + fb := be.(restic.FreezeBackend) + + // Freeze backend + fb.Freeze() + + // Start Save call that should block + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + h := restic.Handle{Type: restic.PackFile, Name: "foobar"} + test.OK(t, be.Save(context.TODO(), h, nil)) + }() + + // check + time.Sleep(1 * time.Millisecond) + val := atomic.LoadInt64(&counter) + test.Assert(t, val == 0, "save call worked despite frozen backend") + + // unfreeze and check that save did complete + fb.Unfreeze() + wg.Wait() + val = atomic.LoadInt64(&counter) + test.Assert(t, val == 1, "save call should have completed") +} diff --git a/internal/migrations/s3_layout.go b/internal/migrations/s3_layout.go index 78d2492d8..9effaee70 100644 --- a/internal/migrations/s3_layout.go +++ b/internal/migrations/s3_layout.go @@ -21,26 +21,9 @@ func init() { // "default" layout. type S3Layout struct{} -func toS3Backend(b restic.Backend) *s3.Backend { - for b != nil { - if be, ok := b.(*s3.Backend); ok { - return be - } - - if be, ok := b.(restic.BackendUnwrapper); ok { - b = be.Unwrap() - } else { - // not the backend we're looking for - break - } - } - debug.Log("backend is not s3") - return nil -} - // Check tests whether the migration can be applied. func (m *S3Layout) Check(_ context.Context, repo restic.Repository) (bool, string, error) { - be := toS3Backend(repo.Backend()) + be := restic.AsBackend[*s3.Backend](repo.Backend()) if be == nil { debug.Log("backend is not s3") return false, "backend is not s3", nil @@ -92,7 +75,7 @@ func (m *S3Layout) moveFiles(ctx context.Context, be *s3.Backend, l layout.Layou // Apply runs the migration. func (m *S3Layout) Apply(ctx context.Context, repo restic.Repository) error { - be := toS3Backend(repo.Backend()) + be := restic.AsBackend[*s3.Backend](repo.Backend()) if be == nil { debug.Log("backend is not s3") return errors.New("backend is not s3") diff --git a/internal/migrations/s3_layout_test.go b/internal/migrations/s3_layout_test.go deleted file mode 100644 index ad0eedea6..000000000 --- a/internal/migrations/s3_layout_test.go +++ /dev/null @@ -1,27 +0,0 @@ -package migrations - -import ( - "testing" - - "github.com/restic/restic/internal/backend/mock" - "github.com/restic/restic/internal/backend/s3" - "github.com/restic/restic/internal/cache" - "github.com/restic/restic/internal/test" -) - -func TestS3UnwrapBackend(t *testing.T) { - // toS3Backend(b restic.Backend) *s3.Backend - - m := mock.NewBackend() - test.Assert(t, toS3Backend(m) == nil, "mock backend is not an s3 backend") - - // uninitialized fake backend for testing - s3 := &s3.Backend{} - test.Assert(t, toS3Backend(s3) == s3, "s3 was not returned") - - c := &cache.Backend{Backend: s3} - test.Assert(t, toS3Backend(c) == s3, "failed to unwrap s3 backend") - - c.Backend = m - test.Assert(t, toS3Backend(c) == nil, "a wrapped mock backend is not an s3 backend") -} diff --git a/internal/restic/backend.go b/internal/restic/backend.go index 555b9d96e..df3281641 100644 --- a/internal/restic/backend.go +++ b/internal/restic/backend.go @@ -75,6 +75,31 @@ type BackendUnwrapper interface { Unwrap() Backend } +func AsBackend[B Backend](b Backend) B { + for b != nil { + if be, ok := b.(B); ok { + return be + } + + if be, ok := b.(BackendUnwrapper); ok { + b = be.Unwrap() + } else { + // not the backend we're looking for + break + } + } + var be B + return be +} + +type FreezeBackend interface { + Backend + // Freeze blocks all backend operations except those on lock files + Freeze() + // Unfreeze allows all backend operations to continue + Unfreeze() +} + // FileInfo is contains information about a file in the backend. type FileInfo struct { Size int64 diff --git a/internal/restic/backend_test.go b/internal/restic/backend_test.go new file mode 100644 index 000000000..a970eb5b3 --- /dev/null +++ b/internal/restic/backend_test.go @@ -0,0 +1,38 @@ +package restic_test + +import ( + "testing" + + "github.com/restic/restic/internal/restic" + "github.com/restic/restic/internal/test" +) + +type testBackend struct { + restic.Backend +} + +func (t *testBackend) Unwrap() restic.Backend { + return nil +} + +type otherTestBackend struct { + restic.Backend +} + +func (t *otherTestBackend) Unwrap() restic.Backend { + return t.Backend +} + +func TestAsBackend(t *testing.T) { + other := otherTestBackend{} + test.Assert(t, restic.AsBackend[*testBackend](other) == nil, "otherTestBackend is not a testBackend backend") + + testBe := &testBackend{} + test.Assert(t, restic.AsBackend[*testBackend](testBe) == testBe, "testBackend was not returned") + + wrapper := &otherTestBackend{Backend: testBe} + test.Assert(t, restic.AsBackend[*testBackend](wrapper) == testBe, "failed to unwrap testBackend backend") + + wrapper.Backend = other + test.Assert(t, restic.AsBackend[*testBackend](wrapper) == nil, "a wrapped otherTestBackend is not a testBackend") +} diff --git a/internal/restic/lock.go b/internal/restic/lock.go index d500c019a..a65ed6b5c 100644 --- a/internal/restic/lock.go +++ b/internal/restic/lock.go @@ -81,6 +81,8 @@ func IsInvalidLock(err error) bool { return errors.As(err, &e) } +var ErrRemovedLock = errors.New("lock file was removed in the meantime") + // NewLock returns a new, non-exclusive lock for the repository. If an // exclusive lock is already held by another process, it returns an error // that satisfies IsAlreadyLocked. @@ -274,6 +276,68 @@ func (l *Lock) Refresh(ctx context.Context) error { return l.repo.Backend().Remove(context.TODO(), Handle{Type: LockFile, Name: oldLockID.String()}) } +// RefreshStaleLock is an extended variant of Refresh that can also refresh stale lock files. +func (l *Lock) RefreshStaleLock(ctx context.Context) error { + debug.Log("refreshing stale lock %v", l.lockID) + // refreshing a stale lock is possible if it still exists and continues to do + // so until after creating a new lock. The initial check avoids creating a new + // lock file if this lock was already removed in the meantime. + exists, err := l.checkExistence(ctx) + if err != nil { + return err + } else if !exists { + return ErrRemovedLock + } + + l.lock.Lock() + l.Time = time.Now() + l.lock.Unlock() + id, err := l.createLock(ctx) + if err != nil { + return err + } + + time.Sleep(waitBeforeLockCheck) + + exists, err = l.checkExistence(ctx) + if err != nil { + // cleanup replacement lock + _ = l.repo.Backend().Remove(context.TODO(), Handle{Type: LockFile, Name: id.String()}) + return err + } + + if !exists { + // cleanup replacement lock + _ = l.repo.Backend().Remove(context.TODO(), Handle{Type: LockFile, Name: id.String()}) + return ErrRemovedLock + } + + l.lock.Lock() + defer l.lock.Unlock() + + debug.Log("new lock ID %v", id) + oldLockID := l.lockID + l.lockID = &id + + return l.repo.Backend().Remove(context.TODO(), Handle{Type: LockFile, Name: oldLockID.String()}) +} + +func (l *Lock) checkExistence(ctx context.Context) (bool, error) { + l.lock.Lock() + defer l.lock.Unlock() + + exists := false + + err := l.repo.Backend().List(ctx, LockFile, func(fi FileInfo) error { + if fi.Name == l.lockID.String() { + exists = true + } + return nil + }) + + return exists, err +} + func (l *Lock) String() string { l.lock.Lock() defer l.lock.Unlock() diff --git a/internal/restic/lock_test.go b/internal/restic/lock_test.go index 2d14499bd..f3c405c9c 100644 --- a/internal/restic/lock_test.go +++ b/internal/restic/lock_test.go @@ -16,6 +16,7 @@ import ( func TestLock(t *testing.T) { repo := repository.TestRepository(t) + restic.TestSetLockTimeout(t, 5*time.Millisecond) lock, err := restic.NewLock(context.TODO(), repo) rtest.OK(t, err) @@ -25,6 +26,7 @@ func TestLock(t *testing.T) { func TestDoubleUnlock(t *testing.T) { repo := repository.TestRepository(t) + restic.TestSetLockTimeout(t, 5*time.Millisecond) lock, err := restic.NewLock(context.TODO(), repo) rtest.OK(t, err) @@ -38,6 +40,7 @@ func TestDoubleUnlock(t *testing.T) { func TestMultipleLock(t *testing.T) { repo := repository.TestRepository(t) + restic.TestSetLockTimeout(t, 5*time.Millisecond) lock1, err := restic.NewLock(context.TODO(), repo) rtest.OK(t, err) @@ -63,6 +66,7 @@ func (be *failLockLoadingBackend) Load(ctx context.Context, h restic.Handle, len func TestMultipleLockFailure(t *testing.T) { be := &failLockLoadingBackend{Backend: mem.New()} repo := repository.TestRepositoryWithBackend(t, be, 0) + restic.TestSetLockTimeout(t, 5*time.Millisecond) lock1, err := restic.NewLock(context.TODO(), repo) rtest.OK(t, err) @@ -83,6 +87,7 @@ func TestLockExclusive(t *testing.T) { func TestLockOnExclusiveLockedRepo(t *testing.T) { repo := repository.TestRepository(t) + restic.TestSetLockTimeout(t, 5*time.Millisecond) elock, err := restic.NewExclusiveLock(context.TODO(), repo) rtest.OK(t, err) @@ -99,6 +104,7 @@ func TestLockOnExclusiveLockedRepo(t *testing.T) { func TestExclusiveLockOnLockedRepo(t *testing.T) { repo := repository.TestRepository(t) + restic.TestSetLockTimeout(t, 5*time.Millisecond) elock, err := restic.NewLock(context.TODO(), repo) rtest.OK(t, err) @@ -247,15 +253,10 @@ func TestRemoveAllLocks(t *testing.T) { 3, processed) } -func TestLockRefresh(t *testing.T) { - repo := repository.TestRepository(t) - - lock, err := restic.NewLock(context.TODO(), repo) - rtest.OK(t, err) - time0 := lock.Time - +func checkSingleLock(t *testing.T, repo restic.Repository) restic.ID { + t.Helper() var lockID *restic.ID - err = repo.List(context.TODO(), restic.LockFile, func(id restic.ID, size int64) error { + err := repo.List(context.TODO(), restic.LockFile, func(id restic.ID, size int64) error { if lockID != nil { t.Error("more than one lock found") } @@ -265,27 +266,59 @@ func TestLockRefresh(t *testing.T) { if err != nil { t.Fatal(err) } + if lockID == nil { + t.Fatal("no lock found") + } + return *lockID +} + +func testLockRefresh(t *testing.T, refresh func(lock *restic.Lock) error) { + repo := repository.TestRepository(t) + restic.TestSetLockTimeout(t, 5*time.Millisecond) + + lock, err := restic.NewLock(context.TODO(), repo) + rtest.OK(t, err) + time0 := lock.Time + + lockID := checkSingleLock(t, repo) time.Sleep(time.Millisecond) - rtest.OK(t, lock.Refresh(context.TODO())) + rtest.OK(t, refresh(lock)) - var lockID2 *restic.ID - err = repo.List(context.TODO(), restic.LockFile, func(id restic.ID, size int64) error { - if lockID2 != nil { - t.Error("more than one lock found") - } - lockID2 = &id - return nil - }) - if err != nil { - t.Fatal(err) - } + lockID2 := checkSingleLock(t, repo) - rtest.Assert(t, !lockID.Equal(*lockID2), + rtest.Assert(t, !lockID.Equal(lockID2), "expected a new ID after lock refresh, got the same") - lock2, err := restic.LoadLock(context.TODO(), repo, *lockID2) + lock2, err := restic.LoadLock(context.TODO(), repo, lockID2) rtest.OK(t, err) rtest.Assert(t, lock2.Time.After(time0), "expected a later timestamp after lock refresh") rtest.OK(t, lock.Unlock()) } + +func TestLockRefresh(t *testing.T) { + testLockRefresh(t, func(lock *restic.Lock) error { + return lock.Refresh(context.TODO()) + }) +} + +func TestLockRefreshStale(t *testing.T) { + testLockRefresh(t, func(lock *restic.Lock) error { + return lock.RefreshStaleLock(context.TODO()) + }) +} + +func TestLockRefreshStaleMissing(t *testing.T) { + repo := repository.TestRepository(t) + restic.TestSetLockTimeout(t, 5*time.Millisecond) + + lock, err := restic.NewLock(context.TODO(), repo) + rtest.OK(t, err) + lockID := checkSingleLock(t, repo) + + // refresh must fail if lock was removed + rtest.OK(t, repo.Backend().Remove(context.TODO(), restic.Handle{Type: restic.LockFile, Name: lockID.String()})) + time.Sleep(time.Millisecond) + err = lock.RefreshStaleLock(context.TODO()) + rtest.Assert(t, err == restic.ErrRemovedLock, "unexpected error, expected %v, got %v", restic.ErrRemovedLock, err) +}