-
Notifications
You must be signed in to change notification settings - Fork 110
image/copy: Fix missing progress reporting for chunked layers #848
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
fdf8af4
80883b6
1b83a41
a8e5f3a
1c7ef0e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| package copy | ||
|
|
||
| import ( | ||
| "context" | ||
| "io" | ||
| "math" | ||
| "time" | ||
|
|
||
| "go.podman.io/image/v5/internal/private" | ||
| "go.podman.io/image/v5/types" | ||
| ) | ||
|
|
||
| // blobChunkAccessorProxy wraps a BlobChunkAccessor to update a *progressBar | ||
| // and optionally *progressReporter (if non-nil) with the number of received bytes. | ||
| type blobChunkAccessorProxy struct { | ||
| wrapped private.BlobChunkAccessor // The underlying BlobChunkAccessor | ||
| bar *progressBar // A progress bar updated with the number of bytes read so far | ||
| reporter progressReporter // A progress reporter updated with the number of bytes read so far | ||
| } | ||
|
|
||
| // GetBlobAt returns a sequential channel of readers that contain data for the requested | ||
| // blob chunks, and a channel that might get a single error value. | ||
| // The specified chunks must be not overlapping and sorted by their offset. | ||
| // The readers must be fully consumed, in the order they are returned, before blocking | ||
| // to read the next chunk. | ||
| // If the Length for the last chunk is set to math.MaxUint64, then it | ||
| // fully fetches the remaining data from the offset to the end of the blob. | ||
| func (s *blobChunkAccessorProxy) GetBlobAt(ctx context.Context, info types.BlobInfo, chunks []private.ImageSourceChunk) (chan io.ReadCloser, chan error, error) { | ||
| start := time.Now() | ||
| rc, errs, err := s.wrapped.GetBlobAt(ctx, info, chunks) | ||
| if err == nil { | ||
| total := int64(0) | ||
| for _, c := range chunks { | ||
| // do not update the progress bar if there is a chunk with unknown length. | ||
| if c.Length == math.MaxUint64 { | ||
| return rc, errs, err | ||
| } | ||
| total += int64(c.Length) | ||
| } | ||
| s.reporter.reportRead(uint64(total)) | ||
| s.bar.EwmaIncrInt64(total, time.Since(start)) | ||
| } | ||
| return rc, errs, err | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,73 +7,118 @@ import ( | |
| "go.podman.io/image/v5/types" | ||
| ) | ||
|
|
||
| // progressReader is a reader that reports its progress to a types.ProgressProperties channel on an interval. | ||
| type progressReader struct { | ||
| source io.Reader | ||
| channel chan<- types.ProgressProperties | ||
| interval time.Duration | ||
| artifact types.BlobInfo | ||
| lastUpdate time.Time | ||
| offset uint64 | ||
| offsetUpdate uint64 | ||
| // progressReporter is an interface for reporting progress about a single blob. | ||
| type progressReporter interface { | ||
| reportRead(bytesRead uint64) | ||
| reportDone() | ||
| reset() | ||
| } | ||
|
|
||
| // newProgressReader creates a new progress reader for: | ||
| // `source`: The source when internally reading bytes | ||
| // `channel`: The reporter channel to which the progress will be sent | ||
| // `interval`: The update interval to indicate how often the progress should update | ||
| // `artifact`: The blob metadata which is currently being progressed | ||
| func newProgressReader( | ||
| source io.Reader, | ||
| // noopProgressReporter is a no-op implementation of progressReporter. | ||
| type noopProgressReporter struct{} | ||
|
|
||
| func (r *noopProgressReporter) reportRead(uint64) {} | ||
| func (r *noopProgressReporter) reportDone() {} | ||
| func (r *noopProgressReporter) reset() {} | ||
|
|
||
| // channelProgressReporter reports progress about a single blob to a | ||
| // types.ProgressProperties channel and supports re-starting from zero | ||
| // without reporting the progress through the channel unless | ||
| // it's higher than the offset reached before the restart to | ||
| // avoid confusing behavior in consumers of the events | ||
| // (skipping back). | ||
| type channelProgressReporter struct { | ||
| channel chan<- types.ProgressProperties // The reporter channel to which the progress will be sent | ||
| interval time.Duration // The update interval to indicate how often the progress should update | ||
| artifact types.BlobInfo // The blob metadata which is currently being progressed | ||
| lastUpdate time.Time // The last time a progress channel event was sent | ||
| offset uint64 // The currently downloaded size in bytes | ||
| maxReportedOffset uint64 // The high-water mark for offset already sent to the channel | ||
| } | ||
|
|
||
| // newChannelProgressReporter creates a new progress reporter | ||
| // and immediately reports a new artifact event. | ||
| func newChannelProgressReporter( | ||
| channel chan<- types.ProgressProperties, | ||
| interval time.Duration, | ||
| artifact types.BlobInfo, | ||
| ) *progressReader { | ||
| // The progress reader constructor informs the progress channel | ||
| // that a new artifact will be read | ||
| ) progressReporter { | ||
| channel <- types.ProgressProperties{ | ||
| Event: types.ProgressEventNewArtifact, | ||
| Artifact: artifact, | ||
| } | ||
| return &progressReader{ | ||
| source: source, | ||
| channel: channel, | ||
| interval: interval, | ||
| artifact: artifact, | ||
| lastUpdate: time.Now(), | ||
| offset: 0, | ||
| offsetUpdate: 0, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (Absolutely non-blocking: I think keeping explicit initializers here is a tiny bit better because we do rely on having these values exactly.) |
||
| return &channelProgressReporter{ | ||
| channel: channel, | ||
| interval: interval, | ||
| artifact: artifact, | ||
| lastUpdate: time.Now(), | ||
| offset: 0, | ||
| maxReportedOffset: 0, | ||
| } | ||
| } | ||
|
|
||
| // reportDone indicates to the internal channel that the progress has been | ||
| // finished | ||
| func (r *progressReader) reportDone() { | ||
| r.channel <- types.ProgressProperties{ | ||
| Event: types.ProgressEventDone, | ||
| Artifact: r.artifact, | ||
| Offset: r.offset, | ||
| OffsetUpdate: r.offsetUpdate, | ||
| } | ||
| // reset resets the reporter's progress. | ||
| // | ||
| // It's meant to be used on error when | ||
| // the processing has to be re-started | ||
| // (e.g. ErrFallbackToOrdinaryLayerDownload). | ||
| func (r *channelProgressReporter) reset() { | ||
| r.offset = 0 | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this situation, we don’t want the GUI progress bars to decrease; just note that the current offset is 0 but remember that we reported N, and then report no progress until we reach N again.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I initially misunderstood the problem (I was thinking of the progress bars here). It should behave like you describe. |
||
| } | ||
|
|
||
| // Read continuously reads bytes into the progress reader and reports the | ||
| // status via the internal channel | ||
| func (r *progressReader) Read(p []byte) (int, error) { | ||
| n, err := r.source.Read(p) | ||
| r.offset += uint64(n) | ||
| r.offsetUpdate += uint64(n) | ||
|
|
||
| // Fire the progress reader in the provided interval | ||
| if time.Since(r.lastUpdate) > r.interval { | ||
| // reportRead reports progress with the number of `bytesRead` | ||
| // while keeping track of a current high-water mark in case | ||
| // of reset(). It never skips back below the already reported | ||
| // offset and does not report the progress unless | ||
| // the configured `interval` elapses. | ||
| func (r *channelProgressReporter) reportRead(bytesRead uint64) { | ||
| r.offset += bytesRead | ||
| if r.offset > r.maxReportedOffset && time.Since(r.lastUpdate) > r.interval { | ||
| r.channel <- types.ProgressProperties{ | ||
| Event: types.ProgressEventRead, | ||
| Artifact: r.artifact, | ||
| Offset: r.offset, | ||
| OffsetUpdate: r.offsetUpdate, | ||
| OffsetUpdate: r.offset - r.maxReportedOffset, | ||
| } | ||
| r.maxReportedOffset = r.offset | ||
| r.lastUpdate = time.Now() | ||
| r.offsetUpdate = 0 | ||
| } | ||
| } | ||
|
|
||
| // reportDone reports successful completion. | ||
| func (r *channelProgressReporter) reportDone() { | ||
| offset := max(r.offset, r.maxReportedOffset) | ||
| r.channel <- types.ProgressProperties{ | ||
| Event: types.ProgressEventDone, | ||
| Artifact: r.artifact, | ||
| Offset: offset, | ||
| OffsetUpdate: offset - r.maxReportedOffset, | ||
| } | ||
| } | ||
|
|
||
| // progressReader extends a wrapped io.Reader | ||
| // with additional reporting of its progress. | ||
| type progressReader struct { | ||
| source io.Reader | ||
| progressReporter | ||
| } | ||
|
|
||
| // newProgressReader creates a new progress reader that wraps source | ||
| // and reports progress through the given reporter. | ||
| func newProgressReader( | ||
| source io.Reader, | ||
| reporter progressReporter, | ||
| ) *progressReader { | ||
| return &progressReader{ | ||
| source: source, | ||
| progressReporter: reporter, | ||
| } | ||
| } | ||
|
|
||
| // Read continuously reads bytes into the progress reader and reports the | ||
| // status via the internal channel. | ||
| func (r *progressReader) Read(p []byte) (int, error) { | ||
| n, err := r.source.Read(p) | ||
| r.reportRead(uint64(n)) | ||
| return n, err | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to make sure, is it ok not worry about thread-safety here?
There was no synchronization in
progressReaderbefore my changes.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes; we have one goroutine per blob, so that naturally synchronizes the per-blob operations; and
io.Readeris generally not thread-safe, so this is not making anything worse.