File: test_race_condition_regression.rs

package info (click to toggle)
rust-pyo3-async-runtimes 0.25.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 448 kB
  • sloc: makefile: 2
file content (68 lines) | stat: -rw-r--r-- 2,082 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
use std::ffi::CString;

use pyo3::{prelude::*, wrap_pyfunction};

#[pyfunction]
fn sleep<'p>(py: Python<'p>, secs: Bound<'p, PyAny>) -> PyResult<Bound<'p, PyAny>> {
    let secs = secs.extract()?;

    pyo3_async_runtimes::async_std::future_into_py(py, async move {
        async_std::task::sleep(std::time::Duration::from_secs_f64(secs)).await;
        Ok(())
    })
}

const RACE_CONDITION_REGRESSION_TEST: &str = r#"
import asyncio
import random

async def trigger_race_condition(rust_sleeper, delay):
    coro = asyncio.wrap_future(rust_sleeper(0.1))
    await asyncio.sleep(delay)
    coro.cancel()

def main(rust_sleeper):
    race_condition_triggered = False

    for i in range(1000):
        delay = random.uniform(0.099, 0.101)
        loop = asyncio.new_event_loop()
        loop.set_debug(True)

        def custom_exception_handler(loop, context):
            nonlocal race_condition_triggered
            race_condition_triggered = True

        try:
            loop.set_exception_handler(custom_exception_handler)
            loop.run_until_complete(trigger_race_condition(rust_sleeper, delay))

            if race_condition_triggered:
                raise Exception("Race condition triggered")

        finally:
            loop.run_until_complete(loop.shutdown_asyncgens())
            if hasattr(loop, 'shutdown_default_executor'):
                loop.run_until_complete(loop.shutdown_default_executor())
            loop.close()
"#;

fn main() -> pyo3::PyResult<()> {
    pyo3::prepare_freethreaded_python();

    Python::with_gil(|py| -> PyResult<()> {
        let sleeper_mod = PyModule::new(py, "rust_sleeper")?;

        sleeper_mod.add_wrapped(wrap_pyfunction!(sleep))?;

        let test_mod = PyModule::from_code(
            py,
            &CString::new(RACE_CONDITION_REGRESSION_TEST).unwrap(),
            &CString::new("race_condition_regression_test.py").unwrap(),
            &CString::new("race_condition_regression_test").unwrap(),
        )?;

        test_mod.call_method1("main", (sleeper_mod.getattr("sleep")?,))?;
        Ok(())
    })
}