Skip to content

Commit

Permalink
Graceful shutdown implementation (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
sheroz authored Nov 7, 2023
1 parent 7391a98 commit 9994db1
Show file tree
Hide file tree
Showing 15 changed files with 275 additions and 70 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

The open-source data access and storage platform

<img src = "docs/img/anor-wb.png" width = "25%"/>

## Components

* [In-Memory Data Storage](/anor-storage)
Expand Down
8 changes: 4 additions & 4 deletions anor-api/src/client/api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,19 @@ impl SocketClient for StorageApiClient {
}
*/

fn insert(&self, storage_item: StorageItem) {
fn insert(&self, _storage_item: StorageItem) {
todo!()
}

fn update(&mut self, key: &str, storage_item: StorageItem) -> std::io::Result<()> {
fn update(&mut self, _key: &str, _storage_item: StorageItem) -> std::io::Result<()> {
todo!()
}

fn get(&mut self, key: &str) -> std::io::Result<StorageItem> {
fn get(&mut self, _key: &str) -> std::io::Result<StorageItem> {
todo!()
}

fn remove(&self, key: &str) -> bool {
fn remove(&self, _key: &str) -> bool {
todo!()
}

Expand Down
6 changes: 3 additions & 3 deletions anor-api/src/service/api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl ApiService for StorageApi {

fn start(
&self,
shutdown: Arc<AtomicBool>,
server_shutdown: Arc<AtomicBool>,
signal_ready_sender: Sender<()>,
) -> Result<(), String> {
assert!(self.config.api.is_some());
Expand All @@ -57,10 +57,10 @@ impl ApiService for StorageApi {

let pool = ThreadPool::new(2);

while !shutdown.load(Ordering::SeqCst) {
while !server_shutdown.load(Ordering::SeqCst) {
match listener.accept() {
Ok((stream, addr)) => {
let shutdown_clone = shutdown.clone();
let shutdown_clone = server_shutdown.clone();
pool.execute(move || {
handle_connection(stream, addr, shutdown_clone);
});
Expand Down
47 changes: 47 additions & 0 deletions anor-config.debug
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# anor configuration file

# storage settings
storage:
data_path: "/tmp/anor"

# api service settings
api:
name: anor1
description: "Anor API Service"
id: anor1@anor
access_token: face0aa08c29eb27aa3e0ebb7fe9d9a678a9caecc1f7b886e35bc16b1c266f42
listen_addresses: 127.0.0.1
listen_port: 9191
connections_max: 20
threads_max: 4
ram_max: 512M
disk_max: 2G
enabled: true

# http service settings
http:
description: "Anor HTTP Service"
listen_addresses: 127.0.0.1
listen_port: 8181
enabled: true

# pool of remote server nodes
remote:
nodes: 127.0.0.1:9191

# client settings
client:
name: test
id: test1@file_api
access_token: bbc5f7280aa440648d2ca6023610956da401739283ec77593492aa385f256dec

# redundancy settings
redundancy:
# strategy: normal - replicate item on 'redundancy_replica_min' nodes
# strategy: maximum - replicate item on 'redundancy_replica_max' nodes if there are enough resources
# strategy: paranoid - replicate item on all nodes if there are enough resources
strategy: normal
replica_min: 2
replica_max: 5

## end of configuration
47 changes: 47 additions & 0 deletions anor-config.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# anor configuration file

# storage settings
storage:
data_path: "${CARGO_MANIFEST_DIR}/target/tmp/anor"

# api service settings
api:
name: anor1
description: "Anor API Service"
id: anor1@anor
access_token: face0aa08c29eb27aa3e0ebb7fe9d9a678a9caecc1f7b886e35bc16b1c266f42
listen_addresses: 127.0.0.1
listen_port: 9191
connections_max: 20
threads_max: 4
ram_max: 512M
disk_max: 2G
enabled: true

# http service settings
http:
description: "Anor HTTP Service"
listen_addresses: 127.0.0.1
listen_port: 8181
enabled: true

# pool of remote server nodes
remote:
nodes: 127.0.0.1:9191

# client settings
client:
name: test
id: test1@file_api
access_token: bbc5f7280aa440648d2ca6023610956da401739283ec77593492aa385f256dec

# redundancy settings
redundancy:
# strategy: normal - replicate item on 'redundancy_replica_min' nodes
# strategy: maximum - replicate item on 'redundancy_replica_max' nodes if there are enough resources
# strategy: paranoid - replicate item on all nodes if there are enough resources
strategy: normal
replica_min: 2
replica_max: 5

## end of configuration
47 changes: 47 additions & 0 deletions anor-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# anor configuration file

# storage settings
storage:
data_path: "/var/anor"

# api service settings
api:
name: anor1
description: "Anor API Service"
id: anor1@anor
access_token: face0aa08c29eb27aa3e0ebb7fe9d9a678a9caecc1f7b886e35bc16b1c266f42
listen_addresses: 127.0.0.1
listen_port: 9191
connections_max: 20
threads_max: 4
ram_max: 512M
disk_max: 2G
enabled: true

# http service settings
http:
description: "Anor HTTP Service"
listen_addresses: 127.0.0.1
listen_port: 8181
enabled: true

# pool of remote server nodes
remote:
nodes: 127.0.0.1:9191

# client settings
client:
name: test
id: test1@file_api
access_token: bbc5f7280aa440648d2ca6023610956da401739283ec77593492aa385f256dec

# redundancy settings
redundancy:
# strategy: normal - replicate item on 'redundancy_replica_min' nodes
# strategy: maximum - replicate item on 'redundancy_replica_max' nodes if there are enough resources
# strategy: paranoid - replicate item on all nodes if there are enough resources
strategy: normal
replica_min: 2
replica_max: 5

## end of configuration
1 change: 1 addition & 0 deletions anor-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ categories = ["data-structures", "caching", "database", "filesystem"]
[dependencies]
log = "0.4"
log4rs = "1.2"
tokio = { version = "1", features = ["full"] }

anor-storage = { path = "../anor-storage"}
anor-api = { path = "../anor-api"}
Expand Down
5 changes: 0 additions & 5 deletions anor-server/src/lib.rs

This file was deleted.

116 changes: 77 additions & 39 deletions anor-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,65 +1,65 @@
use std::sync::atomic::Ordering;
use std::thread;
use std::thread::{self, JoinHandle};
use std::{
sync::{atomic::AtomicBool, mpsc::channel, Arc},
time::Instant,
};

use anor_http::client::http_client;
use anor_http::service::http_service;
use anor_storage::storage::Storage;
use anor_api::{
client::api_client::{SocketClient, StorageApiClient},
service::api_service::{ApiService, StorageApi},
};
use anor_http::client::http_client;
use anor_http::service::http_service;
use anor_storage::storage::Storage;

use anor_utils::config;
use anor_utils::config::{self, Config};

fn main() {
start_api_server();
}

fn start_api_server() {
let launch_start = Instant::now();
use tokio::signal::unix::{signal, SignalKind};

#[tokio::main]
async fn main() {
log4rs::init_file("log.yaml", Default::default()).unwrap();

log::info!("{} v{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"));

// load the configuration
let config = config::load();

// open the data storage
let storage = Storage::open_with_config(config.clone());

// prapare service parameters
let api_service_config = config.clone();
let api_service_shutdown = Arc::new(AtomicBool::new(false));
let api_service_shutdown_clone = api_service_shutdown.clone();
let (api_service_ready_sender, api_service_ready_receiver) = channel();

// start the storage api service in a separate thread
let api_service_handler = thread::spawn(move || {
let service = StorageApi::with_config(storage, api_service_config);
if let Err(err) = service.start(api_service_shutdown_clone, api_service_ready_sender) {
log::error!("{}", err);
panic!("{}", err);
}
// hook for graceful shutdown
let server_shutdown = Arc::new(AtomicBool::new(false));
let config_cloned = config.clone();
let server_shutdown_cloned = server_shutdown.clone();
tokio::spawn(async move {
let mut sigint = signal(SignalKind::interrupt()).unwrap();
let mut sigterm = signal(SignalKind::terminate()).unwrap();
tokio::select! {
_ = sigint.recv() => {
if log::log_enabled!(log::Level::Debug) {
log::debug!("Recieved SIGINT");
}
}
_ = sigterm.recv() => {
if log::log_enabled!(log::Level::Debug) {
log::debug!("Recieved SIGTERM");
}
},
};
graceful_shutdown(server_shutdown_cloned, config_cloned);
});

// wait for the readiness of api service
if let Err(err) = api_service_ready_receiver.recv() {
log::error!("{}", err);
panic!("{}", err);
}

let launch_elapsed = Instant::elapsed(&launch_start);
log::info!("Anor Storage API service started in {:?}", launch_elapsed);
// starting API service
let api_service = start_api_service(config.clone(), storage, server_shutdown.clone());

// api client tests
let mut api_client1 = StorageApiClient::with_config(config.clone());
api_client1.connect().expect("client connection error");

let keys = api_client1.keys();
log::debug!("{:?}", keys);
_ = api_client1.disconnect();

/*
let msg1 = String::from("Hi there1!");
Expand All @@ -78,22 +78,60 @@ fn start_api_server() {
client2.set_item(msg2).expect("set item error");
*/

// shutdown the api server
api_service_shutdown.store(true, Ordering::SeqCst);
api_service.join().unwrap();
log::info!("Anor Storage API service is shutdown successfully.");

log::info!("Anor Server is shutdown successfully.");
}

fn graceful_shutdown(server_shutdown: Arc<AtomicBool>, config: Arc<Config>) {
log::info!("Initializing the graceful shutdown process...");
server_shutdown.store(true, Ordering::SeqCst);

// a temporary solution to unblock socket listener
// make an empty connection to unblock listener and shutdown the api server
let mut api_client_terminate = StorageApiClient::with_config(config.clone());
let mut api_client_terminate = StorageApiClient::with_config(config);
api_client_terminate
.connect()
.expect("client connection error");
_ = api_client_terminate.disconnect();
}

api_service_handler.join().unwrap();
fn start_api_service(
config: Arc<Config>,
storage: Storage,
server_shutdown: Arc<AtomicBool>,
) -> JoinHandle<()> {
log::info!("Starting Anor Storage API service...");

log::info!("Anor Storage API service is shutdown successfully.");
let launch_start = Instant::now();

// prapare service parameters
let api_service_config = config.clone();
let (api_service_ready_sender, api_service_ready_receiver) = channel();

// start the storage api service in a separate thread
let api_service_handler = thread::spawn(move || {
let service = StorageApi::with_config(storage, api_service_config);
if let Err(err) = service.start(server_shutdown, api_service_ready_sender) {
log::error!("{}", err);
panic!("{}", err);
}
});

// wait for the readiness of api service
if let Err(err) = api_service_ready_receiver.recv() {
log::error!("{}", err);
panic!("{}", err);
}

let launch_elapsed = Instant::elapsed(&launch_start);
log::info!("Anor Storage API service started in {:?}", launch_elapsed);

api_service_handler
}

fn _start_http_server() {

let config = config::load();

// open the data storage
Expand Down
2 changes: 1 addition & 1 deletion anor-storage/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{
pub mod storage_codec;
pub mod storage_const;
pub mod storage_item;
pub mod storage_location;
pub mod storage_persistence;
pub mod storage_packet;

use storage_codec::*;
Expand Down
Loading

0 comments on commit 9994db1

Please sign in to comment.