diff --git a/repository/parallel.go b/repository/parallel.go index 0900b2213..44f75ccbe 100644 --- a/repository/parallel.go +++ b/repository/parallel.go @@ -16,10 +16,14 @@ func closeIfOpen(ch chan struct{}) { } } +// ParallelWorkFunc gets one file ID to work on. If an error is returned, +// processing stops. If done is closed, the function should return. +type ParallelWorkFunc func(id string, done <-chan struct{}) error + // FilesInParallel runs n workers of f in parallel, on the IDs that // repo.List(t) yield. If f returns an error, the process is aborted and the // first error is returned. -func FilesInParallel(repo backend.Lister, t backend.Type, n uint, f func(backend.ID) error) error { +func FilesInParallel(repo backend.Lister, t backend.Type, n uint, f ParallelWorkFunc) error { done := make(chan struct{}) defer closeIfOpen(done) @@ -36,17 +40,12 @@ func FilesInParallel(repo backend.Lister, t backend.Type, n uint, f func(backend for { select { - case item, ok := <-ch: + case id, ok := <-ch: if !ok { return } - id, err := backend.ParseID(item) - - if err == nil { - err = f(id) - } - + err := f(id, done) if err != nil { closeIfOpen(done) errors <- err diff --git a/repository/parallel_test.go b/repository/parallel_test.go index 5220c1cf3..fb33e8677 100644 --- a/repository/parallel_test.go +++ b/repository/parallel_test.go @@ -93,7 +93,7 @@ func (tests testIDs) List(t backend.Type, done <-chan struct{}) <-chan string { } func TestFilesInParallel(t *testing.T) { - f := func(id backend.ID) error { + f := func(id string, done <-chan struct{}) error { time.Sleep(1 * time.Millisecond) return nil } @@ -108,7 +108,7 @@ var errTest = errors.New("test error") func TestFilesInParallelWithError(t *testing.T) { - f := func(id backend.ID) error { + f := func(id string, done <-chan struct{}) error { time.Sleep(1 * time.Millisecond) if rand.Float32() < 0.01 {