Skip to content

Commit

Permalink
sql: use semaphore to limit access to the connection pool
Browse files Browse the repository at this point in the history
This ensures that if multiple connections are returned
to the pool at the same time, waiters get them in the order
they were placed in the queue.
  • Loading branch information
link2xt committed Feb 19, 2023
1 parent 609fc67 commit 10066b2
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- deltachat-rpc-server: do not block stdin while processing the request. #4041
deltachat-rpc-server now reads the next request as soon as previous request handler is spawned.
- enable `auto_vacuum` on all SQL connections #2955
- use semaphore for connection pool #4061

### API-Changes

Expand Down
2 changes: 1 addition & 1 deletion src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ impl Sql {
pub(crate) async fn get_conn(&self) -> Result<PooledConnection> {
let lock = self.pool.read().await;
let pool = lock.as_ref().context("no SQL connection")?;
let conn = pool.get().await;
let conn = pool.get().await?;

Ok(conn)
}
Expand Down
39 changes: 21 additions & 18 deletions src/sql/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,19 @@
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Weak};

use anyhow::{Context, Result};
use crossbeam_queue::ArrayQueue;
use rusqlite::Connection;
use tokio::sync::Notify;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};

/// Inner connection pool.
#[derive(Debug)]
struct InnerPool {
/// Available connections.
connections: ArrayQueue<Connection>,

/// Notifies about added connections.
///
/// Used to wait for available connection when the pool is empty.
notify: Notify,
/// Counts the number of available connections.
semaphore: Arc<Semaphore>,
}

impl InnerPool {
Expand All @@ -25,7 +24,6 @@ impl InnerPool {
/// The connection could be new or returned back.
fn put(&self, connection: Connection) {
self.connections.force_push(connection);
self.notify.notify_one();
}
}

Expand All @@ -36,6 +34,9 @@ pub struct PooledConnection {

/// Only `None` right after moving the connection back to the pool.
conn: Option<Connection>,

/// Semaphore permit, dropped after returning the connection to the pool.
_permit: OwnedSemaphorePermit,
}

impl Drop for PooledConnection {
Expand Down Expand Up @@ -75,7 +76,7 @@ impl Pool {
pub fn new(connections: Vec<Connection>) -> Self {
let inner = Arc::new(InnerPool {
connections: ArrayQueue::new(connections.len()),
notify: Notify::new(),
semaphore: Arc::new(Semaphore::new(connections.len())),
});
for connection in connections {
inner.connections.force_push(connection);
Expand All @@ -84,16 +85,18 @@ impl Pool {
}

/// Retrieves a connection from the pool.
pub async fn get(&self) -> PooledConnection {
loop {
if let Some(conn) = self.inner.connections.pop() {
return PooledConnection {
pool: Arc::downgrade(&self.inner),
conn: Some(conn),
};
}

self.inner.notify.notified().await;
}
pub async fn get(&self) -> Result<PooledConnection> {
let permit = self.inner.semaphore.clone().acquire_owned().await?;
let conn = self
.inner
.connections
.pop()
.context("got a permit when there are no connections in the pool")?;
let conn = PooledConnection {
pool: Arc::downgrade(&self.inner),
conn: Some(conn),
_permit: permit,
};
Ok(conn)
}
}

0 comments on commit 10066b2

Please sign in to comment.