From 6a548336ec799a6e080b63d406e402038c6db16a Mon Sep 17 00:00:00 2001 From: Philipp Serr Date: Sun, 6 Dec 2015 17:35:22 +0100 Subject: [PATCH 1/5] Add a test concurrently saving duplicated chunks This commit adds an integration test, that calls Archiver.Save from many goroutines processing several duplicated chunks concurrently. The test asserts, that after all chunks have been saved, there are no unreferenced packs in the repository. The test has been checked to give the expected results: 1) Running the test with maxParallel=1 (all chunks are processed sequentially) has been verified not to produce any unreferenced packs. Consequently the test passes. 2) Running the test with unbounded parallelism (maxParallel= math.MaxInt32) has been verified to produce unreferenced packs all the time (at least 25 test runs). Consequently the test fails due to #358. references: #358 --- archiver_test.go | 120 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/archiver_test.go b/archiver_test.go index 007ed4115..63531b0b4 100644 --- a/archiver_test.go +++ b/archiver_test.go @@ -4,13 +4,16 @@ import ( "bytes" "crypto/sha256" "io" + "math" "testing" "github.com/restic/chunker" "github.com/restic/restic" "github.com/restic/restic/backend" + "github.com/restic/restic/checker" "github.com/restic/restic/crypto" "github.com/restic/restic/pack" + "github.com/restic/restic/repository" . "github.com/restic/restic/test" ) @@ -21,6 +24,11 @@ type Rdr interface { io.ReaderAt } +type chunkedData struct { + buf []byte + chunks []*chunker.Chunk +} + func benchmarkChunkEncrypt(b testing.TB, buf, buf2 []byte, rd Rdr, key *crypto.Key) { rd.Seek(0, 0) ch := chunker.New(rd, testPol, sha256.New()) @@ -233,3 +241,115 @@ func BenchmarkLoadTree(t *testing.B) { } } } + +// Saves several identical chunks concurrently and later check that there are no +// unreferenced packs in the repository. See also #292 and #358. +// The combination of high duplication and high concurrency should provoke any +// issues leading to unreferenced packs. +func TestParallelSaveWithHighDuplication(t *testing.T) { + repo := SetupRepo() + defer TeardownRepo(repo) + + // For every seed a pseudo-random 32Mb blob is generated and split into + // chunks. During the test all chunks of all blobs are processed in parallel + // goroutines. To increase duplication, each chunk is processed + // times. Concurrency can be limited by changing . + // Note: seeds 5, 3, 66, 4, 12 produce the most chunks (descending) + seeds := []int{5, 3, 66, 4, 12} + maxParallel := math.MaxInt32 + duplication := 15 + + arch := restic.NewArchiver(repo) + data := getRandomData(seeds) + + barrier := make(chan struct{}, maxParallel) + errChannels := [](<-chan error){} + + for _, d := range data { + for _, c := range d.chunks { + for dupIdx := 0; dupIdx < duplication; dupIdx++ { + errChan := make(chan error) + errChannels = append(errChannels, errChan) + + go func(buf *[]byte, c *chunker.Chunk, errChan chan<- error) { + barrier <- struct{}{} + + hash := c.Digest + id := backend.ID{} + copy(id[:], hash) + + err := arch.Save(pack.Data, id, c.Length, c.Reader(bytes.NewReader(*buf))) + <-barrier + errChan <- err + }(&d.buf, c, errChan) + } + } + } + + for _, errChan := range errChannels { + OK(t, <-errChan) + } + + OK(t, repo.Flush()) + OK(t, repo.SaveIndex()) + + chkr := createAndInitChecker(t, repo) + assertNoUnreferencedPacks(t, chkr) +} + +func getRandomData(seeds []int) []*chunkedData { + chunks := []*chunkedData{} + sem := make(chan struct{}, len(seeds)) + + for seed := range seeds { + c := &chunkedData{} + chunks = append(chunks, c) + + go func(seed int, data *chunkedData) { + data.buf = Random(seed, 32*1024*1024) + chunker := chunker.New(bytes.NewReader(data.buf), testPol, sha256.New()) + + for { + c, err := chunker.Next() + if err == io.EOF { + break + } + data.chunks = append(data.chunks, c) + } + + sem <- struct{}{} + }(seed, c) + } + + for i := 0; i < len(seeds); i++ { + <-sem + } + return chunks +} + +func createAndInitChecker(t *testing.T, repo *repository.Repository) *checker.Checker { + chkr := checker.New(repo) + + hints, errs := chkr.LoadIndex() + if len(errs) > 0 { + t.Fatalf("expected no errors, got %v: %v", len(errs), errs) + } + + if len(hints) > 0 { + t.Errorf("expected no hints, got %v: %v", len(hints), hints) + } + + return chkr +} + +func assertNoUnreferencedPacks(t *testing.T, chkr *checker.Checker) { + done := make(chan struct{}) + defer close(done) + + errChan := make(chan error) + go chkr.Packs(errChan, done) + + for err := range errChan { + OK(t, err) + } +} From 3d7f72311a1d933f1d4b7c16097a3546c779a729 Mon Sep 17 00:00:00 2001 From: Philipp Serr Date: Wed, 9 Dec 2015 21:09:49 +0100 Subject: [PATCH 2/5] Provoke unreferenced packs using fewer goroutines TestParallelSaveWithDuplication has been reworked to provoke unreferenced packs using fewer goroutines than before and create only one bytes.Reader per blob. This reduces memory usage significantly. The following actions have been taken to keep the chance of provoking unreferenced packs due to #358 high: * Interweaved processing of subsequent chunks * Delaying each goroutine by a few pseudo-randomly chosen nanoseconds (depending on the platform this will most probably only make the os yield execution to another thread): together with the interweaved processing of subsequent chunks, this ensures a minimalistic delay between processing of (some) duplicated chunks * Repeating the test 5 times with different seeds On my test machine, the modified test provoked unreferenced packs 60 times in a row. --- archiver_test.go | 95 +++++++++++++++++++++--------------------------- 1 file changed, 41 insertions(+), 54 deletions(-) diff --git a/archiver_test.go b/archiver_test.go index 63531b0b4..b519db120 100644 --- a/archiver_test.go +++ b/archiver_test.go @@ -4,8 +4,8 @@ import ( "bytes" "crypto/sha256" "io" - "math" "testing" + "time" "github.com/restic/chunker" "github.com/restic/restic" @@ -242,47 +242,48 @@ func BenchmarkLoadTree(t *testing.B) { } } -// Saves several identical chunks concurrently and later check that there are no +// Saves several identical chunks concurrently and later checks that there are no // unreferenced packs in the repository. See also #292 and #358. -// The combination of high duplication and high concurrency should provoke any -// issues leading to unreferenced packs. -func TestParallelSaveWithHighDuplication(t *testing.T) { +func TestParallelSaveWithDuplication(t *testing.T) { + for seed := 0; seed < 5; seed++ { + testParallelSaveWithDuplication(t, seed) + } +} + +func testParallelSaveWithDuplication(t *testing.T, seed int) { repo := SetupRepo() defer TeardownRepo(repo) - // For every seed a pseudo-random 32Mb blob is generated and split into - // chunks. During the test all chunks of all blobs are processed in parallel - // goroutines. To increase duplication, each chunk is processed - // times. Concurrency can be limited by changing . - // Note: seeds 5, 3, 66, 4, 12 produce the most chunks (descending) - seeds := []int{5, 3, 66, 4, 12} - maxParallel := math.MaxInt32 - duplication := 15 + dataSizeMb := 92 + duplication := 7 arch := restic.NewArchiver(repo) - data := getRandomData(seeds) + data, chunks := getRandomData(seed, dataSizeMb*1024*1024) + reader := bytes.NewReader(data) - barrier := make(chan struct{}, maxParallel) errChannels := [](<-chan error){} - for _, d := range data { - for _, c := range d.chunks { - for dupIdx := 0; dupIdx < duplication; dupIdx++ { - errChan := make(chan error) - errChannels = append(errChannels, errChan) + // interweaved processing of subsequent chunks + maxParallel := 2*duplication - 1 + barrier := make(chan struct{}, maxParallel) - go func(buf *[]byte, c *chunker.Chunk, errChan chan<- error) { - barrier <- struct{}{} + for _, c := range chunks { + for dupIdx := 0; dupIdx < duplication; dupIdx++ { + errChan := make(chan error) + errChannels = append(errChannels, errChan) - hash := c.Digest - id := backend.ID{} - copy(id[:], hash) + go func(reader *bytes.Reader, c *chunker.Chunk, errChan chan<- error) { + barrier <- struct{}{} - err := arch.Save(pack.Data, id, c.Length, c.Reader(bytes.NewReader(*buf))) - <-barrier - errChan <- err - }(&d.buf, c, errChan) - } + hash := c.Digest + id := backend.ID{} + copy(id[:], hash) + + time.Sleep(time.Duration(hash[0])) + err := arch.Save(pack.Data, id, c.Length, c.Reader(reader)) + <-barrier + errChan <- err + }(reader, c, errChan) } } @@ -297,34 +298,20 @@ func TestParallelSaveWithHighDuplication(t *testing.T) { assertNoUnreferencedPacks(t, chkr) } -func getRandomData(seeds []int) []*chunkedData { - chunks := []*chunkedData{} - sem := make(chan struct{}, len(seeds)) +func getRandomData(seed int, size int) ([]byte, []*chunker.Chunk) { + buf := Random(seed, size) + chunks := []*chunker.Chunk{} + chunker := chunker.New(bytes.NewReader(buf), testPol, sha256.New()) - for seed := range seeds { - c := &chunkedData{} + for { + c, err := chunker.Next() + if err == io.EOF { + break + } chunks = append(chunks, c) - - go func(seed int, data *chunkedData) { - data.buf = Random(seed, 32*1024*1024) - chunker := chunker.New(bytes.NewReader(data.buf), testPol, sha256.New()) - - for { - c, err := chunker.Next() - if err == io.EOF { - break - } - data.chunks = append(data.chunks, c) - } - - sem <- struct{}{} - }(seed, c) } - for i := 0; i < len(seeds); i++ { - <-sem - } - return chunks + return buf, chunks } func createAndInitChecker(t *testing.T, repo *repository.Repository) *checker.Checker { From e7bf936d2b82418be2d91d67b7e4cb0c4225ac54 Mon Sep 17 00:00:00 2001 From: Philipp Serr Date: Wed, 9 Dec 2015 21:38:03 +0100 Subject: [PATCH 3/5] Increase number of chunks and test repetitions --- archiver_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/archiver_test.go b/archiver_test.go index b519db120..75b1176cb 100644 --- a/archiver_test.go +++ b/archiver_test.go @@ -245,7 +245,7 @@ func BenchmarkLoadTree(t *testing.B) { // Saves several identical chunks concurrently and later checks that there are no // unreferenced packs in the repository. See also #292 and #358. func TestParallelSaveWithDuplication(t *testing.T) { - for seed := 0; seed < 5; seed++ { + for seed := 0; seed < 10; seed++ { testParallelSaveWithDuplication(t, seed) } } @@ -254,7 +254,7 @@ func testParallelSaveWithDuplication(t *testing.T, seed int) { repo := SetupRepo() defer TeardownRepo(repo) - dataSizeMb := 92 + dataSizeMb := 128 duplication := 7 arch := restic.NewArchiver(repo) From 0fde09a86699d42b569bbc02708dc14895862c3b Mon Sep 17 00:00:00 2001 From: Philipp Serr Date: Mon, 28 Dec 2015 18:18:25 +0100 Subject: [PATCH 4/5] Lock MasterIndex and InFlight store together fixes: #358 --- repository/master_index.go | 46 +++++++++++++++++++++++++++----------- 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/repository/master_index.go b/repository/master_index.go index a747d06e4..2f63e82a7 100644 --- a/repository/master_index.go +++ b/repository/master_index.go @@ -157,27 +157,47 @@ func (mi *MasterIndex) Current() *Index { // AddInFlight add the given ID to the list of in-flight IDs. An error is // returned when the ID is already in the list. func (mi *MasterIndex) AddInFlight(id backend.ID) error { - mi.inFlight.Lock() - defer mi.inFlight.Unlock() + // The index + inFlight store must be searched for a matching id in one + // atomic operation. This requires locking the inFlight store and the + // index together! + mi.inFlight.Lock() + defer mi.inFlight.Unlock() - debug.Log("MasterIndex.AddInFlight", "adding %v", id) - if mi.inFlight.Has(id) { - return fmt.Errorf("%v is already in flight", id) - } + // Note: mi.Has read locks the index again. + mi.idxMutex.RLock() + defer mi.idxMutex.RUnlock() - mi.inFlight.Insert(id) - return nil + debug.Log("MasterIndex.AddInFlight", "adding %v", id) + if mi.inFlight.Has(id) { + return fmt.Errorf("%v is already in flight", id) + } + if mi.Has(id) { + return fmt.Errorf("%v is already indexed (fully processed)", id) + } + + mi.inFlight.Insert(id) + return nil } // IsInFlight returns true iff the id is contained in the list of in-flight IDs. func (mi *MasterIndex) IsInFlight(id backend.ID) bool { - mi.inFlight.RLock() - defer mi.inFlight.RUnlock() + // The index + inFlight store must be searched for a matching id in one + // atomic operation. This requires locking the inFlight store and the + // index together! + mi.inFlight.RLock() + defer mi.inFlight.RUnlock() - inFlight := mi.inFlight.Has(id) - debug.Log("MasterIndex.IsInFlight", "testing whether %v is in flight: %v", id.Str(), inFlight) + // Note: mi.Has read locks the index again. + mi.idxMutex.RLock() + defer mi.idxMutex.RUnlock() - return inFlight + inFlight := mi.inFlight.Has(id) + debug.Log("MasterIndex.IsInFlight", "testing whether %v is in flight: %v", id.Str(), inFlight) + + indexed := mi.Has(id) + debug.Log("MasterIndex.IsInFlight", "testing whether %v is indexed (fully processed): %v", id.Str(), indexed) + + return inFlight } // RemoveFromInFlight deletes the given ID from the liste of in-flight IDs. From e689d499e73148ddc3d016b9e373656986878390 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 17 Jan 2016 19:46:48 +0100 Subject: [PATCH 5/5] Improve RandomReader --- test/helpers.go | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/test/helpers.go b/test/helpers.go index 7abb536b3..69a75cd2c 100644 --- a/test/helpers.go +++ b/test/helpers.go @@ -1,7 +1,6 @@ package test_helper import ( - "bytes" "compress/bzip2" "compress/gzip" "crypto/rand" @@ -86,10 +85,24 @@ func Random(seed, count int) []byte { return buf } +type rndReader struct { + src *mrand.Rand +} + +func (r *rndReader) Read(p []byte) (int, error) { + fmt.Printf("Read(%v)\n", len(p)) + for i := range p { + p[i] = byte(r.src.Uint32()) + } + + return len(p), nil +} + // RandomReader returns a reader that returns size bytes of pseudo-random data // derived from the seed. -func RandomReader(seed, size int) *bytes.Reader { - return bytes.NewReader(Random(seed, size)) +func RandomReader(seed, size int) io.Reader { + r := &rndReader{src: mrand.New(mrand.NewSource(int64(seed)))} + return io.LimitReader(r, int64(size)) } // GenRandom returns a []byte filled with up to 1000 random bytes.