diff --git a/README.md b/README.md index 19956fe..769a180 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,8 @@ The open-source data access and storage platform + + ## Components * [In-Memory Data Storage](/anor-storage) diff --git a/anor-api/src/client/api_client.rs b/anor-api/src/client/api_client.rs index f09db63..69041ad 100644 --- a/anor-api/src/client/api_client.rs +++ b/anor-api/src/client/api_client.rs @@ -71,19 +71,19 @@ impl SocketClient for StorageApiClient { } */ - fn insert(&self, storage_item: StorageItem) { + fn insert(&self, _storage_item: StorageItem) { todo!() } - fn update(&mut self, key: &str, storage_item: StorageItem) -> std::io::Result<()> { + fn update(&mut self, _key: &str, _storage_item: StorageItem) -> std::io::Result<()> { todo!() } - fn get(&mut self, key: &str) -> std::io::Result { + fn get(&mut self, _key: &str) -> std::io::Result { todo!() } - fn remove(&self, key: &str) -> bool { + fn remove(&self, _key: &str) -> bool { todo!() } diff --git a/anor-api/src/service/api_service.rs b/anor-api/src/service/api_service.rs index 555851d..105a78c 100644 --- a/anor-api/src/service/api_service.rs +++ b/anor-api/src/service/api_service.rs @@ -37,7 +37,7 @@ impl ApiService for StorageApi { fn start( &self, - shutdown: Arc, + server_shutdown: Arc, signal_ready_sender: Sender<()>, ) -> Result<(), String> { assert!(self.config.api.is_some()); @@ -57,10 +57,10 @@ impl ApiService for StorageApi { let pool = ThreadPool::new(2); - while !shutdown.load(Ordering::SeqCst) { + while !server_shutdown.load(Ordering::SeqCst) { match listener.accept() { Ok((stream, addr)) => { - let shutdown_clone = shutdown.clone(); + let shutdown_clone = server_shutdown.clone(); pool.execute(move || { handle_connection(stream, addr, shutdown_clone); }); diff --git a/anor-config.debug b/anor-config.debug new file mode 100644 index 0000000..77a6bca --- /dev/null +++ b/anor-config.debug @@ -0,0 +1,47 @@ +# anor configuration file + +# storage settings +storage: + data_path: "/tmp/anor" + +# api service settings +api: + name: anor1 + description: "Anor API Service" + id: anor1@anor + access_token: face0aa08c29eb27aa3e0ebb7fe9d9a678a9caecc1f7b886e35bc16b1c266f42 + listen_addresses: 127.0.0.1 + listen_port: 9191 + connections_max: 20 + threads_max: 4 + ram_max: 512M + disk_max: 2G + enabled: true + +# http service settings +http: + description: "Anor HTTP Service" + listen_addresses: 127.0.0.1 + listen_port: 8181 + enabled: true + +# pool of remote server nodes +remote: + nodes: 127.0.0.1:9191 + +# client settings +client: + name: test + id: test1@file_api + access_token: bbc5f7280aa440648d2ca6023610956da401739283ec77593492aa385f256dec + +# redundancy settings +redundancy: + # strategy: normal - replicate item on 'redundancy_replica_min' nodes + # strategy: maximum - replicate item on 'redundancy_replica_max' nodes if there are enough resources + # strategy: paranoid - replicate item on all nodes if there are enough resources + strategy: normal + replica_min: 2 + replica_max: 5 + +## end of configuration diff --git a/anor-config.test b/anor-config.test new file mode 100644 index 0000000..6c2ec08 --- /dev/null +++ b/anor-config.test @@ -0,0 +1,47 @@ +# anor configuration file + +# storage settings +storage: + data_path: "${CARGO_MANIFEST_DIR}/target/tmp/anor" + +# api service settings +api: + name: anor1 + description: "Anor API Service" + id: anor1@anor + access_token: face0aa08c29eb27aa3e0ebb7fe9d9a678a9caecc1f7b886e35bc16b1c266f42 + listen_addresses: 127.0.0.1 + listen_port: 9191 + connections_max: 20 + threads_max: 4 + ram_max: 512M + disk_max: 2G + enabled: true + +# http service settings +http: + description: "Anor HTTP Service" + listen_addresses: 127.0.0.1 + listen_port: 8181 + enabled: true + +# pool of remote server nodes +remote: + nodes: 127.0.0.1:9191 + +# client settings +client: + name: test + id: test1@file_api + access_token: bbc5f7280aa440648d2ca6023610956da401739283ec77593492aa385f256dec + +# redundancy settings +redundancy: + # strategy: normal - replicate item on 'redundancy_replica_min' nodes + # strategy: maximum - replicate item on 'redundancy_replica_max' nodes if there are enough resources + # strategy: paranoid - replicate item on all nodes if there are enough resources + strategy: normal + replica_min: 2 + replica_max: 5 + +## end of configuration diff --git a/anor-config.yaml b/anor-config.yaml new file mode 100644 index 0000000..ff4935f --- /dev/null +++ b/anor-config.yaml @@ -0,0 +1,47 @@ +# anor configuration file + +# storage settings +storage: + data_path: "/var/anor" + +# api service settings +api: + name: anor1 + description: "Anor API Service" + id: anor1@anor + access_token: face0aa08c29eb27aa3e0ebb7fe9d9a678a9caecc1f7b886e35bc16b1c266f42 + listen_addresses: 127.0.0.1 + listen_port: 9191 + connections_max: 20 + threads_max: 4 + ram_max: 512M + disk_max: 2G + enabled: true + +# http service settings +http: + description: "Anor HTTP Service" + listen_addresses: 127.0.0.1 + listen_port: 8181 + enabled: true + +# pool of remote server nodes +remote: + nodes: 127.0.0.1:9191 + +# client settings +client: + name: test + id: test1@file_api + access_token: bbc5f7280aa440648d2ca6023610956da401739283ec77593492aa385f256dec + +# redundancy settings +redundancy: + # strategy: normal - replicate item on 'redundancy_replica_min' nodes + # strategy: maximum - replicate item on 'redundancy_replica_max' nodes if there are enough resources + # strategy: paranoid - replicate item on all nodes if there are enough resources + strategy: normal + replica_min: 2 + replica_max: 5 + +## end of configuration diff --git a/anor-server/Cargo.toml b/anor-server/Cargo.toml index 48d3df1..34aa3fb 100644 --- a/anor-server/Cargo.toml +++ b/anor-server/Cargo.toml @@ -14,6 +14,7 @@ categories = ["data-structures", "caching", "database", "filesystem"] [dependencies] log = "0.4" log4rs = "1.2" +tokio = { version = "1", features = ["full"] } anor-storage = { path = "../anor-storage"} anor-api = { path = "../anor-api"} diff --git a/anor-server/src/lib.rs b/anor-server/src/lib.rs deleted file mode 100644 index cd71fe3..0000000 --- a/anor-server/src/lib.rs +++ /dev/null @@ -1,5 +0,0 @@ -//! **Anor Server** -//! -//! Project Stage -//! -//! **Research:** This project is at the design stage, with some sketches of work but nothing usable yet. diff --git a/anor-server/src/main.rs b/anor-server/src/main.rs index ae075f6..fda2264 100644 --- a/anor-server/src/main.rs +++ b/anor-server/src/main.rs @@ -1,65 +1,65 @@ use std::sync::atomic::Ordering; -use std::thread; +use std::thread::{self, JoinHandle}; use std::{ sync::{atomic::AtomicBool, mpsc::channel, Arc}, time::Instant, }; -use anor_http::client::http_client; -use anor_http::service::http_service; -use anor_storage::storage::Storage; use anor_api::{ client::api_client::{SocketClient, StorageApiClient}, service::api_service::{ApiService, StorageApi}, }; +use anor_http::client::http_client; +use anor_http::service::http_service; +use anor_storage::storage::Storage; -use anor_utils::config; +use anor_utils::config::{self, Config}; -fn main() { - start_api_server(); -} - -fn start_api_server() { - let launch_start = Instant::now(); +use tokio::signal::unix::{signal, SignalKind}; +#[tokio::main] +async fn main() { log4rs::init_file("log.yaml", Default::default()).unwrap(); - log::info!("{} v{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")); + // load the configuration let config = config::load(); // open the data storage let storage = Storage::open_with_config(config.clone()); - // prapare service parameters - let api_service_config = config.clone(); - let api_service_shutdown = Arc::new(AtomicBool::new(false)); - let api_service_shutdown_clone = api_service_shutdown.clone(); - let (api_service_ready_sender, api_service_ready_receiver) = channel(); - - // start the storage api service in a separate thread - let api_service_handler = thread::spawn(move || { - let service = StorageApi::with_config(storage, api_service_config); - if let Err(err) = service.start(api_service_shutdown_clone, api_service_ready_sender) { - log::error!("{}", err); - panic!("{}", err); - } + // hook for graceful shutdown + let server_shutdown = Arc::new(AtomicBool::new(false)); + let config_cloned = config.clone(); + let server_shutdown_cloned = server_shutdown.clone(); + tokio::spawn(async move { + let mut sigint = signal(SignalKind::interrupt()).unwrap(); + let mut sigterm = signal(SignalKind::terminate()).unwrap(); + tokio::select! { + _ = sigint.recv() => { + if log::log_enabled!(log::Level::Debug) { + log::debug!("Recieved SIGINT"); + } + } + _ = sigterm.recv() => { + if log::log_enabled!(log::Level::Debug) { + log::debug!("Recieved SIGTERM"); + } + }, + }; + graceful_shutdown(server_shutdown_cloned, config_cloned); }); - // wait for the readiness of api service - if let Err(err) = api_service_ready_receiver.recv() { - log::error!("{}", err); - panic!("{}", err); - } - - let launch_elapsed = Instant::elapsed(&launch_start); - log::info!("Anor Storage API service started in {:?}", launch_elapsed); + // starting API service + let api_service = start_api_service(config.clone(), storage, server_shutdown.clone()); + // api client tests let mut api_client1 = StorageApiClient::with_config(config.clone()); api_client1.connect().expect("client connection error"); let keys = api_client1.keys(); log::debug!("{:?}", keys); + _ = api_client1.disconnect(); /* let msg1 = String::from("Hi there1!"); @@ -78,22 +78,60 @@ fn start_api_server() { client2.set_item(msg2).expect("set item error"); */ - // shutdown the api server - api_service_shutdown.store(true, Ordering::SeqCst); + api_service.join().unwrap(); + log::info!("Anor Storage API service is shutdown successfully."); + + log::info!("Anor Server is shutdown successfully."); +} + +fn graceful_shutdown(server_shutdown: Arc, config: Arc) { + log::info!("Initializing the graceful shutdown process..."); + server_shutdown.store(true, Ordering::SeqCst); + // a temporary solution to unblock socket listener // make an empty connection to unblock listener and shutdown the api server - let mut api_client_terminate = StorageApiClient::with_config(config.clone()); + let mut api_client_terminate = StorageApiClient::with_config(config); api_client_terminate .connect() .expect("client connection error"); + _ = api_client_terminate.disconnect(); +} - api_service_handler.join().unwrap(); +fn start_api_service( + config: Arc, + storage: Storage, + server_shutdown: Arc, +) -> JoinHandle<()> { + log::info!("Starting Anor Storage API service..."); - log::info!("Anor Storage API service is shutdown successfully."); + let launch_start = Instant::now(); + + // prapare service parameters + let api_service_config = config.clone(); + let (api_service_ready_sender, api_service_ready_receiver) = channel(); + + // start the storage api service in a separate thread + let api_service_handler = thread::spawn(move || { + let service = StorageApi::with_config(storage, api_service_config); + if let Err(err) = service.start(server_shutdown, api_service_ready_sender) { + log::error!("{}", err); + panic!("{}", err); + } + }); + + // wait for the readiness of api service + if let Err(err) = api_service_ready_receiver.recv() { + log::error!("{}", err); + panic!("{}", err); + } + + let launch_elapsed = Instant::elapsed(&launch_start); + log::info!("Anor Storage API service started in {:?}", launch_elapsed); + + api_service_handler } fn _start_http_server() { - let config = config::load(); // open the data storage diff --git a/anor-storage/src/storage.rs b/anor-storage/src/storage.rs index e350d95..359d253 100644 --- a/anor-storage/src/storage.rs +++ b/anor-storage/src/storage.rs @@ -12,7 +12,7 @@ use std::{ pub mod storage_codec; pub mod storage_const; pub mod storage_item; -pub mod storage_location; +pub mod storage_persistence; pub mod storage_packet; use storage_codec::*; diff --git a/anor-storage/src/storage/storage_item.rs b/anor-storage/src/storage/storage_item.rs index 81f841e..f4da515 100644 --- a/anor-storage/src/storage/storage_item.rs +++ b/anor-storage/src/storage/storage_item.rs @@ -1,4 +1,4 @@ -use super::{storage_codec::*, storage_location::*, storage_packet::*}; +use super::{storage_codec::*, storage_persistence::*, storage_packet::*}; use std::collections::HashMap; use uuid::Uuid; @@ -63,7 +63,7 @@ pub struct StorageItem { /// `expires_on` - timestamp, defines expiry datetime pub expires_on: Option, - pub storage_locations: Vec, + pub persistence: StoragePersistence, /// defines the number of required replications in the cluster pub redundancy: u8, @@ -77,7 +77,7 @@ impl StorageItem { version: 0, description: None, item_type: ItemType::Custom, - storage_locations: vec![StorageLocation::Memory], + persistence: StoragePersistence::Memory, data, tags: None, metafields: None, @@ -97,7 +97,7 @@ impl StorageItem { version: 0, description: None, item_type: storage_type, - storage_locations: vec![StorageLocation::Memory], + persistence: StoragePersistence::Memory, data, tags: None, metafields: None, diff --git a/anor-storage/src/storage/storage_location.rs b/anor-storage/src/storage/storage_location.rs deleted file mode 100644 index fe58622..0000000 --- a/anor-storage/src/storage/storage_location.rs +++ /dev/null @@ -1,9 +0,0 @@ -/// Storage type -#[derive(Debug, Clone, bincode::Encode, bincode::Decode)] -pub enum StorageLocation { - /// Store in memory - Memory, - - /// Store in disk - Disk, -} diff --git a/anor-storage/src/storage/storage_persistence.rs b/anor-storage/src/storage/storage_persistence.rs new file mode 100644 index 0000000..9d34e24 --- /dev/null +++ b/anor-storage/src/storage/storage_persistence.rs @@ -0,0 +1,12 @@ +/// Persistence type +#[derive(Debug, Clone, bincode::Encode, bincode::Decode)] +pub enum StoragePersistence { + /// Persist only in memory + Memory = 0, + + /// Persist only in disk + Disk = 1, + + /// Persist both in memory and disk + Hybrid = 2 +} diff --git a/docs/roadmap.md b/docs/roadmap.md index d344cac..f9f12e2 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -20,7 +20,12 @@ - support for JWT - support for Fine-Grained Access Control -6. Scaling +6. Message Queue + - publish + - subscribe + - support for delivery and destination options (persisted, non-persisted, at-least-once, at-most-once, exactly-once) + +7. Scaling - consistent hashing - cluster health check / heartbeats - support for easy scaling @@ -34,20 +39,20 @@ - server node will analyze the received config_id and may respond with updated configuration settings - clients and nodes use weighted graphs to optimize node/peer selection and other network operations -7. Caching +8. Caching - client-side and server-side caching support - cache evictions policies: LRU/LFU -8. File Storage Spaces +9. File Storage Spaces - support for folders - support for path -9. FFI Bindings +10. FFI Bindings - Go - Ruby - Python -10. Remote Collections +11. Remote Collections - HashSet - HashMap - Vec diff --git a/log.yaml b/log.yaml new file mode 100644 index 0000000..2afd21d --- /dev/null +++ b/log.yaml @@ -0,0 +1,20 @@ +appenders: + stdout: + kind: console + encoder: + pattern: "{d(%+)(utc)} [{f}:{L}] {h({l})} {m}{n}" +root: + level: debug + appenders: + - stdout + +loggers: + anor_common::utils::config: + level: debug + + anor::storage: + level: debug + + anor_api: + level: debug +