use crate::err::PyResult;
use crate::ffi_ptr_ext::FfiPtrExt;
use crate::py_result_ext::PyResultExt;
use crate::types::{any::PyAny, PyNone};
use crate::{ffi, Bound, BoundObject, IntoPyObject};
#[cfg(any(PyPy, GraalPy, Py_LIMITED_API))]
use crate::type_object::PyTypeCheck;
use super::PyWeakrefMethods;
#[repr(transparent)]
pub struct PyWeakrefReference(PyAny);
#[cfg(not(any(PyPy, GraalPy, Py_LIMITED_API)))]
pyobject_subclassable_native_type!(PyWeakrefReference, crate::ffi::PyWeakReference);
#[cfg(not(any(PyPy, GraalPy, Py_LIMITED_API)))]
pyobject_native_type!(
PyWeakrefReference,
ffi::PyWeakReference,
pyobject_native_static_type_object!(ffi::_PyWeakref_RefType),
#module=Some("weakref"),
#checkfunction=ffi::PyWeakref_CheckRefExact
);
#[cfg(any(PyPy, GraalPy, Py_LIMITED_API))]
pyobject_native_type_named!(PyWeakrefReference);
#[cfg(any(PyPy, GraalPy, Py_LIMITED_API))]
impl PyTypeCheck for PyWeakrefReference {
const NAME: &'static str = "weakref.ReferenceType";
fn type_check(object: &Bound<'_, PyAny>) -> bool {
unsafe { ffi::PyWeakref_CheckRef(object.as_ptr()) > 0 }
}
}
impl PyWeakrefReference {
#[cfg_attr(
not(all(feature = "macros", not(all(Py_LIMITED_API, not(Py_3_9))))),
doc = "```rust,ignore"
)]
#[cfg_attr(
all(feature = "macros", not(all(Py_LIMITED_API, not(Py_3_9)))),
doc = "```rust"
)]
pub fn new<'py>(object: &Bound<'py, PyAny>) -> PyResult<Bound<'py, PyWeakrefReference>> {
unsafe {
Bound::from_owned_ptr_or_err(
object.py(),
ffi::PyWeakref_NewRef(object.as_ptr(), ffi::Py_None()),
)
.downcast_into_unchecked()
}
}
#[deprecated(since = "0.23.0", note = "renamed to `PyWeakrefReference::new`")]
#[inline]
pub fn new_bound<'py>(object: &Bound<'py, PyAny>) -> PyResult<Bound<'py, PyWeakrefReference>> {
Self::new(object)
}
#[cfg_attr(
not(all(feature = "macros", not(all(Py_LIMITED_API, not(Py_3_9))))),
doc = "```rust,ignore"
)]
#[cfg_attr(
all(feature = "macros", not(all(Py_LIMITED_API, not(Py_3_9)))),
doc = "```rust"
)]
pub fn new_with<'py, C>(
object: &Bound<'py, PyAny>,
callback: C,
) -> PyResult<Bound<'py, PyWeakrefReference>>
where
C: IntoPyObject<'py>,
{
fn inner<'py>(
object: &Bound<'py, PyAny>,
callback: Bound<'py, PyAny>,
) -> PyResult<Bound<'py, PyWeakrefReference>> {
unsafe {
Bound::from_owned_ptr_or_err(
object.py(),
ffi::PyWeakref_NewRef(object.as_ptr(), callback.as_ptr()),
)
.downcast_into_unchecked()
}
}
let py = object.py();
inner(
object,
callback
.into_pyobject(py)
.map(BoundObject::into_any)
.map(BoundObject::into_bound)
.map_err(Into::into)?,
)
}
#[deprecated(since = "0.23.0", note = "renamed to `PyWeakrefReference::new_with`")]
#[allow(deprecated)]
#[inline]
pub fn new_bound_with<'py, C>(
object: &Bound<'py, PyAny>,
callback: C,
) -> PyResult<Bound<'py, PyWeakrefReference>>
where
C: crate::ToPyObject,
{
Self::new_with(object, callback.to_object(object.py()))
}
}
impl<'py> PyWeakrefMethods<'py> for Bound<'py, PyWeakrefReference> {
fn get_object(&self) -> Bound<'py, PyAny> {
let mut obj: *mut ffi::PyObject = std::ptr::null_mut();
match unsafe { ffi::compat::PyWeakref_GetRef(self.as_ptr(), &mut obj) } {
std::os::raw::c_int::MIN..=-1 => panic!("The 'weakref.ReferenceType' instance should be valid (non-null and actually a weakref reference)"),
0 => PyNone::get(self.py()).to_owned().into_any(),
1..=std::os::raw::c_int::MAX => unsafe { obj.assume_owned_unchecked(self.py()) },
}
}
}
#[cfg(test)]
mod tests {
use crate::types::any::{PyAny, PyAnyMethods};
use crate::types::weakref::{PyWeakrefMethods, PyWeakrefReference};
use crate::{Bound, PyResult, Python};
#[cfg(all(not(Py_LIMITED_API), Py_3_10))]
const CLASS_NAME: &str = "<class 'weakref.ReferenceType'>";
#[cfg(all(not(Py_LIMITED_API), not(Py_3_10)))]
const CLASS_NAME: &str = "<class 'weakref'>";
fn check_repr(
reference: &Bound<'_, PyWeakrefReference>,
object: Option<(&Bound<'_, PyAny>, &str)>,
) -> PyResult<()> {
let repr = reference.repr()?.to_string();
let (first_part, second_part) = repr.split_once("; ").unwrap();
{
let (msg, addr) = first_part.split_once("0x").unwrap();
assert_eq!(msg, "<weakref at ");
assert!(addr
.to_lowercase()
.contains(format!("{:x?}", reference.as_ptr()).split_at(2).1));
}
match object {
Some((object, class)) => {
let (msg, addr) = second_part.split_once("0x").unwrap();
assert!(msg.starts_with("to '"));
assert!(msg.contains(class));
assert!(msg.ends_with("' at "));
assert!(addr
.to_lowercase()
.contains(format!("{:x?}", object.as_ptr()).split_at(2).1));
}
None => {
assert_eq!(second_part, "dead>")
}
}
Ok(())
}
mod python_class {
use super::*;
use crate::ffi;
use crate::{py_result_ext::PyResultExt, types::PyType};
fn get_type(py: Python<'_>) -> PyResult<Bound<'_, PyType>> {
py.run(ffi::c_str!("class A:\n pass\n"), None, None)?;
py.eval(ffi::c_str!("A"), None, None)
.downcast_into::<PyType>()
}
#[test]
fn test_weakref_reference_behavior() -> PyResult<()> {
Python::with_gil(|py| {
let class = get_type(py)?;
let object = class.call0()?;
let reference = PyWeakrefReference::new(&object)?;
assert!(!reference.is(&object));
assert!(reference.get_object().is(&object));
#[cfg(not(Py_LIMITED_API))]
assert_eq!(reference.get_type().to_string(), CLASS_NAME);
#[cfg(not(Py_LIMITED_API))]
assert_eq!(reference.getattr("__class__")?.to_string(), CLASS_NAME);
#[cfg(not(Py_LIMITED_API))]
check_repr(&reference, Some((object.as_any(), "A")))?;
assert!(reference
.getattr("__callback__")
.map_or(false, |result| result.is_none()));
assert!(reference.call0()?.is(&object));
drop(object);
assert!(reference.get_object().is_none());
#[cfg(not(Py_LIMITED_API))]
assert_eq!(reference.getattr("__class__")?.to_string(), CLASS_NAME);
check_repr(&reference, None)?;
assert!(reference
.getattr("__callback__")
.map_or(false, |result| result.is_none()));
assert!(reference.call0()?.is_none());
Ok(())
})
}
#[test]
fn test_weakref_upgrade_as() -> PyResult<()> {
Python::with_gil(|py| {
let class = get_type(py)?;
let object = class.call0()?;
let reference = PyWeakrefReference::new(&object)?;
{
let obj = reference.upgrade_as::<PyAny>();
assert!(obj.is_ok());
let obj = obj.unwrap();
assert!(obj.is_some());
assert!(obj.map_or(false, |obj| obj.as_ptr() == object.as_ptr()
&& obj.is_exact_instance(&class)));
}
drop(object);
{
let obj = reference.upgrade_as::<PyAny>();
assert!(obj.is_ok());
let obj = obj.unwrap();
assert!(obj.is_none());
}
Ok(())
})
}
#[test]
fn test_weakref_upgrade_as_unchecked() -> PyResult<()> {
Python::with_gil(|py| {
let class = get_type(py)?;
let object = class.call0()?;
let reference = PyWeakrefReference::new(&object)?;
{
let obj = unsafe { reference.upgrade_as_unchecked::<PyAny>() };
assert!(obj.is_some());
assert!(obj.map_or(false, |obj| obj.as_ptr() == object.as_ptr()
&& obj.is_exact_instance(&class)));
}
drop(object);
{
let obj = unsafe { reference.upgrade_as_unchecked::<PyAny>() };
assert!(obj.is_none());
}
Ok(())
})
}
#[test]
fn test_weakref_upgrade() -> PyResult<()> {
Python::with_gil(|py| {
let class = get_type(py)?;
let object = class.call0()?;
let reference = PyWeakrefReference::new(&object)?;
assert!(reference.call0()?.is(&object));
assert!(reference.upgrade().is_some());
assert!(reference.upgrade().map_or(false, |obj| obj.is(&object)));
drop(object);
assert!(reference.call0()?.is_none());
assert!(reference.upgrade().is_none());
Ok(())
})
}
#[test]
fn test_weakref_get_object() -> PyResult<()> {
Python::with_gil(|py| {
let class = get_type(py)?;
let object = class.call0()?;
let reference = PyWeakrefReference::new(&object)?;
assert!(reference.call0()?.is(&object));
assert!(reference.get_object().is(&object));
drop(object);
assert!(reference.call0()?.is(&reference.get_object()));
assert!(reference.call0()?.is_none());
assert!(reference.get_object().is_none());
Ok(())
})
}
}
#[cfg(all(feature = "macros", not(all(Py_LIMITED_API, not(Py_3_9)))))]
mod pyo3_pyclass {
use super::*;
use crate::{pyclass, Py};
#[pyclass(weakref, crate = "crate")]
struct WeakrefablePyClass {}
#[test]
fn test_weakref_reference_behavior() -> PyResult<()> {
Python::with_gil(|py| {
let object: Bound<'_, WeakrefablePyClass> = Bound::new(py, WeakrefablePyClass {})?;
let reference = PyWeakrefReference::new(&object)?;
assert!(!reference.is(&object));
assert!(reference.get_object().is(&object));
#[cfg(not(Py_LIMITED_API))]
assert_eq!(reference.get_type().to_string(), CLASS_NAME);
#[cfg(not(Py_LIMITED_API))]
assert_eq!(reference.getattr("__class__")?.to_string(), CLASS_NAME);
#[cfg(not(Py_LIMITED_API))]
check_repr(&reference, Some((object.as_any(), "WeakrefablePyClass")))?;
assert!(reference
.getattr("__callback__")
.map_or(false, |result| result.is_none()));
assert!(reference.call0()?.is(&object));
drop(object);
assert!(reference.get_object().is_none());
#[cfg(not(Py_LIMITED_API))]
assert_eq!(reference.getattr("__class__")?.to_string(), CLASS_NAME);
check_repr(&reference, None)?;
assert!(reference
.getattr("__callback__")
.map_or(false, |result| result.is_none()));
assert!(reference.call0()?.is_none());
Ok(())
})
}
#[test]
fn test_weakref_upgrade_as() -> PyResult<()> {
Python::with_gil(|py| {
let object = Py::new(py, WeakrefablePyClass {})?;
let reference = PyWeakrefReference::new(object.bind(py))?;
{
let obj = reference.upgrade_as::<WeakrefablePyClass>();
assert!(obj.is_ok());
let obj = obj.unwrap();
assert!(obj.is_some());
assert!(obj.map_or(false, |obj| obj.as_ptr() == object.as_ptr()));
}
drop(object);
{
let obj = reference.upgrade_as::<WeakrefablePyClass>();
assert!(obj.is_ok());
let obj = obj.unwrap();
assert!(obj.is_none());
}
Ok(())
})
}
#[test]
fn test_weakref_upgrade_as_unchecked() -> PyResult<()> {
Python::with_gil(|py| {
let object = Py::new(py, WeakrefablePyClass {})?;
let reference = PyWeakrefReference::new(object.bind(py))?;
{
let obj = unsafe { reference.upgrade_as_unchecked::<WeakrefablePyClass>() };
assert!(obj.is_some());
assert!(obj.map_or(false, |obj| obj.as_ptr() == object.as_ptr()));
}
drop(object);
{
let obj = unsafe { reference.upgrade_as_unchecked::<WeakrefablePyClass>() };
assert!(obj.is_none());
}
Ok(())
})
}
#[test]
fn test_weakref_upgrade() -> PyResult<()> {
Python::with_gil(|py| {
let object = Py::new(py, WeakrefablePyClass {})?;
let reference = PyWeakrefReference::new(object.bind(py))?;
assert!(reference.call0()?.is(&object));
assert!(reference.upgrade().is_some());
assert!(reference.upgrade().map_or(false, |obj| obj.is(&object)));
drop(object);
assert!(reference.call0()?.is_none());
assert!(reference.upgrade().is_none());
Ok(())
})
}
#[test]
fn test_weakref_get_object() -> PyResult<()> {
Python::with_gil(|py| {
let object = Py::new(py, WeakrefablePyClass {})?;
let reference = PyWeakrefReference::new(object.bind(py))?;
assert!(reference.call0()?.is(&object));
assert!(reference.get_object().is(&object));
drop(object);
assert!(reference.call0()?.is(&reference.get_object()));
assert!(reference.call0()?.is_none());
assert!(reference.get_object().is_none());
Ok(())
})
}
}
}