rest: Use semaphore

This commit is contained in:
Alexander Neumann 2017-06-06 00:25:22 +02:00
parent 683ebef6c6
commit 46049b4236
4 changed files with 42 additions and 28 deletions

View File

@ -5,11 +5,24 @@ import (
"strings" "strings"
"restic/errors" "restic/errors"
"restic/options"
) )
// Config contains all configuration necessary to connect to a REST server. // Config contains all configuration necessary to connect to a REST server.
type Config struct { type Config struct {
URL *url.URL URL *url.URL
Connections uint `option:"connections" help:"set a limit for the number of concurrent connections (default: 20)"`
}
func init() {
options.Register("rest", Config{})
}
// NewConfig returns a new Config with the default values filled in.
func NewConfig() Config {
return Config{
Connections: 20,
}
} }
// ParseConfig parses the string s and extracts the REST server URL. // ParseConfig parses the string s and extracts the REST server URL.
@ -25,6 +38,7 @@ func ParseConfig(s string) (interface{}, error) {
return nil, errors.Wrap(err, "url.Parse") return nil, errors.Wrap(err, "url.Parse")
} }
cfg := Config{URL: u} cfg := NewConfig()
cfg.URL = u
return cfg, nil return cfg, nil
} }

View File

@ -20,7 +20,8 @@ var configTests = []struct {
cfg Config cfg Config
}{ }{
{"rest:http://localhost:1234", Config{ {"rest:http://localhost:1234", Config{
URL: parseURL("http://localhost:1234"), URL: parseURL("http://localhost:1234"),
Connections: 20,
}}, }},
} }

View File

@ -23,21 +23,21 @@ const connLimit = 40
var _ restic.Backend = &restBackend{} var _ restic.Backend = &restBackend{}
type restBackend struct { type restBackend struct {
url *url.URL url *url.URL
connChan chan struct{} sem *backend.Semaphore
client http.Client client http.Client
backend.Layout backend.Layout
} }
// Open opens the REST backend with the given config. // Open opens the REST backend with the given config.
func Open(cfg Config) (restic.Backend, error) { func Open(cfg Config) (restic.Backend, error) {
connChan := make(chan struct{}, connLimit)
for i := 0; i < connLimit; i++ {
connChan <- struct{}{}
}
client := http.Client{Transport: backend.Transport()} client := http.Client{Transport: backend.Transport()}
sem, err := backend.NewSemaphore(cfg.Connections)
if err != nil {
return nil, err
}
// use url without trailing slash for layout // use url without trailing slash for layout
url := cfg.URL.String() url := cfg.URL.String()
if url[len(url)-1] == '/' { if url[len(url)-1] == '/' {
@ -45,10 +45,10 @@ func Open(cfg Config) (restic.Backend, error) {
} }
be := &restBackend{ be := &restBackend{
url: cfg.URL, url: cfg.URL,
connChan: connChan, client: client,
client: client, Layout: &backend.RESTLayout{URL: url, Join: path.Join},
Layout: &backend.RESTLayout{URL: url, Join: path.Join}, sem: sem,
} }
return be, nil return be, nil
@ -108,9 +108,9 @@ func (b *restBackend) Save(h restic.Handle, rd io.Reader) (err error) {
// backend.Closer, which has a noop method. // backend.Closer, which has a noop method.
rd = backend.Closer{Reader: rd} rd = backend.Closer{Reader: rd}
<-b.connChan b.sem.GetToken()
resp, err := b.client.Post(b.Filename(h), "binary/octet-stream", rd) resp, err := b.client.Post(b.Filename(h), "binary/octet-stream", rd)
b.connChan <- struct{}{} b.sem.ReleaseToken()
if resp != nil { if resp != nil {
defer func() { defer func() {
@ -163,9 +163,9 @@ func (b *restBackend) Load(h restic.Handle, length int, offset int64) (io.ReadCl
req.Header.Add("Range", byteRange) req.Header.Add("Range", byteRange)
debug.Log("Load(%v) send range %v", h, byteRange) debug.Log("Load(%v) send range %v", h, byteRange)
<-b.connChan b.sem.GetToken()
resp, err := b.client.Do(req) resp, err := b.client.Do(req)
b.connChan <- struct{}{} b.sem.ReleaseToken()
if err != nil { if err != nil {
if resp != nil { if resp != nil {
@ -190,9 +190,9 @@ func (b *restBackend) Stat(h restic.Handle) (restic.FileInfo, error) {
return restic.FileInfo{}, err return restic.FileInfo{}, err
} }
<-b.connChan b.sem.GetToken()
resp, err := b.client.Head(b.Filename(h)) resp, err := b.client.Head(b.Filename(h))
b.connChan <- struct{}{} b.sem.ReleaseToken()
if err != nil { if err != nil {
return restic.FileInfo{}, errors.Wrap(err, "client.Head") return restic.FileInfo{}, errors.Wrap(err, "client.Head")
} }
@ -237,9 +237,9 @@ func (b *restBackend) Remove(h restic.Handle) error {
if err != nil { if err != nil {
return errors.Wrap(err, "http.NewRequest") return errors.Wrap(err, "http.NewRequest")
} }
<-b.connChan b.sem.GetToken()
resp, err := b.client.Do(req) resp, err := b.client.Do(req)
b.connChan <- struct{}{} b.sem.ReleaseToken()
if err != nil { if err != nil {
return errors.Wrap(err, "client.Do") return errors.Wrap(err, "client.Do")
@ -264,9 +264,9 @@ func (b *restBackend) List(t restic.FileType, done <-chan struct{}) <-chan strin
url += "/" url += "/"
} }
<-b.connChan b.sem.GetToken()
resp, err := b.client.Get(url) resp, err := b.client.Get(url)
b.connChan <- struct{}{} b.sem.ReleaseToken()
if resp != nil { if resp != nil {
defer func() { defer func() {

View File

@ -76,9 +76,8 @@ func newTestSuite(ctx context.Context, t testing.TB) *test.Suite {
t.Fatal(err) t.Fatal(err)
} }
cfg := rest.Config{ cfg := rest.NewConfig()
URL: url, cfg.URL = url
}
return cfg, nil return cfg, nil
}, },