Skip to content

Commit

Permalink
add missing pipeline item methods
Browse files Browse the repository at this point in the history
  • Loading branch information
victorteokw committed Feb 1, 2024
1 parent 511716d commit 981311a
Showing 1 changed file with 112 additions and 2 deletions.
114 changes: 112 additions & 2 deletions src/namespace/namespace.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use pyo3::{pyclass, pymethods, types::PyCFunction, IntoPy, Py, PyErr, PyObject, PyResult, Python};
use teo::prelude::{handler::Group as TeoHandlerGroup, model::Field as TeoField, model::Property as TeoProperty, model::Relation as TeoRelation, request, Enum as TeoEnum, Member as TeoEnumMember, Middleware, Model as TeoModel, Namespace as TeoNamespace, Next};
use teo::prelude::{handler::Group as TeoHandlerGroup, model::Field as TeoField, model::Property as TeoProperty, model::Relation as TeoRelation, pipeline::{self, item::validator::Validity}, request, Enum as TeoEnum, Member as TeoEnumMember, Middleware, Model as TeoModel, Namespace as TeoNamespace, Next, Value};

use crate::{utils::{check_callable::check_callable, await_coroutine_if_needed::await_coroutine_if_needed_value_with_locals}, object::{arguments::teo_args_to_py_args, value::teo_value_to_py_any}, model::{model::Model, field::field::Field, relation::relation::Relation, property::property::Property}, result::{IntoPyResultWithGil, IntoTeoPathResult, IntoTeoResult}, r#enum::{r#enum::Enum, member::member::EnumMember}, request::{Request, RequestCtx}, dynamic::py_ctx_object_from_teo_transaction_ctx, response::Response, handler::group::HandlerGroup};
use crate::{utils::{check_callable::check_callable, await_coroutine_if_needed::await_coroutine_if_needed_value_with_locals}, object::{arguments::teo_args_to_py_args, model::teo_model_object_to_py_any, py_any_to_teo_object, teo_object_to_py_any, value::{py_any_to_teo_value, teo_value_to_py_any}}, model::{model::Model, field::field::Field, relation::relation::Relation, property::property::Property}, result::{IntoPyResultWithGil, IntoTeoPathResult, IntoTeoResult}, r#enum::{r#enum::Enum, member::member::EnumMember}, request::{Request, RequestCtx}, dynamic::py_ctx_object_from_teo_transaction_ctx, response::Response, handler::group::HandlerGroup};

#[pyclass]
pub struct Namespace {
Expand Down Expand Up @@ -151,6 +151,116 @@ impl Namespace {
Ok(())
}

pub fn define_pipeline_item(&mut self, py: Python<'_>, name: &str, callback: PyObject) -> PyResult<()> {
check_callable(callback.as_ref(py))?;
let callback_owned = &*Box::leak(Box::new(Py::from(callback)));
let main_thread_locals = &*Box::leak(Box::new(pyo3_asyncio::tokio::get_current_locals(py)?));
self.teo_namespace.define_pipeline_item(name, move |args, ctx: pipeline::Ctx| async move {
let result = Python::with_gil(|py| {
let value = teo_object_to_py_any(py, ctx.value())?;
let args = teo_args_to_py_args(py, &args)?;
let object = teo_model_object_to_py_any(py, ctx.object())?;
let ctx = py_ctx_object_from_teo_transaction_ctx(py, ctx.transaction_ctx(), "")?;
let result = callback_owned.call1(py, (value, args, object, ctx))?;
Ok(result)
}).into_teo_result()?;
let awaited_result = await_coroutine_if_needed_value_with_locals(result, main_thread_locals).await.into_teo_result()?;
Python::with_gil(|py| {
let result = py_any_to_teo_object(py, awaited_result).into_teo_result()?;
Ok(result)
})
});
Ok(())
}

pub fn define_transform_pipeline_item(&mut self, py: Python<'_>, name: &str, callback: PyObject) -> PyResult<()> {
self.define_pipeline_item(py, name, callback)
}

pub fn define_validator_pipeline_item(&mut self, py: Python<'_>, name: &str, callback: PyObject) -> PyResult<()> {
check_callable(callback.as_ref(py))?;
let callback_owned = &*Box::leak(Box::new(Py::from(callback)));
let main_thread_locals = &*Box::leak(Box::new(pyo3_asyncio::tokio::get_current_locals(py)?));
self.teo_namespace.define_validator_pipeline_item(name, move |_: Value, args, ctx: pipeline::Ctx| async move {
let result = Python::with_gil(|py| {
let value = teo_object_to_py_any(py, ctx.value())?;
let args = teo_args_to_py_args(py, &args)?;
let object = teo_model_object_to_py_any(py, ctx.object())?;
let ctx = py_ctx_object_from_teo_transaction_ctx(py, ctx.transaction_ctx(), "")?;
let result = callback_owned.call1(py, (value, args, object, ctx))?;
Ok(result)
}).into_teo_result()?;
let awaited_result = await_coroutine_if_needed_value_with_locals(result, main_thread_locals).await.into_teo_result()?;
Python::with_gil(|py| {
let result = py_any_to_teo_value(py, awaited_result.as_ref(py)).into_teo_result()?;
Ok::<Validity, teo::prelude::Error>(match result {
Value::String(s) => {
Validity::Invalid(s.to_owned())
},
Value::Bool(b) => if b {
Validity::Valid
} else {
Validity::Invalid("value is invalid".to_owned())
},
_ => Validity::Valid
})
})
});
Ok(())
}

pub fn define_callback_pipeline_item(&mut self, py: Python<'_>, name: &str, callback: PyObject) -> PyResult<()> {
check_callable(callback.as_ref(py))?;
let callback_owned = &*Box::leak(Box::new(Py::from(callback)));
let main_thread_locals = &*Box::leak(Box::new(pyo3_asyncio::tokio::get_current_locals(py)?));
self.teo_namespace.define_callback_pipeline_item(name, move |args, ctx: pipeline::Ctx| async move {
let result = Python::with_gil(|py| {
let value = teo_object_to_py_any(py, ctx.value())?;
let args = teo_args_to_py_args(py, &args)?;
let object = teo_model_object_to_py_any(py, ctx.object())?;
let ctx = py_ctx_object_from_teo_transaction_ctx(py, ctx.transaction_ctx(), "")?;
let result = callback_owned.call1(py, (value, args, object, ctx))?;
Ok(result)
}).into_teo_result()?;
let _ = await_coroutine_if_needed_value_with_locals(result, main_thread_locals).await.into_teo_result()?;
Ok(())
});
Ok(())
}

pub fn define_compare_pipeline_item(&mut self, py: Python<'_>, name: &str, callback: PyObject) -> PyResult<()> {
check_callable(callback.as_ref(py))?;
let callback_owned = &*Box::leak(Box::new(Py::from(callback)));
let main_thread_locals = &*Box::leak(Box::new(pyo3_asyncio::tokio::get_current_locals(py)?));
self.teo_namespace.define_compare_pipeline_item(name, move |old: Value, new: Value, args, ctx: pipeline::Ctx| async move {
let result = Python::with_gil(|py| {
let value_old = teo_value_to_py_any(py, &old)?;
let value_new = teo_value_to_py_any(py, &new)?;
let args = teo_args_to_py_args(py, &args)?;
let object = teo_model_object_to_py_any(py, ctx.object())?;
let ctx = py_ctx_object_from_teo_transaction_ctx(py, ctx.transaction_ctx(), "")?;
let result = callback_owned.call1(py, (value_old, value_new, args, object, ctx))?;
Ok(result)
}).into_teo_result()?;
let awaited_result = await_coroutine_if_needed_value_with_locals(result, main_thread_locals).await.into_teo_result()?;
Python::with_gil(|py| {
let result = py_any_to_teo_value(py, awaited_result.into_ref(py)).into_teo_result()?;
Ok::<Validity, teo::prelude::Error>(match result {
Value::String(s) => {
Validity::Invalid(s.to_owned())
},
Value::Bool(b) => if b {
Validity::Valid
} else {
Validity::Invalid("value is invalid".to_owned())
},
_ => Validity::Valid
})
})
});
Ok(())
}

pub fn define_handler(&mut self, py: Python<'_>, name: String, callback: PyObject) -> PyResult<()> {
check_callable(callback.as_ref(py))?;
let main_thread_locals = &*Box::leak(Box::new(pyo3_asyncio::tokio::get_current_locals(py)?));
Expand Down

0 comments on commit 981311a

Please sign in to comment.