diff --git a/internal/backend/http_transport.go b/internal/backend/http_transport.go index 19b20dc6a..2b9efe000 100644 --- a/internal/backend/http_transport.go +++ b/internal/backend/http_transport.go @@ -1,6 +1,7 @@ package backend import ( + "context" "crypto/tls" "crypto/x509" "encoding/pem" @@ -13,6 +14,7 @@ import ( "github.com/peterbourgon/unixtransport" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/feature" ) // TransportOptions collects various options which can be set for an HTTP based @@ -66,14 +68,28 @@ func readPEMCertKey(filename string) (certs []byte, key []byte, err error) { // a custom rootCertFilename is non-empty, it must point to a valid PEM file, // otherwise the function will return an error. func Transport(opts TransportOptions) (http.RoundTripper, error) { + dial := (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + DualStack: true, + }).DialContext + dialTimeout := dial + + if feature.Flag.Enabled(feature.HTTPTimeouts) { + // inject timeoutConn to enforce progress + dialTimeout = func(ctx context.Context, network, addr string) (net.Conn, error) { + conn, err := dial(ctx, network, addr) + if err != nil { + return conn, err + } + return newTimeoutConn(conn, 5*time.Minute) + } + } + // copied from net/http tr := &http.Transport{ - Proxy: http.ProxyFromEnvironment, - DialContext: (&net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 30 * time.Second, - DualStack: true, - }).DialContext, + Proxy: http.ProxyFromEnvironment, + DialContext: dialTimeout, ForceAttemptHTTP2: true, MaxIdleConns: 100, MaxIdleConnsPerHost: 100, diff --git a/internal/backend/timeout_transport.go b/internal/backend/timeout_transport.go new file mode 100644 index 000000000..551df68eb --- /dev/null +++ b/internal/backend/timeout_transport.go @@ -0,0 +1,172 @@ +package backend + +import ( + "net" + "os" + "sync" + "time" +) + +// timeoutConn will timeout if no read or write progress is made for progressTimeout. +// This ensures that stuck network connections are interrupted after some time. +// By using a timeoutConn within a http transport (via DialContext), sending / receing +// the request / response body is guarded with a timeout. The read progress part also +// limits the time until a response header must be received. +// +// The progressTimeout must be larger than the IdleConnTimeout of the http transport. +// +// The http2.Transport offers a similar functionality via WriteByteTimeout & ReadIdleTimeout. +// However, those are not available for HTTP/1 connections. Thus, there's no builtin way to +// enforce progress for sending the request body or reading the response body. +// See https://github.com/restic/restic/issues/4193#issuecomment-2067988727 for details. +type timeoutConn struct { + conn net.Conn + // timeout within which a read/write must make progress, otherwise a connection is considered broken + // if no read/write is pending, then the timeout is inactive + progressTimeout time.Duration + + // all access to fields below must hold m + m sync.Mutex + + // user defined read/write deadline + readDeadline time.Time + writeDeadline time.Time + // timestamp of last successful write (at least one byte) + lastWrite time.Time +} + +var _ net.Conn = &timeoutConn{} + +func newTimeoutConn(conn net.Conn, progressTimeout time.Duration) (*timeoutConn, error) { + // reset timeouts to ensure a consistent state + err := conn.SetDeadline(time.Time{}) + if err != nil { + return nil, err + } + + return &timeoutConn{ + conn: conn, + progressTimeout: progressTimeout, + }, nil +} + +func (t *timeoutConn) Write(p []byte) (n int, err error) { + t.m.Lock() + timeout := t.writeDeadline + t.m.Unlock() + var zero time.Time + if timeout != zero { + // fall back to standard behavior if a timeout was set explicitly + n, err := t.conn.Write(p) + if n > 0 { + t.m.Lock() + t.lastWrite = time.Now() + t.m.Unlock() + } + return n, err + } + + // based on http2stickyErrWriter.Write from go/src/net/http/h2_bundle.go + for { + _ = t.conn.SetWriteDeadline(time.Now().Add(t.progressTimeout)) + + nn, err := t.conn.Write(p[n:]) + n += nn + if nn > 0 { + // track write progress + t.m.Lock() + t.lastWrite = time.Now() + t.m.Unlock() + } + + if n < len(p) && nn > 0 && err == os.ErrDeadlineExceeded { + // some data is still left to send, keep going as long as there is some progress + continue + } + + t.m.Lock() + // restore configured deadline + _ = t.conn.SetWriteDeadline(t.writeDeadline) + t.m.Unlock() + return n, err + } +} + +func (t *timeoutConn) Read(b []byte) (n int, err error) { + t.m.Lock() + timeout := t.readDeadline + t.m.Unlock() + var zero time.Time + if timeout != zero { + // fall back to standard behavior if a timeout was set explicitly + return t.conn.Read(b) + } + + var start = time.Now() + + for { + _ = t.conn.SetReadDeadline(start.Add(t.progressTimeout)) + + nn, err := t.conn.Read(b) + t.m.Lock() + lastWrite := t.lastWrite + t.m.Unlock() + if nn == 0 && err == os.ErrDeadlineExceeded && lastWrite.After(start) { + // deadline exceeded, but write made some progress in the meantime + start = lastWrite + continue + } + + t.m.Lock() + // restore configured deadline + _ = t.conn.SetReadDeadline(t.readDeadline) + t.m.Unlock() + return nn, err + } +} + +func (t *timeoutConn) Close() error { + return t.conn.Close() +} + +func (t *timeoutConn) LocalAddr() net.Addr { + return t.conn.LocalAddr() +} + +func (t *timeoutConn) RemoteAddr() net.Addr { + return t.conn.RemoteAddr() +} + +func (t *timeoutConn) SetDeadline(d time.Time) error { + err := t.SetReadDeadline(d) + err2 := t.SetWriteDeadline(d) + if err != nil { + return err + } + return err2 +} + +func (t *timeoutConn) SetReadDeadline(d time.Time) error { + t.m.Lock() + defer t.m.Unlock() + + // track timeout modifications, as the current timeout cannot be queried + err := t.conn.SetReadDeadline(d) + if err != nil { + return err + } + t.readDeadline = d + return nil +} + +func (t *timeoutConn) SetWriteDeadline(d time.Time) error { + t.m.Lock() + defer t.m.Unlock() + + err := t.conn.SetWriteDeadline(d) + if err != nil { + return err + } + t.writeDeadline = d + return nil +} diff --git a/internal/feature/registry.go b/internal/feature/registry.go index 2d2e45edf..1b2c8207d 100644 --- a/internal/feature/registry.go +++ b/internal/feature/registry.go @@ -8,6 +8,7 @@ const ( DeprecateLegacyIndex FlagName = "deprecate-legacy-index" DeprecateS3LegacyLayout FlagName = "deprecate-s3-legacy-layout" DeviceIDForHardlinks FlagName = "device-id-for-hardlinks" + HTTPTimeouts FlagName = "http-timeouts" ) func init() { @@ -15,5 +16,6 @@ func init() { DeprecateLegacyIndex: {Type: Beta, Description: "disable support for index format used by restic 0.1.0. Use `restic repair index` to update the index if necessary."}, DeprecateS3LegacyLayout: {Type: Beta, Description: "disable support for S3 legacy layout used up to restic 0.7.0. Use `RESTIC_FEATURES=deprecate-s3-legacy-layout=false restic migrate s3_layout` to migrate your S3 repository if necessary."}, DeviceIDForHardlinks: {Type: Alpha, Description: "store deviceID only for hardlinks to reduce metadata changes for example when using btrfs subvolumes. Will be removed in a future restic version after repository format 3 is available"}, + HTTPTimeouts: {Type: Beta, Description: "improve handling of stuck HTTP connections using timeouts."}, }) }