Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful shutdown implementation #6

Merged
merged 2 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

The open-source data access and storage platform

<img src = "docs/img/anor-wb.png" width = "25%"/>

## Components

* [In-Memory Data Storage](/anor-storage)
Expand Down
8 changes: 4 additions & 4 deletions anor-api/src/client/api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,19 @@ impl SocketClient for StorageApiClient {
}
*/

fn insert(&self, storage_item: StorageItem) {
fn insert(&self, _storage_item: StorageItem) {
todo!()
}

fn update(&mut self, key: &str, storage_item: StorageItem) -> std::io::Result<()> {
fn update(&mut self, _key: &str, _storage_item: StorageItem) -> std::io::Result<()> {
todo!()
}

fn get(&mut self, key: &str) -> std::io::Result<StorageItem> {
fn get(&mut self, _key: &str) -> std::io::Result<StorageItem> {
todo!()
}

fn remove(&self, key: &str) -> bool {
fn remove(&self, _key: &str) -> bool {
todo!()
}

Expand Down
6 changes: 3 additions & 3 deletions anor-api/src/service/api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl ApiService for StorageApi {

fn start(
&self,
shutdown: Arc<AtomicBool>,
server_shutdown: Arc<AtomicBool>,
signal_ready_sender: Sender<()>,
) -> Result<(), String> {
assert!(self.config.api.is_some());
Expand All @@ -57,10 +57,10 @@ impl ApiService for StorageApi {

let pool = ThreadPool::new(2);

while !shutdown.load(Ordering::SeqCst) {
while !server_shutdown.load(Ordering::SeqCst) {
match listener.accept() {
Ok((stream, addr)) => {
let shutdown_clone = shutdown.clone();
let shutdown_clone = server_shutdown.clone();
pool.execute(move || {
handle_connection(stream, addr, shutdown_clone);
});
Expand Down
47 changes: 47 additions & 0 deletions anor-config.debug
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# anor configuration file

# storage settings
storage:
data_path: "/tmp/anor"

# api service settings
api:
name: anor1
description: "Anor API Service"
id: anor1@anor
access_token: face0aa08c29eb27aa3e0ebb7fe9d9a678a9caecc1f7b886e35bc16b1c266f42
listen_addresses: 127.0.0.1
listen_port: 9191
connections_max: 20
threads_max: 4
ram_max: 512M
disk_max: 2G
enabled: true

# http service settings
http:
description: "Anor HTTP Service"
listen_addresses: 127.0.0.1
listen_port: 8181
enabled: true

# pool of remote server nodes
remote:
nodes: 127.0.0.1:9191

# client settings
client:
name: test
id: test1@file_api
access_token: bbc5f7280aa440648d2ca6023610956da401739283ec77593492aa385f256dec

# redundancy settings
redundancy:
# strategy: normal - replicate item on 'redundancy_replica_min' nodes
# strategy: maximum - replicate item on 'redundancy_replica_max' nodes if there are enough resources
# strategy: paranoid - replicate item on all nodes if there are enough resources
strategy: normal
replica_min: 2
replica_max: 5

## end of configuration
47 changes: 47 additions & 0 deletions anor-config.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# anor configuration file

# storage settings
storage:
data_path: "${CARGO_MANIFEST_DIR}/target/tmp/anor"

# api service settings
api:
name: anor1
description: "Anor API Service"
id: anor1@anor
access_token: face0aa08c29eb27aa3e0ebb7fe9d9a678a9caecc1f7b886e35bc16b1c266f42
listen_addresses: 127.0.0.1
listen_port: 9191
connections_max: 20
threads_max: 4
ram_max: 512M
disk_max: 2G
enabled: true

# http service settings
http:
description: "Anor HTTP Service"
listen_addresses: 127.0.0.1
listen_port: 8181
enabled: true

# pool of remote server nodes
remote:
nodes: 127.0.0.1:9191

# client settings
client:
name: test
id: test1@file_api
access_token: bbc5f7280aa440648d2ca6023610956da401739283ec77593492aa385f256dec

# redundancy settings
redundancy:
# strategy: normal - replicate item on 'redundancy_replica_min' nodes
# strategy: maximum - replicate item on 'redundancy_replica_max' nodes if there are enough resources
# strategy: paranoid - replicate item on all nodes if there are enough resources
strategy: normal
replica_min: 2
replica_max: 5

## end of configuration
47 changes: 47 additions & 0 deletions anor-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# anor configuration file

# storage settings
storage:
data_path: "/var/anor"

# api service settings
api:
name: anor1
description: "Anor API Service"
id: anor1@anor
access_token: face0aa08c29eb27aa3e0ebb7fe9d9a678a9caecc1f7b886e35bc16b1c266f42
listen_addresses: 127.0.0.1
listen_port: 9191
connections_max: 20
threads_max: 4
ram_max: 512M
disk_max: 2G
enabled: true

# http service settings
http:
description: "Anor HTTP Service"
listen_addresses: 127.0.0.1
listen_port: 8181
enabled: true

# pool of remote server nodes
remote:
nodes: 127.0.0.1:9191

# client settings
client:
name: test
id: test1@file_api
access_token: bbc5f7280aa440648d2ca6023610956da401739283ec77593492aa385f256dec

# redundancy settings
redundancy:
# strategy: normal - replicate item on 'redundancy_replica_min' nodes
# strategy: maximum - replicate item on 'redundancy_replica_max' nodes if there are enough resources
# strategy: paranoid - replicate item on all nodes if there are enough resources
strategy: normal
replica_min: 2
replica_max: 5

## end of configuration
1 change: 1 addition & 0 deletions anor-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ categories = ["data-structures", "caching", "database", "filesystem"]
[dependencies]
log = "0.4"
log4rs = "1.2"
tokio = { version = "1", features = ["full"] }

anor-storage = { path = "../anor-storage"}
anor-api = { path = "../anor-api"}
Expand Down
5 changes: 0 additions & 5 deletions anor-server/src/lib.rs

This file was deleted.

116 changes: 77 additions & 39 deletions anor-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,65 +1,65 @@
use std::sync::atomic::Ordering;
use std::thread;
use std::thread::{self, JoinHandle};
use std::{
sync::{atomic::AtomicBool, mpsc::channel, Arc},
time::Instant,
};

use anor_http::client::http_client;
use anor_http::service::http_service;
use anor_storage::storage::Storage;
use anor_api::{
client::api_client::{SocketClient, StorageApiClient},
service::api_service::{ApiService, StorageApi},
};
use anor_http::client::http_client;
use anor_http::service::http_service;
use anor_storage::storage::Storage;

use anor_utils::config;
use anor_utils::config::{self, Config};

fn main() {
start_api_server();
}

fn start_api_server() {
let launch_start = Instant::now();
use tokio::signal::unix::{signal, SignalKind};

#[tokio::main]
async fn main() {
log4rs::init_file("log.yaml", Default::default()).unwrap();

log::info!("{} v{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"));

// load the configuration
let config = config::load();

// open the data storage
let storage = Storage::open_with_config(config.clone());

// prapare service parameters
let api_service_config = config.clone();
let api_service_shutdown = Arc::new(AtomicBool::new(false));
let api_service_shutdown_clone = api_service_shutdown.clone();
let (api_service_ready_sender, api_service_ready_receiver) = channel();

// start the storage api service in a separate thread
let api_service_handler = thread::spawn(move || {
let service = StorageApi::with_config(storage, api_service_config);
if let Err(err) = service.start(api_service_shutdown_clone, api_service_ready_sender) {
log::error!("{}", err);
panic!("{}", err);
}
// hook for graceful shutdown
let server_shutdown = Arc::new(AtomicBool::new(false));
let config_cloned = config.clone();
let server_shutdown_cloned = server_shutdown.clone();
tokio::spawn(async move {
let mut sigint = signal(SignalKind::interrupt()).unwrap();
let mut sigterm = signal(SignalKind::terminate()).unwrap();
tokio::select! {
_ = sigint.recv() => {
if log::log_enabled!(log::Level::Debug) {
log::debug!("Recieved SIGINT");
}
}
_ = sigterm.recv() => {
if log::log_enabled!(log::Level::Debug) {
log::debug!("Recieved SIGTERM");
}
},
};
graceful_shutdown(server_shutdown_cloned, config_cloned);
});

// wait for the readiness of api service
if let Err(err) = api_service_ready_receiver.recv() {
log::error!("{}", err);
panic!("{}", err);
}

let launch_elapsed = Instant::elapsed(&launch_start);
log::info!("Anor Storage API service started in {:?}", launch_elapsed);
// starting API service
let api_service = start_api_service(config.clone(), storage, server_shutdown.clone());

// api client tests
let mut api_client1 = StorageApiClient::with_config(config.clone());
api_client1.connect().expect("client connection error");

let keys = api_client1.keys();
log::debug!("{:?}", keys);
_ = api_client1.disconnect();

/*
let msg1 = String::from("Hi there1!");
Expand All @@ -78,22 +78,60 @@ fn start_api_server() {
client2.set_item(msg2).expect("set item error");
*/

// shutdown the api server
api_service_shutdown.store(true, Ordering::SeqCst);
api_service.join().unwrap();
log::info!("Anor Storage API service is shutdown successfully.");

log::info!("Anor Server is shutdown successfully.");
}

fn graceful_shutdown(server_shutdown: Arc<AtomicBool>, config: Arc<Config>) {
log::info!("Initializing the graceful shutdown process...");
server_shutdown.store(true, Ordering::SeqCst);

// a temporary solution to unblock socket listener
// make an empty connection to unblock listener and shutdown the api server
let mut api_client_terminate = StorageApiClient::with_config(config.clone());
let mut api_client_terminate = StorageApiClient::with_config(config);
api_client_terminate
.connect()
.expect("client connection error");
_ = api_client_terminate.disconnect();
}

api_service_handler.join().unwrap();
fn start_api_service(
config: Arc<Config>,
storage: Storage,
server_shutdown: Arc<AtomicBool>,
) -> JoinHandle<()> {
log::info!("Starting Anor Storage API service...");

log::info!("Anor Storage API service is shutdown successfully.");
let launch_start = Instant::now();

// prapare service parameters
let api_service_config = config.clone();
let (api_service_ready_sender, api_service_ready_receiver) = channel();

// start the storage api service in a separate thread
let api_service_handler = thread::spawn(move || {
let service = StorageApi::with_config(storage, api_service_config);
if let Err(err) = service.start(server_shutdown, api_service_ready_sender) {
log::error!("{}", err);
panic!("{}", err);
}
});

// wait for the readiness of api service
if let Err(err) = api_service_ready_receiver.recv() {
log::error!("{}", err);
panic!("{}", err);
}

let launch_elapsed = Instant::elapsed(&launch_start);
log::info!("Anor Storage API service started in {:?}", launch_elapsed);

api_service_handler
}

fn _start_http_server() {

let config = config::load();

// open the data storage
Expand Down
2 changes: 1 addition & 1 deletion anor-storage/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{
pub mod storage_codec;
pub mod storage_const;
pub mod storage_item;
pub mod storage_location;
pub mod storage_persistence;
pub mod storage_packet;

use storage_codec::*;
Expand Down
Loading