diff --git a/README.md b/README.md index 33c8e9a..9143965 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,152 @@ -# Persevere – tool for reliably uploading huge files to S3 +# Persevere – a tool for reliably uploading huge files to S3 -> [!CAUTION] -> This project is in a very early development stage and not yet expected to be ready for production use. +With Persevere you can upload huge files to S3 without worrying about network interruptions or other issues. +Persevere will allow you to resume the upload where it was left off, even in the case of a system crash during upload. + +The contents of the file you upload are always streamed, which means the memory usage of Persevere is minimal, usually below 10 MB. +This makes it possible to upload files of any size supported by S3, even if they are larger than the available memory of your system. + +> [!IMPORTANT] +> This project is still in fairly early development. +> Although we have used it to upload files up to 3 TB in size reliably, there is a chance that there are bugs that could lead to corrupt objects in S3. +> +> For files where it is vital to you that the object that ends up in S3 is valid, consider one of these options: +> +> * Download the object again and verify its checksum versus the original locally. +> * Let S3 calculate a checksum after the object is uploaded through e.g. the AWS Console. +> +> (Please note that when S3 calculates the checksum it will copy the object onto itself, which might incur additional costs.) +> +> We are planning on adding automatic checksum calculation on upload, as well as per-part checksums, which takes this burden off of you. + +## Installation + +Currently, there are no pre-built binaries available. +Installation of Persevere requires checking out this repository and building it yourself. +You need to have [Rust](https://www.rust-lang.org) installed on your system. + +```sh +$ git clone https://github.com/takkt-ag/persevere.git +$ cd persevere +$ cargo build --release +``` + +This will create the binary in: + +* `target/release/persevere` on Unix-like systems +* `target\release\persevere.exe` on Windows + +## Usage + +Persevere is a command-line tool, so interactions with it happen from a terminal. +A normal workflow of using Persevere means invoking the `upload` command for the file you want to upload. + +Assume you have a very large file called `database.dump` that you want to upload to the S3 bucket `my-bucket` under the key `backups/database.dump`. +You can use Persevere as such to upload this file: + +```sh +persevere upload --s3-bucket my-bucket --s3-key backups/database.dump --file-to-upload database.dump --state-file database.dump.persevere-state +``` + +The actual name of the state-file does not matter, just make it something that makes sense to you! +Once you execute the command, the upload will start immediately, showing you the status of the upload as it progresses. + +If the upload is interrupted for any reason, you can resume it by running the `resume` command, providing the same state-file again: + +```sh +persevere resume --state-file database.dump.persevere-state +``` + +Should you, for any reason, want to abort the upload before it has finished, you can do so by running the `abort` command, again providing the same state-file: + +```sh +persevere abort --state-file database.dump.persevere-state +``` + +To see all available commands, run: + +```sh +persevere --help +``` + +If you want to see the help for a specific command, run: + +```sh +persevere --help +``` + +## AWS credentials and permissions + +An upload to S3 obviously requires some credentials and permissions to work. + +Persevere will automatically discover valid AWS credentials like most AWS SDKs. +This means you can provide environment variables such as `AWS_PROFILE` to select the profile you want to upload a file with, or provide the `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` directly. + +If you are running Persevere on an AWS resource that has an AWS role attached (like the instance profile of an EC2 instance, or the task-role of an ECS task), Persevere will automatically use the credentials of that role. + +Regardless of how the credentials are provided, the user or role must have the necessary permissions to upload to the S3 bucket and key you specify. +Only the `s3:PutObject` and `s3:AbortMultipartUpload` actions need to be allowed. + +A valid IAM policy can look like this: + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "s3:PutObject", + "s3:AbortMultipartUpload" + ], + "Resource": "arn:aws:s3:::my-bucket/backups/*" + } + ] +} +``` + +## Comparison to other tools + +There are many tools available that allow you to upload files to S3, although we have found none that: + +* Deal well with interruptions during the upload. +* Don't require a language runtime (like Python or Node.js) to be installed. + +Persevere is trying to fill specifically this gap, which means it is not a replacement for the various other tools, but rather an addition. + +If you are looking for other features, such as: + +* Downloads and uploads highly optimized for speed. +* Downloads and uploads of many files at once. +* Synchronization of files between local and S3. +* Management of S3 buckets and objects. + +You might want to look at other tools, such as: + +* The official [AWS CLI](https://aws.amazon.com/cli/). +* s3cmd: +* s4cmd: +* s5cmd: + + + (We do not explicitly endorse the use of these tools, they are just examples of tools that are available that might fit your needs better. + Make sure to evaluate them yourself to see if they fit your use-case.) + + +## Planned features + +Persevere is not intended to become a full-featured S3 client: it is meant to be a tool that allows you to upload huge files to S3, **reliably**. + +Still, there are some features that we believe are necessary to make Persevere a complete tool for this purpose: + +* Automatic checksum calculation on upload. +* Per-part checksums. + +Additionally, we think there might be features that could be useful to many users, enhancing the applicability of Persevere, without bloating it: + +* Uploading multiple parts in parallel to speed up uploads. + +If you are interested in contributing a feature that is not mentioned here, we suggest to reach out through an issue first to see if the feature is something we would like to see in Persevere. ## License diff --git a/src/main.rs b/src/main.rs index 96593f5..fc4fb70 100644 --- a/src/main.rs +++ b/src/main.rs @@ -70,26 +70,105 @@ use tracing::{ }; use tracing_subscriber::prelude::*; -#[derive(Debug, Clone, Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize)] struct State { s3_bucket: String, s3_key: String, file_to_upload: PathBuf, file_size_in_bytes: u64, part_size: u64, + number_of_parts: u64, upload_id: String, last_successful_part: u64, #[serde(with = "de::completed_parts")] completed_parts: Vec, } +impl State { + async fn from_file(file: impl AsRef) -> Result { + let file = file.as_ref().to_owned(); + + // serde_json does not support asynchronous readers, so we make sure to spawn the task away + // from the main thread. + tokio::task::spawn_blocking(|| { + serde_json::from_reader( + std::fs::File::open(file) + .context("Failed to open state file") + .into_unrecoverable()?, + ) + .context("Failed to deserialize state file") + .into_unrecoverable() + }) + .await + .expect("Failed to await synchronous read of state file") + } + + // NOTE: `self` is taken mutably here, even though it isn't required by the method itself. By + // requiring mutability, we guarantee that there is only ever one task that can write the + // state file at a time, ensuring the file is always in a consistent state that. + async fn write_to_file(&mut self, file: impl AsRef) -> Result<()> { + let file = file.as_ref().to_owned(); + + // serde_json does not support asynchronous writers, so we make sure to spawn the task such + // that it doesn't block the executor. + tokio::task::block_in_place(|| { + serde_json::to_writer( + std::fs::File::create(file) + .context("Failed to open state file") + .into_unrecoverable()?, + self, + ) + .context("Failed to serialize state file") + .into_unrecoverable() + }) + } +} + #[derive(Debug, Parser)] #[command(version)] enum Cli { /// Upload a file to S3. + /// + /// Persevere will take care of uploading the file in a manner that is resilient, such that + /// intermittent errors do not result in losing all progress on the upload, as well as + /// resumable, e.g. in case the system you are uploading crashed or there is a more persistent, + /// but still recoverable, error. + /// + /// This is achieved through a state-file which keeps track of the state of the upload. Resuming + /// an upload is done through the `resume` subcommand, by providing the same state-file again. + /// + /// You need the following AWS permissions for the S3-object ARN you are trying to upload to: + /// + /// * `s3:PutObject` + /// * `s3:AbortMultipartUpload` Upload(Upload), /// Resume the upload of a file to S3. + /// + /// You only have to provide the state-file of a previous invocation to `upload`, and Persevere + /// will resume your upload where it left off. + /// + /// You can not provide any other parameters to modify how the upload is handled, all choices + /// made when you started the upload have to remain the same. If you modify the state-file + /// manually, chances are you'll either have the upload fail outright, or you'll end up with a + /// corrupt object in S3 (and won't know that it is corrupt). + /// + /// You need the following AWS permissions for the S3-object ARN you are trying to upload to: + /// + /// * `s3:PutObject` + /// * `s3:AbortMultipartUpload` Resume(Resume), + /// Abort the upload of a file to S3. + /// + /// If you previously started an upload using the `upload` subcommand which has failed with a + /// recoverable error, but you no longer want to finish the upload you can invoke this + /// subcommand with the state-file. The multipart-upload with AWS will then be aborted (which + /// ensures the partial upload no longer creates any cost) and the state-file will be removed. + /// + /// You need the following AWS permissions for the S3-object ARN you are trying to upload to: + /// + /// * `s3:PutObject` + /// * `s3:AbortMultipartUpload` + Abort(Abort), } #[derive(Debug, Args)] @@ -100,16 +179,30 @@ struct Upload { /// The S3 key where to upload the file to. #[arg(long)] s3_key: String, - /// Path to the file to upload. + /// Path to the local file to upload to S3. #[arg(long)] file_to_upload: PathBuf, - /// Explicit part-size to use. + /// Explicit part-size, in bytes, to use. + /// + /// If not provided, Persevere will choose the smallest part-size possible by default, which is + /// either 5 MB (the minimum S3 requires) or the smallest each part can be to allow the file to + /// be uploaded within 10,000 parts (the maximum S3 allows). + /// + /// Smaller part-sizes make you lose less progress in case something fails, but it usually also + /// means that you might not achieve as much throughput as your network would allow. In cases + /// where you want to optimize for throughput, and don't care too much about losing progress + /// within an individual part, you can increase the part-size. + /// + /// The maximum part-size S3 supports is 5 GB. Persevere will inform you if the part-size you + /// have chosen is too small for either the file you are trying to upload, or smaller than AWS's + /// limit. It will also inform you if you have chosen a part-size that is too large and not + /// supported by S3. #[arg(long)] override_part_size: Option, /// Path to where the state-file will be saved. /// - /// The state-file is used to make resumable uploads possible. This file is only written if the - /// upload has failed. + /// The state-file is used to make resumable uploads possible. It will automatically be removed + /// if the upload finishes successfully. #[arg(long)] state_file: PathBuf, } @@ -118,6 +211,14 @@ impl Upload { async fn run(mut self) -> Result<()> { debug!("Running upload command: {:?}", self); + debug!("Verifying that the state-file doesn't exist yet. If it does, we don't allow the start of a new upload against the same file."); + if tokio::fs::try_exists(&self.state_file) + .await + .into_unrecoverable()? + { + bail!("The state-file already exists, and we don't allow starting a new upload against the same file. If you want to resume the upload, use the 'resume' command instead. If you want to start a new upload, please remove the state-file first, or use a different one."); + } + self.file_to_upload = self .file_to_upload .canonicalize() @@ -183,18 +284,19 @@ impl Upload { upload_id, self.s3_bucket, self.s3_key, ); - let state = State { + let mut state = State { s3_bucket: self.s3_bucket, s3_key: self.s3_key, file_to_upload: self.file_to_upload, file_size_in_bytes, part_size, + number_of_parts: file_size_in_bytes.div_ceil(part_size), upload_id, last_successful_part: 0, completed_parts: vec![], }; - match upload(&s3, &self.state_file, state.clone()).await { + match upload(&s3, &self.state_file, &mut state).await { Err(Error::Unrecoverable(err)) => { error!( "Unrecoverable failure during upload, aborting multipart upload: {}", @@ -219,7 +321,8 @@ impl Upload { struct Resume { /// Path to where the state-file of a previous upload. /// - /// This state-file is used to resume the upload in question. + /// This state-file is used to resume the upload in question. The state-file will automatically + /// be removed if the upload finishes successfully. #[arg(long)] state_file: PathBuf, } @@ -228,23 +331,7 @@ impl Resume { async fn run(&self) -> Result<()> { debug!("Running resume command: {:?}", self); - // Serde does not support asynchronous readers, so we make sure to spawn the task away from - // the main thread. - let state: State = tokio::spawn({ - let state_file = self.state_file.clone(); - async { - serde_json::from_reader( - std::fs::File::open(state_file) - .context("Failed to open state file") - .into_unrecoverable()?, - ) - .context("Failed to deserialize state file") - .into_unrecoverable() - } - }) - .await - .expect("Failed to await synchronous read of state file")?; - + let mut state = State::from_file(&self.state_file).await?; let current_file_size_in_bytes = { let file = tokio::fs::File::open(&state.file_to_upload) .await @@ -263,7 +350,7 @@ impl Resume { let config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await; let s3 = aws_sdk_s3::Client::new(&config); - match upload(&s3, &self.state_file, state.clone()).await { + match upload(&s3, &self.state_file, &mut state).await { Err(Error::Unrecoverable(err)) => { error!( "Unrecoverable failure during upload, aborting multipart upload: {}", @@ -284,42 +371,83 @@ impl Resume { } } -#[tracing::instrument(skip_all)] -#[allow(clippy::too_many_arguments)] // FIXME: refactor to reduce number of arguments -async fn upload_part( - s3: &aws_sdk_s3::Client, - s3_bucket: &str, - s3_key: &str, - upload_id: &str, - file_to_upload: &Path, +#[derive(Debug, Args)] +struct Abort { + /// Path to where the state-file of a previous upload. + /// + /// This state-file is used to abort the upload in question. The state-file will automatically + /// be removed after the upload has been aborted. + #[arg(long)] + state_file: PathBuf, +} + +impl Abort { + async fn run(&self) -> Result<()> { + debug!("Running abort command: {:?}", self); + + let state = State::from_file(&self.state_file).await?; + let config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await; + let s3 = aws_sdk_s3::Client::new(&config); + + s3.abort_multipart_upload() + .bucket(&state.s3_bucket) + .key(&state.s3_key) + .upload_id(&state.upload_id) + .send() + .await + .into_retryable()?; + info!( + "Aborted multipart upload with ID {} for: s3://{}/{}", + state.upload_id, state.s3_bucket, state.s3_key, + ); + + debug!("Removing state-file: {}", self.state_file.display()); + match tokio::fs::remove_file(&self.state_file).await { + Err(error) if error.kind() == std::io::ErrorKind::NotFound => { + debug!("The state-file did not exist, probably because it was never written, likely because the upload worked first try.") + } + result => result.into_unrecoverable()?, + } + + Ok(()) + } +} + +#[derive(Clone, Debug)] +struct Part { + number: i32, offset: u64, - part_number: u64, - number_of_parts: u64, - part_size: u64, -) -> Result { + size: u64, +} + +#[tracing::instrument(skip_all)] +async fn upload_part(s3: &aws_sdk_s3::Client, state: &State, part: Part) -> Result { info!( "Starting upload of part {} of {} ({} bytes)...", - part_number, number_of_parts, part_size, + part.number, state.number_of_parts, part.size, + ); + debug!( + "Opening file for reading: {}", + state.file_to_upload.display() ); - debug!("Opening file for reading: {}", file_to_upload.display()); - let mut file = tokio::fs::File::open(file_to_upload) + let mut file = tokio::fs::File::open(&state.file_to_upload) .await .into_unrecoverable()?; - debug!("Seeking to the start of the part: {}", offset); - file.seek(tokio::io::SeekFrom::Start(offset)) + debug!("Seeking to the start of the part: {}", part.offset); + file.seek(tokio::io::SeekFrom::Start(part.offset)) .await .into_unrecoverable()?; - let part_reader = file.take(part_size); + let part_reader = file.take(part.size); let byte_stream = ByteStream::from_reader(part_reader); let uploaded_part = s3 .upload_part() - .bucket(s3_bucket) - .key(s3_key) - .upload_id(upload_id) - .part_number(part_number as i32) - .content_length(part_size as i64) + .bucket(&state.s3_bucket) + .key(&state.s3_key) + .upload_id(&state.upload_id) + .part_number(part.number) + .content_length(part.size as i64) .body(byte_stream) .send() .await @@ -327,7 +455,7 @@ async fn upload_part( info!( "Finished upload of part {} of {} ({} bytes)", - part_number, number_of_parts, part_size, + part.number, state.number_of_parts, part.size, ); Ok(CompletedPart::builder() @@ -336,24 +464,23 @@ async fn upload_part( .set_checksum_sha1(uploaded_part.checksum_sha1) .set_checksum_sha256(uploaded_part.checksum_sha256) .set_e_tag(uploaded_part.e_tag) - .part_number(part_number as i32) + .part_number(part.number) .build()) } #[tracing::instrument(skip_all)] -async fn upload(s3: &aws_sdk_s3::Client, state_file: &Path, mut state: State) -> Result<()> { - let number_of_parts = state.file_size_in_bytes.div_ceil(state.part_size); +async fn upload(s3: &aws_sdk_s3::Client, state_file: &Path, state: &mut State) -> Result<()> { debug!( "File size: {} bytes. Part size: {} bytes. Number of parts to upload: {}.", - state.file_size_in_bytes, state.part_size, number_of_parts, + state.file_size_in_bytes, state.part_size, state.number_of_parts, ); - if number_of_parts > MAXIMUM_PART_NUMBER { + if state.number_of_parts > MAXIMUM_PART_NUMBER { bail!("The number of parts exceeds the maximum number of parts allowed by S3"); } info!( "Uploading the file in {} parts of {} bytes each", - number_of_parts, state.part_size, + state.number_of_parts, state.part_size, ); let first_part_number = if state.last_successful_part > 0 { @@ -362,8 +489,8 @@ async fn upload(s3: &aws_sdk_s3::Client, state_file: &Path, mut state: State) -> MINIMUM_PART_NUMBER }; let mut offset = (first_part_number - 1) * state.part_size; - for part_number in first_part_number..(MINIMUM_PART_NUMBER + number_of_parts) { - let actual_part_size = if part_number == number_of_parts { + for part_number in first_part_number..(MINIMUM_PART_NUMBER + state.number_of_parts) { + let actual_part_size = if part_number == state.number_of_parts { let potential_part_size = state.file_size_in_bytes % state.part_size; if potential_part_size == 0 { state.part_size @@ -376,19 +503,12 @@ async fn upload(s3: &aws_sdk_s3::Client, state_file: &Path, mut state: State) -> let mut last_retry_error: Option = None; for attempt in 1..=3 { - match upload_part( - s3, - &state.s3_bucket, - &state.s3_key, - &state.upload_id, - &state.file_to_upload, + let part = Part { + number: part_number as i32, offset, - part_number, - number_of_parts, - actual_part_size, - ) - .await - { + size: actual_part_size, + }; + match upload_part(s3, state, part).await { Ok(completed_part) => { state.completed_parts.push(completed_part); offset += actual_part_size; @@ -410,24 +530,8 @@ async fn upload(s3: &aws_sdk_s3::Client, state_file: &Path, mut state: State) -> } } + state.write_to_file(&state_file).await?; if let Some(error) = last_retry_error { - // Serde does not support asynchronous writeers, so we make sure to spawn the task away - // from the main thread. - tokio::spawn({ - let state_file = state_file.to_owned(); - async move { - serde_json::to_writer( - std::fs::File::create(state_file) - .context("Failed to open state file") - .into_unrecoverable()?, - &state, - ) - .context("Failed to serialize state file") - .into_unrecoverable() - } - }) - .await - .expect("Failed to await synchronous write of state file")?; error!( "Failed to upload part {} after 3 attempts. Multipart upload will not be aborted, to allow resuming.", part_number, @@ -445,12 +549,12 @@ async fn upload(s3: &aws_sdk_s3::Client, state_file: &Path, mut state: State) -> let completed_multipart_upload = s3 .complete_multipart_upload() - .bucket(state.s3_bucket) - .key(state.s3_key) + .bucket(&state.s3_bucket) + .key(&state.s3_key) .upload_id(&state.upload_id) .multipart_upload( CompletedMultipartUpload::builder() - .set_parts(Some(state.completed_parts)) + .set_parts(Some(state.completed_parts.clone())) .build(), ) .send() @@ -464,6 +568,14 @@ async fn upload(s3: &aws_sdk_s3::Client, state_file: &Path, mut state: State) -> .unwrap_or(""), ); + debug!("Removing state-file: {}", state_file.display()); + match tokio::fs::remove_file(state_file).await { + Err(error) if error.kind() == std::io::ErrorKind::NotFound => { + debug!("The state-file did not exist, probably because it was never written, likely because the upload worked first try.") + } + result => result.into_unrecoverable()?, + } + Ok(()) } @@ -474,8 +586,8 @@ async fn main() -> Result<()> { tracing_subscriber::fmt::layer() .compact() .with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE) - .with_file(true) - .with_line_number(true) + .with_file(false) + .with_line_number(false) .with_target(false), ) .with( @@ -489,5 +601,6 @@ async fn main() -> Result<()> { match command { Cli::Upload(cmd) => cmd.run().await, Cli::Resume(cmd) => cmd.run().await, + Cli::Abort(cmd) => cmd.run().await, } }