Skip to content

Commit

Permalink
refactor: client uses subfile finder
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Dec 13, 2023
1 parent 5c024c5 commit 40df391
Showing 1 changed file with 49 additions and 46 deletions.
95 changes: 49 additions & 46 deletions subfile-exchange/src/subfile_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,9 @@ impl SubfileDownloader {
pub fn new(ipfs_client: IpfsClient, args: DownloaderArgs) -> Self {
SubfileDownloader {
ipfs_client: ipfs_client.clone(),
//TODO: consider a more advanced config such as if a proxy should be used for the client
http_client: reqwest::Client::new(),
ipfs_hash: args.ipfs_hash,
_gateway_url: args.gateway_url,
//TODO: Check for healthy indexers in args.indexer_endpoints
static_endpoints: args.indexer_endpoints,
output_dir: args.output_dir,
free_query_auth_token: args.free_query_auth_token,
Expand Down Expand Up @@ -97,50 +95,7 @@ impl SubfileDownloader {
);

// check subfile availability from gateway/indexer_endpoints
let blocklist = self.indexer_blocklist.lock().unwrap().clone();
let endpoints = &self
.static_endpoints
.iter()
.filter(|url| !blocklist.contains(*url))
.cloned()
.collect::<Vec<_>>();
self.update_indexer_urls(
&self
.subfile_finder
.subfile_availabilities(&self.ipfs_hash, endpoints)
.await,
);
let indexer_endpoints = self.indexer_urls.lock().unwrap().clone();
if indexer_endpoints.is_empty() {
tracing::warn!(
subfile_hash = &self.ipfs_hash,
"No endpoint satisfy the subfile requested, sieve through available subfiles for individual files"
);

// check files availability from gateway/indexer_endpoints
match self
.subfile_finder
.file_discovery(&self.ipfs_hash, endpoints)
.await
{
Ok(map) => {
let msg = format!(
"Files available on these available subfiles: {:#?}",
tracing::field::debug(&map.lock().await),
);
return Err(Error::DataUnavilable(msg));
}
Err(e) => {
let msg = format!(
"Cannot match the files: {:?}, {:?}",
tracing::field::debug(&self.indexer_urls.lock().unwrap()),
tracing::field::debug(&e),
);
tracing::error!(msg);
return Err(Error::DataUnavilable(msg));
}
}
};
let _ = self.availbility_check().await;

// Loop through chunk files for downloading
let mut incomplete_files = vec![];
Expand Down Expand Up @@ -295,6 +250,54 @@ impl SubfileDownloader {
auth_token: self.free_query_auth_token.clone(),
})
}

async fn availbility_check(&self) -> Result<(), Error> {
let blocklist = self.indexer_blocklist.lock().unwrap().clone();
let endpoints = &self
.static_endpoints
.iter()
.filter(|url| !blocklist.contains(*url))
.cloned()
.collect::<Vec<_>>();
self.update_indexer_urls(
&self
.subfile_finder
.subfile_availabilities(&self.ipfs_hash, endpoints)
.await,
);
let indexer_endpoints = self.indexer_urls.lock().unwrap().clone();
if indexer_endpoints.is_empty() {
tracing::warn!(
subfile_hash = &self.ipfs_hash,
"No endpoint satisfy the subfile requested, sieve through available subfiles for individual files"
);

// check files availability from gateway/indexer_endpoints
match self
.subfile_finder
.file_discovery(&self.ipfs_hash, endpoints)
.await
{
Ok(map) => {
let msg = format!(
"Files available on these available subfiles: {:#?}",
tracing::field::debug(&map.lock().await),
);
return Err(Error::DataUnavilable(msg));
}
Err(e) => {
let msg = format!(
"Cannot match the files: {:?}, {:?}",
tracing::field::debug(&self.indexer_urls.lock().unwrap()),
tracing::field::debug(&e),
);
tracing::error!(msg);
return Err(Error::DataUnavilable(msg));
}
}
};
Ok(())
}
}

#[derive(Debug, Clone)]
Expand Down

0 comments on commit 40df391

Please sign in to comment.