Skip to content

Commit

Permalink
coding like a none ordinary coder
Browse files Browse the repository at this point in the history
  • Loading branch information
wildonion committed Aug 22, 2024
1 parent e12848f commit dde6b5f
Show file tree
Hide file tree
Showing 19 changed files with 1,643 additions and 608 deletions.
514 changes: 297 additions & 217 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions hooper/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>
std::env::var("DATABASE_URL").unwrap()
).await.unwrap();
let fresh = args.fresh;
if fresh{
Migrator::fresh(&connection).await.unwrap();
Migrator::refresh(&connection).await.unwrap();
} else{
Migrator::up(&connection, None).await.unwrap(); // executing database tasks like creating tables on startup
}
// if fresh{
// Migrator::fresh(&connection).await.unwrap();
// Migrator::refresh(&connection).await.unwrap();
// } else{
// Migrator::up(&connection, None).await.unwrap(); // executing database tasks like creating tables on startup
// }

Migrator::status(&connection).await.unwrap();
// Migrator::status(&connection).await.unwrap();

/* ******************************* IMPORTANT *******************************
there must be some sleep or loop{} to keeps the app awake
Expand Down
Binary file modified src/.DS_Store
Binary file not shown.
87 changes: 41 additions & 46 deletions src/apis/v1/http/hoop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use middlewares::check_token::check_token;
use models::{event::{EventQuery, EventType, HoopEventForm}, user::UserData};
use salvo::{http::form::FormData, Error};
use serde::{Deserialize, Serialize};
use models::server::Response as HoopoeResponse;
use crate::*;


Expand Down Expand Up @@ -41,22 +42,58 @@ pub async fn add_hoop(
hoop_info: FormBody<HoopEventForm>,
){


// trying to get the user data in here
// extracting the required user_data from the depot, this gets filled
// inside the middleware setup, if we're here means the middleware
// has injected the right data into the depot.
let user_data = depot.get::<UserData>("user_data").unwrap();


// extracting necessary structures from the app context
let app_ctx = depot.obtain::<Option<AppContext>>().unwrap(); // extracting shared app context
let redis_pool = app_ctx.clone().unwrap().app_storage.clone().unwrap().get_redis_pool().await.unwrap();
let sea_orm_pool = app_ctx.clone().unwrap().app_storage.clone().unwrap().get_seaorm_pool().await.unwrap();
let actors = app_ctx.clone().unwrap().actors.unwrap();
let hoop_mutator_actor_controller = actors.clone().cqrs_actors.mutators.hoop_mutator_actor;
let mut redis_conn = redis_pool.get().await.unwrap();


let cover = req.file("cover").await.unwrap();
let decoded = serde_json::from_str::
let inv_info = serde_json::from_str::
<Vec<std::collections::HashMap<String, i64>>>
(&hoop_info.invitations.clone()).unwrap();


// setting up the exp time of the event inside the redis as an expirable key
// later on the scheduler actor can subscribe to redis expire channel
let hoop_end_time = hoop_info.end_at.parse::<i64>().unwrap();
let hoop_start_time = hoop_info.started_at.parse::<i64>().unwrap();
if hoop_end_time < hoop_start_time{
// reject the request
let server_time = format!("{}", chrono::Local::now().to_string());
res.status_code = Some(StatusCode::NOT_ACCEPTABLE);
res.render(Json(
HoopoeResponse::<&[u8]>{
data: &[],
message: "invalid end time",
is_err: true,
status: StatusCode::NOT_ACCEPTABLE.as_u16(),
meta: Some(
serde_json::json!({
"server_time": server_time
})
)
}
));
}

// set an exp key to check the end time of the event every
// 10 mins without sending io calls to db
let now = chrono::Local::now();
let ten_mins_later = chrono::Local::now() + chrono::Duration::minutes(10);
let duration_in_seconds = ten_mins_later.timestamp() - now.timestamp();
let key = format!("hoop_{}_at_{}", hoop_info.title, hoop_info.started_at);
let _: () = redis_conn.set_ex(key, &user_data.username, duration_in_seconds as u64).await.unwrap();

let etype = match hoop_info.etype.as_str(){
"social" => EventType::SocialGathering,
"proposal" => EventType::Proposal,
Expand Down Expand Up @@ -110,58 +147,16 @@ pub async fn get_hoop(
let sea_orm_pool = app_ctx.clone().unwrap().app_storage.clone().unwrap().get_seaorm_pool().await.unwrap();
let actors = app_ctx.clone().unwrap().actors.unwrap();
let hoop_mutator_actor_controller = actors.clone().cqrs_actors.mutators.hoop_mutator_actor;
let redis_conn = redis_pool.get().await.unwrap();


// trying to get the user data in here
let user_data = depot.get::<UserData>("user_data").unwrap();

/* --------------------------------------------------------------------------------------------------------
event scheduler (check the endTime of the event constantly to close the event)
resource-intensive with regular checking in a loop{}:
1 - an actor task or cron scheduler to check the end time of the hoop constantly to update the is_finished field
2 - loop tokio spawn interval tick then include!{} hoop_scheduler.rs to call the method
the optimal and none intensive solution would be using of key space notifications
which allows allow clients to subscribe to Pub/Sub channels in order to receive
events affecting the Redis data set in some wayin Redis, however the followings
are the steps must be taken to complete the logic. We're consuming that we have a
user_id as the key and some value with an exportable key for 10mins later
after login time.
let login_time = chrono::Local::now();
let ten_mins_later = login_time + chrono::Duration::minutes(10);
redis_conn.set_exp(user_id, ten_mins_later);
1 - configuring Redis to enable key space notifications
2 - when the key expires Redis publish its event to a prebuilt expiration channel
2 - we then subscribe to the __keyevent@0__:expired channel
3 - we'll receive the event from the channel
4 - trigger the notification for the related user id (expired key)
5 - publish triggered notif to rmq producer using notif_borker_actor
6 - consume notif from rmq broker to cache on redis and store in db for future short pollings
7 - send received notif to mpsc sender of the ws server
8 - receive the notif from the mpsc channel inside ws server setup
at this time to send notif to client we can either
cache the notif on Redis or store it on db, allows clients use short polling approach to
fetch the notif through an interval process.
or another approach which is more resource intensive for push notification strategies
is by using a channel (MPSC) to send the notif to a websocket server actor configuration
thread from there send to the ws peer actor in realtime.
the ws setup could be an actor based setup which is more simpler to send messages to
peer sessions from ws server through actor concepts like:
atomic syncing with mutex rwlock and channels, os/async io threads or tasks,
select, mailbox mpsc channels and task scheduler interval.
*/

// get live hoops (those ones that are not finished or expired)
// get all owner hoops
// get all user joined hoops

let query_params = req.parse_queries::<EventQuery>().unwrap();

res.render("developing...")

}

#[endpoint]
Expand Down
2 changes: 1 addition & 1 deletion src/apis/v1/http/notif.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ pub async fn get_notif(

}
}

}

},
Expand Down
3 changes: 1 addition & 2 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ use indexmap::IndexMap;
use models::event::NotifData;
use salvo::http::response;
use serde_json::Value;
use tests::orex;
use tests::{actor, orex};
use workers::notif::{self, NotifBrokerActor};
use std::io::BufWriter;
use std::str::FromStr;
Expand Down Expand Up @@ -132,7 +132,6 @@ mod requests;
mod entities;
mod config;
mod cli;
mod tasks;
mod interfaces;
mod context;
mod storage;
Expand Down
4 changes: 3 additions & 1 deletion src/constants/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@



use std::sync::atomic::{AtomicU8, AtomicUsize};
use std::sync::{atomic::{AtomicU8, AtomicUsize}, Mutex};
use salvo::websocket::Message;
use tokio::{sync::mpsc, time::Duration};
use crate::*;
Expand Down Expand Up @@ -69,6 +69,8 @@ pub static ONLINE_USERS: Lazy<WsUsers> = Lazy::new(||{ WsUsers::default() }); //
pub static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1); // used as a thread safe user id, this would be simply a usize in actor state cause actors are an isolated and thread safe objects
pub static WS_ROOMS: Lazy<WsRooms> = Lazy::new(||{ WsRooms::default() }); // thread safe map between event room and all user ids in that room

pub static PRODUC_IDS: Lazy<Arc<Mutex<Vec<i32>>>> = Lazy::new(||{Arc::new(Mutex::new(vec![]))});

pub const APP_NAME: &str = "Hoopoe";
pub const APP_DOMAIN: &str = "hoopoe.app";
pub const ASSETS_IMG_DIR: &str = "assets/images";
Expand Down
10 changes: 8 additions & 2 deletions src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::workers::notif::{NotifBrokerActor};
use crate::config::{Env as ConfigEnv, Context};
use crate::config::EnvExt;
use crate::storage::engine::Storage;
use crate::workers::scheduler::Scheduler;
use actix::{Actor, Addr};
use indexmap::IndexMap;
use serde::{Serialize, Deserialize};
Expand Down Expand Up @@ -60,6 +61,7 @@ pub struct CqrsActors{
pub struct ActorInstaces{
pub broker_actors: BrokerActor,
pub cqrs_actors: CqrsActors,
pub scheduler_actor: Addr<Scheduler>,
}


Expand All @@ -69,11 +71,13 @@ pub struct Channels{

pub struct NotifMpscChannel{
pub sender: tokio::sync::mpsc::Sender<String>,
pub receiver: std::sync::Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<String>>>
// thread safe receiver since receiver must be mutable by default thus having
// it in another thread requires to wrapp it around Arc and Mutex
pub receiver: std::sync::Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<String>>>
}

#[derive(Clone)]
pub struct AppContext{
pub struct AppContext{ // we'll extract this instance from the depot in each api handler
pub config: Option<std::sync::Arc<Context<ConfigEnv>>>,
pub app_storage: Option<std::sync::Arc<Storage>>,
pub actors: Option<ActorInstaces>,
Expand Down Expand Up @@ -111,6 +115,7 @@ impl AppContext{
let notif_actor = NotifBrokerActor::new(app_storage.clone(), notif_mutator_actor.clone(), zerlog_producer_actor.clone(), notif_broker_tx.clone()).start();
let hoop_mutator_actor = HoopMutatorActor::new(app_storage.clone(), zerlog_producer_actor.clone()).start();
let hoop_accessor_actor = HoopAccessorActor::new(app_storage.clone(), zerlog_producer_actor.clone()).start();
let scheduler_actor = Scheduler::new(notif_actor.clone(), app_storage.clone(), notif_broker_tx.clone(), zerlog_producer_actor.clone()).start();

let actor_instances = ActorInstaces{
broker_actors: BrokerActor{
Expand All @@ -127,6 +132,7 @@ impl AppContext{
hoop_accessor_actor: hoop_accessor_actor.clone()
},
},
scheduler_actor: scheduler_actor,
};

Self {
Expand Down
4 changes: 4 additions & 0 deletions src/error/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,10 @@ impl std::fmt::Display for HoopoeErrorResponse{
to unwrap the exact source of the error without crashing the app at runtime.
-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
*/
// impl From trait for every possible error of the HoopoeErrorResponse enables
// use to call ? operator on the type which returns the HoopoeErrorResponse struct
// the Rust compiler knows how to map and build the actual error from the type that
// caused the error.
impl From<std::io::Error> for HoopoeErrorResponse{ // building error instance from std::io::Errro, supports any tcp based server error
fn from(error: std::io::Error) -> Self {
Self{
Expand Down
8 changes: 8 additions & 0 deletions src/lockers/llm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@
//-----------------------------------------------------------------------------------------------------

use std::sync::atomic::{AtomicBool, Ordering};
use constants::PRODUC_IDS;
use interfaces::product::ProductExt;
use salvo::concurrency_limiter;
use serde::{Deserialize, Serialize};
use crate::{constants::PURCHASE_DEMO_LOCK_MUTEX, *};

Expand Down Expand Up @@ -254,6 +256,12 @@ impl ProductExt for Product{
*/
pub(self) async fn start_minting(product: Product) -> (bool, tokio::sync::mpsc::Receiver<Product>){

/* ___ IMPORTANT
╰┈➤ in handling async future io tasks remember to use Mutex in a separate light io threads
to avoid blocking the main or actual thread the request is being handled in like always
do the lock process of PURCHASE_DEMO_LOCK_MUTEX inside a separate thread of tokio::spawn()
*/

let Product { pid, buyer_id, is_minted } = product.clone();

// cloning the static mutex, having a global data in Rust requires to define
Expand Down
1 change: 1 addition & 0 deletions src/models/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub struct HoopEventForm{
pub title: String,
pub description: String,
pub started_at: String,
pub end_at: String,
pub duration: String, // in seconds
pub capacity: String,
pub participants: Vec<String>,
Expand Down
17 changes: 9 additions & 8 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,14 +176,14 @@ impl HoopoeServer{
).await.unwrap();
let fresh = args.fresh;
// migration process at runtime
if fresh{
log::info!("fresh db...");
Migrator::fresh(&connection).await.unwrap();
Migrator::refresh(&connection).await.unwrap();
} else{
Migrator::up(&connection, None).await.unwrap();
}
Migrator::status(&connection).await.unwrap();
// if fresh{
// log::info!("fresh db...");
// Migrator::fresh(&connection).await.unwrap();
// Migrator::refresh(&connection).await.unwrap();
// } else{
// Migrator::up(&connection, None).await.unwrap();
// }
// Migrator::status(&connection).await.unwrap();

}

Expand Down Expand Up @@ -230,6 +230,7 @@ impl HoopoeServer{
}
}

// surely needs ssl cert files
pub async fn runOverHTTP3(self){

let Self { service, addr, app_ctx, ssl_domain } = self;
Expand Down
Loading

0 comments on commit dde6b5f

Please sign in to comment.