-
Notifications
You must be signed in to change notification settings - Fork 97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Copy progress - feature request #839
Comments
How about a |
What does it look like? How is it better than a channel or a func? Maybe a practical example would help me understand it? |
Something like https://pkg.go.dev/oras.land/oras@v1.2.0/cmd/oras/internal/display/status/progress#Manager type Manager interface {
Add() (Status, error)
Close() error
}
type Status chan *status
// status is used as message to update progress view.
type status struct {
done bool // done is true when the end time is set
prompt string
descriptor ocispec.Descriptor
offset int64
total humanize.Bytes
speedWindow *speedWindow
startTime time.Time
endTime time.Time
mark spinner
lock sync.Mutex
} The Maybe we can improve a bit by separating them. /cc @qweeah |
I truly don't remember that part. 😁 I could see having an What is the If we want to add progress to oras CLI, it becomes a consuming party, I think I would handle it as a separate addition in a separate PR. |
I thought about this a bit more over the last few days. I still am having a hard time seeing the benefit of a := func(msg oras.Update) {
fmt.Printf("copied %d bytes out of %d total for %s\n", msg.Copied, msg.Descriptor.Size, msg.Descriptor.Digest)
}
opts := oras.CopyOptions{
ProgressHandler: f
}
desc, err := oras.Copy(ctx, src, tagName, dst, tagName, opts) With each copy of data, it just calls I have been thinking a bit more about the whole "how do we call the function without blocking". I had suggested earlier:
but I like this less. I am worried about a proliferation of goroutines. If we have frequent updates, that could get out of control. Here is a possible alternative.
This is less of an issue if we pass a channel in the options, instead of a function, but I do think the option is cleaner. Thoughts @shizhMSFT ? |
I would like to have a more flexible interface and have some utility functions for easy setup. Let me try to refactor the existing |
Good point there @shizhMSFT . I am trying hard to keep the abstract interface (is that redundant?) that goes to |
Here's a solid PoC on how the File package track
import (
"io"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
// State represents the state of a descriptor.
type State int
const (
StateUnknown State = iota
StateInitialized
StateTransmitting
StateTransmitted
StateExists
StateSkipped
StateMounted
)
// Status represents the status of a descriptor.
type Status struct {
// State represents the state of the descriptor.
State State
// Offset represents the current offset of the descriptor.
// Offset is discarded if set to a negative value.
Offset int64
}
// Tracker updates the status of a descriptor.
type Tracker interface {
io.Closer
// Update updates the status of the descriptor.
Update(status Status) error
// Fail marks the descriptor as failed.
Fail(err error) error
}
// Manager tracks the progress of multiple descriptors.
type Manager interface {
io.Closer
// Track starts tracking the progress of a descriptor.
Track(desc ocispec.Descriptor) (Tracker, error)
}
// Record adds the progress of a descriptor as a single entry.
func Record(m Manager, desc ocispec.Descriptor, status Status) error {
tracker, err := m.Track(desc)
if err != nil {
return err
}
err = tracker.Update(status)
if err != nil {
return err
}
return tracker.Close()
} File package track
import "io"
// ReadTracker tracks the transmission based on the read operation.
type ReadTracker struct {
base io.Reader
tracker Tracker
offset int64
}
// NewReadTracker attaches a tracker to a reader.
func NewReadTracker(track Tracker, r io.Reader) *ReadTracker {
return &ReadTracker{
base: r,
tracker: track,
}
}
// Read reads from the base reader and updates the status.
func (rt *ReadTracker) Read(p []byte) (n int, err error) {
n, err = rt.base.Read(p)
rt.offset += int64(n)
_ = rt.tracker.Update(Status{
State: StateTransmitting,
Offset: rt.offset,
})
if err != nil && err != io.EOF {
_ = rt.tracker.Fail(err)
}
return n, err
}
// Close closes the tracker.
func (rt *ReadTracker) Close() error {
return rt.tracker.Close()
}
// Start starts tracking the transmission.
func (rt *ReadTracker) Start() error {
return rt.tracker.Update(Status{
State: StateInitialized,
Offset: -1,
})
}
// Done marks the transmission as complete.
// Done should be called after the transmission is complete.
// Note: Reading all content from the reader does not imply the transmission is
// complete.
func (rt *ReadTracker) Done() error {
return rt.tracker.Update(Status{
State: StateTransmitted,
Offset: -1,
})
} Please refer to oras-project/oras#1524 to understand how above implementation can take effect in the real code. |
Here is an example using functions. package main
import (
"context"
"fmt"
"io"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2/content"
"oras.land/oras-go/v2/registry/remote"
"oras.land/oras/internal/experimental/track"
)
type ProgressFunc func(status track.Status, err error) error
func (pf ProgressFunc) Update(status track.Status) error {
return pf(status, nil)
}
func (pf ProgressFunc) Fail(err error) error {
return pf(track.Status{}, err)
}
func (pf ProgressFunc) Close() error {
return nil
}
type DescriptorProgressFunc func(desc ocispec.Descriptor, status track.Status, err error) error
func (dpf DescriptorProgressFunc) Track(desc ocispec.Descriptor) (ProgressFunc, error) {
return func(status track.Status, err error) error {
return dpf(desc, status, err)
}, nil
}
func (dpf DescriptorProgressFunc) Close() error {
return nil
}
func main() {
// Create a new progress manager.
prompt := map[track.State]string{
track.StateInitialized: "Pending",
track.StateTransmitting: "Pulling",
track.StateTransmitted: "Pulled ",
}
manager := DescriptorProgressFunc(func(desc ocispec.Descriptor, status track.Status, err error) error {
if err != nil {
fmt.Println("Error ", desc.Digest, err)
return err
}
if status.Offset < 0 {
fmt.Println(prompt[status.State], desc.Digest)
} else {
fmt.Printf("%s %s %6.2f%%\n", prompt[status.State], desc.Digest, float64(status.Offset)/float64(desc.Size)*100)
}
return nil
})
// Pull a manifest
repo, err := remote.NewRepository("ghcr.io/oras-project/oras")
if err != nil {
panic(err)
}
ctx := context.Background()
desc, err := repo.Resolve(ctx, "v1.2.0")
if err != nil {
panic(err)
}
tracker, _ := manager.Track(desc)
defer tracker.Close()
tracker.Update(track.Status{
State: track.StateInitialized,
})
r, err := repo.Fetch(ctx, desc)
if err != nil {
tracker.Fail(err)
return
}
defer r.Close()
rt := track.NewReadTracker(tracker, r)
defer rt.Close()
vr := content.NewVerifyReader(rt, desc)
_, err = io.ReadAll(vr) // io.ReadAll uses a smaller buffer
if err != nil {
tracker.Fail(err)
return
}
err = vr.Verify()
if err != nil {
tracker.Fail(err)
return
}
rt.Done()
} It outputs $ go run internal/experimental/example
Pending sha256:0087224dd0decc354b5b0689068fbbc40cd5dc3dbf65fcb3868dfbd363dc790b 0.00%
Pulling sha256:0087224dd0decc354b5b0689068fbbc40cd5dc3dbf65fcb3868dfbd363dc790b 13.15%
Pulling sha256:0087224dd0decc354b5b0689068fbbc40cd5dc3dbf65fcb3868dfbd363dc790b 23.02%
Pulling sha256:0087224dd0decc354b5b0689068fbbc40cd5dc3dbf65fcb3868dfbd363dc790b 36.17%
Pulling sha256:0087224dd0decc354b5b0689068fbbc40cd5dc3dbf65fcb3868dfbd363dc790b 52.61%
Pulling sha256:0087224dd0decc354b5b0689068fbbc40cd5dc3dbf65fcb3868dfbd363dc790b 78.91%
Pulling sha256:0087224dd0decc354b5b0689068fbbc40cd5dc3dbf65fcb3868dfbd363dc790b 100.00%
Pulling sha256:0087224dd0decc354b5b0689068fbbc40cd5dc3dbf65fcb3868dfbd363dc790b 100.00%
Pulled sha256:0087224dd0decc354b5b0689068fbbc40cd5dc3dbf65fcb3868dfbd363dc790b |
Currently, this model only covers descriptors and not references. |
@shizhMSFT yes, this is pretty detailed. It is more advanced - but also more complex - than I had in mind. From the What I think you have constructed is a more advanced thing in a few respects:
I assume (please correct me) that your thought was something like: opts := oras.CopyOptions {
// other stuff
ProgressTracker: manager, // some implementation of manager
}
oras.Copy(ctx, src, srcRef, dst , dstRef , opts ) I think that is more complex than needs be. A manager is useful, but is not the only way to do it. For example, I might have a simpler implementation, or just need progress for one descriptor, etc. That is why I was thinking the simple method above: f := func(msg oras.Update) {
fmt.Printf("copied %d bytes out of %d total for %s\n", msg.Copied, msg.Descriptor.Size, msg.Descriptor.Digest)
}
opts := oras.CopyOptions{
ProgressHandler: f
}
desc, err := oras.Copy(ctx, src, tagName, dst, tagName, opts) Now, you might actually want a manager. In which case, the update function itself might look like: type ProgressHandler interface {
Update func(msg oras.Update)
}
// this can be the actual manager you had above; implements ProgressHandler interface
type manager struct {
}
func (m *manager) Update(msg oras.Update) {
// check the descriptor, if needed, create a tracker
}
manager := NewManager()
opts := oras.CopyOptions{
ProgressHandler: manager,
}
desc, err := oras.Copy(ctx, src, tagName, dst, tagName, opts) In the philosophy of "batteries optional but included", we can include a manager implementation just like you have above, on the assumption that many will want it. But keep the |
Heh, crossed wires. I am reading your latest comments now. |
Looking at your example, you didn't plumb it into That is a lot of extra overhead to do when calling |
Here is an example if you don't want a manager. package main
import (
"context"
"fmt"
"io"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2/content"
"oras.land/oras-go/v2/registry/remote"
"oras.land/oras/internal/experimental/track"
)
type ProgressFunc func(status track.Status, err error) error
func (pf ProgressFunc) Update(status track.Status) error {
return pf(status, nil)
}
func (pf ProgressFunc) Fail(err error) error {
return pf(track.Status{}, err)
}
func (pf ProgressFunc) Close() error {
return nil
}
func main() {
// Pull a manifest
repo, err := remote.NewRepository("ghcr.io/oras-project/oras")
if err != nil {
panic(err)
}
ctx := context.Background()
desc, err := repo.Resolve(ctx, "v1.2.0")
if err != nil {
panic(err)
}
// Create a new progress tracker.
prompt := map[track.State]string{
track.StateInitialized: "Pending",
track.StateTransmitting: "Pulling",
track.StateTransmitted: "Pulled ",
}
tracker := ProgressFunc(func(status track.Status, err error) error {
if err != nil {
fmt.Println("Error ", desc.Digest, err)
return err
}
if status.Offset < 0 {
fmt.Println(prompt[status.State], desc.Digest)
} else {
fmt.Printf("%s %s %6.2f%%\n", prompt[status.State], desc.Digest, float64(status.Offset)/float64(desc.Size)*100)
}
return nil
})
tracker.Update(track.Status{
State: track.StateInitialized,
})
r, err := repo.Fetch(ctx, desc)
if err != nil {
tracker.Fail(err)
return
}
defer r.Close()
rt := track.NewReadTracker(tracker, r)
defer rt.Close()
vr := content.NewVerifyReader(rt, desc)
_, err = io.ReadAll(vr) // io.ReadAll uses a smaller buffer
if err != nil {
tracker.Fail(err)
return
}
err = vr.Verify()
if err != nil {
tracker.Fail(err)
return
}
rt.Done()
} |
A manager is required since |
If func(desc ocispec.Descriptor, status track.Status, err error) error {
fmt.Printf("copied %d bytes out of %d total for %s\n", status.Offset, desc.Size, desc.Digest)
} although this simple function does not consider error handling. |
So.. the complete example is f := func(desc ocispec.Descriptor, status track.Status, err error) error {
fmt.Printf("copied %d bytes out of %d total for %s\n", status.Offset, desc.Size, desc.Digest)
}
opts := oras.CopyOptions{
ProgressManager: DescriptorProgressFunc(f),
}
desc, err := oras.Copy(ctx, src, tagName, dst, tagName, opts) |
I don't think that changes anything. If |
Hold on a second. You are not decorating the desc, you are decorating the reader? r, err := repo.Fetch(ctx, desc)
if err != nil {
tracker.Fail(err)
return
}
defer r.Close()
rt := track.NewReadTracker(tracker, r)
defer rt.Close()
vr := content.NewVerifyReader(rt, desc)
_, err = io.ReadAll(vr) // io.ReadAll uses a smaller buffer |
So all you are doing is wrapping a |
Yes, I don't think there is other way to get the status of a read operation (e.g. |
That makes a lot of sense. It is the canonical way of doing things like that in go. I still don't get how we would do that with |
Exactly. |
I think the name "manager" threw me off. It is a tracker factory, that lets copy say, "I am about to start working on desc, give me a tracker for it, and if I get none (no factory or nil tracker returned), I will just process the blobs as I want". OK, it is interesting. Going to absorb it a bit. The trick is to get the documentation right. What then are |
They are just utility functions like http.HandlerFunc for your simple cases for demo purposes. |
What purpose do they serve? If I understood correctly
|
For simple scenarios like yours, crafting a custom type and implementing those methods might be time consuming. Those utilities help you to covert a function to a handler meeting the interfaces. This pattern is common in the golang world. If you take a look at http.HandlerFunc, you can observe that |
The analogy to My perspective always bounces between engineer who needs to build something, and user who needs to consume it (product). I will take a stab at writing the brief form of docs here. Tell me if this describes what you are doing. -----BEGIN----- At the lowest level, each blob when being read is fetched via r, err := repo.Fetch(ctx, desc) // returns an io.ReadCloser
rc := myReadCloserTrackerFunc(r) // also returns an io.ReadCloser This is the simplest way to track progress for a specific blob. Whether you send updates to an observability system or stderr, how often you do it, whether based on every buffer copy, every 1MB, or every 3 seconds, is entirely up to how you build your wrapper function, in our example At a higher level, but not required, just to wrap a single type Tracker interface {
io.Closer
Update(status Status) error
Fail(err error) error
ReadCloser(r io.ReadCloser) io.ReadCloser
} You can use a tracker := ProgressFunc(func(status track.Status, err error) error {
if err != nil {
fmt.Println("Error ", desc.Digest, err)
return err
}
if status.Offset < 0 {
fmt.Println(prompt[status.State], desc.Digest)
} else {
fmt.Printf("%s %s %6.2f%%\n", prompt[status.State], desc.Digest, float64(status.Offset)/float64(desc.Size)*100)
}
return nil
}) You then can generate your tracker := ProgressFunc() // etc
r, err := repo.Fetch(ctx, desc) // returns an io.ReadCloser
rc := tracker.ReadCloser(r) If you are copying an entire artifact by tag, which likely involves multiple descriptors, you can track it by providing a manager, or factory, that generates trackers for each descriptor's content that is copied. The manager is called by type Manager interface {
io.Closer
Track(desc ocispec.Descriptor) (Tracker, error)
} You then can pass the opts := oras.CopyOptions {
// other stuff
ProgressTracker: manager, // some implementation of manager
}
oras.Copy(ctx, src, srcRef, dst , dstRef , opts ) If the Of course, you are not required to create a manager := DescriptorProgressFunc(func(desc ocispec.Descriptor, status track.Status, err error) error {
if err != nil {
fmt.Println("Error ", desc.Digest, err)
return err
}
if status.Offset < 0 {
fmt.Println(prompt[status.State], desc.Digest)
} else {
fmt.Printf("%s %s %6.2f%%\n", prompt[status.State], desc.Digest, float64(status.Offset)/float64(desc.Size)*100)
}
return nil
}) Each update, whether a change in status or bytes copied, will call the function. Is that roughly it? If so, I have a few suggestions:
type DescriptorProgressFunc func(desc ocispec.Descriptor, status track.Status, err error) error and then we can call the generator something like Looking for feedback. |
Yes. Just notice that the ReadCloser(r io.ReadCloser) io.ReadCloser as we can provide a common function with performance optimized. E.g. implementing Your suggestion is mainly about naming. We can revise them in the future PRs (as I will prepare one). Besides, the name For the package name, I'm considering |
Sounds good. Feel free to use (and modify) my "docs" in the documentation with the PR. What assistance do you need? |
As discussed in this comment (with thanks to @shizhMSFT for shepherding it through).
Summary
oras.Copy()
Proposed design
The signature for oras.Copy includes oras.CopyOptions as the last parameter. If this were variadic, I would suggest adding another
WithProgressUpdate()
or similar, but we use a singleCopyOptions
, so I propose adding another property:CopyOptions.Progress
. The type depends on the design choice.There are two ways to do this:
CopyOptions.Progress func(Update)
. With each update,Copy()
(or its underlying functions) would call the passed function, passing it theUpdate
(see below).CopyOptions.Progress chan<- Update
. With each updateCopy()
would send anUpdate
to the channelIf
Progress
is nil, then this functionality is bypassed.Despite my linked PR using channels, I have no strong preference for channel over function.
Preventing Blocking
With both functions and channels, there is a concern that it might block. In principle, I think that if someone calls
Copy()
and passes it a blocking function or unbuffered channel, that is their issue. However, it can cause us headaches to support them.I would consider having each call that sends to the channel or calls
func()
to be in a separate short-lived goroutine, which calls fund or sends to the channel, wrapped in a timeout.Frequency of Update
My initial design is to rely on the underlying
io.CopyN()
. Whatever we use for that, we use for writing updates. However, that can overwhelm if the defaultio.Copy()
is used. If I recall correctly,io.Copy()
defaults to 32K. With a 100MB blob, that is ~3000 updates. That may or may not be good.However we control the update frequency, I think it should be byte-based, not time-based. I.e. "updates every x KB" instead of "updates every y seconds." That is more useful, and also easier to implement.
In terms of controlling the update frequency, the simplest way is
CopyOption.ProgresssFrequency uint
. If it is 0, stick to the default.An alternative is to have
CopyOption.Progress
be a struct with both the channel/func (whichever is chosen) and an update frequency property.A third method - and probably the simplest - is not to control it at all, but rather have it be part of
CopyOption.Progress
. Our callCopy()
calls that / sends to channel, and it buffers as often as it wants. This is the simplest, but is subject to making our "blocking control", i.e. goroutines, being overwhelmed.Open to ideas.
Structure of update message
The
oras.Update
should be simple and contain only 2 properties:The descriptor is important for 2 reasons:
Sample
Channel:
Func:
The text was updated successfully, but these errors were encountered: