Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy progress - feature request #839

Open
deitch opened this issue Oct 21, 2024 · 30 comments
Open

Copy progress - feature request #839

deitch opened this issue Oct 21, 2024 · 30 comments
Labels
enhancement New feature or request
Milestone

Comments

@deitch
Copy link
Contributor

deitch commented Oct 21, 2024

As discussed in this comment (with thanks to @shizhMSFT for shepherding it through).

Summary

  • Request: Add support for ongoing progress updates when running oras.Copy()
  • Context: if the downloaded blobs/manifests are large (and sometimes even if not), being able to know status updates, and that things are not stuck, is very useful
  • Boundaries: this should not be concerned with UI or other aspects, except, potentially, for oras CLI, and that is out of scope at this time. The entire interface should be one that lets updates be sent by code to some consumer, which handles UI (if any)

Proposed design

The signature for oras.Copy includes oras.CopyOptions as the last parameter. If this were variadic, I would suggest adding another WithProgressUpdate() or similar, but we use a single CopyOptions, so I propose adding another property: CopyOptions.Progress. The type depends on the design choice.

There are two ways to do this:

  • function: CopyOptions.Progress func(Update). With each update, Copy() (or its underlying functions) would call the passed function, passing it the Update (see below).
  • channel: CopyOptions.Progress chan<- Update. With each update Copy() would send an Update to the channel

If Progress is nil, then this functionality is bypassed.

Despite my linked PR using channels, I have no strong preference for channel over function.

Preventing Blocking

With both functions and channels, there is a concern that it might block. In principle, I think that if someone calls Copy() and passes it a blocking function or unbuffered channel, that is their issue. However, it can cause us headaches to support them.

I would consider having each call that sends to the channel or calls func() to be in a separate short-lived goroutine, which calls fund or sends to the channel, wrapped in a timeout.

Frequency of Update

My initial design is to rely on the underlying io.CopyN(). Whatever we use for that, we use for writing updates. However, that can overwhelm if the default io.Copy() is used. If I recall correctly, io.Copy() defaults to 32K. With a 100MB blob, that is ~3000 updates. That may or may not be good.

However we control the update frequency, I think it should be byte-based, not time-based. I.e. "updates every x KB" instead of "updates every y seconds." That is more useful, and also easier to implement.

In terms of controlling the update frequency, the simplest way is CopyOption.ProgresssFrequency uint. If it is 0, stick to the default.

An alternative is to have CopyOption.Progress be a struct with both the channel/func (whichever is chosen) and an update frequency property.

A third method - and probably the simplest - is not to control it at all, but rather have it be part of CopyOption.Progress. Our call Copy() calls that / sends to channel, and it buffers as often as it wants. This is the simplest, but is subject to making our "blocking control", i.e. goroutines, being overwhelmed.

Open to ideas.

Structure of update message

The oras.Update should be simple and contain only 2 properties:

type Update struct {
   Copied int64
   Descriptor descriptor
}

The descriptor is important for 2 reasons:

  1. To know the total expected size (which lets the consumer calculate the percentage complete)
  2. To know what downloading blob/manifest this is for. The func can be called / channel can receive messages multiple times, even in parallel. This lets the consumer know exactly what the update is for.

Sample

Channel:

ch := make(chan oras.Update, 1000)
opts := oras.CopyOptions{
    ProgressChannel: ch        
}
desc, err := oras.Copy(ctx, src, tagName, dst, tagName, opts)
go func(ch <- chan CopyUpdate) {
    for msg := range ch {
        fmt.Printf("copied %d bytes out of %d total for %s\n", msg.Copied, msg.Descriptor.Size, msg.Descriptor.Digest)
    }
}(ch)

Func:

f := func(msg oras.Update) {
        fmt.Printf("copied %d bytes out of %d total for %s\n", msg.Copied, msg.Descriptor.Size, msg.Descriptor.Digest)
}
opts := oras.CopyOptions{
    ProgressHandler: f
}
desc, err := oras.Copy(ctx, src, tagName, dst, tagName, opts)
@shizhMSFT
Copy link
Contributor

How about a type ProgressManager interface in CopyOptions?

@deitch
Copy link
Contributor Author

deitch commented Oct 21, 2024

How about a type ProgressManager interface in CopyOptions?

What does it look like? How is it better than a channel or a func?

Maybe a practical example would help me understand it?

@shizhMSFT
Copy link
Contributor

How about a type ProgressManager interface in CopyOptions?

What does it look like? How is it better than a channel or a func?

Maybe a practical example would help me understand it?

Something like https://pkg.go.dev/oras.land/oras@v1.2.0/cmd/oras/internal/display/status/progress#Manager

type Manager interface {
	Add() (Status, error)
	Close() error
}

type Status chan *status

// status is used as message to update progress view.
type status struct {
	done        bool // done is true when the end time is set
	prompt      string
	descriptor  ocispec.Descriptor
	offset      int64
	total       humanize.Bytes
	speedWindow *speedWindow

	startTime time.Time
	endTime   time.Time
	mark      spinner
	lock      sync.Mutex
}

The oras CLI combines the status update and view model in a single structure.

Maybe we can improve a bit by separating them. /cc @qweeah

@deitch
Copy link
Contributor Author

deitch commented Oct 21, 2024

I truly don't remember that part. 😁

I could see having an err error and a done bool property, not sure I would want much else.

What is the Manager used for? Is that just so that a UI-implementing side (like oras CLI) can manage it? If so, I would keep it separate. Simplest possible method (channel or func call) passed to Copy(), let the consuming side worry about what it does with it.

If we want to add progress to oras CLI, it becomes a consuming party, I think I would handle it as a separate addition in a separate PR.

@FeynmanZhou FeynmanZhou added the enhancement New feature or request label Oct 21, 2024
@deitch
Copy link
Contributor Author

deitch commented Oct 25, 2024

I thought about this a bit more over the last few days. I still am having a hard time seeing the benefit of a ProgressManager interface over just having a func or chan. I am trying to keep it as simple as possible, which to me looks like:

 := func(msg oras.Update) {
        fmt.Printf("copied %d bytes out of %d total for %s\n", msg.Copied, msg.Descriptor.Size, msg.Descriptor.Digest)
}
opts := oras.CopyOptions{
    ProgressHandler: f
}
desc, err := oras.Copy(ctx, src, tagName, dst, tagName, opts)

With each copy of data, it just calls f(update).

I have been thinking a bit more about the whole "how do we call the function without blocking". I had suggested earlier:

I would consider having each call that sends to the channel or calls func() to be in a separate short-lived goroutine, which calls fund or sends to the channel, wrapped in a timeout.

but I like this less. I am worried about a proliferation of goroutines. If we have frequent updates, that could get out of control.

Here is a possible alternative.

  1. When we call Copy(), if the handler is not nil, we spin up a chan Update and one goroutine.
  2. With each update, the main thread (from inside Copy()) sends an Update on the channel. The goroutine we spun up receives that update and then calls the handler function.
  3. Since we create the channel, we control the channel size, we can ensure it is well buffered. We also can have the main Copy() thread put select default on sending to the channel, so we do not have to worry about it.

This is less of an issue if we pass a channel in the options, instead of a function, but I do think the option is cleaner.

Thoughts @shizhMSFT ?

@shizhMSFT
Copy link
Contributor

I would like to have a more flexible interface and have some utility functions for easy setup. Let me try to refactor the existing oras CLI code a bit in the next few days or a week and see if we can have better code. Besides Copy(), I'm also holistically thinking about how we can cover the progress for things like Fetch(), and Push().

@deitch
Copy link
Contributor Author

deitch commented Oct 28, 2024

Good point there @shizhMSFT . I am trying hard to keep the abstract interface (is that redundant?) that goes to Copy() as simple as possible, while the implementation of the consumer - like oras CLI - is where other things go. I hadn't thought about Fetch() and Push(), but should go there as well.

@shizhMSFT
Copy link
Contributor

Here's a solid PoC on how the Manager and related code look like (see 2 files below).

File track/interface.go defines the interfaces and includes a simple utility function:

package track

import (
	"io"

	ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)

// State represents the state of a descriptor.
type State int

const (
	StateUnknown State = iota
	StateInitialized
	StateTransmitting
	StateTransmitted
	StateExists
	StateSkipped
	StateMounted
)

// Status represents the status of a descriptor.
type Status struct {
	// State represents the state of the descriptor.
	State State

	// Offset represents the current offset of the descriptor.
	// Offset is discarded if set to a negative value.
	Offset int64
}

// Tracker updates the status of a descriptor.
type Tracker interface {
	io.Closer

	// Update updates the status of the descriptor.
	Update(status Status) error

	// Fail marks the descriptor as failed.
	Fail(err error) error
}

// Manager tracks the progress of multiple descriptors.
type Manager interface {
	io.Closer

	// Track starts tracking the progress of a descriptor.
	Track(desc ocispec.Descriptor) (Tracker, error)
}

// Record adds the progress of a descriptor as a single entry.
func Record(m Manager, desc ocispec.Descriptor, status Status) error {
	tracker, err := m.Track(desc)
	if err != nil {
		return err
	}
	err = tracker.Update(status)
	if err != nil {
		return err
	}
	return tracker.Close()
}

File track/reader.go implements how a Tracker can hook a Reader:

package track

import "io"

// ReadTracker tracks the transmission based on the read operation.
type ReadTracker struct {
	base    io.Reader
	tracker Tracker
	offset  int64
}

// NewReadTracker attaches a tracker to a reader.
func NewReadTracker(track Tracker, r io.Reader) *ReadTracker {
	return &ReadTracker{
		base:    r,
		tracker: track,
	}
}

// Read reads from the base reader and updates the status.
func (rt *ReadTracker) Read(p []byte) (n int, err error) {
	n, err = rt.base.Read(p)
	rt.offset += int64(n)
	_ = rt.tracker.Update(Status{
		State:  StateTransmitting,
		Offset: rt.offset,
	})
	if err != nil && err != io.EOF {
		_ = rt.tracker.Fail(err)
	}
	return n, err
}

// Close closes the tracker.
func (rt *ReadTracker) Close() error {
	return rt.tracker.Close()
}

// Start starts tracking the transmission.
func (rt *ReadTracker) Start() error {
	return rt.tracker.Update(Status{
		State:  StateInitialized,
		Offset: -1,
	})
}

// Done marks the transmission as complete.
// Done should be called after the transmission is complete.
// Note: Reading all content from the reader does not imply the transmission is
// complete.
func (rt *ReadTracker) Done() error {
	return rt.tracker.Update(Status{
		State:  StateTransmitted,
		Offset: -1,
	})
}

Please refer to oras-project/oras#1524 to understand how above implementation can take effect in the real code.

@shizhMSFT
Copy link
Contributor

shizhMSFT commented Nov 1, 2024

Here is an example using functions.

package main

import (
	"context"
	"fmt"
	"io"

	ocispec "github.com/opencontainers/image-spec/specs-go/v1"

	"oras.land/oras-go/v2/content"
	"oras.land/oras-go/v2/registry/remote"
	"oras.land/oras/internal/experimental/track"
)

type ProgressFunc func(status track.Status, err error) error

func (pf ProgressFunc) Update(status track.Status) error {
	return pf(status, nil)
}

func (pf ProgressFunc) Fail(err error) error {
	return pf(track.Status{}, err)
}

func (pf ProgressFunc) Close() error {
	return nil
}

type DescriptorProgressFunc func(desc ocispec.Descriptor, status track.Status, err error) error

func (dpf DescriptorProgressFunc) Track(desc ocispec.Descriptor) (ProgressFunc, error) {
	return func(status track.Status, err error) error {
		return dpf(desc, status, err)
	}, nil
}

func (dpf DescriptorProgressFunc) Close() error {
	return nil
}

func main() {
	// Create a new progress manager.
	prompt := map[track.State]string{
		track.StateInitialized:  "Pending",
		track.StateTransmitting: "Pulling",
		track.StateTransmitted:  "Pulled ",
	}
	manager := DescriptorProgressFunc(func(desc ocispec.Descriptor, status track.Status, err error) error {
		if err != nil {
			fmt.Println("Error   ", desc.Digest, err)
			return err
		}
		if status.Offset < 0 {
			fmt.Println(prompt[status.State], desc.Digest)
		} else {
			fmt.Printf("%s %s %6.2f%%\n", prompt[status.State], desc.Digest, float64(status.Offset)/float64(desc.Size)*100)
		}
		return nil
	})

	// Pull a manifest
	repo, err := remote.NewRepository("ghcr.io/oras-project/oras")
	if err != nil {
		panic(err)
	}

	ctx := context.Background()
	desc, err := repo.Resolve(ctx, "v1.2.0")
	if err != nil {
		panic(err)
	}
	tracker, _ := manager.Track(desc)
	defer tracker.Close()
	tracker.Update(track.Status{
		State: track.StateInitialized,
	})

	r, err := repo.Fetch(ctx, desc)
	if err != nil {
		tracker.Fail(err)
		return
	}
	defer r.Close()
	rt := track.NewReadTracker(tracker, r)
	defer rt.Close()
	vr := content.NewVerifyReader(rt, desc)
	_, err = io.ReadAll(vr) // io.ReadAll uses a smaller buffer
	if err != nil {
		tracker.Fail(err)
		return
	}
	err = vr.Verify()
	if err != nil {
		tracker.Fail(err)
		return
	}
	rt.Done()
}

It outputs

$ go run internal/experimental/example
Pending sha256:0087224dd0decc354b5b0689068fbbc40cd5dc3dbf65fcb3868dfbd363dc790b   0.00%
Pulling sha256:0087224dd0decc354b5b0689068fbbc40cd5dc3dbf65fcb3868dfbd363dc790b  13.15%
Pulling sha256:0087224dd0decc354b5b0689068fbbc40cd5dc3dbf65fcb3868dfbd363dc790b  23.02%
Pulling sha256:0087224dd0decc354b5b0689068fbbc40cd5dc3dbf65fcb3868dfbd363dc790b  36.17%
Pulling sha256:0087224dd0decc354b5b0689068fbbc40cd5dc3dbf65fcb3868dfbd363dc790b  52.61%
Pulling sha256:0087224dd0decc354b5b0689068fbbc40cd5dc3dbf65fcb3868dfbd363dc790b  78.91%
Pulling sha256:0087224dd0decc354b5b0689068fbbc40cd5dc3dbf65fcb3868dfbd363dc790b 100.00%
Pulling sha256:0087224dd0decc354b5b0689068fbbc40cd5dc3dbf65fcb3868dfbd363dc790b 100.00%
Pulled  sha256:0087224dd0decc354b5b0689068fbbc40cd5dc3dbf65fcb3868dfbd363dc790b

@shizhMSFT
Copy link
Contributor

Currently, this model only covers descriptors and not references.

@deitch
Copy link
Contributor Author

deitch commented Nov 1, 2024

@shizhMSFT yes, this is pretty detailed. It is more advanced - but also more complex - than I had in mind.

From the Copy() perspective, all I would think it wants is a simple way to send an update: "For descriptor desc, be informed that I have copied x bytes out of y expected total, and my latest state update is Z (error, completed, in progress)." I would think we would want it to be as simple as possible for Copy() (and Fetch(), etc.; any core library function).

What I think you have constructed is a more advanced thing in a few respects:

  1. there is a manager that can create a tracker for each desc, etc.
  2. there are many more states, and more complex interactions for those updates

I assume (please correct me) that your thought was something like:

opts := oras.CopyOptions {
  // other stuff
  ProgressTracker: manager, // some implementation of manager
}
oras.Copy(ctx, src, srcRef, dst , dstRef , opts )

I think that is more complex than needs be. A manager is useful, but is not the only way to do it. For example, I might have a simpler implementation, or just need progress for one descriptor, etc.

That is why I was thinking the simple method above:

f := func(msg oras.Update) {
        fmt.Printf("copied %d bytes out of %d total for %s\n", msg.Copied, msg.Descriptor.Size, msg.Descriptor.Digest)
}
opts := oras.CopyOptions{
    ProgressHandler: f
}
desc, err := oras.Copy(ctx, src, tagName, dst, tagName, opts)

Now, you might actually want a manager. In which case, the update function itself might look like:

type ProgressHandler interface {
    Update func(msg oras.Update)
}

// this can be the actual manager you had above; implements ProgressHandler interface
type manager struct {
}
func (m *manager) Update(msg oras.Update) {
    // check the descriptor, if needed, create a tracker
}
manager := NewManager()
opts := oras.CopyOptions{
    ProgressHandler: manager,
}
desc, err := oras.Copy(ctx, src, tagName, dst, tagName, opts)

In the philosophy of "batteries optional but included", we can include a manager implementation just like you have above, on the assumption that many will want it. But keep the Copy() interaction really simple: I just send updates. How you manage them - discard, print to stdout, separate by desc or status or whatever - is beyond me, and I don't even care about the interaction.

@deitch
Copy link
Contributor Author

deitch commented Nov 1, 2024

Heh, crossed wires. I am reading your latest comments now.

@deitch
Copy link
Contributor Author

deitch commented Nov 1, 2024

Looking at your example, you didn't plumb it into Copy() or Fetch(), but you decorated the desc? What am I misunderstanding?

That is a lot of extra overhead to do when calling Fetch(). Wouldn't I want it to be a simple additional option to the call?

@shizhMSFT
Copy link
Contributor

shizhMSFT commented Nov 1, 2024

Here is an example if you don't want a manager.

package main

import (
	"context"
	"fmt"
	"io"

	ocispec "github.com/opencontainers/image-spec/specs-go/v1"

	"oras.land/oras-go/v2/content"
	"oras.land/oras-go/v2/registry/remote"
	"oras.land/oras/internal/experimental/track"
)

type ProgressFunc func(status track.Status, err error) error

func (pf ProgressFunc) Update(status track.Status) error {
	return pf(status, nil)
}

func (pf ProgressFunc) Fail(err error) error {
	return pf(track.Status{}, err)
}

func (pf ProgressFunc) Close() error {
	return nil
}

func main() {
	// Pull a manifest
	repo, err := remote.NewRepository("ghcr.io/oras-project/oras")
	if err != nil {
		panic(err)
	}

	ctx := context.Background()
	desc, err := repo.Resolve(ctx, "v1.2.0")
	if err != nil {
		panic(err)
	}

	// Create a new progress tracker.
	prompt := map[track.State]string{
		track.StateInitialized:  "Pending",
		track.StateTransmitting: "Pulling",
		track.StateTransmitted:  "Pulled ",
	}
	tracker := ProgressFunc(func(status track.Status, err error) error {
		if err != nil {
			fmt.Println("Error   ", desc.Digest, err)
			return err
		}
		if status.Offset < 0 {
			fmt.Println(prompt[status.State], desc.Digest)
		} else {
			fmt.Printf("%s %s %6.2f%%\n", prompt[status.State], desc.Digest, float64(status.Offset)/float64(desc.Size)*100)
		}
		return nil
	})
	tracker.Update(track.Status{
		State: track.StateInitialized,
	})

	r, err := repo.Fetch(ctx, desc)
	if err != nil {
		tracker.Fail(err)
		return
	}
	defer r.Close()
	rt := track.NewReadTracker(tracker, r)
	defer rt.Close()
	vr := content.NewVerifyReader(rt, desc)
	_, err = io.ReadAll(vr) // io.ReadAll uses a smaller buffer
	if err != nil {
		tracker.Fail(err)
		return
	}
	err = vr.Verify()
	if err != nil {
		tracker.Fail(err)
		return
	}
	rt.Done()
}

@shizhMSFT
Copy link
Contributor

A manager is required since Copy() deals with multiple descriptors but Fetch() and Push() only deal with one descriptor at a time.

@shizhMSFT
Copy link
Contributor

If ProgressFunc and DescriptorProgressFunc are pre-defined in oras-go, all you need is just a single function

func(desc ocispec.Descriptor, status track.Status, err error) error {
        fmt.Printf("copied %d bytes out of %d total for %s\n", status.Offset, desc.Size, desc.Digest)
}

although this simple function does not consider error handling.

@shizhMSFT
Copy link
Contributor

So.. the complete example is

f := func(desc ocispec.Descriptor, status track.Status, err error) error {
        fmt.Printf("copied %d bytes out of %d total for %s\n", status.Offset, desc.Size, desc.Digest)
}
opts := oras.CopyOptions{
    ProgressManager: DescriptorProgressFunc(f),
}
desc, err := oras.Copy(ctx, src, tagName, dst, tagName, opts)

@deitch
Copy link
Contributor Author

deitch commented Nov 1, 2024

A manager is required since Copy() deals with multiple descriptors but Fetch() and Push() only deal with one descriptor at a time.

I don't think that changes anything. If Update includes the descriptor, it includes all I need to know.

@deitch
Copy link
Contributor Author

deitch commented Nov 1, 2024

Hold on a second. You are not decorating the desc, you are decorating the reader?

	r, err := repo.Fetch(ctx, desc)
	if err != nil {
		tracker.Fail(err)
		return
	}
	defer r.Close()
	rt := track.NewReadTracker(tracker, r)
	defer rt.Close()
	vr := content.NewVerifyReader(rt, desc)
	_, err = io.ReadAll(vr) // io.ReadAll uses a smaller buffer

@deitch
Copy link
Contributor Author

deitch commented Nov 1, 2024

So all you are doing is wrapping a ReadCloser with a ReadCloser that has the "let me call some function to report updates once in a while" logic inside it? Chaining?

@shizhMSFT
Copy link
Contributor

Yes, I don't think there is other way to get the status of a read operation (e.g. resp.Body). It is worth noting that on receiving a status update does not imply you should render it. In other words, updates are call-based but rendering can be time-based.

@deitch
Copy link
Contributor Author

deitch commented Nov 1, 2024

That makes a lot of sense. It is the canonical way of doing things like that in go.

I still don't get how we would do that with Copy(), which, as you said, is multiple blobs. Ah, is that what the manager is for? It is a "tracker factory", where Copy() can get a tracker for each desc on which it is working, and then it can do the chaining you just showed for each ReadCloser?

@shizhMSFT
Copy link
Contributor

Ah, is that what the manager is for? It is a "tracker factory", where Copy() can get a tracker for each desc on which it is working, and then it can do the chaining you just showed for each ReadCloser?

Exactly.

@deitch
Copy link
Contributor Author

deitch commented Nov 1, 2024

I think the name "manager" threw me off. It is a tracker factory, that lets copy say, "I am about to start working on desc, give me a tracker for it, and if I get none (no factory or nil tracker returned), I will just process the blobs as I want".

OK, it is interesting. Going to absorb it a bit. The trick is to get the documentation right.

What then are DescriptorProgressFunc and ProgressFunc? Don't I just pass a Manager, which has one main func on it, Track(), which lets me start tracking a desc?

@shizhMSFT
Copy link
Contributor

What then are DescriptorProgressFunc and ProgressFunc?

They are just utility functions like http.HandlerFunc for your simple cases for demo purposes.

@deitch
Copy link
Contributor Author

deitch commented Nov 1, 2024

What purpose do they serve? If I understood correctly

  1. For Fetch, you don't pass anything. Since it returns a readcloser, just wrap it. We can provide some useful functions, like in your example, but nothing is needed.
  2. For Copy, you pass a factory, which generates a Tracker for each descriptor, which then is called.

@shizhMSFT
Copy link
Contributor

What purpose do they serve?

For simple scenarios like yours, crafting a custom type and implementing those methods might be time consuming. Those utilities help you to covert a function to a handler meeting the interfaces.

This pattern is common in the golang world. If you take a look at http.HandlerFunc, you can observe that http.HandlerFunc is a utility to convert a function into a http.Handler interface, which is convenient.

@deitch
Copy link
Contributor Author

deitch commented Nov 1, 2024

The analogy to http.HandlerFunc is a good one. Although in our case, it might be a bit too complex.

My perspective always bounces between engineer who needs to build something, and user who needs to consume it (product). I will take a stab at writing the brief form of docs here. Tell me if this describes what you are doing.

-----BEGIN-----
When transmitting blobs, whether individually via Fetch() or Push(), or as part of an entire image via Fetch(), you have the option to track progress of each blob being transmitted. oras library does not care what you do with the progress updates, only that it makes them available to you.

At the lowest level, each blob when being read is fetched via oras.Fetch(), which returns an io.ReadCloser. You can capture progress by wrapping the io.ReadCloser with a struct that returns an io.ReadCloser but reports on progress.

	r, err := repo.Fetch(ctx, desc) // returns an io.ReadCloser
        rc := myReadCloserTrackerFunc(r) // also returns an io.ReadCloser

This is the simplest way to track progress for a specific blob. Whether you send updates to an observability system or stderr, how often you do it, whether based on every buffer copy, every 1MB, or every 3 seconds, is entirely up to how you build your wrapper function, in our example myReadCloserTrackerFunc.

At a higher level, but not required, just to wrap a single ReadCloser, oras provides the Tracker interface.

type Tracker interface {
	io.Closer
	Update(status Status) error
	Fail(err error) error
        ReadCloser(r io.ReadCloser) io.ReadCloser
}

You can use a Tracker to generate a ReadCloser as well as keep track of several status updates. A Tracker can be generated via ProgressFunc(func), which returns a Tracker you can use. You pass it the function you expect to be called with each update

	tracker := ProgressFunc(func(status track.Status, err error) error {
		if err != nil {
			fmt.Println("Error   ", desc.Digest, err)
			return err
		}
		if status.Offset < 0 {
			fmt.Println(prompt[status.State], desc.Digest)
		} else {
			fmt.Printf("%s %s %6.2f%%\n", prompt[status.State], desc.Digest, float64(status.Offset)/float64(desc.Size)*100)
		}
		return nil
	})

You then can generate your io.ReadCloser to wrap the io.ReadCloser returned from Fetch():

tracker := ProgressFunc() // etc
r, err := repo.Fetch(ctx, desc) // returns an io.ReadCloser
rc := tracker.ReadCloser(r)

If you are copying an entire artifact by tag, which likely involves multiple descriptors, you can track it by providing a manager, or factory, that generates trackers for each descriptor's content that is copied. The manager is called by Copy() each time it encounters a new descriptor, and uses the returned Tracker to track progress for each blob's Fetch() or Push():

type Manager interface {
	io.Closer
	Track(desc ocispec.Descriptor) (Tracker, error)
}

You then can pass the Manager to Copy():

opts := oras.CopyOptions {
  // other stuff
  ProgressTracker: manager, // some implementation of manager
}
oras.Copy(ctx, src, srcRef, dst , dstRef , opts )

If the ProgressTracker property is nil, or if Track() returns nil for a given descriptor, then that descriptor is not tracked.

Of course, you are not required to create a Manager implementation. oras library provides a convenient one for you with oras.DescriptorProgressFunc(), which returns a manager or factory. The function is called with each update, whether of copied content or change in status. For example:

	manager := DescriptorProgressFunc(func(desc ocispec.Descriptor, status track.Status, err error) error {
		if err != nil {
			fmt.Println("Error   ", desc.Digest, err)
			return err
		}
		if status.Offset < 0 {
			fmt.Println(prompt[status.State], desc.Digest)
		} else {
			fmt.Printf("%s %s %6.2f%%\n", prompt[status.State], desc.Digest, float64(status.Offset)/float64(desc.Size)*100)
		}
		return nil
	})

Each update, whether a change in status or bytes copied, will call the function.
-----END-----

Is that roughly it? If so, I have a few suggestions:

  • Make the func passed to DescriptorProgressFunc and ProgressFunc identical. The only difference is that one passes the descriptor and one doesn't. Does it hurt to pass it? And then make it a type.
  • The name Manager is confusing. Why not just call it type ProgressTracker interface?
  • The convenient "manager generator" (DescriptorProgressFunc) and "tracker generator" (ProgressFunc) have somewhat confusing names. Isn't the func that gets passed to them a DescriptorProgressFunc?
type DescriptorProgressFunc func(desc ocispec.Descriptor, status track.Status, err error) error

and then we can call the generator something like NewProgressTracker and NewTracker or similar

Looking for feedback.

@shizhMSFT
Copy link
Contributor

Is that roughly it?

Yes. Just notice that the Tracker interface should not contain

ReadCloser(r io.ReadCloser) io.ReadCloser

as we can provide a common function with performance optimized. E.g. implementing io.WriteTo if the original r implements it.

Your suggestion is mainly about naming. We can revise them in the future PRs (as I will prepare one).

Besides, the name Manager is common in the golang world. E.g. github.com/containerd/containerd: content.Manager. For now, track.Manager manages track.Trackers.

For the package name, I'm considering track v.s. progress. If I cannot find out a scenario for general tracking, I'd like to rename the package to progress (i.e. progress.Manager and progress.Tracker).

@deitch
Copy link
Contributor Author

deitch commented Nov 4, 2024

Sounds good. Feel free to use (and modify) my "docs" in the documentation with the PR.

What assistance do you need?

@Wwwsylvia Wwwsylvia added this to the v2.6.0 milestone Nov 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
Status: No status
Development

No branches or pull requests

4 participants