Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions image/copy/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
func (ic *imageCopier) copyBlobFromStream(ctx context.Context, srcReader io.Reader, srcInfo types.BlobInfo,
getOriginalLayerCopyWriter func(decompressor compressiontypes.DecompressorFunc) io.Writer,
isConfig bool, toEncrypt bool, bar *progressBar, layerIndex int, emptyLayer bool,
reporter progressReporter,
) (types.BlobInfo, error) {
// The copying happens through a pipeline of connected io.Readers;
// that pipeline is built by updating stream.
Expand Down Expand Up @@ -84,16 +85,11 @@ func (ic *imageCopier) copyBlobFromStream(ctx context.Context, srcReader io.Read
return types.BlobInfo{}, err
}

// === Report progress using the ic.c.options.Progress channel, if required.
// === Report progress using the reporter, if required.
if ic.c.options.Progress != nil && ic.c.options.ProgressInterval > 0 {
progressReader := newProgressReader(
stream.reader,
ic.c.options.Progress,
ic.c.options.ProgressInterval,
srcInfo,
)
defer progressReader.reportDone()
stream.reader = progressReader
// Note: the reporter can be a no-op if the condition above evaluates
// false and in that case there's no reason to wrap the reader here.
stream.reader = newProgressReader(stream.reader, reporter)
}

// === Finally, send the layer stream to dest.
Expand Down
44 changes: 44 additions & 0 deletions image/copy/progress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package copy

import (
"context"
"io"
"math"
"time"

"go.podman.io/image/v5/internal/private"
"go.podman.io/image/v5/types"
)

// blobChunkAccessorProxy wraps a BlobChunkAccessor to update a *progressBar
// and optionally *progressReporter (if non-nil) with the number of received bytes.
type blobChunkAccessorProxy struct {
wrapped private.BlobChunkAccessor // The underlying BlobChunkAccessor
bar *progressBar // A progress bar updated with the number of bytes read so far
reporter progressReporter // A progress reporter updated with the number of bytes read so far
}

// GetBlobAt returns a sequential channel of readers that contain data for the requested
// blob chunks, and a channel that might get a single error value.
// The specified chunks must be not overlapping and sorted by their offset.
// The readers must be fully consumed, in the order they are returned, before blocking
// to read the next chunk.
// If the Length for the last chunk is set to math.MaxUint64, then it
// fully fetches the remaining data from the offset to the end of the blob.
func (s *blobChunkAccessorProxy) GetBlobAt(ctx context.Context, info types.BlobInfo, chunks []private.ImageSourceChunk) (chan io.ReadCloser, chan error, error) {
start := time.Now()
rc, errs, err := s.wrapped.GetBlobAt(ctx, info, chunks)
if err == nil {
total := int64(0)
for _, c := range chunks {
// do not update the progress bar if there is a chunk with unknown length.
if c.Length == math.MaxUint64 {
return rc, errs, err
}
total += int64(c.Length)
}
s.reporter.reportRead(uint64(total))
s.bar.EwmaIncrInt64(total, time.Since(start))
}
return rc, errs, err
}
35 changes: 0 additions & 35 deletions image/copy/progress_bars.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
package copy

import (
"context"
"fmt"
"io"
"math"
"time"

"github.com/vbauerster/mpb/v8"
"github.com/vbauerster/mpb/v8/decor"
"go.podman.io/image/v5/internal/private"
"go.podman.io/image/v5/types"
)

Expand Down Expand Up @@ -144,34 +140,3 @@ func (bar *progressBar) mark100PercentComplete() {
bar.SetTotal(-1, true) // total < 0 = set it to bar.Current(), report it; and mark the bar as complete.
}
}

// blobChunkAccessorProxy wraps a BlobChunkAccessor and updates a *progressBar
// with the number of received bytes.
type blobChunkAccessorProxy struct {
wrapped private.BlobChunkAccessor // The underlying BlobChunkAccessor
bar *progressBar // A progress bar updated with the number of bytes read so far
}

// GetBlobAt returns a sequential channel of readers that contain data for the requested
// blob chunks, and a channel that might get a single error value.
// The specified chunks must be not overlapping and sorted by their offset.
// The readers must be fully consumed, in the order they are returned, before blocking
// to read the next chunk.
// If the Length for the last chunk is set to math.MaxUint64, then it
// fully fetches the remaining data from the offset to the end of the blob.
func (s *blobChunkAccessorProxy) GetBlobAt(ctx context.Context, info types.BlobInfo, chunks []private.ImageSourceChunk) (chan io.ReadCloser, chan error, error) {
start := time.Now()
rc, errs, err := s.wrapped.GetBlobAt(ctx, info, chunks)
if err == nil {
total := int64(0)
for _, c := range chunks {
// do not update the progress bar if there is a chunk with unknown length.
if c.Length == math.MaxUint64 {
return rc, errs, err
}
total += int64(c.Length)
}
s.bar.EwmaIncrInt64(total, time.Since(start))
}
return rc, errs, err
}
139 changes: 92 additions & 47 deletions image/copy/progress_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,73 +7,118 @@ import (
"go.podman.io/image/v5/types"
)

// progressReader is a reader that reports its progress to a types.ProgressProperties channel on an interval.
type progressReader struct {
source io.Reader
channel chan<- types.ProgressProperties
interval time.Duration
artifact types.BlobInfo
lastUpdate time.Time
offset uint64
offsetUpdate uint64
// progressReporter is an interface for reporting progress about a single blob.
type progressReporter interface {
reportRead(bytesRead uint64)
reportDone()
reset()
}

// newProgressReader creates a new progress reader for:
// `source`: The source when internally reading bytes
// `channel`: The reporter channel to which the progress will be sent
// `interval`: The update interval to indicate how often the progress should update
// `artifact`: The blob metadata which is currently being progressed
func newProgressReader(
source io.Reader,
// noopProgressReporter is a no-op implementation of progressReporter.
type noopProgressReporter struct{}

func (r *noopProgressReporter) reportRead(uint64) {}
func (r *noopProgressReporter) reportDone() {}
func (r *noopProgressReporter) reset() {}

// channelProgressReporter reports progress about a single blob to a
// types.ProgressProperties channel and supports re-starting from zero
// without reporting the progress through the channel unless
// it's higher than the offset reached before the restart to
// avoid confusing behavior in consumers of the events
// (skipping back).
type channelProgressReporter struct {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure, is it ok not worry about thread-safety here?
There was no synchronization in progressReader before my changes.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes; we have one goroutine per blob, so that naturally synchronizes the per-blob operations; and io.Reader is generally not thread-safe, so this is not making anything worse.

channel chan<- types.ProgressProperties // The reporter channel to which the progress will be sent
interval time.Duration // The update interval to indicate how often the progress should update
artifact types.BlobInfo // The blob metadata which is currently being progressed
lastUpdate time.Time // The last time a progress channel event was sent
offset uint64 // The currently downloaded size in bytes
maxReportedOffset uint64 // The high-water mark for offset already sent to the channel
}

// newChannelProgressReporter creates a new progress reporter
// and immediately reports a new artifact event.
func newChannelProgressReporter(
channel chan<- types.ProgressProperties,
interval time.Duration,
artifact types.BlobInfo,
) *progressReader {
// The progress reader constructor informs the progress channel
// that a new artifact will be read
) progressReporter {
channel <- types.ProgressProperties{
Event: types.ProgressEventNewArtifact,
Artifact: artifact,
}
return &progressReader{
source: source,
channel: channel,
interval: interval,
artifact: artifact,
lastUpdate: time.Now(),
offset: 0,
offsetUpdate: 0,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Absolutely non-blocking: I think keeping explicit initializers here is a tiny bit better because we do rely on having these values exactly.)

return &channelProgressReporter{
channel: channel,
interval: interval,
artifact: artifact,
lastUpdate: time.Now(),
offset: 0,
maxReportedOffset: 0,
}
}

// reportDone indicates to the internal channel that the progress has been
// finished
func (r *progressReader) reportDone() {
r.channel <- types.ProgressProperties{
Event: types.ProgressEventDone,
Artifact: r.artifact,
Offset: r.offset,
OffsetUpdate: r.offsetUpdate,
}
// reset resets the reporter's progress.
//
// It's meant to be used on error when
// the processing has to be re-started
// (e.g. ErrFallbackToOrdinaryLayerDownload).
func (r *channelProgressReporter) reset() {
r.offset = 0
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this situation, we don’t want the GUI progress bars to decrease; just note that the current offset is 0 but remember that we reported N, and then report no progress until we reach N again.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially misunderstood the problem (I was thinking of the progress bars here). It should behave like you describe.

}

// Read continuously reads bytes into the progress reader and reports the
// status via the internal channel
func (r *progressReader) Read(p []byte) (int, error) {
n, err := r.source.Read(p)
r.offset += uint64(n)
r.offsetUpdate += uint64(n)

// Fire the progress reader in the provided interval
if time.Since(r.lastUpdate) > r.interval {
// reportRead reports progress with the number of `bytesRead`
// while keeping track of a current high-water mark in case
// of reset(). It never skips back below the already reported
// offset and does not report the progress unless
// the configured `interval` elapses.
func (r *channelProgressReporter) reportRead(bytesRead uint64) {
r.offset += bytesRead
if r.offset > r.maxReportedOffset && time.Since(r.lastUpdate) > r.interval {
r.channel <- types.ProgressProperties{
Event: types.ProgressEventRead,
Artifact: r.artifact,
Offset: r.offset,
OffsetUpdate: r.offsetUpdate,
OffsetUpdate: r.offset - r.maxReportedOffset,
}
r.maxReportedOffset = r.offset
r.lastUpdate = time.Now()
r.offsetUpdate = 0
}
}

// reportDone reports successful completion.
func (r *channelProgressReporter) reportDone() {
offset := max(r.offset, r.maxReportedOffset)
r.channel <- types.ProgressProperties{
Event: types.ProgressEventDone,
Artifact: r.artifact,
Offset: offset,
OffsetUpdate: offset - r.maxReportedOffset,
}
}

// progressReader extends a wrapped io.Reader
// with additional reporting of its progress.
type progressReader struct {
source io.Reader
progressReporter
}

// newProgressReader creates a new progress reader that wraps source
// and reports progress through the given reporter.
func newProgressReader(
source io.Reader,
reporter progressReporter,
) *progressReader {
return &progressReader{
source: source,
progressReporter: reporter,
}
}

// Read continuously reads bytes into the progress reader and reports the
// status via the internal channel.
func (r *progressReader) Read(p []byte) (int, error) {
n, err := r.source.Read(p)
r.reportRead(uint64(n))
return n, err
}
Loading
Loading