File: test_tokio_multi_thread_uvloop.rs

package info (click to toggle)
rust-pyo3-async-runtimes 0.25.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 448 kB
  • sloc: makefile: 2
file content (46 lines) | stat: -rw-r--r-- 1,486 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#[cfg(not(target_os = "windows"))]
fn main() -> pyo3::PyResult<()> {
    use pyo3::{prelude::*, types::PyType};

    pyo3::prepare_freethreaded_python();

    Python::with_gil(|py| {
        // uvloop not supported on the free-threaded build yet
        // https://github.com/MagicStack/uvloop/issues/642
        let sysconfig = py.import("sysconfig")?;
        let is_freethreaded = sysconfig.call_method1("get_config_var", ("Py_GIL_DISABLED",))?;
        if is_freethreaded.is_truthy()? {
            return Ok(());
        }

        let uvloop = py.import("uvloop")?;
        uvloop.call_method0("install")?;

        // store a reference for the assertion
        let uvloop = PyObject::from(uvloop);

        pyo3_async_runtimes::tokio::run(py, async move {
            // verify that we are on a uvloop.Loop
            Python::with_gil(|py| -> PyResult<()> {
                assert!(
                    pyo3_async_runtimes::tokio::get_current_loop(py)?.is_instance(
                        uvloop
                            .bind(py)
                            .getattr("Loop")?
                            .downcast::<PyType>()
                            .unwrap()
                    )?
                );
                Ok(())
            })?;

            tokio::time::sleep(std::time::Duration::from_secs(1)).await;

            println!("test test_tokio_multi_thread_uvloop ... ok");
            Ok(())
        })
    })
}

#[cfg(target_os = "windows")]
fn main() {}