Skip to content
This repository has been archived by the owner on Apr 16, 2022. It is now read-only.

Commit

Permalink
Merge pull request #1 from steve-chavez/master
Browse files Browse the repository at this point in the history
Add amqp connection retrying at startup and update README
  • Loading branch information
Steve Chávez committed Jun 23, 2017
2 parents 8353a17 + 0bb6c66 commit a5b0e57
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 46 deletions.
63 changes: 28 additions & 35 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ Configuration is done through environment variables:
## Running in console
#### Install
```shell
VERSION=0.0.1 && \
PLATFORM=x86_64-unknown-linux-gnu && \
VERSION=0.0.1 \
PLATFORM=x86_64-unknown-linux-gnu \
curl -SLO https://github.com/subzerocloud/pg-amqp-bridge/releases/download/${VERSION}/pg-amqp-bridge-${VERSION}-${PLATFORM}.tar.gz && \
tar zxf pg-amqp-bridge-${VERSION}-${PLATFORM}.tar.gz && \
mv pg-amqp-bridge /usr/local/bin
Expand All @@ -42,40 +42,13 @@ pg-amqp-bridge
## Running as docker container

```shell
docker run -d \
-e POSTGRESQL_URI="postgres://postgres@database" \
-e AMQP_URI="amqp://rabbitmq//" \
docker run --rm -it --net=host \
-e POSTGRESQL_URI="postgres://postgres@localhost" \
-e AMQP_URI="amqp://localhost//" \
-e BRIDGE_CHANNELS="pgchannel1:task_queue,pgchannel2:direct_exchange,pgchannel3:topic_exchange" \
--add-host database:DB_IP_ADDRESS \
--add-host rabbitmq:RABBITMQ_IP_ADDRESS \
subzerocloud/pg-amqp-bridge
```

## Running from source

#### Install Rust

```shell
curl https://sh.rustup.rs -sSf | sh
```

#### Run

```shell
POSTGRESQL_URI="postgres://postgres@localhost" \
AMQP_URI="amqp://localhost//" \
BRIDGE_CHANNELS="pgchannel1:task_queue,pgchannel2:direct_exchange,pgchannel3:topic_exchange" \
cargo run
```

#### Test

**Note**: RabbitMQ and PostgreSQL need to be running on your localhost

```shell
cargo test
```

## Sending messages
**Note**: the bridge doesn't declare exchanges or queues, if they aren't previoulsy declared it will exit with an error.

Expand Down Expand Up @@ -120,7 +93,6 @@ NOTIFY pgchannel3, 'quick.brown.fox|Topic message';
NOTIFY pgchannel3, 'lazy.#|Topic message';
```


## Helper Functions

To make sending messages a bit easier you can setup the following functions in your database
Expand Down Expand Up @@ -168,9 +140,30 @@ after insert or update or delete on tablename
for each row execute procedure rabbitmq.on_row_change();
```

## Author
## Running from source

#### Install Rust

```shell
curl https://sh.rustup.rs -sSf | sh
```

#### Run

```shell
POSTGRESQL_URI="postgres://postgres@localhost" \
AMQP_URI="amqp://localhost//" \
BRIDGE_CHANNELS="pgchannel1:task_queue,pgchannel2:direct_exchange,pgchannel3:topic_exchange" \
cargo run
```

#### Test

**Note**: RabbitMQ and PostgreSQL need to be running on your localhost

Steve Chavez
```shell
cargo test
```

## Contributing

Expand Down
20 changes: 19 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use postgres::*;
use std::default::Default;
use std::thread;
use std::thread::JoinHandle;
use std::time::Duration;

#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
enum Type {
Expand Down Expand Up @@ -37,7 +38,24 @@ impl Bridge {
let mut bindings = parse_bridge_channels(bridge_channels);
let mut children = Vec::new();

let mut session = Session::open_url(amqp_uri.as_str()).unwrap();
println!("Attempting to connect to AMQP server..");
let mut session = {
let mut s = Session::open_url(amqp_uri.as_str());
// Retry with cyclic exponential backoff
let mut i = 1;
while let Err(e) = s {
println!("{:?}", e);
let time = Duration::from_secs(i);
println!("Retrying the AMQP connection in {:?} seconds..", time.as_secs());
thread::sleep(time);
s = Session::open_url(amqp_uri.as_str());
i *= 2;
if i > 32 { i = 1 };
};
s.unwrap()
};
println!("Connection to AMQP server successful");

let mut channel_id = 0;

for binding in &mut bindings{
Expand Down
22 changes: 12 additions & 10 deletions tests/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ use lapin::client::*;
use lapin::channel::*;
use std::env;
use std::thread;
use std::time::Duration;
use test::*;
use bridge::Bridge;

//Lapin doesn't support amqp://localhost// format.
const TEST_AMQP_URI: &str = "127.0.0.1:5672";
const TEST_AMQP_HOST_PORT: &str = "127.0.0.1:5672";
const TEST_AMQP_URI: &str = "amqp://localhost//";
const TEST_PG_URI: &str = "postgres://postgres@localhost";

const TEST_1_PG_CHANNEL: &str = "test_1_pgchannel";
Expand All @@ -42,7 +44,7 @@ const TEST_3_EXCHANGE: &str = "test_3_topic_exchange";
fn publishing_to_queue_works() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let addr = TEST_AMQP_URI.parse().unwrap();
let addr = TEST_AMQP_HOST_PORT.parse().unwrap();

let pg_conn = Connection::connect(TEST_PG_URI, TlsMode::None).unwrap();

Expand Down Expand Up @@ -71,7 +73,7 @@ fn publishing_to_queue_works() {
fn publishing_to_direct_exchange_works() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let addr = TEST_AMQP_URI.parse().unwrap();
let addr = TEST_AMQP_HOST_PORT.parse().unwrap();

let pg_conn = Connection::connect(TEST_PG_URI, TlsMode::None).unwrap();

Expand Down Expand Up @@ -109,7 +111,7 @@ fn publishing_to_direct_exchange_works() {
fn publishing_to_topic_exchange_works() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let addr = TEST_AMQP_URI.parse().unwrap();
let addr = TEST_AMQP_HOST_PORT.parse().unwrap();

let pg_conn = Connection::connect(TEST_PG_URI, TlsMode::None).unwrap();

Expand Down Expand Up @@ -152,7 +154,7 @@ fn publishing_to_topic_exchange_works() {
fn setup(){
let mut core = Core::new().unwrap();
let handle = core.handle();
let addr = TEST_AMQP_URI.parse().unwrap();
let addr = TEST_AMQP_HOST_PORT.parse().unwrap();

let _ = core.run(
TcpStream::connect(&addr, &handle)
Expand Down Expand Up @@ -187,7 +189,7 @@ fn setup(){
fn teardown(){
let mut core = Core::new().unwrap();
let handle = core.handle();
let addr = TEST_AMQP_URI.parse().unwrap();
let addr = TEST_AMQP_HOST_PORT.parse().unwrap();

let _ = core.run(
TcpStream::connect(&addr, &handle)
Expand Down Expand Up @@ -229,11 +231,11 @@ fn main() {
add_test(&mut tests, "publishing_to_direct_exchange_works".to_string(), publishing_to_direct_exchange_works);
add_test(&mut tests, "publishing_to_topic_exchange_works".to_string(), publishing_to_topic_exchange_works);
thread::spawn(move ||
Bridge::new().start(&"amqp://localhost//".to_string(),
&TEST_PG_URI.to_string(),
&bridge_channels)
Bridge::new().start(&TEST_AMQP_URI.to_string(),
&TEST_PG_URI.to_string(),
&bridge_channels)
);
thread::sleep_ms(2000);
thread::sleep(Duration::from_secs(2));
test::test_main(&args, tests);
teardown();
}
Expand Down

0 comments on commit a5b0e57

Please sign in to comment.