From feeab84204fab8964793cc13c9264cad255ded80 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 20 Jan 2024 18:40:22 +0100 Subject: [PATCH] repair pack: extract the repair logic into the repository package Currently, the cmd/restic package contains a significant amount of code that modifies repository internals. This code should in the mid-term move into the repository package. --- cmd/restic/cmd_repair_packs.go | 75 ++++--------------------- cmd/restic/progress.go | 18 ++++++ internal/repository/repair_pack.go | 88 ++++++++++++++++++++++++++++++ internal/ui/progress/printer.go | 30 ++++++++++ 4 files changed, 148 insertions(+), 63 deletions(-) create mode 100644 internal/repository/repair_pack.go create mode 100644 internal/ui/progress/printer.go diff --git a/cmd/restic/cmd_repair_packs.go b/cmd/restic/cmd_repair_packs.go index c572e02c5..04b06c33b 100644 --- a/cmd/restic/cmd_repair_packs.go +++ b/cmd/restic/cmd_repair_packs.go @@ -9,8 +9,8 @@ import ( "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" + "github.com/restic/restic/internal/ui/termstatus" "github.com/spf13/cobra" - "golang.org/x/sync/errgroup" ) var cmdRepairPacks = &cobra.Command{ @@ -29,7 +29,9 @@ Exit status is 0 if the command was successful, and non-zero if there was any er `, DisableAutoGenTag: true, RunE: func(cmd *cobra.Command, args []string) error { - return runRepairPacks(cmd.Context(), globalOptions, args) + term, cancel := setupTermstatus() + defer cancel() + return runRepairPacks(cmd.Context(), globalOptions, term, args) }, } @@ -37,7 +39,7 @@ func init() { cmdRepair.AddCommand(cmdRepairPacks) } -func runRepairPacks(ctx context.Context, gopts GlobalOptions, args []string) error { +func runRepairPacks(ctx context.Context, gopts GlobalOptions, term *termstatus.Terminal, args []string) error { // FIXME discuss and add proper feature flag mechanism flag, _ := os.LookupEnv("RESTIC_FEATURES") if flag != "repair-packs-v1" { @@ -68,21 +70,19 @@ func runRepairPacks(ctx context.Context, gopts GlobalOptions, args []string) err return err } - return repairPacks(ctx, gopts, repo, ids) -} - -func repairPacks(ctx context.Context, gopts GlobalOptions, repo *repository.Repository, ids restic.IDSet) error { bar := newIndexProgress(gopts.Quiet, gopts.JSON) - err := repo.LoadIndex(ctx, bar) + err = repo.LoadIndex(ctx, bar) if err != nil { return errors.Fatalf("%s", err) } - Warnf("saving backup copies of pack files in current folder\n") + printer := newTerminalProgressPrinter(gopts.verbosity, term) + + printer.P("saving backup copies of pack files to current folder") for id := range ids { f, err := os.OpenFile("pack-"+id.String(), os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0o666) if err != nil { - return errors.Fatalf("%s", err) + return err } err = repo.Backend().Load(ctx, backend.Handle{Type: restic.PackFile, Name: id.String()}, 0, 0, func(rd io.Reader) error { @@ -94,66 +94,15 @@ func repairPacks(ctx context.Context, gopts GlobalOptions, repo *repository.Repo return err }) if err != nil { - return errors.Fatalf("%s", err) + return err } } - wg, wgCtx := errgroup.WithContext(ctx) - repo.StartPackUploader(wgCtx, wg) - repo.DisableAutoIndexUpdate() - - Warnf("salvaging intact data from specified pack files\n") - bar = newProgressMax(!gopts.Quiet, uint64(len(ids)), "pack files") - defer bar.Done() - - wg.Go(func() error { - // examine all data the indexes have for the pack file - for b := range repo.Index().ListPacks(wgCtx, ids) { - blobs := b.Blobs - if len(blobs) == 0 { - Warnf("no blobs found for pack %v\n", b.PackID) - bar.Add(1) - continue - } - - err = repo.LoadBlobsFromPack(wgCtx, b.PackID, blobs, func(blob restic.BlobHandle, buf []byte, err error) error { - if err != nil { - // Fallback path - buf, err = repo.LoadBlob(wgCtx, blob.Type, blob.ID, nil) - if err != nil { - Warnf("failed to load blob %v: %v\n", blob.ID, err) - return nil - } - } - id, _, _, err := repo.SaveBlob(wgCtx, blob.Type, buf, restic.ID{}, true) - if !id.Equal(blob.ID) { - panic("pack id mismatch during upload") - } - return err - }) - if err != nil { - return err - } - bar.Add(1) - } - return repo.Flush(wgCtx) - }) - - if err := wg.Wait(); err != nil { - return errors.Fatalf("%s", err) - } - bar.Done() - - // remove salvaged packs from index - err = rebuildIndexFiles(ctx, gopts, repo, ids, nil, false) + err = repository.RepairPacks(ctx, repo, ids, printer) if err != nil { return errors.Fatalf("%s", err) } - // cleanup - Warnf("removing salvaged pack files\n") - DeleteFiles(ctx, gopts, repo, ids, restic.PackFile) - Warnf("\nUse `restic repair snapshots --forget` to remove the corrupted data blobs from all snapshots\n") return nil } diff --git a/cmd/restic/progress.go b/cmd/restic/progress.go index 9d93863ad..48aa209a6 100644 --- a/cmd/restic/progress.go +++ b/cmd/restic/progress.go @@ -109,3 +109,21 @@ func newIndexProgress(quiet bool, json bool) *progress.Counter { func newIndexTerminalProgress(quiet bool, json bool, term *termstatus.Terminal) *progress.Counter { return newTerminalProgressMax(!quiet && !json && stdoutIsTerminal(), 0, "index files loaded", term) } + +type terminalProgressPrinter struct { + term *termstatus.Terminal + ui.Message + show bool +} + +func (t *terminalProgressPrinter) NewCounter(description string) *progress.Counter { + return newTerminalProgressMax(t.show, 0, description, t.term) +} + +func newTerminalProgressPrinter(verbosity uint, term *termstatus.Terminal) progress.Printer { + return &terminalProgressPrinter{ + term: term, + Message: *ui.NewMessage(term, verbosity), + show: verbosity > 0, + } +} diff --git a/internal/repository/repair_pack.go b/internal/repository/repair_pack.go new file mode 100644 index 000000000..5f3d43dc3 --- /dev/null +++ b/internal/repository/repair_pack.go @@ -0,0 +1,88 @@ +package repository + +import ( + "context" + "errors" + "io" + + "github.com/restic/restic/internal/restic" + "github.com/restic/restic/internal/ui/progress" + "golang.org/x/sync/errgroup" +) + +func RepairPacks(ctx context.Context, repo *Repository, ids restic.IDSet, printer progress.Printer) error { + wg, wgCtx := errgroup.WithContext(ctx) + repo.StartPackUploader(wgCtx, wg) + repo.DisableAutoIndexUpdate() + + printer.P("salvaging intact data from specified pack files") + bar := printer.NewCounter("pack files") + bar.SetMax(uint64(len(ids))) + defer bar.Done() + + wg.Go(func() error { + // examine all data the indexes have for the pack file + for b := range repo.Index().ListPacks(wgCtx, ids) { + blobs := b.Blobs + if len(blobs) == 0 { + printer.E("no blobs found for pack %v", b.PackID) + bar.Add(1) + continue + } + + err := repo.LoadBlobsFromPack(wgCtx, b.PackID, blobs, func(blob restic.BlobHandle, buf []byte, err error) error { + if err != nil { + // Fallback path + buf, err = repo.LoadBlob(wgCtx, blob.Type, blob.ID, nil) + if err != nil { + printer.E("failed to load blob %v: %v", blob.ID, err) + return nil + } + } + id, _, _, err := repo.SaveBlob(wgCtx, blob.Type, buf, restic.ID{}, true) + if !id.Equal(blob.ID) { + panic("pack id mismatch during upload") + } + return err + }) + if err != nil { + return err + } + bar.Add(1) + } + return repo.Flush(wgCtx) + }) + + err := wg.Wait() + bar.Done() + if err != nil { + return err + } + + // remove salvaged packs from index + printer.P("rebuilding index") + + bar = printer.NewCounter("packs processed") + err = repo.Index().Save(ctx, repo, ids, nil, restic.MasterIndexSaveOpts{ + SaveProgress: bar, + DeleteProgress: func() *progress.Counter { + return printer.NewCounter("old indexes deleted") + }, + DeleteReport: func(id restic.ID, err error) { + printer.VV("removed index %v", id.String()) + }, + }) + + if err != nil { + return err + } + + // cleanup + printer.P("removing salvaged pack files") + // if we fail to delete the damaged pack files, then prune will remove them later on + bar = printer.NewCounter("files deleted") + _ = restic.ParallelRemove(ctx, repo, ids, restic.PackFile, nil, bar) + bar.Done() + + return nil +} diff --git a/internal/ui/progress/printer.go b/internal/ui/progress/printer.go new file mode 100644 index 000000000..c95383d3e --- /dev/null +++ b/internal/ui/progress/printer.go @@ -0,0 +1,30 @@ +package progress + +// A Printer can can return a new counter or print messages +// at different log levels. +// It must be safe to call its methods from concurrent goroutines. +type Printer interface { + NewCounter(description string) *Counter + + E(msg string, args ...interface{}) + P(msg string, args ...interface{}) + V(msg string, args ...interface{}) + VV(msg string, args ...interface{}) +} + +// NoopPrinter discards all messages +type NoopPrinter struct{} + +var _ Printer = (*NoopPrinter)(nil) + +func (*NoopPrinter) NewCounter(description string) *Counter { + return nil +} + +func (*NoopPrinter) E(msg string, args ...interface{}) {} + +func (*NoopPrinter) P(msg string, args ...interface{}) {} + +func (*NoopPrinter) V(msg string, args ...interface{}) {} + +func (*NoopPrinter) VV(msg string, args ...interface{}) {}