From 75872371dd289ffda8ba9d660a0a2f0d70244b1d Mon Sep 17 00:00:00 2001 From: Gord Allott Date: Mon, 5 Feb 2024 10:30:20 +0000 Subject: [PATCH] add support for non connstring auth & workerpool size config --- cmd/export/main.go | 54 +++++++++++++++++++++++++++++++++------------- pkg/poll/poll.go | 10 ++++++--- 2 files changed, 46 insertions(+), 18 deletions(-) diff --git a/cmd/export/main.go b/cmd/export/main.go index 634a8f2..c034ac4 100644 --- a/cmd/export/main.go +++ b/cmd/export/main.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "os/signal" + "strings" "syscall" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" @@ -29,10 +30,19 @@ var Cmd = &cobra.Command{ Run: export, } -func auth(ctx context.Context, connectionString string, opts *azidentity.DefaultAzureCredentialOptions) (*azblob.Client, error) { +func authConnectionString(ctx context.Context, connectionString string) (*azblob.Client, error) { return azblob.NewClientFromConnectionString(connectionString, nil) } +func authDefualt(ctx context.Context, serviceURL string) (*azblob.Client, error) { + cred, err := azidentity.NewDefaultAzureCredential(nil) + if err != nil { + return nil, fmt.Errorf("error getting default azure credentials: %w", err) + } + + return azblob.NewClient(serviceURL, cred, nil) +} + var ( storageURL string connectionString string @@ -40,6 +50,8 @@ var ( axiomPersonalOrg string axiomURL string + + workerPoolSize int ) func init() { @@ -47,6 +59,7 @@ func init() { viper.AutomaticEnv() flags := Cmd.Flags() + flags.IntVar(&workerPoolSize, "worker-pool-size", 8, "the size of the worker pool used to transfer blobs to axiom (more workers == more blobs sent concurrently)") flags.StringVar(&axiomPersonalAPIKey, "axiom-personal-token", "", "your full axiom personal API key (or env AXIOM_PERSONAL_TOKEN)") if err := viper.BindPFlag("AXIOM_PERSONAL_TOKEN", flags.Lookup("axiom-personal-token")); err != nil { panic(err) @@ -66,32 +79,41 @@ func init() { } } -func export(cmd *cobra.Command, args []string) { +func ensureValid(ctx context.Context) (*azblob.Client, error) { axiomPersonalAPIKey = viper.Get("AXIOM_PERSONAL_TOKEN").(string) if axiomPersonalAPIKey == "" { - fmt.Fprintf(cmd.ErrOrStderr(), "axiom personal token is required") - return + return nil, fmt.Errorf("axiom personal token is required") + } + + storageURL = viper.Get("STORAGE_URL").(string) + if storageURL == "" { + return nil, fmt.Errorf("storage url is required") } connectionString = viper.Get("CONNECTION_STRING").(string) - if connectionString == "" { - fmt.Fprintf(cmd.ErrOrStderr(), "connection string is required") - return + if strings.TrimSpace(connectionString) != "" { + azclient, err := authConnectionString(ctx, connectionString) + if err != nil { + return nil, fmt.Errorf("can not auth with azure via connection-string: %w", err) + } + return azclient, nil } - storageURL = viper.Get("STORAGE_URL").(string) - if storageURL == "" { - fmt.Fprintf(cmd.ErrOrStderr(), "storage url is required") - return + azclient, err := authDefualt(ctx, storageURL) + if err != nil { + return nil, fmt.Errorf("can not auth with azure via default credentials: %w", err) } + return azclient, nil +} - cmd.Println("exporting from azure to axiom") +func export(cmd *cobra.Command, args []string) { ctx, cancel := context.WithCancel(cmd.Context()) defer cancel() - azclient, err := auth(ctx, connectionString, nil) + azclient, err := ensureValid(ctx) if err != nil { - fmt.Fprintf(cmd.ErrOrStderr(), "can not auth with azure: %s\n", err) + fmt.Fprintf(cmd.ErrOrStderr(), "error validating: %s", err) + return } axiclient, err := axiom.NewClient( @@ -103,7 +125,9 @@ func export(cmd *cobra.Command, args []string) { fmt.Fprintf(cmd.ErrOrStderr(), "can not create axiom client: %s\n", err) } - poller := poll.NewPoller() + cmd.Println("exporting from azure to axiom") + + poller := poll.NewPoller(workerPoolSize) if err := poller.Start(ctx, azclient, axiclient, monitor.NewStorageAccountMonitor(storageURL)); err != nil { fmt.Fprintf(cmd.ErrOrStderr(), "can not start poller: %s\n", err) } diff --git a/pkg/poll/poll.go b/pkg/poll/poll.go index 421d8ef..0f83f69 100644 --- a/pkg/poll/poll.go +++ b/pkg/poll/poll.go @@ -24,12 +24,16 @@ var logger = log.New(os.Stdout, "poll: ", log.LstdFlags) // queuing up all the blobs at once type Poll struct { + wpsize int + cancel context.CancelFunc stopped <-chan struct{} } -func NewPoller() *Poll { - return &Poll{} +func NewPoller(workerPoolSize int) *Poll { + return &Poll{ + wpsize: workerPoolSize, + } } func (p *Poll) Start(ctx context.Context, @@ -99,7 +103,7 @@ func (p *Poll) loop(ctx context.Context, containers[i], containers[j] = containers[j], containers[i] }) - wp := pond.New(8, 16) + wp := pond.New(p.wpsize, p.wpsize*2) for _, container := range containers { streamContainer(ctx, wp, azClient, axClient, container) }