Skip to content

Commit

Permalink
feat: individual price mgmt, merge status/price query/mutation
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Mar 28, 2024
1 parent ae24c82 commit 5e67fd4
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 11 deletions.
187 changes: 181 additions & 6 deletions file-service/src/admin.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use std::collections::HashMap;
use std::sync::Arc;

use async_graphql::{Context, EmptySubscription, Object, Schema};
use async_graphql::{Context, EmptySubscription, MergedObject, Object, Schema};
use async_graphql_axum::{GraphQLRequest, GraphQLResponse};
use axum::{extract::State, routing::get, Router, Server};
use http::HeaderMap;
use tokio::sync::Mutex;

use crate::file_server::status::{GraphQlBundle, StatusQuery};
use crate::file_server::util::graphql_playground;
use crate::file_server::{FileServiceError, ServerContext};
use crate::file_server::{
cost::{GraphQlCostModel, PriceQuery},
status::{GraphQlBundle, StatusQuery},
util::graphql_playground,
FileServiceError, ServerContext,
};
use file_exchange::{
errors::{Error, ServerError},
manifest::{
Expand All @@ -21,6 +24,7 @@ use file_exchange::{
pub struct AdminState {
pub client: IpfsClient,
pub bundles: Arc<Mutex<HashMap<String, LocalBundle>>>,
pub prices: Arc<Mutex<HashMap<String, f64>>>,
pub admin_auth_token: Option<String>,
pub admin_schema: AdminSchema,
}
Expand All @@ -36,10 +40,21 @@ impl AdminContext {
}
}

pub type AdminSchema = Schema<StatusQuery, StatusMutation, EmptySubscription>;
#[derive(MergedObject, Default)]
pub struct MergedQuery(StatusQuery, PriceQuery);

#[derive(MergedObject, Default)]
pub struct MergedMutation(StatusMutation, PriceMutation);

pub type AdminSchema = Schema<MergedQuery, MergedMutation, EmptySubscription>;

pub async fn build_schema() -> AdminSchema {
Schema::build(StatusQuery, StatusMutation, EmptySubscription).finish()
Schema::build(
MergedQuery(StatusQuery, PriceQuery),
MergedMutation(StatusMutation, PriceMutation),
EmptySubscription,
)
.finish()
}

fn get_token_from_headers(headers: &HeaderMap) -> Option<String> {
Expand Down Expand Up @@ -68,6 +83,7 @@ pub fn serve_admin(context: ServerContext) {
AdminState {
client: context.state.client.clone(),
bundles: context.state.bundles.clone(),
prices: context.state.prices.clone(),
admin_auth_token: context.state.admin_auth_token.clone(),
admin_schema: build_schema().await,
}
Expand Down Expand Up @@ -267,3 +283,162 @@ impl StatusMutation {
removed_bundles
}
}

#[derive(Default)]
pub struct PriceMutation;

#[Object]
impl PriceMutation {
// Set price for a deployment
async fn set_price(
&self,
ctx: &Context<'_>,
deployment: String,
price_per_byte: f64,
) -> Result<GraphQlCostModel, anyhow::Error> {
if ctx.data_opt::<String>()
!= ctx
.data_unchecked::<AdminContext>()
.state
.admin_auth_token
.as_ref()
{
return Err(anyhow::anyhow!(format!(
"Failed to authenticate: {:#?} (admin: {:#?}",
ctx.data_opt::<String>(),
ctx.data_unchecked::<AdminContext>()
.state
.admin_auth_token
.as_ref()
)));
}

ctx.data_unchecked::<AdminContext>()
.state
.prices
.lock()
.await
.insert(deployment.clone(), price_per_byte);

Ok(GraphQlCostModel {
deployment,
price_per_byte,
})
}

// Add multiple bundles
async fn set_prices(
&self,
ctx: &Context<'_>,
deployments: Vec<String>,
prices: Vec<f64>,
) -> Result<Vec<GraphQlCostModel>, anyhow::Error> {
if ctx.data_opt::<String>()
!= ctx
.data_unchecked::<AdminContext>()
.state
.admin_auth_token
.as_ref()
{
return Err(anyhow::anyhow!("Failed to authenticate"));
}
let price_ref = ctx.data_unchecked::<AdminContext>().state.prices.clone();
let prices = deployments
.iter()
.zip(prices)
.map(|(deployment, price)| {
let price_ref = price_ref.clone();

async move {
price_ref
.clone()
.lock()
.await
.insert(deployment.clone(), price);

Ok::<_, anyhow::Error>(GraphQlCostModel {
deployment: deployment.to_string(),
price_per_byte: price,
})
}
})
.collect::<Vec<_>>();

// Since collect() gathers futures, we need to resolve them. You can use `try_join_all` for this.
let resolved_prices: Result<Vec<GraphQlCostModel>, _> =
futures::future::try_join_all(prices).await;

Ok(resolved_prices.unwrap_or_default())
}

async fn remove_price(
&self,
ctx: &Context<'_>,
deployment: String,
) -> Result<Option<GraphQlCostModel>, anyhow::Error> {
if ctx.data_opt::<String>()
!= ctx
.data_unchecked::<AdminContext>()
.state
.admin_auth_token
.as_ref()
{
return Err(anyhow::anyhow!("Failed to authenticate"));
}

let bundle = ctx
.data_unchecked::<AdminContext>()
.state
.prices
.lock()
.await
.remove(&deployment)
.map(|price| GraphQlCostModel {
deployment,
price_per_byte: price,
});

Ok(bundle)
}

async fn remove_prices(
&self,
ctx: &Context<'_>,
deployments: Vec<String>,
) -> Result<Vec<GraphQlCostModel>, anyhow::Error> {
if ctx.data_opt::<String>()
!= ctx
.data_unchecked::<AdminContext>()
.state
.admin_auth_token
.as_ref()
{
return Err(anyhow::anyhow!("Failed to authenticate"));
}

let prices = deployments
.iter()
.map(|deployment| async move {
ctx.data_unchecked::<AdminContext>()
.state
.prices
.lock()
.await
.remove(deployment)
.map(|price| GraphQlCostModel {
deployment: deployment.to_string(),
price_per_byte: price,
})
.ok_or(anyhow::anyhow!(format!(
"Deployment not found: {}",
deployment
)))
})
.collect::<Vec<_>>();

let removed_prices: Result<Vec<GraphQlCostModel>, _> =
futures::future::try_join_all(prices).await;

removed_prices
}
}
8 changes: 4 additions & 4 deletions file-service/src/file_server/cost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ pub struct GraphQlCostModel {
}

#[derive(Default)]
pub struct Query;
pub struct PriceQuery;

#[Object]
impl Query {
impl PriceQuery {
/// Provide an array of cost model to the queried deployment whether it is served or not
async fn cost_models(
&self,
Expand Down Expand Up @@ -70,10 +70,10 @@ impl Query {
}
}

pub type CostSchema = Schema<Query, EmptyMutation, EmptySubscription>;
pub type CostSchema = Schema<PriceQuery, EmptyMutation, EmptySubscription>;

pub async fn build_schema() -> CostSchema {
Schema::build(Query, EmptyMutation, EmptySubscription).finish()
Schema::build(PriceQuery, EmptyMutation, EmptySubscription).finish()
}

pub async fn cost(State(context): State<ServerContext>, req: GraphQLRequest) -> GraphQLResponse {
Expand Down
4 changes: 3 additions & 1 deletion file-service/src/file_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ pub struct ServerState {
pub client: IpfsClient,
pub operator_public_key: String,
pub bundles: Arc<Mutex<HashMap<String, LocalBundle>>>, // Keyed by IPFS hash, valued by Bundle and Local path
pub admin_auth_token: Option<String>, // Add bearer prefix
pub prices: Arc<Mutex<HashMap<String, f64>>>, // Keyed by IPFS hash, valued by price per byte
pub admin_auth_token: Option<String>, // Add bearer prefix
pub config: Config,
pub database: PgPool,
pub cost_schema: crate::file_server::cost::CostSchema,
Expand Down Expand Up @@ -116,6 +117,7 @@ pub async fn initialize_server_context(config: Config) -> Result<ServerContext,
config: config.clone(),
client: client.clone(),
bundles: Arc::new(Mutex::new(HashMap::new())),
prices: Arc::new(Mutex::new(HashMap::new())),
admin_auth_token,
operator_public_key: public_key(&config.common.indexer.operator_mnemonic)
.expect("Failed to initiate with operator wallet"),
Expand Down

0 comments on commit 5e67fd4

Please sign in to comment.