From 18ae7ef33aa274d60747c896334df446804e82ae Mon Sep 17 00:00:00 2001 From: hopeyen Date: Mon, 15 Apr 2024 11:18:27 -0700 Subject: [PATCH] fix: fail by invalid location, better server error displayed --- file-exchange/benches/read_chunk.rs | 5 +- file-exchange/src/manifest/store.rs | 21 ++-- file-service/src/admin.rs | 138 ++++++++++++++------------ file-service/src/file_server/range.rs | 2 +- 4 files changed, 93 insertions(+), 73 deletions(-) diff --git a/file-exchange/benches/read_chunk.rs b/file-exchange/benches/read_chunk.rs index 9a08ab1..2f9f464 100644 --- a/file-exchange/benches/read_chunk.rs +++ b/file-exchange/benches/read_chunk.rs @@ -5,6 +5,7 @@ use file_exchange::{ config::{LocalDirectory, StorageMethod}, manifest::store::Store, }; +use object_store::path::Path; use rand::Rng; use std::{fs::File, ops::Range, path::PathBuf}; @@ -23,7 +24,7 @@ fn read_chunk_benchmark(c: &mut Criterion) { })) .unwrap(), ); - let file_name = black_box("0017234600.dbin.zst"); + let file_loc = black_box(Path::from("0017234600.dbin.zst")); let file = black_box(File::open(file_path).unwrap()); let file_size: usize = black_box( file.metadata() @@ -36,7 +37,7 @@ fn read_chunk_benchmark(c: &mut Criterion) { c.bench_function("read_chunk", |b| { let range = black_box(random_file_range(file_size)); b.to_async(FuturesExecutor) - .iter(|| store.range_read(file_name, &range)) + .iter(|| store.range_read(&file_loc, &range)) }); } diff --git a/file-exchange/src/manifest/store.rs b/file-exchange/src/manifest/store.rs index a52c9db..8dbe3b4 100644 --- a/file-exchange/src/manifest/store.rs +++ b/file-exchange/src/manifest/store.rs @@ -103,10 +103,10 @@ impl Store { .cloned() } - pub async fn range_read(&self, file_name: &str, range: &Range) -> Result { + pub async fn range_read(&self, file_loc: &Path, range: &Range) -> Result { Ok(self .store - .get_range(&Path::from(file_name), range.to_owned()) + .get_range(file_loc, range.to_owned()) .await .unwrap()) } @@ -241,7 +241,7 @@ impl Store { pub async fn validate_local_bundle(&self, local: &LocalBundle) -> Result<&Self, Error> { tracing::trace!( bundle = tracing::field::debug(&local), - "Read and verify bundle. This may cause a long initialization time." + "Read and verify bundle. This might take a while based on bundle size." ); // Read all files in bundle to verify locally. This may cause a long initialization time @@ -263,14 +263,20 @@ impl Store { // read file by file_manifest.file_name let meta_info = &file.meta_info; let file_manifest = &file.file_manifest; - // let mut file_path = self.local_path.clone(); - // file_path.push(meta_info.name.clone()); tracing::trace!( // file_path = tracing::field::debug(&file_path), file_prefix = tracing::field::debug(&prefix), file_manifest = tracing::field::debug(&file_manifest), "Verify file" ); + let metadata = + self.find_object(&meta_info.name, prefix) + .await + .ok_or(Error::DataUnavailable(format!( + "Cannot find object {} with prefix {:#?} in store's main directory path", + meta_info.name, prefix, + )))?; + tracing::trace!(file_meta = tracing::field::debug(&metadata), "Found file"); // loop through file manifest byte range let chunk_ops: Vec<_> = (0..(file_manifest.total_bytes / file_manifest.chunk_size + 1)) @@ -288,13 +294,12 @@ impl Store { }) .collect(); - let file_name = meta_info.name.clone(); for (range, chunk_hash) in chunk_ops { - let chunk_data = self.range_read(&file_name, &range).await?; + let chunk_data = self.range_read(&metadata.location, &range).await?; // verify chunk if !verify_chunk(&chunk_data, &chunk_hash) { tracing::error!( - file = tracing::field::debug(&file_name), + file = tracing::field::debug(&metadata), chunk_hash = tracing::field::debug(&chunk_hash), "Cannot locally verify the serving file" ); diff --git a/file-service/src/admin.rs b/file-service/src/admin.rs index 8c13a2d..fa5f554 100644 --- a/file-service/src/admin.rs +++ b/file-service/src/admin.rs @@ -127,7 +127,7 @@ impl StatusMutation { ctx: &Context<'_>, deployment: String, location: String, - ) -> Result { + ) -> Result { if ctx.data_opt::() != ctx .data_unchecked::() @@ -135,13 +135,9 @@ impl StatusMutation { .admin_auth_token .as_ref() { - return Err(anyhow::anyhow!(format!( - "Failed to authenticate: {:#?} (admin: {:#?}", + return Err(ServerError::InvalidAuthentication(format!( + "Failed to authenticate: {:#?}", ctx.data_opt::(), - ctx.data_unchecked::() - .state - .admin_auth_token - .as_ref() ))); } let bundle = match read_bundle( @@ -151,7 +147,7 @@ impl StatusMutation { .await { Ok(s) => s, - Err(e) => return Err(anyhow::anyhow!(e.to_string(),)), + Err(e) => return Err(ServerError::RequestBodyError(e.to_string())), }; let local_bundle = LocalBundle { bundle: bundle.clone(), @@ -162,7 +158,8 @@ impl StatusMutation { .state .store .validate_local_bundle(&local_bundle) - .await; + .await + .map_err(|e| ServerError::ContextError(e.to_string()))?; ctx.data_unchecked::() .state @@ -180,7 +177,7 @@ impl StatusMutation { ctx: &Context<'_>, deployments: Vec, locations: Vec, - ) -> Result, anyhow::Error> { + ) -> Result, ServerError> { if ctx.data_opt::() != ctx .data_unchecked::() @@ -188,7 +185,9 @@ impl StatusMutation { .admin_auth_token .as_ref() { - return Err(anyhow::anyhow!("Failed to authenticate")); + return Err(ServerError::InvalidAuthentication( + "Failed to authenticate".to_string(), + )); } let client = ctx.data_unchecked::().state.client.clone(); let bundle_ref = ctx.data_unchecked::().state.bundles.clone(); @@ -204,7 +203,7 @@ impl StatusMutation { let bundle = read_bundle(&client.clone(), deployment) .await - .map_err(|e| anyhow::anyhow!("{}", e))?; + .map_err(|e| ServerError::RequestBodyError(e.to_string()))?; let local_bundle = LocalBundle { bundle: bundle.clone(), @@ -215,30 +214,30 @@ impl StatusMutation { .state .store .validate_local_bundle(&local_bundle) - .await; + .await + .map_err(|e| ServerError::ContextError(e.to_string()))?; bundle_ref .clone() .lock() .await .insert(bundle.ipfs_hash.clone(), local_bundle); - Ok::<_, anyhow::Error>(GraphQlBundle::from(bundle)) + Ok::<_, crate::admin::ServerError>(GraphQlBundle::from(bundle)) } }) .collect::>(); // Since collect() gathers futures, we need to resolve them. You can use `try_join_all` for this. - let resolved_bundles: Result, _> = - futures::future::try_join_all(bundles).await; + let resolved_bundles: Vec = futures::future::try_join_all(bundles).await?; - Ok(resolved_bundles.unwrap_or_default()) + Ok(resolved_bundles) } async fn remove_bundle( &self, ctx: &Context<'_>, deployment: String, - ) -> Result, anyhow::Error> { + ) -> Result, ServerError> { if ctx.data_opt::() != ctx .data_unchecked::() @@ -246,7 +245,10 @@ impl StatusMutation { .admin_auth_token .as_ref() { - return Err(anyhow::anyhow!("Failed to authenticate")); + return Err(ServerError::InvalidAuthentication(format!( + "Failed to authenticate: {:#?}", + ctx.data_opt::(), + ))); } let bundle = ctx @@ -265,7 +267,7 @@ impl StatusMutation { &self, ctx: &Context<'_>, deployments: Vec, - ) -> Result, anyhow::Error> { + ) -> Result, ServerError> { if ctx.data_opt::() != ctx .data_unchecked::() @@ -273,7 +275,10 @@ impl StatusMutation { .admin_auth_token .as_ref() { - return Err(anyhow::anyhow!("Failed to authenticate")); + return Err(ServerError::InvalidAuthentication(format!( + "Failed to authenticate: {:#?}", + ctx.data_opt::(), + ))); } let bundles = deployments @@ -286,17 +291,16 @@ impl StatusMutation { .await .remove(deployment) .map(|b| GraphQlBundle::from(b.bundle)) - .ok_or(anyhow::anyhow!(format!( + .ok_or(ServerError::ContextError(format!( "Deployment not found: {}", deployment ))) }) .collect::>(); - let removed_bundles: Result, _> = - futures::future::try_join_all(bundles).await; + let removed_bundles: Vec = futures::future::try_join_all(bundles).await?; - removed_bundles + Ok(removed_bundles) } // Add a file @@ -305,7 +309,7 @@ impl StatusMutation { ctx: &Context<'_>, deployment: String, file_name: String, - ) -> Result { + ) -> Result { if ctx.data_opt::() != ctx .data_unchecked::() @@ -313,13 +317,9 @@ impl StatusMutation { .admin_auth_token .as_ref() { - return Err(anyhow::anyhow!(format!( - "Failed to authenticate: {:#?} (admin: {:#?}", + return Err(ServerError::InvalidAuthentication(format!( + "Failed to authenticate: {:#?}", ctx.data_opt::(), - ctx.data_unchecked::() - .state - .admin_auth_token - .as_ref() ))); } let file_manifest = match fetch_file_manifest_from_ipfs( @@ -329,7 +329,7 @@ impl StatusMutation { .await { Ok(s) => s, - Err(e) => return Err(anyhow::anyhow!(e.to_string(),)), + Err(e) => return Err(ServerError::ContextError(e.to_string())), }; let meta = FileManifestMeta { @@ -339,12 +339,12 @@ impl StatusMutation { }, file_manifest, }; - let _ = ctx - .data_unchecked::() + ctx.data_unchecked::() .state .store .read_and_validate_file(&meta, None) - .await; + .await + .map_err(|e| ServerError::ContextError(e.to_string()))?; ctx.data_unchecked::() .state .files @@ -361,7 +361,7 @@ impl StatusMutation { ctx: &Context<'_>, deployments: Vec, file_names: Vec, - ) -> Result, anyhow::Error> { + ) -> Result, ServerError> { if ctx.data_opt::() != ctx .data_unchecked::() @@ -369,7 +369,10 @@ impl StatusMutation { .admin_auth_token .as_ref() { - return Err(anyhow::anyhow!("Failed to authenticate")); + return Err(ServerError::InvalidAuthentication(format!( + "Failed to authenticate: {:#?}", + ctx.data_opt::(), + ))); } let client = ctx.data_unchecked::().state.client.clone(); let file_ref = ctx.data_unchecked::().state.files.clone(); @@ -385,7 +388,7 @@ impl StatusMutation { let file_manifest = fetch_file_manifest_from_ipfs(&client.clone(), deployment) .await - .map_err(|e| anyhow::anyhow!("{}", e))?; + .map_err(|e| ServerError::ContextError(e.to_string()))?; let meta = FileManifestMeta { meta_info: FileMetaInfo { @@ -394,19 +397,19 @@ impl StatusMutation { }, file_manifest, }; - let _ = ctx - .data_unchecked::() + ctx.data_unchecked::() .state .store .read_and_validate_file(&meta, None) - .await; + .await + .map_err(|e| ServerError::ContextError(e.to_string()))?; file_ref .clone() .lock() .await .insert(deployment.clone(), meta.clone()); - Ok::<_, anyhow::Error>(GraphQlFileManifestMeta::from(meta)) + Ok::<_, crate::admin::ServerError>(GraphQlFileManifestMeta::from(meta)) } }) .collect::>(); @@ -422,7 +425,7 @@ impl StatusMutation { &self, ctx: &Context<'_>, deployment: String, - ) -> Result, anyhow::Error> { + ) -> Result, ServerError> { if ctx.data_opt::() != ctx .data_unchecked::() @@ -430,7 +433,10 @@ impl StatusMutation { .admin_auth_token .as_ref() { - return Err(anyhow::anyhow!("Failed to authenticate")); + return Err(ServerError::InvalidAuthentication(format!( + "Failed to authenticate: {:#?}", + ctx.data_opt::(), + ))); } let file = ctx @@ -449,7 +455,7 @@ impl StatusMutation { &self, ctx: &Context<'_>, deployments: Vec, - ) -> Result, anyhow::Error> { + ) -> Result, ServerError> { if ctx.data_opt::() != ctx .data_unchecked::() @@ -457,7 +463,10 @@ impl StatusMutation { .admin_auth_token .as_ref() { - return Err(anyhow::anyhow!("Failed to authenticate")); + return Err(ServerError::InvalidAuthentication(format!( + "Failed to authenticate: {:#?}", + ctx.data_opt::(), + ))); } let files = deployments @@ -470,7 +479,7 @@ impl StatusMutation { .await .remove(deployment) .map(GraphQlFileManifestMeta::from) - .ok_or(anyhow::anyhow!(format!( + .ok_or(ServerError::ContextError(format!( "Deployment not found: {}", deployment ))) @@ -495,7 +504,7 @@ impl PriceMutation { ctx: &Context<'_>, deployment: String, price_per_byte: f64, - ) -> Result { + ) -> Result { if ctx.data_opt::() != ctx .data_unchecked::() @@ -503,13 +512,9 @@ impl PriceMutation { .admin_auth_token .as_ref() { - return Err(anyhow::anyhow!(format!( - "Failed to authenticate: {:#?} (admin: {:#?}", + return Err(ServerError::InvalidAuthentication(format!( + "Failed to authenticate: {:#?}", ctx.data_opt::(), - ctx.data_unchecked::() - .state - .admin_auth_token - .as_ref() ))); } @@ -532,7 +537,7 @@ impl PriceMutation { ctx: &Context<'_>, deployments: Vec, prices: Vec, - ) -> Result, anyhow::Error> { + ) -> Result, ServerError> { if ctx.data_opt::() != ctx .data_unchecked::() @@ -540,7 +545,10 @@ impl PriceMutation { .admin_auth_token .as_ref() { - return Err(anyhow::anyhow!("Failed to authenticate")); + return Err(ServerError::InvalidAuthentication(format!( + "Failed to authenticate: {:#?}", + ctx.data_opt::(), + ))); } let price_ref = ctx.data_unchecked::().state.prices.clone(); let prices = deployments @@ -576,7 +584,7 @@ impl PriceMutation { &self, ctx: &Context<'_>, deployment: String, - ) -> Result, anyhow::Error> { + ) -> Result, ServerError> { if ctx.data_opt::() != ctx .data_unchecked::() @@ -584,7 +592,10 @@ impl PriceMutation { .admin_auth_token .as_ref() { - return Err(anyhow::anyhow!("Failed to authenticate")); + return Err(ServerError::InvalidAuthentication(format!( + "Failed to authenticate: {:#?}", + ctx.data_opt::(), + ))); } let bundle = ctx @@ -607,7 +618,7 @@ impl PriceMutation { &self, ctx: &Context<'_>, deployments: Vec, - ) -> Result, anyhow::Error> { + ) -> Result, ServerError> { if ctx.data_opt::() != ctx .data_unchecked::() @@ -615,7 +626,10 @@ impl PriceMutation { .admin_auth_token .as_ref() { - return Err(anyhow::anyhow!("Failed to authenticate")); + return Err(ServerError::InvalidAuthentication(format!( + "Failed to authenticate: {:#?}", + ctx.data_opt::(), + ))); } let prices = deployments @@ -631,7 +645,7 @@ impl PriceMutation { deployment: deployment.to_string(), price_per_byte: price, }) - .ok_or(anyhow::anyhow!(format!( + .ok_or(ServerError::ContextError(format!( "Deployment not found: {}", deployment ))) diff --git a/file-service/src/file_server/range.rs b/file-service/src/file_server/range.rs index 3ef7ece..6b7a54e 100644 --- a/file-service/src/file_server/range.rs +++ b/file-service/src/file_server/range.rs @@ -89,7 +89,7 @@ pub async fn serve_file_range( start, end: start + length, }; - let content = store.range_read(file_name, &range).await?; + let content = store.range_read(&metadata.location, &range).await?; let transferred_bytes = crate::metrics::TRANSFERRED_BYTES.with_label_values(&[file_name]); transferred_bytes.set(length.try_into().unwrap());