-
Notifications
You must be signed in to change notification settings - Fork 108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
wasm rdbms #1212
base: main
Are you sure you want to change the base?
wasm rdbms #1212
Conversation
# Conflicts: # Cargo.toml
💵 To receive payouts, sign up on Algora, link your Github account and connect with Stripe. |
Vec<T>: RdbmsIntoValueAndType, | ||
{ | ||
fn merge_types(first: AnalysedType, second: AnalysedType) -> AnalysedType { | ||
if let (AnalysedType::Record(f), AnalysedType::Record(s)) = (first.clone(), second) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the need of this merge_types?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
postgres have support for custom (user defined types), where types are in general defined by data, this trait is used to create AnalysedType
from basic and user defined types
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work!
I have added an initial set of comments. (Did not review much the actual services::rdbms
module).
golem-test-framework/src/dsl/mod.rs
Outdated
worker_id: impl Into<TargetWorkerId> + Send + Sync, | ||
function_name: &str, | ||
params: Vec<ValueAndType>, | ||
) -> crate::Result<Result<TypeAnnotatedValue, Error>>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have introduced the ValueAndType
type as a much more friendly alternative to the protobuf-generated TypeAnnotatedValue
and want to use that in code (and only use TAV on the gRPC APIs). This was not fully done in existing code, but new code should use ValueAndType
.
(I did not reach in the review the tests that require this yet but hopefully they could be even better if the result type is ValueAndType
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, thanks, will take a look on that (when i added first impl., of this api, i think it was not there)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed impl. , now there is ValueAndType
in response
Ok(params) => Ok(RdbmsRequest::<MysqlType>::new(pool_key, statement, params)), | ||
Err(error) => Err(RdbmsError::QueryParameterFailure(error)), | ||
}; | ||
durability.persist(self, (), result).await |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The input parameter should not be ()
but at least the statement
and possibly also the params
. This way these are added to the oplog and are visible through oplog query, useful for debugging (you see what query you did, not just that you did a query).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed impl., now there is Option<RdbmsRequest<T>>
in input, (similar to query
and execute
functions)
statement: String, | ||
params: Vec<DbValue>, | ||
) -> anyhow::Result<Result<DbResult, Error>> { | ||
let worker_id = self.state.owned_worker_id.worker_id.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let worker_id = self.state.owned_worker_id.worker_id.clone(); | |
let worker_id = self.state.owned_worker_id.worker_id(); |
statement: String, | ||
params: Vec<DbValue>, | ||
) -> anyhow::Result<Result<u64, Error>> { | ||
let worker_id = self.state.owned_worker_id.worker_id.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let worker_id = self.state.owned_worker_id.worker_id.clone(); | |
let worker_id = self.state.owned_worker_id.worker_id(); |
self, | ||
"rdbms::mysql::db-connection", | ||
"query-stream", | ||
DurableFunctionType::WriteRemoteBatched(Some(begin_oplog_idx)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Marking every query as WriteRemote
or WriteRemoteBatched
, even if they are not changing the database, is a simplification that has negative side effects.
See the following PR description to understand what this is used for: #682
If even read queries are treated as writes, that interferes with the linked logic, if there are interleaved http requests and database requests.
So it would be nice to be able to detect if a query is "read-only" and use ReadRemote
function type in that case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also note that the above logic (of checking concurrent write effects) apply automatically when you use begin_durable_function/end_durable_function
with write-remote function type. This may be the correct thing but needs to be a conscious decision. But only marking side effects that are changing the DB as WriteRemote/WriteRemoteBatched
definitely helps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was not sure what is the best solution in this case,
as you mentioned, queries are in general readonly,
I think that all function invocations (in relation to query stream ) needs to be handled in case of durability like one batched operation, (similar to transaction)
if DurableFunctionType.ReadRemote
will be used in HostDbResultStream.get_next
and HostDbResultStream.get_columns
(for stream, which is not in transaction)
for example, if first 2 invocations HostDbResultStream.get_next
will be processed/persisted, then worker will crash , I am not sure if it can be easy to continue with next HostDbResultStream.get_next
(considering that first 2 chunks can be taken from state/oplog and next needs to be read from DB)
but I am not sure, maybe I do not understand something, please let me know, thank you very much
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are right and maybe we are missing something (in the core). Because yes the whole set of operations (opening the transaction/stream etc) have to be retried on replay if it was interrupted - but on the other hand it is not necessarily having the "write remote" semantics which means that we did something to the outside world that we cannot undo. So for example if it was just a SELECT, or a transaction that never got committed, then in the DB it's like nothing happened.
Anyway I think this should not block this PR, but something I want to think more about.
self.observe_function_call("rdbms::mysql::db-connection", "begin-transaction"); | ||
|
||
let begin_oplog_index = self | ||
.begin_durable_function(&DurableFunctionType::WriteRemoteBatched(None)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
begin_durable_function
must be paired with an end_durable_function
on all code paths otherwise it is leaking. I think now it is only ended in case of some errors but not on the happy path. (I may be wrong, lots of lines to review)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currently end_durable_function
is invoked HostDbTransaction.drop
, but will add all also handling to HostDbTransaction.commit
and HostDbTransaction.rollback
and will check other cases
let handle = resource.rep(); | ||
self.state | ||
.open_function_table | ||
.insert(handle, begin_oplog_index); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly to the previous comment - this entry has to be remove
d on each code path when the transaction/stream is completed.
} | ||
} | ||
|
||
pub trait RdbmsIntoValueAndType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should not be any need for this type class. We already have IntoValue
where you implement two functions, one that returns the value and another that returns the type. There is also IntoValueAndType
which is implemented automatically for T: IntoValue
that gives you an into_value_and_type
function just like the one here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because postgres
have custom (user defined types), which basically mean recursion, I needed to have possibility to create value and type together (ValueAndType
),
as you also mentioned there IntoValueAndType
but as there is impl<T: IntoValue + Sized> IntoValueAndType for T
https://github.com/golemcloud/golem/blob/main/wasm-rpc/src/value_and_type.rs#L92-L96
if I wanted directly implement IntoValueAndType
for some types, it ended in some cases with
https://doc.rust-lang.org/error_codes/E0119.html
so I added RdbmsIntoValueAndType
, not sure if there is some different solution for problem, but I will think about that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason why there is a separate IntoValue
and IntoValueAndType
type class is that IntoValue
has a static get_type
method (not requiring self
). That is useful in many cases and simple to implement for most types. However there are some more dynamic cases where you don't know the type statically. For those, we are implementing IntoValueAndType
directly (and not implementing IntoValue
) because there you have access to self
to produce both the value and type.
I think if you would only implement IntoValueAndType
in these cases it should work
(For example one type such the above is impl IntoValueAndType for SerializableInvokeRequest
)
Could you give it one more try, would be nice to not have an extra trait for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to figure out how to use IntoValueAndType
, one more time, but was not able to figure out solution
it is possible to do implementation of IntoValueAndType
for specific type (similar like for mentioned SerializableInvokeRequest
),
but if it is need to be in another common type type like Option
, Vec
or Result
for example Option<SerializableInvokeRequest>
, it will not work
|
1026 | Ok(payload.into_value_and_type())
| ^^^^^^^^^^^^^^^^^^^ method cannot be called on `Option<SerializableInvokeRequest>` due to unsatisfied trait bounds
|
::: /Users/coon/.rustup/toolchains/stable-x86_64-apple-darwin/lib/rustlib/src/rust/library/core/src/option.rs:572:1
|
572 | pub enum Option<T> {
| ------------------ doesn't satisfy `_: IntoValueAndType` or `_: IntoValue`
|
= note: the following trait bounds were not satisfied:
`std::option::Option<SerializableInvokeRequest>: IntoValue`
which is required by `std::option::Option<SerializableInvokeRequest>: IntoValueAndType`
`&std::option::Option<SerializableInvokeRequest>: IntoValue`
which is required by `&std::option::Option<SerializableInvokeRequest>: IntoValueAndType`
`&mut std::option::Option<SerializableInvokeRequest>: IntoValue`
which is required by `&mut std::option::Option<SerializableInvokeRequest>: IntoValueAndType`
if I want to add impl<T: IntoValueAndType> IntoValueAndType for Option<T>
it will end with compilation issues
error[E0119]: conflicting implementations of trait `value_and_type::IntoValueAndType` for type `std::option::Option<_>`
--> wasm-rpc/src/value_and_type.rs:291:1
|
93 | impl<T: IntoValue + Sized> IntoValueAndType for T {
| ------------------------------------------------- first implementation here
...
291 | impl<T: IntoValueAndType> IntoValueAndType for Option<T> {
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ conflicting implementation for `std::option::Option<_>`
For more information about this error, try `rustc --explain E0119`.
based on what I have seen
https://std-dev-guide.rust-lang.org/policy/specialization.html
can be solution for that problem, but this is unstable feature
do you have some different ideas ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I see, and it is not possible to implement a impl<T: IntoValueAndType> IntoValueAndType for Option<T>
because you cannot figure out the type information in the None
case. In your trait there is still a get type. This makes me realize I may not fully understand the root problem :)
If you can return an AnalysedType
you could just implement IntoValue
. So I guess in some cases this base type is not the truth? So for these recursive types you just return something "random" if it's None
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So for these recursive types you just return something "random" if it's None
it is not completely random type
RdbmsIntoValueAndType
implementation is following
pub trait RdbmsIntoValueAndType {
fn into_value_and_type(self) -> ValueAndType;
fn get_base_type() -> AnalysedType;
}
impl<T: RdbmsIntoValueAndType> RdbmsIntoValueAndType for Option<T> {
fn into_value_and_type(self) -> ValueAndType {
match self {
Some(v) => {
let v = v.into_value_and_type();
ValueAndType::new(
Value::Option(Some(Box::new(v.value))),
analysed_type::option(v.typ),
)
}
None => ValueAndType::new(Value::Option(None), Self::get_base_type()),
}
}
fn get_base_type() -> AnalysedType {
analysed_type::option(T::get_base_type())
}
}
it is more like combination of IntoValueAndType
and IntoValue
so in case of postgres DbValue
where there are common and user defined type, DbValue::get_base_type()
returning "common" schema only, but DbValue::into_value_and_type
may have schema with additional custom user defined types, if specific DbValue
holds data with user defined type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok let's leave it like this for now
} | ||
|
||
#[test] | ||
async fn postgres_create_insert_select_test( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be these tests are necessary, but I couldn't understand yet why. Could you please explain @justcoon ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
those test are testing conversion and type mappings, query execution and transaction implementations
$30, $31, $32, $33, $34, $35, $36, $37, $38::tsvector, $39::tsquery, | ||
$40, $41, $42 | ||
); | ||
"#; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was these required because of the type conversions that we have?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, to test conversions, and type mappings
LGTM @justcoon nice one. |
# Conflicts: # Cargo.lock # golem-worker-executor-base/src/preview2/mod.rs # golem-worker-executor-base/src/worker/mod.rs
# Conflicts: # Cargo.toml
fixes: #1016
/claim #1016
rdbms host implementation for postgres and mysql
sqlx
rust library,mysql
(docker)invoke_and_await_typed
functions toWorkerService
- (typed response is possible to transform to json, which then make test results comparison easier)TODO
query_stream
implementation for transaction (it may require implement custom sqlx Executor, as provided implementation is for mut Connections, and then there is error like: error[E0515]: cannot return value referencing local variabletransaction
)SerializableError
implementation for rdbms errors (initial impl. usingSerializableError::Generic
, should beSerializableError::Rdbms { error: crate::services::rdbms::Error }
added?, what is preferred approach in case ofSerializableError
in case of adding new modules) - updateSerializableError::Rdbms
was addedValueAndType
whereAnalysedType
is in general WIT type definition, this is issue in case of rdbms::postgres which have recursive types, there is a question, what can be done in this caseIntoValueAndType
can be used instead ofRdbmsIntoValueAndType
(RdbmsIntoValueAndType
was added because https://doc.rust-lang.org/error_codes/E0119.html)