From b9f7c216fcdcbca955c77226c30bb35a129daa5e Mon Sep 17 00:00:00 2001 From: meteorgan Date: Sun, 14 Jul 2024 22:23:24 +0800 Subject: [PATCH] feat(core): support user defined metadata for oss (#4881) * feat(user_metadata): support user defined metadata for oss * fix cargo fmt && doc tests * Add user metadata key checks for oss && refeactor some code * remove unused code --- core/src/lib.rs | 4 +- core/src/raw/mod.rs | 2 +- core/src/raw/ops.rs | 21 +++- core/src/services/oss/backend.rs | 7 +- core/src/services/oss/core.rs | 119 +++++++++++++++----- core/src/types/capability.rs | 2 + core/src/types/list.rs | 4 +- core/src/types/metadata.rs | 22 +++- core/src/types/operator/operator_futures.rs | 41 +++++-- core/tests/behavior/async_write.rs | 25 ++++ core/tests/behavior/utils.rs | 2 +- 11 files changed, 200 insertions(+), 49 deletions(-) diff --git a/core/src/lib.rs b/core/src/lib.rs index 60181210518..4e9b5f21d38 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -154,8 +154,8 @@ mod tests { #[test] fn assert_size() { assert_eq!(40, size_of::()); - assert_eq!(256, size_of::()); - assert_eq!(232, size_of::()); + assert_eq!(304, size_of::()); + assert_eq!(280, size_of::()); assert_eq!(1, size_of::()); assert_eq!(24, size_of::()); } diff --git a/core/src/raw/mod.rs b/core/src/raw/mod.rs index b4d6b3ca969..ad8d1fe11ac 100644 --- a/core/src/raw/mod.rs +++ b/core/src/raw/mod.rs @@ -23,7 +23,7 @@ //! access raw APIs. //! - Raw APIs should only be accessed via `opendal::raw::Xxxx`, any public //! API should never expose raw API directly. -//! - Raw APIs are far more less stable than public API, please don't rely on +//! - Raw APIs are far less stable than public API, please don't rely on //! them whenever possible. mod accessor; diff --git a/core/src/raw/ops.rs b/core/src/raw/ops.rs index aff6021ca56..c18c87961aa 100644 --- a/core/src/raw/ops.rs +++ b/core/src/raw/ops.rs @@ -19,6 +19,7 @@ //! //! By using ops, users can add more context for operation. +use std::collections::HashMap; use std::time::Duration; use flagset::FlagSet; @@ -537,24 +538,24 @@ impl OpStat { self.override_content_disposition.as_deref() } - /// Sets the cache-control header that should be send back by the remote read operation. + /// Sets the cache-control header that should be sent back by the remote read operation. pub fn with_override_cache_control(mut self, cache_control: &str) -> Self { self.override_cache_control = Some(cache_control.into()); self } - /// Returns the cache-control header that should be send back by the remote read operation. + /// Returns the cache-control header that should be sent back by the remote read operation. pub fn override_cache_control(&self) -> Option<&str> { self.override_cache_control.as_deref() } - /// Sets the content-type header that should be send back by the remote read operation. + /// Sets the content-type header that should be sent back by the remote read operation. pub fn with_override_content_type(mut self, content_type: &str) -> Self { self.override_content_type = Some(content_type.into()); self } - /// Returns the content-type header that should be send back by the remote read operation. + /// Returns the content-type header that should be sent back by the remote read operation. pub fn override_content_type(&self) -> Option<&str> { self.override_content_type.as_deref() } @@ -580,6 +581,7 @@ pub struct OpWrite { content_disposition: Option, cache_control: Option, executor: Option, + user_metadata: Option>, } impl OpWrite { @@ -677,6 +679,17 @@ impl OpWrite { } self } + + /// Set the user defined metadata of the op + pub fn with_user_metadata(mut self, metadata: HashMap) -> Self { + self.user_metadata = Some(metadata); + self + } + + /// Get the user defined metadata from the op + pub fn user_metadata(&self) -> Option<&HashMap> { + self.user_metadata.as_ref() + } } /// Args for `writer` operation. diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index b2679f1b2a3..61e548bf457 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -107,7 +107,7 @@ impl OssBuilder { self } - /// Set a endpoint for generating presigned urls. + /// Set an endpoint for generating presigned urls. /// /// You can offer a public endpoint like to return a presinged url for /// public accessors, along with an internal endpoint like @@ -416,6 +416,7 @@ impl Access for OssBackend { } else { Some(usize::MAX) }, + write_with_user_metadata: true, delete: true, copy: true, @@ -447,7 +448,9 @@ impl Access for OssBackend { match status { StatusCode::OK => { let headers = resp.headers(); - let mut meta = parse_into_metadata(path, headers)?; + let mut meta = + self.core + .parse_metadata(path, constants::X_OSS_META_PREFIX, resp.headers())?; if let Some(v) = parse_header_to_str(headers, "x-oss-version-id")? { meta.set_version(v); diff --git a/core/src/services/oss/core.rs b/core/src/services/oss/core.rs index e9de4cc0ab2..00c0e1b9559 100644 --- a/core/src/services/oss/core.rs +++ b/core/src/services/oss/core.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; use std::fmt::Write; @@ -28,10 +29,10 @@ use http::header::CONTENT_TYPE; use http::header::IF_MATCH; use http::header::IF_NONE_MATCH; use http::header::RANGE; -use http::HeaderName; use http::HeaderValue; use http::Request; use http::Response; +use http::{HeaderMap, HeaderName}; use reqsign::AliyunCredential; use reqsign::AliyunLoader; use reqsign::AliyunOssSigner; @@ -41,7 +42,7 @@ use serde::Serialize; use crate::raw::*; use crate::*; -mod constants { +pub mod constants { pub const X_OSS_SERVER_SIDE_ENCRYPTION: &str = "x-oss-server-side-encryption"; pub const X_OSS_SERVER_SIDE_ENCRYPTION_KEY_ID: &str = "x-oss-server-side-encryption-key-id"; @@ -49,6 +50,8 @@ mod constants { pub const RESPONSE_CONTENT_DISPOSITION: &str = "response-content-disposition"; pub const OSS_QUERY_VERSION_ID: &str = "versionId"; + + pub const X_OSS_META_PREFIX: &str = "x-oss-meta-"; } pub struct OssCore { @@ -156,6 +159,88 @@ impl OssCore { } req } + + fn insert_metadata_headers( + &self, + mut req: http::request::Builder, + size: Option, + args: &OpWrite, + ) -> Result { + req = req.header(CONTENT_LENGTH, size.unwrap_or_default()); + + if let Some(mime) = args.content_type() { + req = req.header(CONTENT_TYPE, mime); + } + + if let Some(pos) = args.content_disposition() { + req = req.header(CONTENT_DISPOSITION, pos); + } + + if let Some(cache_control) = args.cache_control() { + req = req.header(CACHE_CONTROL, cache_control); + } + + if let Some(user_metadata) = args.user_metadata() { + for (key, value) in user_metadata { + // before insert user defined metadata header, add prefix to the header name + if !self.check_user_metadata_key(key) { + return Err(Error::new( + ErrorKind::Unsupported, + "the format of the user metadata key is invalid, please refer the document", + )); + } + req = req.header(format!("{}{}", constants::X_OSS_META_PREFIX, key), value) + } + } + + Ok(req) + } + + // According to https://help.aliyun.com/zh/oss/developer-reference/putobject + // there are some limits in user defined metadata key + fn check_user_metadata_key(&self, key: &str) -> bool { + key.chars().all(|c| c.is_ascii_alphanumeric() || c == '-') + } + + /// parse_metadata will parse http headers(including standards http headers + /// and user defined metadata header) into Metadata. + /// + /// # Arguments + /// + /// * `user_metadata_prefix` is the prefix of user defined metadata key + /// + /// # Notes + /// + /// before return the user defined metadata, we'll strip the user_metadata_prefix from the key + pub fn parse_metadata( + &self, + path: &str, + user_metadata_prefix: &str, + headers: &HeaderMap, + ) -> Result { + let mut m = parse_into_metadata(path, headers)?; + + let data: HashMap = headers + .iter() + .filter_map(|(key, _)| { + if key.as_str().starts_with(user_metadata_prefix) { + if let Ok(Some(value)) = parse_header_to_str(headers, key) { + let key_str = key.to_string(); + let stripped_key = key_str + .strip_prefix(user_metadata_prefix) + .expect("strip prefix must succeed"); + return Some((stripped_key.to_string(), value.to_string())); + } + } + None + }) + .collect(); + if !data.is_empty() { + m.with_user_metadata(data); + } + + Ok(m) + } } impl OssCore { @@ -174,19 +259,7 @@ impl OssCore { let mut req = Request::put(&url); - req = req.header(CONTENT_LENGTH, size.unwrap_or_default()); - - if let Some(mime) = args.content_type() { - req = req.header(CONTENT_TYPE, mime); - } - - if let Some(pos) = args.content_disposition() { - req = req.header(CONTENT_DISPOSITION, pos); - } - - if let Some(cache_control) = args.cache_control() { - req = req.header(CACHE_CONTROL, cache_control) - } + req = self.insert_metadata_headers(req, size, args)?; // set sse headers req = self.insert_sse_headers(req); @@ -214,19 +287,7 @@ impl OssCore { let mut req = Request::post(&url); - req = req.header(CONTENT_LENGTH, size); - - if let Some(mime) = args.content_type() { - req = req.header(CONTENT_TYPE, mime); - } - - if let Some(pos) = args.content_disposition() { - req = req.header(CONTENT_DISPOSITION, pos); - } - - if let Some(cache_control) = args.cache_control() { - req = req.header(CACHE_CONTROL, cache_control) - } + req = self.insert_metadata_headers(req, Some(size), args)?; // set sse headers req = self.insert_sse_headers(req); @@ -593,7 +654,7 @@ impl OssCore { self.send(req).await } - /// Abort an on-going multipart upload. + /// Abort an ongoing multipart upload. /// reference docs https://www.alibabacloud.com/help/zh/oss/developer-reference/abortmultipartupload pub async fn oss_abort_multipart_upload( &self, diff --git a/core/src/types/capability.rs b/core/src/types/capability.rs index 45d53682a35..5b2068a63b9 100644 --- a/core/src/types/capability.rs +++ b/core/src/types/capability.rs @@ -95,6 +95,8 @@ pub struct Capability { pub write_with_content_disposition: bool, /// If operator supports write with cache control. pub write_with_cache_control: bool, + /// If operator supports write with user defined metadata + pub write_with_user_metadata: bool, /// write_multi_max_size is the max size that services support in write_multi. /// /// For example, AWS S3 supports 5GiB as max in write_multi. diff --git a/core/src/types/list.rs b/core/src/types/list.rs index 9c38b4fc050..fe70b00e7cb 100644 --- a/core/src/types/list.rs +++ b/core/src/types/list.rs @@ -90,8 +90,8 @@ pub struct Lister { /// use opendal::Entry; /// use opendal::Result; /// -/// assert_eq!(256, size_of::<(String, Result)>()); -/// assert_eq!(256, size_of::>()); +/// assert_eq!(304, size_of::<(String, Result)>()); +/// assert_eq!(304, size_of::>()); /// ``` /// /// So let's ignore this lint: diff --git a/core/src/types/metadata.rs b/core/src/types/metadata.rs index c29b37ec21d..9f67af88dc5 100644 --- a/core/src/types/metadata.rs +++ b/core/src/types/metadata.rs @@ -18,6 +18,7 @@ use chrono::prelude::*; use flagset::flags; use flagset::FlagSet; +use std::collections::HashMap; use crate::raw::*; use crate::*; @@ -45,6 +46,7 @@ pub struct Metadata { etag: Option, last_modified: Option>, version: Option, + user_metadata: Option>, } impl Metadata { @@ -71,6 +73,7 @@ impl Metadata { etag: None, content_disposition: None, version: None, + user_metadata: None, } } @@ -87,7 +90,7 @@ impl Metadata { self } - /// Check if there metadata already contains given metakey. + /// Check if the metadata already contains given metakey. pub(crate) fn contains_metakey(&self, metakey: impl Into>) -> bool { let input_metakey = metakey.into(); @@ -507,6 +510,21 @@ impl Metadata { self.metakey |= Metakey::Version; self } + + /// User defined metadata of this entry + /// + /// The prefix of the user defined metadata key(for example: in oss, it's x-oss-meta-) + /// is remove from the key + pub fn user_metadata(&self) -> Option<&HashMap> { + self.user_metadata.as_ref() + } + + /// Set user defined metadata of this entry + pub fn with_user_metadata(&mut self, data: HashMap) -> &mut Self { + self.user_metadata = Some(data); + self.metakey |= Metakey::UserMetaData; + self + } } flags! { @@ -548,5 +566,7 @@ flags! { LastModified, /// Key for version. Version, + /// Key for user metadata + UserMetaData, } } diff --git a/core/src/types/operator/operator_futures.rs b/core/src/types/operator/operator_futures.rs index 045f211263a..e83caf2768c 100644 --- a/core/src/types/operator/operator_futures.rs +++ b/core/src/types/operator/operator_futures.rs @@ -19,6 +19,7 @@ //! //! By using futures, users can add more options for operation. +use std::collections::HashMap; use std::future::IntoFuture; use std::ops::RangeBounds; use std::time::Duration; @@ -71,7 +72,7 @@ impl>> OperatorFuture { impl>> OperatorFuture { /// Change the operation's args. fn map(mut self, f: impl FnOnce(I) -> I) -> Self { - self.args = (f)(self.args); + self.args = f(self.args); self } } @@ -116,17 +117,17 @@ impl>> FutureStat { pub type FuturePresignStat = OperatorFuture<(OpStat, Duration), PresignedRequest, F>; impl>> FuturePresignStat { - /// Sets the content-disposition header that should be send back by the remote read operation. + /// Sets the content-disposition header that should be sent back by the remote read operation. pub fn override_content_disposition(self, v: &str) -> Self { self.map(|(args, dur)| (args.with_override_content_disposition(v), dur)) } - /// Sets the cache-control header that should be send back by the remote read operation. + /// Sets the cache-control header that should be sent back by the remote read operation. pub fn override_cache_control(self, v: &str) -> Self { self.map(|(args, dur)| (args.with_override_cache_control(v), dur)) } - /// Sets the content-type header that should be send back by the remote read operation. + /// Sets the content-type header that should be sent back by the remote read operation. pub fn override_content_type(self, v: &str) -> Self { self.map(|(args, dur)| (args.with_override_content_type(v), dur)) } @@ -148,17 +149,17 @@ impl>> FuturePresignStat { pub type FuturePresignRead = OperatorFuture<(OpRead, Duration), PresignedRequest, F>; impl>> FuturePresignRead { - /// Sets the content-disposition header that should be send back by the remote read operation. + /// Sets the content-disposition header that should be sent back by the remote read operation. pub fn override_content_disposition(self, v: &str) -> Self { self.map(|(args, dur)| (args.with_override_content_disposition(v), dur)) } - /// Sets the cache-control header that should be send back by the remote read operation. + /// Sets the cache-control header that should be sent back by the remote read operation. pub fn override_cache_control(self, v: &str) -> Self { self.map(|(args, dur)| (args.with_override_cache_control(v), dur)) } - /// Sets the content-type header that should be send back by the remote read operation. + /// Sets the content-type header that should be sent back by the remote read operation. pub fn override_content_type(self, v: &str) -> Self { self.map(|(args, dur)| (args.with_override_content_type(v), dur)) } @@ -322,6 +323,22 @@ impl>> FutureWrite { pub fn executor(self, executor: Executor) -> Self { self.map(|(args, options, bs)| (args.with_executor(executor), options, bs)) } + + /// Set the user defined metadata of the op + /// + /// ## Notes + /// + /// we don't need to include the user defined metadata prefix in the key + /// every service will handle it internally + pub fn user_metadata(self, data: impl IntoIterator) -> Self { + self.map(|(args, options, bs)| { + ( + args.with_user_metadata(HashMap::from_iter(data)), + options, + bs, + ) + }) + } } /// Future that generated by [`Operator::writer_with`]. @@ -384,6 +401,16 @@ impl>> FutureWriter { pub fn executor(self, executor: Executor) -> Self { self.map(|(args, options)| (args.with_executor(executor), options)) } + + /// Set the user defined metadata of the op + /// + /// ## Notes + /// + /// we don't need to include the user defined metadata prefix in the key + /// every service will handle it internally + pub fn user_metadata(self, data: impl IntoIterator) -> Self { + self.map(|(args, options)| (args.with_user_metadata(HashMap::from_iter(data)), options)) + } } /// Future that generated by [`Operator::delete_with`]. diff --git a/core/tests/behavior/async_write.rs b/core/tests/behavior/async_write.rs index 4bc59fe7583..8a146af65f3 100644 --- a/core/tests/behavior/async_write.rs +++ b/core/tests/behavior/async_write.rs @@ -26,6 +26,7 @@ use futures::StreamExt; use log::warn; use sha2::Digest; use sha2::Sha256; +use std::collections::HashMap; use crate::*; @@ -42,6 +43,7 @@ pub fn tests(op: &Operator, tests: &mut Vec) { test_write_with_cache_control, test_write_with_content_type, test_write_with_content_disposition, + test_write_with_user_metadata, test_writer_write, test_writer_write_with_overwrite, test_writer_write_with_concurrent, @@ -205,6 +207,29 @@ pub async fn test_write_with_content_disposition(op: Operator) -> Result<()> { Ok(()) } +/// write a single file with user defined metadata should succeed. +pub async fn test_write_with_user_metadata(op: Operator) -> Result<()> { + if !op.info().full_capability().write_with_user_metadata { + return Ok(()); + } + + let (path, content, _) = TEST_FIXTURE.new_file(op.clone()); + let target_user_metadata = vec![("location".to_string(), "everywhere".to_string())]; + op.write_with(&path, content) + .user_metadata(target_user_metadata.clone()) + .await?; + + let meta = op.stat(&path).await.expect("stat must succeed"); + let resp_meta = meta.user_metadata().expect("meta data must exist"); + + assert_eq!( + *resp_meta, + target_user_metadata.into_iter().collect::>() + ); + + Ok(()) +} + /// Delete existing file should succeed. pub async fn test_writer_abort(op: Operator) -> Result<()> { let (path, content, _) = TEST_FIXTURE.new_file(op.clone()); diff --git a/core/tests/behavior/utils.rs b/core/tests/behavior/utils.rs index 17124e24060..91deb1178b5 100644 --- a/core/tests/behavior/utils.rs +++ b/core/tests/behavior/utils.rs @@ -188,7 +188,7 @@ impl Fixture { let op = op.into(); let paths: Vec<_> = mem::take(self.paths.lock().unwrap().as_mut()); for path in paths.iter() { - // We try our best to cleanup fixtures, but won't panic if failed. + // We try our best to clean up fixtures, but won't panic if failed. let _ = op.delete(path).await.map_err(|err| { log::error!("fixture cleanup path {path} failed: {:?}", err); });