diff --git a/README.md b/README.md index e5d96bf..c838c4b 100644 --- a/README.md +++ b/README.md @@ -25,8 +25,8 @@ Configuration is done through environment variables: ## Running in console #### Install ```shell -VERSION=0.0.1 && \ -PLATFORM=x86_64-unknown-linux-gnu && \ +VERSION=0.0.1 \ +PLATFORM=x86_64-unknown-linux-gnu \ curl -SLO https://github.com/subzerocloud/pg-amqp-bridge/releases/download/${VERSION}/pg-amqp-bridge-${VERSION}-${PLATFORM}.tar.gz && \ tar zxf pg-amqp-bridge-${VERSION}-${PLATFORM}.tar.gz && \ mv pg-amqp-bridge /usr/local/bin @@ -42,40 +42,13 @@ pg-amqp-bridge ## Running as docker container ```shell -docker run -d \ --e POSTGRESQL_URI="postgres://postgres@database" \ --e AMQP_URI="amqp://rabbitmq//" \ +docker run --rm -it --net=host \ +-e POSTGRESQL_URI="postgres://postgres@localhost" \ +-e AMQP_URI="amqp://localhost//" \ -e BRIDGE_CHANNELS="pgchannel1:task_queue,pgchannel2:direct_exchange,pgchannel3:topic_exchange" \ ---add-host database:DB_IP_ADDRESS \ ---add-host rabbitmq:RABBITMQ_IP_ADDRESS \ subzerocloud/pg-amqp-bridge ``` -## Running from source - -#### Install Rust - -```shell -curl https://sh.rustup.rs -sSf | sh -``` - -#### Run - -```shell -POSTGRESQL_URI="postgres://postgres@localhost" \ -AMQP_URI="amqp://localhost//" \ -BRIDGE_CHANNELS="pgchannel1:task_queue,pgchannel2:direct_exchange,pgchannel3:topic_exchange" \ -cargo run -``` - -#### Test - -**Note**: RabbitMQ and PostgreSQL need to be running on your localhost - -```shell -cargo test -``` - ## Sending messages **Note**: the bridge doesn't declare exchanges or queues, if they aren't previoulsy declared it will exit with an error. @@ -120,7 +93,6 @@ NOTIFY pgchannel3, 'quick.brown.fox|Topic message'; NOTIFY pgchannel3, 'lazy.#|Topic message'; ``` - ## Helper Functions To make sending messages a bit easier you can setup the following functions in your database @@ -168,9 +140,30 @@ after insert or update or delete on tablename for each row execute procedure rabbitmq.on_row_change(); ``` -## Author +## Running from source + +#### Install Rust + +```shell +curl https://sh.rustup.rs -sSf | sh +``` + +#### Run + +```shell +POSTGRESQL_URI="postgres://postgres@localhost" \ +AMQP_URI="amqp://localhost//" \ +BRIDGE_CHANNELS="pgchannel1:task_queue,pgchannel2:direct_exchange,pgchannel3:topic_exchange" \ +cargo run +``` + +#### Test + +**Note**: RabbitMQ and PostgreSQL need to be running on your localhost -Steve Chavez +```shell +cargo test +``` ## Contributing diff --git a/src/lib.rs b/src/lib.rs index ed09f1b..2af02cd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,7 @@ use postgres::*; use std::default::Default; use std::thread; use std::thread::JoinHandle; +use std::time::Duration; #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] enum Type { @@ -37,7 +38,24 @@ impl Bridge { let mut bindings = parse_bridge_channels(bridge_channels); let mut children = Vec::new(); - let mut session = Session::open_url(amqp_uri.as_str()).unwrap(); + println!("Attempting to connect to AMQP server.."); + let mut session = { + let mut s = Session::open_url(amqp_uri.as_str()); + // Retry with cyclic exponential backoff + let mut i = 1; + while let Err(e) = s { + println!("{:?}", e); + let time = Duration::from_secs(i); + println!("Retrying the AMQP connection in {:?} seconds..", time.as_secs()); + thread::sleep(time); + s = Session::open_url(amqp_uri.as_str()); + i *= 2; + if i > 32 { i = 1 }; + }; + s.unwrap() + }; + println!("Connection to AMQP server successful"); + let mut channel_id = 0; for binding in &mut bindings{ diff --git a/tests/main.rs b/tests/main.rs index 57bc849..2ae4237 100644 --- a/tests/main.rs +++ b/tests/main.rs @@ -15,11 +15,13 @@ use lapin::client::*; use lapin::channel::*; use std::env; use std::thread; +use std::time::Duration; use test::*; use bridge::Bridge; //Lapin doesn't support amqp://localhost// format. -const TEST_AMQP_URI: &str = "127.0.0.1:5672"; +const TEST_AMQP_HOST_PORT: &str = "127.0.0.1:5672"; +const TEST_AMQP_URI: &str = "amqp://localhost//"; const TEST_PG_URI: &str = "postgres://postgres@localhost"; const TEST_1_PG_CHANNEL: &str = "test_1_pgchannel"; @@ -42,7 +44,7 @@ const TEST_3_EXCHANGE: &str = "test_3_topic_exchange"; fn publishing_to_queue_works() { let mut core = Core::new().unwrap(); let handle = core.handle(); - let addr = TEST_AMQP_URI.parse().unwrap(); + let addr = TEST_AMQP_HOST_PORT.parse().unwrap(); let pg_conn = Connection::connect(TEST_PG_URI, TlsMode::None).unwrap(); @@ -71,7 +73,7 @@ fn publishing_to_queue_works() { fn publishing_to_direct_exchange_works() { let mut core = Core::new().unwrap(); let handle = core.handle(); - let addr = TEST_AMQP_URI.parse().unwrap(); + let addr = TEST_AMQP_HOST_PORT.parse().unwrap(); let pg_conn = Connection::connect(TEST_PG_URI, TlsMode::None).unwrap(); @@ -109,7 +111,7 @@ fn publishing_to_direct_exchange_works() { fn publishing_to_topic_exchange_works() { let mut core = Core::new().unwrap(); let handle = core.handle(); - let addr = TEST_AMQP_URI.parse().unwrap(); + let addr = TEST_AMQP_HOST_PORT.parse().unwrap(); let pg_conn = Connection::connect(TEST_PG_URI, TlsMode::None).unwrap(); @@ -152,7 +154,7 @@ fn publishing_to_topic_exchange_works() { fn setup(){ let mut core = Core::new().unwrap(); let handle = core.handle(); - let addr = TEST_AMQP_URI.parse().unwrap(); + let addr = TEST_AMQP_HOST_PORT.parse().unwrap(); let _ = core.run( TcpStream::connect(&addr, &handle) @@ -187,7 +189,7 @@ fn setup(){ fn teardown(){ let mut core = Core::new().unwrap(); let handle = core.handle(); - let addr = TEST_AMQP_URI.parse().unwrap(); + let addr = TEST_AMQP_HOST_PORT.parse().unwrap(); let _ = core.run( TcpStream::connect(&addr, &handle) @@ -229,11 +231,11 @@ fn main() { add_test(&mut tests, "publishing_to_direct_exchange_works".to_string(), publishing_to_direct_exchange_works); add_test(&mut tests, "publishing_to_topic_exchange_works".to_string(), publishing_to_topic_exchange_works); thread::spawn(move || - Bridge::new().start(&"amqp://localhost//".to_string(), - &TEST_PG_URI.to_string(), - &bridge_channels) + Bridge::new().start(&TEST_AMQP_URI.to_string(), + &TEST_PG_URI.to_string(), + &bridge_channels) ); - thread::sleep_ms(2000); + thread::sleep(Duration::from_secs(2)); test::test_main(&args, tests); teardown(); }