diff --git a/README.md b/README.md
index 19956fe..769a180 100644
--- a/README.md
+++ b/README.md
@@ -5,6 +5,8 @@
The open-source data access and storage platform
+
+
## Components
* [In-Memory Data Storage](/anor-storage)
diff --git a/anor-api/src/client/api_client.rs b/anor-api/src/client/api_client.rs
index f09db63..69041ad 100644
--- a/anor-api/src/client/api_client.rs
+++ b/anor-api/src/client/api_client.rs
@@ -71,19 +71,19 @@ impl SocketClient for StorageApiClient {
}
*/
- fn insert(&self, storage_item: StorageItem) {
+ fn insert(&self, _storage_item: StorageItem) {
todo!()
}
- fn update(&mut self, key: &str, storage_item: StorageItem) -> std::io::Result<()> {
+ fn update(&mut self, _key: &str, _storage_item: StorageItem) -> std::io::Result<()> {
todo!()
}
- fn get(&mut self, key: &str) -> std::io::Result {
+ fn get(&mut self, _key: &str) -> std::io::Result {
todo!()
}
- fn remove(&self, key: &str) -> bool {
+ fn remove(&self, _key: &str) -> bool {
todo!()
}
diff --git a/anor-api/src/service/api_service.rs b/anor-api/src/service/api_service.rs
index 555851d..105a78c 100644
--- a/anor-api/src/service/api_service.rs
+++ b/anor-api/src/service/api_service.rs
@@ -37,7 +37,7 @@ impl ApiService for StorageApi {
fn start(
&self,
- shutdown: Arc,
+ server_shutdown: Arc,
signal_ready_sender: Sender<()>,
) -> Result<(), String> {
assert!(self.config.api.is_some());
@@ -57,10 +57,10 @@ impl ApiService for StorageApi {
let pool = ThreadPool::new(2);
- while !shutdown.load(Ordering::SeqCst) {
+ while !server_shutdown.load(Ordering::SeqCst) {
match listener.accept() {
Ok((stream, addr)) => {
- let shutdown_clone = shutdown.clone();
+ let shutdown_clone = server_shutdown.clone();
pool.execute(move || {
handle_connection(stream, addr, shutdown_clone);
});
diff --git a/anor-config.debug b/anor-config.debug
new file mode 100644
index 0000000..77a6bca
--- /dev/null
+++ b/anor-config.debug
@@ -0,0 +1,47 @@
+# anor configuration file
+
+# storage settings
+storage:
+ data_path: "/tmp/anor"
+
+# api service settings
+api:
+ name: anor1
+ description: "Anor API Service"
+ id: anor1@anor
+ access_token: face0aa08c29eb27aa3e0ebb7fe9d9a678a9caecc1f7b886e35bc16b1c266f42
+ listen_addresses: 127.0.0.1
+ listen_port: 9191
+ connections_max: 20
+ threads_max: 4
+ ram_max: 512M
+ disk_max: 2G
+ enabled: true
+
+# http service settings
+http:
+ description: "Anor HTTP Service"
+ listen_addresses: 127.0.0.1
+ listen_port: 8181
+ enabled: true
+
+# pool of remote server nodes
+remote:
+ nodes: 127.0.0.1:9191
+
+# client settings
+client:
+ name: test
+ id: test1@file_api
+ access_token: bbc5f7280aa440648d2ca6023610956da401739283ec77593492aa385f256dec
+
+# redundancy settings
+redundancy:
+ # strategy: normal - replicate item on 'redundancy_replica_min' nodes
+ # strategy: maximum - replicate item on 'redundancy_replica_max' nodes if there are enough resources
+ # strategy: paranoid - replicate item on all nodes if there are enough resources
+ strategy: normal
+ replica_min: 2
+ replica_max: 5
+
+## end of configuration
diff --git a/anor-config.test b/anor-config.test
new file mode 100644
index 0000000..6c2ec08
--- /dev/null
+++ b/anor-config.test
@@ -0,0 +1,47 @@
+# anor configuration file
+
+# storage settings
+storage:
+ data_path: "${CARGO_MANIFEST_DIR}/target/tmp/anor"
+
+# api service settings
+api:
+ name: anor1
+ description: "Anor API Service"
+ id: anor1@anor
+ access_token: face0aa08c29eb27aa3e0ebb7fe9d9a678a9caecc1f7b886e35bc16b1c266f42
+ listen_addresses: 127.0.0.1
+ listen_port: 9191
+ connections_max: 20
+ threads_max: 4
+ ram_max: 512M
+ disk_max: 2G
+ enabled: true
+
+# http service settings
+http:
+ description: "Anor HTTP Service"
+ listen_addresses: 127.0.0.1
+ listen_port: 8181
+ enabled: true
+
+# pool of remote server nodes
+remote:
+ nodes: 127.0.0.1:9191
+
+# client settings
+client:
+ name: test
+ id: test1@file_api
+ access_token: bbc5f7280aa440648d2ca6023610956da401739283ec77593492aa385f256dec
+
+# redundancy settings
+redundancy:
+ # strategy: normal - replicate item on 'redundancy_replica_min' nodes
+ # strategy: maximum - replicate item on 'redundancy_replica_max' nodes if there are enough resources
+ # strategy: paranoid - replicate item on all nodes if there are enough resources
+ strategy: normal
+ replica_min: 2
+ replica_max: 5
+
+## end of configuration
diff --git a/anor-config.yaml b/anor-config.yaml
new file mode 100644
index 0000000..ff4935f
--- /dev/null
+++ b/anor-config.yaml
@@ -0,0 +1,47 @@
+# anor configuration file
+
+# storage settings
+storage:
+ data_path: "/var/anor"
+
+# api service settings
+api:
+ name: anor1
+ description: "Anor API Service"
+ id: anor1@anor
+ access_token: face0aa08c29eb27aa3e0ebb7fe9d9a678a9caecc1f7b886e35bc16b1c266f42
+ listen_addresses: 127.0.0.1
+ listen_port: 9191
+ connections_max: 20
+ threads_max: 4
+ ram_max: 512M
+ disk_max: 2G
+ enabled: true
+
+# http service settings
+http:
+ description: "Anor HTTP Service"
+ listen_addresses: 127.0.0.1
+ listen_port: 8181
+ enabled: true
+
+# pool of remote server nodes
+remote:
+ nodes: 127.0.0.1:9191
+
+# client settings
+client:
+ name: test
+ id: test1@file_api
+ access_token: bbc5f7280aa440648d2ca6023610956da401739283ec77593492aa385f256dec
+
+# redundancy settings
+redundancy:
+ # strategy: normal - replicate item on 'redundancy_replica_min' nodes
+ # strategy: maximum - replicate item on 'redundancy_replica_max' nodes if there are enough resources
+ # strategy: paranoid - replicate item on all nodes if there are enough resources
+ strategy: normal
+ replica_min: 2
+ replica_max: 5
+
+## end of configuration
diff --git a/anor-server/Cargo.toml b/anor-server/Cargo.toml
index 48d3df1..34aa3fb 100644
--- a/anor-server/Cargo.toml
+++ b/anor-server/Cargo.toml
@@ -14,6 +14,7 @@ categories = ["data-structures", "caching", "database", "filesystem"]
[dependencies]
log = "0.4"
log4rs = "1.2"
+tokio = { version = "1", features = ["full"] }
anor-storage = { path = "../anor-storage"}
anor-api = { path = "../anor-api"}
diff --git a/anor-server/src/lib.rs b/anor-server/src/lib.rs
deleted file mode 100644
index cd71fe3..0000000
--- a/anor-server/src/lib.rs
+++ /dev/null
@@ -1,5 +0,0 @@
-//! **Anor Server**
-//!
-//! Project Stage
-//!
-//! **Research:** This project is at the design stage, with some sketches of work but nothing usable yet.
diff --git a/anor-server/src/main.rs b/anor-server/src/main.rs
index ae075f6..fda2264 100644
--- a/anor-server/src/main.rs
+++ b/anor-server/src/main.rs
@@ -1,65 +1,65 @@
use std::sync::atomic::Ordering;
-use std::thread;
+use std::thread::{self, JoinHandle};
use std::{
sync::{atomic::AtomicBool, mpsc::channel, Arc},
time::Instant,
};
-use anor_http::client::http_client;
-use anor_http::service::http_service;
-use anor_storage::storage::Storage;
use anor_api::{
client::api_client::{SocketClient, StorageApiClient},
service::api_service::{ApiService, StorageApi},
};
+use anor_http::client::http_client;
+use anor_http::service::http_service;
+use anor_storage::storage::Storage;
-use anor_utils::config;
+use anor_utils::config::{self, Config};
-fn main() {
- start_api_server();
-}
-
-fn start_api_server() {
- let launch_start = Instant::now();
+use tokio::signal::unix::{signal, SignalKind};
+#[tokio::main]
+async fn main() {
log4rs::init_file("log.yaml", Default::default()).unwrap();
-
log::info!("{} v{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"));
+ // load the configuration
let config = config::load();
// open the data storage
let storage = Storage::open_with_config(config.clone());
- // prapare service parameters
- let api_service_config = config.clone();
- let api_service_shutdown = Arc::new(AtomicBool::new(false));
- let api_service_shutdown_clone = api_service_shutdown.clone();
- let (api_service_ready_sender, api_service_ready_receiver) = channel();
-
- // start the storage api service in a separate thread
- let api_service_handler = thread::spawn(move || {
- let service = StorageApi::with_config(storage, api_service_config);
- if let Err(err) = service.start(api_service_shutdown_clone, api_service_ready_sender) {
- log::error!("{}", err);
- panic!("{}", err);
- }
+ // hook for graceful shutdown
+ let server_shutdown = Arc::new(AtomicBool::new(false));
+ let config_cloned = config.clone();
+ let server_shutdown_cloned = server_shutdown.clone();
+ tokio::spawn(async move {
+ let mut sigint = signal(SignalKind::interrupt()).unwrap();
+ let mut sigterm = signal(SignalKind::terminate()).unwrap();
+ tokio::select! {
+ _ = sigint.recv() => {
+ if log::log_enabled!(log::Level::Debug) {
+ log::debug!("Recieved SIGINT");
+ }
+ }
+ _ = sigterm.recv() => {
+ if log::log_enabled!(log::Level::Debug) {
+ log::debug!("Recieved SIGTERM");
+ }
+ },
+ };
+ graceful_shutdown(server_shutdown_cloned, config_cloned);
});
- // wait for the readiness of api service
- if let Err(err) = api_service_ready_receiver.recv() {
- log::error!("{}", err);
- panic!("{}", err);
- }
-
- let launch_elapsed = Instant::elapsed(&launch_start);
- log::info!("Anor Storage API service started in {:?}", launch_elapsed);
+ // starting API service
+ let api_service = start_api_service(config.clone(), storage, server_shutdown.clone());
+ // api client tests
let mut api_client1 = StorageApiClient::with_config(config.clone());
api_client1.connect().expect("client connection error");
let keys = api_client1.keys();
log::debug!("{:?}", keys);
+ _ = api_client1.disconnect();
/*
let msg1 = String::from("Hi there1!");
@@ -78,22 +78,60 @@ fn start_api_server() {
client2.set_item(msg2).expect("set item error");
*/
- // shutdown the api server
- api_service_shutdown.store(true, Ordering::SeqCst);
+ api_service.join().unwrap();
+ log::info!("Anor Storage API service is shutdown successfully.");
+
+ log::info!("Anor Server is shutdown successfully.");
+}
+
+fn graceful_shutdown(server_shutdown: Arc, config: Arc) {
+ log::info!("Initializing the graceful shutdown process...");
+ server_shutdown.store(true, Ordering::SeqCst);
+ // a temporary solution to unblock socket listener
// make an empty connection to unblock listener and shutdown the api server
- let mut api_client_terminate = StorageApiClient::with_config(config.clone());
+ let mut api_client_terminate = StorageApiClient::with_config(config);
api_client_terminate
.connect()
.expect("client connection error");
+ _ = api_client_terminate.disconnect();
+}
- api_service_handler.join().unwrap();
+fn start_api_service(
+ config: Arc,
+ storage: Storage,
+ server_shutdown: Arc,
+) -> JoinHandle<()> {
+ log::info!("Starting Anor Storage API service...");
- log::info!("Anor Storage API service is shutdown successfully.");
+ let launch_start = Instant::now();
+
+ // prapare service parameters
+ let api_service_config = config.clone();
+ let (api_service_ready_sender, api_service_ready_receiver) = channel();
+
+ // start the storage api service in a separate thread
+ let api_service_handler = thread::spawn(move || {
+ let service = StorageApi::with_config(storage, api_service_config);
+ if let Err(err) = service.start(server_shutdown, api_service_ready_sender) {
+ log::error!("{}", err);
+ panic!("{}", err);
+ }
+ });
+
+ // wait for the readiness of api service
+ if let Err(err) = api_service_ready_receiver.recv() {
+ log::error!("{}", err);
+ panic!("{}", err);
+ }
+
+ let launch_elapsed = Instant::elapsed(&launch_start);
+ log::info!("Anor Storage API service started in {:?}", launch_elapsed);
+
+ api_service_handler
}
fn _start_http_server() {
-
let config = config::load();
// open the data storage
diff --git a/anor-storage/src/storage.rs b/anor-storage/src/storage.rs
index e350d95..359d253 100644
--- a/anor-storage/src/storage.rs
+++ b/anor-storage/src/storage.rs
@@ -12,7 +12,7 @@ use std::{
pub mod storage_codec;
pub mod storage_const;
pub mod storage_item;
-pub mod storage_location;
+pub mod storage_persistence;
pub mod storage_packet;
use storage_codec::*;
diff --git a/anor-storage/src/storage/storage_item.rs b/anor-storage/src/storage/storage_item.rs
index 81f841e..f4da515 100644
--- a/anor-storage/src/storage/storage_item.rs
+++ b/anor-storage/src/storage/storage_item.rs
@@ -1,4 +1,4 @@
-use super::{storage_codec::*, storage_location::*, storage_packet::*};
+use super::{storage_codec::*, storage_persistence::*, storage_packet::*};
use std::collections::HashMap;
use uuid::Uuid;
@@ -63,7 +63,7 @@ pub struct StorageItem {
/// `expires_on` - timestamp, defines expiry datetime
pub expires_on: Option,
- pub storage_locations: Vec,
+ pub persistence: StoragePersistence,
/// defines the number of required replications in the cluster
pub redundancy: u8,
@@ -77,7 +77,7 @@ impl StorageItem {
version: 0,
description: None,
item_type: ItemType::Custom,
- storage_locations: vec![StorageLocation::Memory],
+ persistence: StoragePersistence::Memory,
data,
tags: None,
metafields: None,
@@ -97,7 +97,7 @@ impl StorageItem {
version: 0,
description: None,
item_type: storage_type,
- storage_locations: vec![StorageLocation::Memory],
+ persistence: StoragePersistence::Memory,
data,
tags: None,
metafields: None,
diff --git a/anor-storage/src/storage/storage_location.rs b/anor-storage/src/storage/storage_location.rs
deleted file mode 100644
index fe58622..0000000
--- a/anor-storage/src/storage/storage_location.rs
+++ /dev/null
@@ -1,9 +0,0 @@
-/// Storage type
-#[derive(Debug, Clone, bincode::Encode, bincode::Decode)]
-pub enum StorageLocation {
- /// Store in memory
- Memory,
-
- /// Store in disk
- Disk,
-}
diff --git a/anor-storage/src/storage/storage_persistence.rs b/anor-storage/src/storage/storage_persistence.rs
new file mode 100644
index 0000000..9d34e24
--- /dev/null
+++ b/anor-storage/src/storage/storage_persistence.rs
@@ -0,0 +1,12 @@
+/// Persistence type
+#[derive(Debug, Clone, bincode::Encode, bincode::Decode)]
+pub enum StoragePersistence {
+ /// Persist only in memory
+ Memory = 0,
+
+ /// Persist only in disk
+ Disk = 1,
+
+ /// Persist both in memory and disk
+ Hybrid = 2
+}
diff --git a/docs/roadmap.md b/docs/roadmap.md
index d344cac..f9f12e2 100644
--- a/docs/roadmap.md
+++ b/docs/roadmap.md
@@ -20,7 +20,12 @@
- support for JWT
- support for Fine-Grained Access Control
-6. Scaling
+6. Message Queue
+ - publish
+ - subscribe
+ - support for delivery and destination options (persisted, non-persisted, at-least-once, at-most-once, exactly-once)
+
+7. Scaling
- consistent hashing
- cluster health check / heartbeats
- support for easy scaling
@@ -34,20 +39,20 @@
- server node will analyze the received config_id and may respond with updated configuration settings
- clients and nodes use weighted graphs to optimize node/peer selection and other network operations
-7. Caching
+8. Caching
- client-side and server-side caching support
- cache evictions policies: LRU/LFU
-8. File Storage Spaces
+9. File Storage Spaces
- support for folders
- support for path
-9. FFI Bindings
+10. FFI Bindings
- Go
- Ruby
- Python
-10. Remote Collections
+11. Remote Collections
- HashSet
- HashMap
- Vec
diff --git a/log.yaml b/log.yaml
new file mode 100644
index 0000000..2afd21d
--- /dev/null
+++ b/log.yaml
@@ -0,0 +1,20 @@
+appenders:
+ stdout:
+ kind: console
+ encoder:
+ pattern: "{d(%+)(utc)} [{f}:{L}] {h({l})} {m}{n}"
+root:
+ level: debug
+ appenders:
+ - stdout
+
+loggers:
+ anor_common::utils::config:
+ level: debug
+
+ anor::storage:
+ level: debug
+
+ anor_api:
+ level: debug
+