Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consistently use GCP XML API #4207

Merged
merged 5 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/object_store.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ jobs:

- name: Configure Fake GCS Server (GCP emulation)
run: |
docker run -d -p 4443:4443 fsouza/fake-gcs-server -scheme http
docker run -d -p 4443:4443 tustvold/fake-gcs-server -scheme http -backend memory -public-host localhost:4443
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please leave a link to that PR as a comment (so we now we can update it back when it is accepted upstream)?

# Give the container a moment to start up prior to configuring it
sleep 1
curl -v -X POST --data-binary '{"name":"test-bucket"}' -H "Content-Type: application/json" "http://localhost:4443/storage/v1/b"
Expand Down
2 changes: 1 addition & 1 deletion object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time", "io-ut
nix = "0.26.1"

[features]
cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/json","reqwest/stream", "chrono/serde", "base64", "rand", "ring"]
cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "base64", "rand", "ring"]
azure = ["cloud"]
gcp = ["cloud", "rustls-pemfile"]
aws = ["cloud"]
Expand Down
69 changes: 3 additions & 66 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@
use crate::aws::checksum::Checksum;
use crate::aws::credential::{AwsCredential, CredentialExt, CredentialProvider};
use crate::aws::STRICT_PATH_ENCODE_SET;
use crate::client::list::ListResponse;
use crate::client::pagination::stream_paginated;
use crate::client::retry::RetryExt;
use crate::multipart::UploadPart;
use crate::path::DELIMITER;
use crate::util::{format_http_range, format_prefix};
use crate::{
BoxStream, ClientOptions, ListResult, MultipartId, ObjectMeta, Path, Result,
RetryConfig, StreamExt,
BoxStream, ClientOptions, ListResult, MultipartId, Path, Result, RetryConfig,
StreamExt,
};
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::{Buf, Bytes};
use chrono::{DateTime, Utc};
use percent_encoding::{utf8_percent_encode, PercentEncode};
use reqwest::{
header::CONTENT_TYPE, Client as ReqwestClient, Method, Response, StatusCode,
Expand Down Expand Up @@ -118,69 +118,6 @@ impl From<Error> for crate::Error {
}
}

#[derive(Debug, Deserialize)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just moved into list.rs so it can be shared with the gcs implementation

#[serde(rename_all = "PascalCase")]
pub struct ListResponse {
#[serde(default)]
pub contents: Vec<ListContents>,
#[serde(default)]
pub common_prefixes: Vec<ListPrefix>,
#[serde(default)]
pub next_continuation_token: Option<String>,
}

impl TryFrom<ListResponse> for ListResult {
type Error = crate::Error;

fn try_from(value: ListResponse) -> Result<Self> {
let common_prefixes = value
.common_prefixes
.into_iter()
.map(|x| Ok(Path::parse(x.prefix)?))
.collect::<Result<_>>()?;

let objects = value
.contents
.into_iter()
.map(TryFrom::try_from)
.collect::<Result<_>>()?;

Ok(Self {
common_prefixes,
objects,
})
}
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct ListPrefix {
pub prefix: String,
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct ListContents {
pub key: String,
pub size: usize,
pub last_modified: DateTime<Utc>,
#[serde(rename = "ETag")]
pub e_tag: Option<String>,
}

impl TryFrom<ListContents> for ObjectMeta {
type Error = crate::Error;

fn try_from(value: ListContents) -> Result<Self> {
Ok(Self {
location: Path::parse(value.key)?,
last_modified: value.last_modified,
size: value.size,
e_tag: value.e_tag,
})
}
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
struct InitiateMultipart {
Expand Down
57 changes: 7 additions & 50 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::stream::BoxStream;
use futures::TryStreamExt;
use itertools::Itertools;
Expand All @@ -53,6 +52,7 @@ use crate::aws::credential::{
AwsCredential, CredentialProvider, InstanceCredentialProvider,
StaticCredentialProvider, WebIdentityProvider,
};
use crate::client::header::header_meta;
use crate::client::ClientConfigKey;
use crate::config::ConfigValue;
use crate::multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart};
Expand Down Expand Up @@ -86,24 +86,6 @@ static METADATA_ENDPOINT: &str = "http://169.254.169.254";
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
enum Error {
#[snafu(display("Last-Modified Header missing from response"))]
MissingLastModified,

#[snafu(display("Content-Length Header missing from response"))]
MissingContentLength,

#[snafu(display("Invalid last modified '{}': {}", last_modified, source))]
InvalidLastModified {
last_modified: String,
source: chrono::ParseError,
},

#[snafu(display("Invalid content length '{}': {}", content_length, source))]
InvalidContentLength {
content_length: String,
source: std::num::ParseIntError,
},

#[snafu(display("Missing region"))]
MissingRegion,

Expand Down Expand Up @@ -154,6 +136,11 @@ enum Error {

#[snafu(display("Failed to parse the region for bucket '{}'", bucket))]
RegionParse { bucket: String },

#[snafu(display("Failed to parse headers: {}", source))]
Header {
source: crate::client::header::Error,
},
}

impl From<Error> for super::Error {
Expand Down Expand Up @@ -274,40 +261,10 @@ impl ObjectStore for AmazonS3 {
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
use reqwest::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED};

// Extract meta from headers
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_ResponseSyntax
let response = self.client.get_request(location, None, true).await?;
let headers = response.headers();

let last_modified = headers
.get(LAST_MODIFIED)
.context(MissingLastModifiedSnafu)?;

let content_length = headers
.get(CONTENT_LENGTH)
.context(MissingContentLengthSnafu)?;

let last_modified = last_modified.to_str().context(BadHeaderSnafu)?;
let last_modified = DateTime::parse_from_rfc2822(last_modified)
.context(InvalidLastModifiedSnafu { last_modified })?
.with_timezone(&Utc);

let content_length = content_length.to_str().context(BadHeaderSnafu)?;
let content_length = content_length
.parse()
.context(InvalidContentLengthSnafu { content_length })?;

let e_tag = headers.get(ETAG).context(MissingEtagSnafu)?;
let e_tag = e_tag.to_str().context(BadHeaderSnafu)?;

Ok(ObjectMeta {
location: location.clone(),
last_modified,
size: content_length,
e_tag: Some(e_tag.to_string()),
})
Ok(header_meta(location, response.headers()).context(HeaderSnafu)?)
}

async fn delete(&self, location: &Path) -> Result<()> {
Expand Down
61 changes: 7 additions & 54 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::Bytes;
use chrono::{TimeZone, Utc};
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use percent_encoding::percent_decode_str;
use serde::{Deserialize, Serialize};
Expand All @@ -51,9 +50,9 @@ use std::{collections::BTreeSet, str::FromStr};
use tokio::io::AsyncWrite;
use url::Url;

use crate::client::header::header_meta;
use crate::client::ClientConfigKey;
use crate::config::ConfigValue;
use crate::util::RFC1123_FMT;
pub use credential::authority_hosts;

mod client;
Expand All @@ -74,24 +73,6 @@ const MSI_ENDPOINT_ENV_KEY: &str = "IDENTITY_ENDPOINT";
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
enum Error {
#[snafu(display("Last-Modified Header missing from response"))]
MissingLastModified,

#[snafu(display("Content-Length Header missing from response"))]
MissingContentLength,

#[snafu(display("Invalid last modified '{}': {}", last_modified, source))]
InvalidLastModified {
last_modified: String,
source: chrono::ParseError,
},

#[snafu(display("Invalid content length '{}': {}", content_length, source))]
InvalidContentLength {
content_length: String,
source: std::num::ParseIntError,
},

#[snafu(display("Received header containing non-ASCII data"))]
BadHeader { source: reqwest::header::ToStrError },

Expand Down Expand Up @@ -145,6 +126,11 @@ enum Error {

#[snafu(display("ETag Header missing from response"))]
MissingEtag,

#[snafu(display("Failed to parse headers: {}", source))]
Header {
source: crate::client::header::Error,
},
}

impl From<Error> for super::Error {
Expand Down Expand Up @@ -237,43 +223,10 @@ impl ObjectStore for MicrosoftAzure {
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
use reqwest::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED};

// Extract meta from headers
// https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties
let response = self.client.get_request(location, None, true).await?;
let headers = response.headers();

let last_modified = headers
.get(LAST_MODIFIED)
.ok_or(Error::MissingLastModified)?
.to_str()
.context(BadHeaderSnafu)?;
let last_modified = Utc
.datetime_from_str(last_modified, RFC1123_FMT)
.context(InvalidLastModifiedSnafu { last_modified })?;

let content_length = headers
.get(CONTENT_LENGTH)
.ok_or(Error::MissingContentLength)?
.to_str()
.context(BadHeaderSnafu)?;
let content_length = content_length
.parse()
.context(InvalidContentLengthSnafu { content_length })?;

let e_tag = headers
.get(ETAG)
.ok_or(Error::MissingEtag)?
.to_str()
.context(BadHeaderSnafu)?;

Ok(ObjectMeta {
location: location.clone(),
last_modified,
size: content_length,
e_tag: Some(e_tag.to_string()),
})
Ok(header_meta(location, response.headers()).context(HeaderSnafu)?)
}

async fn delete(&self, location: &Path) -> Result<()> {
Expand Down
83 changes: 83 additions & 0 deletions object_store/src/client/header.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Logic for extracting ObjectMeta from headers used by AWS, GCP and Azure

use crate::path::Path;
use crate::ObjectMeta;
use chrono::{DateTime, Utc};
use hyper::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED};
use hyper::HeaderMap;
use snafu::{OptionExt, ResultExt, Snafu};

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("ETag Header missing from response"))]
MissingEtag,

#[snafu(display("Received header containing non-ASCII data"))]
BadHeader { source: reqwest::header::ToStrError },

#[snafu(display("Last-Modified Header missing from response"))]
MissingLastModified,

#[snafu(display("Content-Length Header missing from response"))]
MissingContentLength,

#[snafu(display("Invalid last modified '{}': {}", last_modified, source))]
InvalidLastModified {
last_modified: String,
source: chrono::ParseError,
},

#[snafu(display("Invalid content length '{}': {}", content_length, source))]
InvalidContentLength {
content_length: String,
source: std::num::ParseIntError,
},
}

/// Extracts [`ObjectMeta`] from the provided [`HeaderMap`]
pub fn header_meta(location: &Path, headers: &HeaderMap) -> Result<ObjectMeta, Error> {
let last_modified = headers
.get(LAST_MODIFIED)
.context(MissingLastModifiedSnafu)?;

let content_length = headers
.get(CONTENT_LENGTH)
.context(MissingContentLengthSnafu)?;

let last_modified = last_modified.to_str().context(BadHeaderSnafu)?;
let last_modified = DateTime::parse_from_rfc2822(last_modified)
.context(InvalidLastModifiedSnafu { last_modified })?
.with_timezone(&Utc);

let content_length = content_length.to_str().context(BadHeaderSnafu)?;
let content_length = content_length
.parse()
.context(InvalidContentLengthSnafu { content_length })?;

let e_tag = headers.get(ETAG).context(MissingEtagSnafu)?;
let e_tag = e_tag.to_str().context(BadHeaderSnafu)?;

Ok(ObjectMeta {
location: location.clone(),
last_modified,
size: content_length,
e_tag: Some(e_tag.to_string()),
})
}
Loading