Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consistently use GCP XML API #4207

Merged
merged 5 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/object_store.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ jobs:
- uses: actions/checkout@v3

- name: Configure Fake GCS Server (GCP emulation)
# Custom image - see fsouza/fake-gcs-server#1164
run: |
docker run -d -p 4443:4443 fsouza/fake-gcs-server -scheme http
docker run -d -p 4443:4443 tustvold/fake-gcs-server -scheme http -backend memory -public-host localhost:4443
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please leave a link to that PR as a comment (so we now we can update it back when it is accepted upstream)?

# Give the container a moment to start up prior to configuring it
sleep 1
curl -v -X POST --data-binary '{"name":"test-bucket"}' -H "Content-Type: application/json" "http://localhost:4443/storage/v1/b"
Expand Down
2 changes: 1 addition & 1 deletion object_store/CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ To test the GCS integration, we use [Fake GCS Server](https://github.com/fsouza/
Startup the fake server:

```shell
docker run -p 4443:4443 fsouza/fake-gcs-server -scheme http
docker run -p 4443:4443 tustvold/fake-gcs-server -scheme http
```

Configure the account:
Expand Down
2 changes: 1 addition & 1 deletion object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time", "io-ut
nix = "0.26.1"

[features]
cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/json","reqwest/stream", "chrono/serde", "base64", "rand", "ring"]
cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "base64", "rand", "ring"]
azure = ["cloud"]
gcp = ["cloud", "rustls-pemfile"]
aws = ["cloud"]
Expand Down
69 changes: 3 additions & 66 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@
use crate::aws::checksum::Checksum;
use crate::aws::credential::{AwsCredential, CredentialExt, CredentialProvider};
use crate::aws::{STORE, STRICT_PATH_ENCODE_SET};
use crate::client::list::ListResponse;
use crate::client::pagination::stream_paginated;
use crate::client::retry::RetryExt;
use crate::client::GetOptionsExt;
use crate::multipart::UploadPart;
use crate::path::DELIMITER;
use crate::util::format_prefix;
use crate::{
BoxStream, ClientOptions, GetOptions, ListResult, MultipartId, ObjectMeta, Path,
Result, RetryConfig, StreamExt,
BoxStream, ClientOptions, GetOptions, ListResult, MultipartId, Path, Result,
RetryConfig, StreamExt,
};
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::{Buf, Bytes};
use chrono::{DateTime, Utc};
use percent_encoding::{utf8_percent_encode, PercentEncode};
use reqwest::{header::CONTENT_TYPE, Client as ReqwestClient, Method, Response};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -109,69 +109,6 @@ impl From<Error> for crate::Error {
}
}

#[derive(Debug, Deserialize)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just moved into list.rs so it can be shared with the gcs implementation

#[serde(rename_all = "PascalCase")]
pub struct ListResponse {
#[serde(default)]
pub contents: Vec<ListContents>,
#[serde(default)]
pub common_prefixes: Vec<ListPrefix>,
#[serde(default)]
pub next_continuation_token: Option<String>,
}

impl TryFrom<ListResponse> for ListResult {
type Error = crate::Error;

fn try_from(value: ListResponse) -> Result<Self> {
let common_prefixes = value
.common_prefixes
.into_iter()
.map(|x| Ok(Path::parse(x.prefix)?))
.collect::<Result<_>>()?;

let objects = value
.contents
.into_iter()
.map(TryFrom::try_from)
.collect::<Result<_>>()?;

Ok(Self {
common_prefixes,
objects,
})
}
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct ListPrefix {
pub prefix: String,
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct ListContents {
pub key: String,
pub size: usize,
pub last_modified: DateTime<Utc>,
#[serde(rename = "ETag")]
pub e_tag: Option<String>,
}

impl TryFrom<ListContents> for ObjectMeta {
type Error = crate::Error;

fn try_from(value: ListContents) -> Result<Self> {
Ok(Self {
location: Path::parse(value.key)?,
last_modified: value.last_modified,
size: value.size,
e_tag: value.e_tag,
})
}
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
struct InitiateMultipart {
Expand Down
57 changes: 7 additions & 50 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::stream::BoxStream;
use futures::TryStreamExt;
use itertools::Itertools;
Expand All @@ -52,6 +51,7 @@ use crate::aws::credential::{
AwsCredential, CredentialProvider, InstanceCredentialProvider,
StaticCredentialProvider, WebIdentityProvider,
};
use crate::client::header::header_meta;
use crate::client::ClientConfigKey;
use crate::config::ConfigValue;
use crate::multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart};
Expand Down Expand Up @@ -87,24 +87,6 @@ static METADATA_ENDPOINT: &str = "http://169.254.169.254";
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
enum Error {
#[snafu(display("Last-Modified Header missing from response"))]
MissingLastModified,

#[snafu(display("Content-Length Header missing from response"))]
MissingContentLength,

#[snafu(display("Invalid last modified '{}': {}", last_modified, source))]
InvalidLastModified {
last_modified: String,
source: chrono::ParseError,
},

#[snafu(display("Invalid content length '{}': {}", content_length, source))]
InvalidContentLength {
content_length: String,
source: std::num::ParseIntError,
},

#[snafu(display("Missing region"))]
MissingRegion,

Expand Down Expand Up @@ -155,6 +137,11 @@ enum Error {

#[snafu(display("Failed to parse the region for bucket '{}'", bucket))]
RegionParse { bucket: String },

#[snafu(display("Failed to parse headers: {}", source))]
Header {
source: crate::client::header::Error,
},
}

impl From<Error> for super::Error {
Expand Down Expand Up @@ -261,41 +248,11 @@ impl ObjectStore for AmazonS3 {
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
use reqwest::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED};

let options = GetOptions::default();
// Extract meta from headers
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_ResponseSyntax
let response = self.client.get_request(location, options, true).await?;
let headers = response.headers();

let last_modified = headers
.get(LAST_MODIFIED)
.context(MissingLastModifiedSnafu)?;

let content_length = headers
.get(CONTENT_LENGTH)
.context(MissingContentLengthSnafu)?;

let last_modified = last_modified.to_str().context(BadHeaderSnafu)?;
let last_modified = DateTime::parse_from_rfc2822(last_modified)
.context(InvalidLastModifiedSnafu { last_modified })?
.with_timezone(&Utc);

let content_length = content_length.to_str().context(BadHeaderSnafu)?;
let content_length = content_length
.parse()
.context(InvalidContentLengthSnafu { content_length })?;

let e_tag = headers.get(ETAG).context(MissingEtagSnafu)?;
let e_tag = e_tag.to_str().context(BadHeaderSnafu)?;

Ok(ObjectMeta {
location: location.clone(),
last_modified,
size: content_length,
e_tag: Some(e_tag.to_string()),
})
Ok(header_meta(location, response.headers()).context(HeaderSnafu)?)
}

async fn delete(&self, location: &Path) -> Result<()> {
Expand Down
60 changes: 7 additions & 53 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::Bytes;
use chrono::{TimeZone, Utc};
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use percent_encoding::percent_decode_str;
use serde::{Deserialize, Serialize};
Expand All @@ -50,9 +49,9 @@ use std::{collections::BTreeSet, str::FromStr};
use tokio::io::AsyncWrite;
use url::Url;

use crate::client::header::header_meta;
use crate::client::ClientConfigKey;
use crate::config::ConfigValue;
use crate::util::RFC1123_FMT;
pub use credential::authority_hosts;

mod client;
Expand All @@ -75,24 +74,6 @@ const MSI_ENDPOINT_ENV_KEY: &str = "IDENTITY_ENDPOINT";
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
enum Error {
#[snafu(display("Last-Modified Header missing from response"))]
MissingLastModified,

#[snafu(display("Content-Length Header missing from response"))]
MissingContentLength,

#[snafu(display("Invalid last modified '{}': {}", last_modified, source))]
InvalidLastModified {
last_modified: String,
source: chrono::ParseError,
},

#[snafu(display("Invalid content length '{}': {}", content_length, source))]
InvalidContentLength {
content_length: String,
source: std::num::ParseIntError,
},

#[snafu(display("Received header containing non-ASCII data"))]
BadHeader { source: reqwest::header::ToStrError },

Expand Down Expand Up @@ -146,6 +127,11 @@ enum Error {

#[snafu(display("ETag Header missing from response"))]
MissingEtag,

#[snafu(display("Failed to parse headers: {}", source))]
Header {
source: crate::client::header::Error,
},
}

impl From<Error> for super::Error {
Expand Down Expand Up @@ -223,44 +209,12 @@ impl ObjectStore for MicrosoftAzure {
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
use reqwest::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED};
let options = GetOptions::default();

// Extract meta from headers
// https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties
let response = self.client.get_request(location, options, true).await?;
let headers = response.headers();

let last_modified = headers
.get(LAST_MODIFIED)
.ok_or(Error::MissingLastModified)?
.to_str()
.context(BadHeaderSnafu)?;
let last_modified = Utc
.datetime_from_str(last_modified, RFC1123_FMT)
.context(InvalidLastModifiedSnafu { last_modified })?;

let content_length = headers
.get(CONTENT_LENGTH)
.ok_or(Error::MissingContentLength)?
.to_str()
.context(BadHeaderSnafu)?;
let content_length = content_length
.parse()
.context(InvalidContentLengthSnafu { content_length })?;

let e_tag = headers
.get(ETAG)
.ok_or(Error::MissingEtag)?
.to_str()
.context(BadHeaderSnafu)?;

Ok(ObjectMeta {
location: location.clone(),
last_modified,
size: content_length,
e_tag: Some(e_tag.to_string()),
})
Ok(header_meta(location, response.headers()).context(HeaderSnafu)?)
}

async fn delete(&self, location: &Path) -> Result<()> {
Expand Down
Loading