From c35518a865deb6fd6107e07f88b67c1363949a98 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 17 Sep 2017 11:05:30 +0200 Subject: [PATCH 1/9] Azure/GS: Remove ReadDir() --- internal/backend/azure/azure.go | 57 --------------------------------- internal/backend/gs/gs.go | 54 ------------------------------- 2 files changed, 111 deletions(-) diff --git a/internal/backend/azure/azure.go b/internal/backend/azure/azure.go index a59f1e1eb..b0aa3141f 100644 --- a/internal/backend/azure/azure.go +++ b/internal/backend/azure/azure.go @@ -7,7 +7,6 @@ import ( "os" "path" "strings" - "time" "github.com/Azure/azure-sdk-for-go/storage" "github.com/restic/restic/internal/backend" @@ -96,62 +95,6 @@ func (be *Backend) Join(p ...string) string { return path.Join(p...) } -type fileInfo struct { - name string - size int64 - mode os.FileMode - modTime time.Time - isDir bool -} - -func (fi fileInfo) Name() string { return fi.name } // base name of the file -func (fi fileInfo) Size() int64 { return fi.size } // length in bytes for regular files; system-dependent for others -func (fi fileInfo) Mode() os.FileMode { return fi.mode } // file mode bits -func (fi fileInfo) ModTime() time.Time { return fi.modTime } // modification time -func (fi fileInfo) IsDir() bool { return fi.isDir } // abbreviation for Mode().IsDir() -func (fi fileInfo) Sys() interface{} { return nil } // underlying data source (can return nil) - -// ReadDir returns the entries for a directory. -func (be *Backend) ReadDir(dir string) (list []os.FileInfo, err error) { - debug.Log("ReadDir(%v)", dir) - - // make sure dir ends with a slash - if dir[len(dir)-1] != '/' { - dir += "/" - } - - obj, err := be.container.ListBlobs(storage.ListBlobsParameters{Prefix: dir, Delimiter: "/"}) - if err != nil { - return nil, err - } - - for _, item := range obj.BlobPrefixes { - entry := fileInfo{ - name: strings.TrimPrefix(item, dir), - isDir: true, - mode: os.ModeDir | 0755, - } - if entry.name != "" { - list = append(list, entry) - } - } - - for _, item := range obj.Blobs { - entry := fileInfo{ - name: strings.TrimPrefix(item.Name, dir), - isDir: false, - mode: 0644, - size: item.Properties.ContentLength, - modTime: time.Time(item.Properties.LastModified), - } - if entry.name != "" { - list = append(list, entry) - } - } - - return list, nil -} - // Location returns this backend's location (the container name). func (be *Backend) Location() string { return be.Join(be.container.Name, be.prefix) diff --git a/internal/backend/gs/gs.go b/internal/backend/gs/gs.go index 3fc6cf1f7..743dd6d74 100644 --- a/internal/backend/gs/gs.go +++ b/internal/backend/gs/gs.go @@ -7,7 +7,6 @@ import ( "os" "path" "strings" - "time" "github.com/pkg/errors" "github.com/restic/restic/internal/backend" @@ -134,59 +133,6 @@ func (be *Backend) Join(p ...string) string { return path.Join(p...) } -type fileInfo struct { - name string - size int64 - mode os.FileMode - modTime time.Time - isDir bool -} - -func (fi fileInfo) Name() string { return fi.name } // base name of the file -func (fi fileInfo) Size() int64 { return fi.size } // length in bytes for regular files; system-dependent for others -func (fi fileInfo) Mode() os.FileMode { return fi.mode } // file mode bits -func (fi fileInfo) ModTime() time.Time { return fi.modTime } // modification time -func (fi fileInfo) IsDir() bool { return fi.isDir } // abbreviation for Mode().IsDir() -func (fi fileInfo) Sys() interface{} { return nil } // underlying data source (can return nil) - -// ReadDir returns the entries for a directory. -func (be *Backend) ReadDir(dir string) (list []os.FileInfo, err error) { - debug.Log("ReadDir(%v)", dir) - - // make sure dir ends with a slash - if dir[len(dir)-1] != '/' { - dir += "/" - } - - obj, err := be.service.Objects.List(be.bucketName).Prefix(dir).Delimiter("/").Do() - if err != nil { - return nil, err - } - - for _, item := range obj.Prefixes { - entry := fileInfo{ - name: strings.TrimPrefix(item, dir), - isDir: true, - mode: os.ModeDir | 0755, - } - list = append(list, entry) - } - for _, item := range obj.Items { - entry := fileInfo{ - name: strings.TrimPrefix(item.Name, dir), - isDir: false, - mode: 0644, - size: int64(item.Size), - //modTime: item.Updated, - } - if entry.name != "" { - list = append(list, entry) - } - } - - return list, nil -} - // Location returns this backend's location (the bucket name). func (be *Backend) Location() string { return be.Join(be.bucketName, be.prefix) From 40edf00182b187653d8f626b3af644ebce2492f2 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 17 Sep 2017 11:08:51 +0200 Subject: [PATCH 2/9] gs: implement pagination --- internal/backend/gs/gs.go | 39 +++++++++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/internal/backend/gs/gs.go b/internal/backend/gs/gs.go index 743dd6d74..072f87ea4 100644 --- a/internal/backend/gs/gs.go +++ b/internal/backend/gs/gs.go @@ -298,22 +298,33 @@ func (be *Backend) List(ctx context.Context, t restic.FileType) <-chan string { go func() { defer close(ch) - obj, err := be.service.Objects.List(be.bucketName).Prefix(prefix).Do() - if err != nil { - return - } - - for _, item := range obj.Items { - m := strings.TrimPrefix(item.Name, prefix) - if m == "" { - continue - } - - select { - case ch <- path.Base(m): - case <-ctx.Done(): + listReq := be.service.Objects.List(be.bucketName).Prefix(prefix) + for { + obj, err := listReq.Do() + if err != nil { + fmt.Fprintf(os.Stderr, "error listing %v: %v\n", prefix, err) return } + + debug.Log("returned %v items", len(obj.Items)) + + for _, item := range obj.Items { + m := strings.TrimPrefix(item.Name, prefix) + if m == "" { + continue + } + + select { + case ch <- path.Base(m): + case <-ctx.Done(): + return + } + } + + if obj.NextPageToken == "" { + break + } + listReq.PageToken(obj.NextPageToken) } }() From f61dab1774a020c74ced93d5640fd7077957a1e0 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 17 Sep 2017 11:09:09 +0200 Subject: [PATCH 3/9] backend: Add test for List() --- internal/backend/test/tests.go | 47 ++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/internal/backend/test/tests.go b/internal/backend/test/tests.go index 64907f2f7..7390b8593 100644 --- a/internal/backend/test/tests.go +++ b/internal/backend/test/tests.go @@ -240,6 +240,53 @@ func (s *Suite) TestLoad(t *testing.T) { test.OK(t, b.Remove(context.TODO(), handle)) } +// TestList makes sure that the backend can list more than a thousand files. +func (s *Suite) TestList(t *testing.T) { + seedRand(t) + + b := s.open(t) + defer s.close(t, b) + + const numTestFiles = 1233 + list1 := restic.NewIDSet() + + for i := 0; i < numTestFiles; i++ { + data := []byte(fmt.Sprintf("random test blob %v", i)) + id := restic.Hash(data) + h := restic.Handle{Type: restic.DataFile, Name: id.String()} + err := b.Save(context.TODO(), h, bytes.NewReader(data)) + if err != nil { + t.Fatal(err) + } + list1.Insert(id) + } + + t.Logf("wrote %v files", len(list1)) + + list2 := restic.NewIDSet() + for name := range b.List(context.TODO(), restic.DataFile) { + id, err := restic.ParseID(name) + if err != nil { + t.Fatal(err) + } + list2.Insert(id) + } + + t.Logf("loaded %v IDs from backend", len(list2)) + + if !list1.Equals(list2) { + t.Errorf("lists are not equal, list1 %d entries, list2 %d entries", len(list1), len(list2)) + } + + for id := range list1 { + h := restic.Handle{Type: restic.DataFile, Name: id.String()} + err := s.delayedRemove(t, b, h) + if err != nil { + t.Fatal(err) + } + } +} + type errorCloser struct { io.Reader l int From dd49e2b12d6182bd387870b3639265763bc5330f Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 17 Sep 2017 11:32:05 +0200 Subject: [PATCH 4/9] Azure: Fix List(), use pagination marker --- internal/backend/azure/azure.go | 42 ++++++++++++++++++++++----------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/internal/backend/azure/azure.go b/internal/backend/azure/azure.go index b0aa3141f..8c5200f89 100644 --- a/internal/backend/azure/azure.go +++ b/internal/backend/azure/azure.go @@ -264,25 +264,39 @@ func (be *Backend) List(ctx context.Context, t restic.FileType) <-chan string { prefix += "/" } + params := storage.ListBlobsParameters{ + MaxResults: 1000, + Prefix: prefix, + } + go func() { defer close(ch) - obj, err := be.container.ListBlobs(storage.ListBlobsParameters{Prefix: prefix}) - if err != nil { - return - } - - for _, item := range obj.Blobs { - m := strings.TrimPrefix(item.Name, prefix) - if m == "" { - continue - } - - select { - case ch <- path.Base(m): - case <-ctx.Done(): + for { + obj, err := be.container.ListBlobs(params) + if err != nil { return } + + debug.Log("got %v objects", len(obj.Blobs)) + + for _, item := range obj.Blobs { + m := strings.TrimPrefix(item.Name, prefix) + if m == "" { + continue + } + + select { + case ch <- path.Base(m): + case <-ctx.Done(): + return + } + } + + if obj.NextMarker == "" { + break + } + params.Marker = obj.NextMarker } }() From 649c5362501799ea9301a1dde5337a9aa99675f9 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 17 Sep 2017 11:36:45 +0200 Subject: [PATCH 5/9] backend: Improve test for pagination in list --- internal/backend/test/tests.go | 38 +++++++++++++++++++++++++++------- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/internal/backend/test/tests.go b/internal/backend/test/tests.go index 7390b8593..9dfb068d9 100644 --- a/internal/backend/test/tests.go +++ b/internal/backend/test/tests.go @@ -11,6 +11,7 @@ import ( "reflect" "sort" "strings" + "sync" "testing" "time" @@ -250,14 +251,37 @@ func (s *Suite) TestList(t *testing.T) { const numTestFiles = 1233 list1 := restic.NewIDSet() + var wg sync.WaitGroup + input := make(chan int, numTestFiles) for i := 0; i < numTestFiles; i++ { - data := []byte(fmt.Sprintf("random test blob %v", i)) - id := restic.Hash(data) - h := restic.Handle{Type: restic.DataFile, Name: id.String()} - err := b.Save(context.TODO(), h, bytes.NewReader(data)) - if err != nil { - t.Fatal(err) - } + input <- i + } + close(input) + + output := make(chan restic.ID, numTestFiles) + + for worker := 0; worker < 5; worker++ { + wg.Add(1) + go func() { + defer wg.Done() + + for i := range input { + data := []byte(fmt.Sprintf("random test blob %v", i)) + id := restic.Hash(data) + h := restic.Handle{Type: restic.DataFile, Name: id.String()} + err := b.Save(context.TODO(), h, bytes.NewReader(data)) + if err != nil { + t.Fatal(err) + } + output <- id + } + }() + } + + wg.Wait() + close(output) + + for id := range output { list1.Insert(id) } From 3b6a580b32b30d0e61789893803d34fa2a231dbf Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Mon, 18 Sep 2017 12:01:54 +0200 Subject: [PATCH 6/9] backend: Make pagination for List configurable --- internal/backend/azure/azure.go | 19 ++++++-- internal/backend/gs/gs.go | 21 +++++--- internal/backend/test/tests.go | 86 ++++++++++++++++----------------- 3 files changed, 72 insertions(+), 54 deletions(-) diff --git a/internal/backend/azure/azure.go b/internal/backend/azure/azure.go index 8c5200f89..6fad216bf 100644 --- a/internal/backend/azure/azure.go +++ b/internal/backend/azure/azure.go @@ -17,13 +17,16 @@ import ( // Backend stores data on an azure endpoint. type Backend struct { - accountName string - container *storage.Container - sem *backend.Semaphore - prefix string + accountName string + container *storage.Container + sem *backend.Semaphore + prefix string + listMaxItems int backend.Layout } +const defaultListMaxItems = 5000 + // make sure that *Backend implements backend.Backend var _ restic.Backend = &Backend{} @@ -53,6 +56,7 @@ func open(cfg Config) (*Backend, error) { Path: cfg.Prefix, Join: path.Join, }, + listMaxItems: defaultListMaxItems, } return be, nil @@ -84,6 +88,11 @@ func Create(cfg Config) (restic.Backend, error) { return be, nil } +// SetListMaxItems sets the number of list items to load per request. +func (be *Backend) SetListMaxItems(i int) { + be.listMaxItems = i +} + // IsNotExist returns true if the error is caused by a not existing file. func (be *Backend) IsNotExist(err error) bool { debug.Log("IsNotExist(%T, %#v)", err, err) @@ -265,7 +274,7 @@ func (be *Backend) List(ctx context.Context, t restic.FileType) <-chan string { } params := storage.ListBlobsParameters{ - MaxResults: 1000, + MaxResults: uint(be.listMaxItems), Prefix: prefix, } diff --git a/internal/backend/gs/gs.go b/internal/backend/gs/gs.go index 072f87ea4..902726d1b 100644 --- a/internal/backend/gs/gs.go +++ b/internal/backend/gs/gs.go @@ -22,11 +22,12 @@ import ( // Backend stores data on an gs endpoint. type Backend struct { - service *storage.Service - projectID string - sem *backend.Semaphore - bucketName string - prefix string + service *storage.Service + projectID string + sem *backend.Semaphore + bucketName string + prefix string + listMaxItems int backend.Layout } @@ -55,6 +56,8 @@ func getStorageService(jsonKeyPath string) (*storage.Service, error) { return service, nil } +const defaultListMaxItems = 1000 + func open(cfg Config) (*Backend, error) { debug.Log("open, config %#v", cfg) @@ -78,6 +81,7 @@ func open(cfg Config) (*Backend, error) { Path: cfg.Prefix, Join: path.Join, }, + listMaxItems: defaultListMaxItems, } return be, nil @@ -111,6 +115,11 @@ func Create(cfg Config) (restic.Backend, error) { return be, nil } +// SetListMaxItems sets the number of list items to load per request. +func (be *Backend) SetListMaxItems(i int) { + be.listMaxItems = i +} + // IsNotExist returns true if the error is caused by a not existing file. func (be *Backend) IsNotExist(err error) bool { debug.Log("IsNotExist(%T, %#v)", err, err) @@ -298,7 +307,7 @@ func (be *Backend) List(ctx context.Context, t restic.FileType) <-chan string { go func() { defer close(ch) - listReq := be.service.Objects.List(be.bucketName).Prefix(prefix) + listReq := be.service.Objects.List(be.bucketName).Prefix(prefix).MaxResults(int64(be.listMaxItems)) for { obj, err := listReq.Do() if err != nil { diff --git a/internal/backend/test/tests.go b/internal/backend/test/tests.go index 9dfb068d9..5d75cd80b 100644 --- a/internal/backend/test/tests.go +++ b/internal/backend/test/tests.go @@ -11,7 +11,6 @@ import ( "reflect" "sort" "strings" - "sync" "testing" "time" @@ -241,67 +240,68 @@ func (s *Suite) TestLoad(t *testing.T) { test.OK(t, b.Remove(context.TODO(), handle)) } -// TestList makes sure that the backend can list more than a thousand files. +// TestList makes sure that the backend implements List() pagination correctly. func (s *Suite) TestList(t *testing.T) { seedRand(t) + numTestFiles := rand.Intn(20) + 20 + b := s.open(t) defer s.close(t, b) - const numTestFiles = 1233 list1 := restic.NewIDSet() - var wg sync.WaitGroup - input := make(chan int, numTestFiles) for i := 0; i < numTestFiles; i++ { - input <- i - } - close(input) - - output := make(chan restic.ID, numTestFiles) - - for worker := 0; worker < 5; worker++ { - wg.Add(1) - go func() { - defer wg.Done() - - for i := range input { - data := []byte(fmt.Sprintf("random test blob %v", i)) - id := restic.Hash(data) - h := restic.Handle{Type: restic.DataFile, Name: id.String()} - err := b.Save(context.TODO(), h, bytes.NewReader(data)) - if err != nil { - t.Fatal(err) - } - output <- id - } - }() - } - - wg.Wait() - close(output) - - for id := range output { + data := []byte(fmt.Sprintf("random test blob %v", i)) + id := restic.Hash(data) + h := restic.Handle{Type: restic.DataFile, Name: id.String()} + err := b.Save(context.TODO(), h, bytes.NewReader(data)) + if err != nil { + t.Fatal(err) + } list1.Insert(id) } t.Logf("wrote %v files", len(list1)) - list2 := restic.NewIDSet() - for name := range b.List(context.TODO(), restic.DataFile) { - id, err := restic.ParseID(name) - if err != nil { - t.Fatal(err) - } - list2.Insert(id) + var tests = []struct { + maxItems int + }{ + {3}, {8}, {11}, {13}, {23}, + {numTestFiles}, {numTestFiles + 7}, {numTestFiles + 10}, {numTestFiles + 1123}, } - t.Logf("loaded %v IDs from backend", len(list2)) + for _, test := range tests { + t.Run(fmt.Sprintf("max-%v", test.maxItems), func(t *testing.T) { + list2 := restic.NewIDSet() - if !list1.Equals(list2) { - t.Errorf("lists are not equal, list1 %d entries, list2 %d entries", len(list1), len(list2)) + type setter interface { + SetListMaxItems(int) + } + + if s, ok := b.(setter); ok { + t.Logf("setting max list items to %d", test.maxItems) + s.SetListMaxItems(test.maxItems) + } + + for name := range b.List(context.TODO(), restic.DataFile) { + id, err := restic.ParseID(name) + if err != nil { + t.Fatal(err) + } + list2.Insert(id) + } + + t.Logf("loaded %v IDs from backend", len(list2)) + + if !list1.Equals(list2) { + t.Errorf("lists are not equal, list1 %d entries, list2 %d entries", + len(list1), len(list2)) + } + }) } + t.Logf("remove %d files", numTestFiles) for id := range list1 { h := restic.Handle{Type: restic.DataFile, Name: id.String()} err := s.delayedRemove(t, b, h) From 835ba16c275a983bd2b4b8c80eea3399cd6a3acd Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Mon, 18 Sep 2017 12:13:35 +0200 Subject: [PATCH 7/9] b2: Add pagination for List() --- internal/backend/b2/b2.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/internal/backend/b2/b2.go b/internal/backend/b2/b2.go index 94a13222b..8fd148769 100644 --- a/internal/backend/b2/b2.go +++ b/internal/backend/b2/b2.go @@ -16,13 +16,16 @@ import ( // b2Backend is a backend which stores its data on Backblaze B2. type b2Backend struct { - client *b2.Client - bucket *b2.Bucket - cfg Config + client *b2.Client + bucket *b2.Bucket + cfg Config + listMaxItems int backend.Layout sem *backend.Semaphore } +const defaultListMaxItems = 1000 + // ensure statically that *b2Backend implements restic.Backend. var _ restic.Backend = &b2Backend{} @@ -121,6 +124,11 @@ func Create(cfg Config) (restic.Backend, error) { return be, nil } +// SetListMaxItems sets the number of list items to load per request. +func (be *b2Backend) SetListMaxItems(i int) { + be.listMaxItems = i +} + // Location returns the location for the backend. func (be *b2Backend) Location() string { return be.cfg.Bucket @@ -307,10 +315,11 @@ func (be *b2Backend) List(ctx context.Context, t restic.FileType) <-chan string cur := &b2.Cursor{Prefix: prefix} for { - objs, c, err := be.bucket.ListCurrentObjects(ctx, 1000, cur) + objs, c, err := be.bucket.ListCurrentObjects(ctx, be.listMaxItems, cur) if err != nil && err != io.EOF { return } + debug.Log("returned %v items", len(objs)) for _, obj := range objs { // Skip objects returned that do not have the specified prefix. if !strings.HasPrefix(obj.Name(), prefix) { From 4c6b626db6e7de19b8b580fac705b318838fb4c4 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Mon, 18 Sep 2017 13:18:42 +0200 Subject: [PATCH 8/9] backend: Improve TestList --- internal/backend/test/tests.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/internal/backend/test/tests.go b/internal/backend/test/tests.go index 5d75cd80b..e32e2c421 100644 --- a/internal/backend/test/tests.go +++ b/internal/backend/test/tests.go @@ -267,8 +267,7 @@ func (s *Suite) TestList(t *testing.T) { var tests = []struct { maxItems int }{ - {3}, {8}, {11}, {13}, {23}, - {numTestFiles}, {numTestFiles + 7}, {numTestFiles + 10}, {numTestFiles + 1123}, + {11}, {23}, {numTestFiles}, {numTestFiles + 10}, {numTestFiles + 1123}, } for _, test := range tests { @@ -302,12 +301,14 @@ func (s *Suite) TestList(t *testing.T) { } t.Logf("remove %d files", numTestFiles) + handles := make([]restic.Handle, 0, len(list1)) for id := range list1 { - h := restic.Handle{Type: restic.DataFile, Name: id.String()} - err := s.delayedRemove(t, b, h) - if err != nil { - t.Fatal(err) - } + handles = append(handles, restic.Handle{Type: restic.DataFile, Name: id.String()}) + } + + err := s.delayedRemove(t, b, handles...) + if err != nil { + t.Fatal(err) } } From d7e644272f4787fb3a16e0057824637896f35689 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Tue, 19 Sep 2017 10:50:07 +0200 Subject: [PATCH 9/9] prune: Add plausibility check --- cmd/restic/cmd_prune.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cmd/restic/cmd_prune.go b/cmd/restic/cmd_prune.go index c50b3413e..8e3c5b53d 100644 --- a/cmd/restic/cmd_prune.go +++ b/cmd/restic/cmd_prune.go @@ -179,6 +179,12 @@ func pruneRepository(gopts GlobalOptions, repo restic.Repository) error { } bar.Done() + if len(usedBlobs) > stats.blobs { + return errors.Fatalf("number of used blobs is larger than number of available blobs!\n" + + "Please report this error (along with the output of the 'prune' run) at\n" + + "https://github.com/restic/restic/issues/new") + } + Verbosef("found %d of %d data blobs still in use, removing %d blobs\n", len(usedBlobs), stats.blobs, stats.blobs-len(usedBlobs))