Skip to content

Commit

Permalink
feat(core): support user defined metadata for oss (#4881)
Browse files Browse the repository at this point in the history
* feat(user_metadata): support user defined metadata for oss

* fix cargo fmt && doc tests

* Add user metadata key checks for oss && refeactor some code

* remove unused code
  • Loading branch information
meteorgan authored Jul 14, 2024
1 parent 4d48970 commit b9f7c21
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 49 deletions.
4 changes: 2 additions & 2 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ mod tests {
#[test]
fn assert_size() {
assert_eq!(40, size_of::<Operator>());
assert_eq!(256, size_of::<Entry>());
assert_eq!(232, size_of::<Metadata>());
assert_eq!(304, size_of::<Entry>());
assert_eq!(280, size_of::<Metadata>());
assert_eq!(1, size_of::<EntryMode>());
assert_eq!(24, size_of::<Scheme>());
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
//! access raw APIs.
//! - Raw APIs should only be accessed via `opendal::raw::Xxxx`, any public
//! API should never expose raw API directly.
//! - Raw APIs are far more less stable than public API, please don't rely on
//! - Raw APIs are far less stable than public API, please don't rely on
//! them whenever possible.
mod accessor;
Expand Down
21 changes: 17 additions & 4 deletions core/src/raw/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//!
//! By using ops, users can add more context for operation.
use std::collections::HashMap;
use std::time::Duration;

use flagset::FlagSet;
Expand Down Expand Up @@ -537,24 +538,24 @@ impl OpStat {
self.override_content_disposition.as_deref()
}

/// Sets the cache-control header that should be send back by the remote read operation.
/// Sets the cache-control header that should be sent back by the remote read operation.
pub fn with_override_cache_control(mut self, cache_control: &str) -> Self {
self.override_cache_control = Some(cache_control.into());
self
}

/// Returns the cache-control header that should be send back by the remote read operation.
/// Returns the cache-control header that should be sent back by the remote read operation.
pub fn override_cache_control(&self) -> Option<&str> {
self.override_cache_control.as_deref()
}

/// Sets the content-type header that should be send back by the remote read operation.
/// Sets the content-type header that should be sent back by the remote read operation.
pub fn with_override_content_type(mut self, content_type: &str) -> Self {
self.override_content_type = Some(content_type.into());
self
}

/// Returns the content-type header that should be send back by the remote read operation.
/// Returns the content-type header that should be sent back by the remote read operation.
pub fn override_content_type(&self) -> Option<&str> {
self.override_content_type.as_deref()
}
Expand All @@ -580,6 +581,7 @@ pub struct OpWrite {
content_disposition: Option<String>,
cache_control: Option<String>,
executor: Option<Executor>,
user_metadata: Option<HashMap<String, String>>,
}

impl OpWrite {
Expand Down Expand Up @@ -677,6 +679,17 @@ impl OpWrite {
}
self
}

/// Set the user defined metadata of the op
pub fn with_user_metadata(mut self, metadata: HashMap<String, String>) -> Self {
self.user_metadata = Some(metadata);
self
}

/// Get the user defined metadata from the op
pub fn user_metadata(&self) -> Option<&HashMap<String, String>> {
self.user_metadata.as_ref()
}
}

/// Args for `writer` operation.
Expand Down
7 changes: 5 additions & 2 deletions core/src/services/oss/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl OssBuilder {
self
}

/// Set a endpoint for generating presigned urls.
/// Set an endpoint for generating presigned urls.
///
/// You can offer a public endpoint like <https://oss-cn-beijing.aliyuncs.com> to return a presinged url for
/// public accessors, along with an internal endpoint like <https://oss-cn-beijing-internal.aliyuncs.com>
Expand Down Expand Up @@ -416,6 +416,7 @@ impl Access for OssBackend {
} else {
Some(usize::MAX)
},
write_with_user_metadata: true,

delete: true,
copy: true,
Expand Down Expand Up @@ -447,7 +448,9 @@ impl Access for OssBackend {
match status {
StatusCode::OK => {
let headers = resp.headers();
let mut meta = parse_into_metadata(path, headers)?;
let mut meta =
self.core
.parse_metadata(path, constants::X_OSS_META_PREFIX, resp.headers())?;

if let Some(v) = parse_header_to_str(headers, "x-oss-version-id")? {
meta.set_version(v);
Expand Down
119 changes: 90 additions & 29 deletions core/src/services/oss/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::fmt::Write;
Expand All @@ -28,10 +29,10 @@ use http::header::CONTENT_TYPE;
use http::header::IF_MATCH;
use http::header::IF_NONE_MATCH;
use http::header::RANGE;
use http::HeaderName;
use http::HeaderValue;
use http::Request;
use http::Response;
use http::{HeaderMap, HeaderName};
use reqsign::AliyunCredential;
use reqsign::AliyunLoader;
use reqsign::AliyunOssSigner;
Expand All @@ -41,14 +42,16 @@ use serde::Serialize;
use crate::raw::*;
use crate::*;

mod constants {
pub mod constants {
pub const X_OSS_SERVER_SIDE_ENCRYPTION: &str = "x-oss-server-side-encryption";

pub const X_OSS_SERVER_SIDE_ENCRYPTION_KEY_ID: &str = "x-oss-server-side-encryption-key-id";

pub const RESPONSE_CONTENT_DISPOSITION: &str = "response-content-disposition";

pub const OSS_QUERY_VERSION_ID: &str = "versionId";

pub const X_OSS_META_PREFIX: &str = "x-oss-meta-";
}

pub struct OssCore {
Expand Down Expand Up @@ -156,6 +159,88 @@ impl OssCore {
}
req
}

fn insert_metadata_headers(
&self,
mut req: http::request::Builder,
size: Option<u64>,
args: &OpWrite,
) -> Result<http::request::Builder> {
req = req.header(CONTENT_LENGTH, size.unwrap_or_default());

if let Some(mime) = args.content_type() {
req = req.header(CONTENT_TYPE, mime);
}

if let Some(pos) = args.content_disposition() {
req = req.header(CONTENT_DISPOSITION, pos);
}

if let Some(cache_control) = args.cache_control() {
req = req.header(CACHE_CONTROL, cache_control);
}

if let Some(user_metadata) = args.user_metadata() {
for (key, value) in user_metadata {
// before insert user defined metadata header, add prefix to the header name
if !self.check_user_metadata_key(key) {
return Err(Error::new(
ErrorKind::Unsupported,
"the format of the user metadata key is invalid, please refer the document",
));
}
req = req.header(format!("{}{}", constants::X_OSS_META_PREFIX, key), value)
}
}

Ok(req)
}

// According to https://help.aliyun.com/zh/oss/developer-reference/putobject
// there are some limits in user defined metadata key
fn check_user_metadata_key(&self, key: &str) -> bool {
key.chars().all(|c| c.is_ascii_alphanumeric() || c == '-')
}

/// parse_metadata will parse http headers(including standards http headers
/// and user defined metadata header) into Metadata.
///
/// # Arguments
///
/// * `user_metadata_prefix` is the prefix of user defined metadata key
///
/// # Notes
///
/// before return the user defined metadata, we'll strip the user_metadata_prefix from the key
pub fn parse_metadata(
&self,
path: &str,
user_metadata_prefix: &str,
headers: &HeaderMap,
) -> Result<Metadata> {
let mut m = parse_into_metadata(path, headers)?;

let data: HashMap<String, String> = headers
.iter()
.filter_map(|(key, _)| {
if key.as_str().starts_with(user_metadata_prefix) {
if let Ok(Some(value)) = parse_header_to_str(headers, key) {
let key_str = key.to_string();
let stripped_key = key_str
.strip_prefix(user_metadata_prefix)
.expect("strip prefix must succeed");
return Some((stripped_key.to_string(), value.to_string()));
}
}
None
})
.collect();
if !data.is_empty() {
m.with_user_metadata(data);
}

Ok(m)
}
}

impl OssCore {
Expand All @@ -174,19 +259,7 @@ impl OssCore {

let mut req = Request::put(&url);

req = req.header(CONTENT_LENGTH, size.unwrap_or_default());

if let Some(mime) = args.content_type() {
req = req.header(CONTENT_TYPE, mime);
}

if let Some(pos) = args.content_disposition() {
req = req.header(CONTENT_DISPOSITION, pos);
}

if let Some(cache_control) = args.cache_control() {
req = req.header(CACHE_CONTROL, cache_control)
}
req = self.insert_metadata_headers(req, size, args)?;

// set sse headers
req = self.insert_sse_headers(req);
Expand Down Expand Up @@ -214,19 +287,7 @@ impl OssCore {

let mut req = Request::post(&url);

req = req.header(CONTENT_LENGTH, size);

if let Some(mime) = args.content_type() {
req = req.header(CONTENT_TYPE, mime);
}

if let Some(pos) = args.content_disposition() {
req = req.header(CONTENT_DISPOSITION, pos);
}

if let Some(cache_control) = args.cache_control() {
req = req.header(CACHE_CONTROL, cache_control)
}
req = self.insert_metadata_headers(req, Some(size), args)?;

// set sse headers
req = self.insert_sse_headers(req);
Expand Down Expand Up @@ -593,7 +654,7 @@ impl OssCore {
self.send(req).await
}

/// Abort an on-going multipart upload.
/// Abort an ongoing multipart upload.
/// reference docs https://www.alibabacloud.com/help/zh/oss/developer-reference/abortmultipartupload
pub async fn oss_abort_multipart_upload(
&self,
Expand Down
2 changes: 2 additions & 0 deletions core/src/types/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ pub struct Capability {
pub write_with_content_disposition: bool,
/// If operator supports write with cache control.
pub write_with_cache_control: bool,
/// If operator supports write with user defined metadata
pub write_with_user_metadata: bool,
/// write_multi_max_size is the max size that services support in write_multi.
///
/// For example, AWS S3 supports 5GiB as max in write_multi.
Expand Down
4 changes: 2 additions & 2 deletions core/src/types/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ pub struct Lister {
/// use opendal::Entry;
/// use opendal::Result;
///
/// assert_eq!(256, size_of::<(String, Result<opendal::raw::RpStat>)>());
/// assert_eq!(256, size_of::<Option<Entry>>());
/// assert_eq!(304, size_of::<(String, Result<opendal::raw::RpStat>)>());
/// assert_eq!(304, size_of::<Option<Entry>>());
/// ```
///
/// So let's ignore this lint:
Expand Down
22 changes: 21 additions & 1 deletion core/src/types/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use chrono::prelude::*;
use flagset::flags;
use flagset::FlagSet;
use std::collections::HashMap;

use crate::raw::*;
use crate::*;
Expand Down Expand Up @@ -45,6 +46,7 @@ pub struct Metadata {
etag: Option<String>,
last_modified: Option<DateTime<Utc>>,
version: Option<String>,
user_metadata: Option<HashMap<String, String>>,
}

impl Metadata {
Expand All @@ -71,6 +73,7 @@ impl Metadata {
etag: None,
content_disposition: None,
version: None,
user_metadata: None,
}
}

Expand All @@ -87,7 +90,7 @@ impl Metadata {
self
}

/// Check if there metadata already contains given metakey.
/// Check if the metadata already contains given metakey.
pub(crate) fn contains_metakey(&self, metakey: impl Into<FlagSet<Metakey>>) -> bool {
let input_metakey = metakey.into();

Expand Down Expand Up @@ -507,6 +510,21 @@ impl Metadata {
self.metakey |= Metakey::Version;
self
}

/// User defined metadata of this entry
///
/// The prefix of the user defined metadata key(for example: in oss, it's x-oss-meta-)
/// is remove from the key
pub fn user_metadata(&self) -> Option<&HashMap<String, String>> {
self.user_metadata.as_ref()
}

/// Set user defined metadata of this entry
pub fn with_user_metadata(&mut self, data: HashMap<String, String>) -> &mut Self {
self.user_metadata = Some(data);
self.metakey |= Metakey::UserMetaData;
self
}
}

flags! {
Expand Down Expand Up @@ -548,5 +566,7 @@ flags! {
LastModified,
/// Key for version.
Version,
/// Key for user metadata
UserMetaData,
}
}
Loading

0 comments on commit b9f7c21

Please sign in to comment.